# Stock Price Prediction

In [1]:
%load_ext autoreload
%autoreload 2
import numpy as np
import copy 
import random
from tqdm import tqdm
from scipy.stats import bernoulli
from scipy.optimize import minimize
import matplotlib.pyplot as plt
from Q_learning import *
import yfinance as yf
import itertools

# Data Treatment

Load Data from Yahoo Finance

In [2]:
AAPL_data = yf.download("AAPL", start="2010-01-02", end="2021-01-01")
# MSFT_data = yf.download("MSFT", start="2010-01-02", end="2021-01-01")
# GOOGL_data = yf.download("GOOGL", start="2010-01-02", end="2021-01-01")
# EBAY_data = yf.download("EBAY", start="2010-01-02", end="2021-01-01")
# AMZN_data = yf.download("AMZN", start="2010-01-02", end="2021-01-01")

AAPL_returns = np.sign(np.diff(AAPL_data["Close"]))

[*********************100%***********************]  1 of 1 completed


We map the returns to [-2,-1,1,2]

In [3]:
small_return = 0.01
ind_0_pos = (np.abs(np.diff(AAPL_data["Close"])/(AAPL_data["Close"])[:-1])<small_return)&((np.diff(AAPL_data["Close"])/(AAPL_data["Close"])[:-1])>=0)
ind_pos = ((np.diff(AAPL_data["Close"])/(AAPL_data["Close"])[:-1])>=small_return)
ind_0_neg = (np.abs(np.diff(AAPL_data["Close"])/(AAPL_data["Close"])[:-1])<small_return)&((np.diff(AAPL_data["Close"])/(AAPL_data["Close"])[:-1])<0)
ind_neg = ((np.diff(AAPL_data["Close"])/(AAPL_data["Close"])[:-1])<= - small_return)
AAPL_returns = np.zeros(len(AAPL_data)-1)
AAPL_returns[ind_0_pos] = 1
AAPL_returns[ind_pos] = 2
AAPL_returns[ind_0_neg] = -1
AAPL_returns[ind_neg] = -2

We define the training and the  testing period

In [39]:
training_end = 2200
AAPL_returns_train = AAPL_returns[:2200]

#Testing Period 1
test_period_1_start = 2201
test_period_1_end = 2301




AAPL_returns_test1 = AAPL_returns[test_period_1_start:test_period_1_end]


# Setting 

Fix the setting

In [18]:
h = 5 # Length of history
returns = AAPL_returns_train

# State Space
T = [-2,-1,1,2]
X = np.array(list(itertools.product(T, repeat=h)))
# Action Space
A = np.array(T)

def c(x,y):
    return 100*np.linalg.norm(x[:-1]-y[:-1])+np.abs(x[-1]-y[-1])

def r(x,a,y):
    return int(y[-1]==a)
epsilon = 0.1 # Radius of the Wasserstein Ball
alpha = 0.45 # Discount Factor

To speed up the computations we compute the Values p_u, p_d, p_u0, p_d0 in dependence of each state once!

In [19]:
p_list = []
for x in X:
    eps = 1e-8
    x_u = np.concatenate([x[1:],[2]])
    x_d = np.concatenate([x[1:],[-2]])
    x_u0 = np.concatenate([x[1:],[1]])
    x_d0 = np.concatenate([x[1:],[-1]])    
    p_u_raw = np.sum([np.all(returns[i:(h+i)]==x_u) for i in range(len(returns)-h-1)])
    p_d_raw = np.sum([np.all(returns[i:(h+i)]==x_d) for i in range(len(returns)-h-1)])
    p_u0_raw = np.sum([np.all(returns[i:(h+i)]==x_u0) for i in range(len(returns)-h-1)])
    p_d0_raw = np.sum([np.all(returns[i:(h+i)]==x_d0) for i in range(len(returns)-h-1)])
    p_u = (eps/4+p_u_raw)/(p_u_raw+p_d_raw+p_u0_raw+p_d0_raw+eps)
    p_d = (eps/4+p_d_raw)/(p_u_raw+p_d_raw+p_u0_raw+p_d0_raw+eps)
    p_u0= (eps/4+p_u0_raw)/(p_u_raw+p_d_raw+p_u0_raw+p_d0_raw+eps)
    p_d0= (eps/4+p_d0_raw)/(p_u_raw+p_d_raw+p_u0_raw+p_d0_raw+eps)
    p_list.append([p_u,p_d,p_u0,p_d0])

Define density and a "prediction" of the next state

In [20]:
def p_0(k,x,a):
    ind = np.flatnonzero((x_0==X).all(1))[0]
    p_u,p_d,p_u0,p_d0 = p_list[ind]
    return p_d*np.all(k==np.concatenate([x[1:],[-2]]))+p_d0*np.all(k==np.concatenate([x[1:],[-1]]))+ p_u0*np.all(k==np.concatenate([x[1:],[1]]))+ p_u*np.all(k==np.concatenate([x[1:],[2]]))
def P_0(x,a):
    ind = np.flatnonzero((x_0==X).all(1))[0]
    p_u,p_d,p_u0,p_d0 = p_list[ind]
    rand_unif=(np.random.random_sample(size=1))
    rand = -2*int(rand_unif<p_d)-1*int((rand_unif>(p_d))*(rand_unif<(p_d+p_d0)))+1*int((rand_unif>(p_d+p_d0))*(rand_unif<(p_d+p_d0+p_u0)))+2*int(rand_unif>(p_d+p_d0+p_u0))
    return np.concatenate([x[1:],[rand]])    

Determine the initial value (randomly chosen)

In [22]:
rng = np.random.default_rng()
x_0 = rng.choice(np.array([returns[i:(h+i)]  for i in range(len(returns)-h-1)]),axis = 0)

# Training

Train the policies by Q learning

In [23]:
Q_opt_nonrobust = q_learning(X,
               A,
               r,
               P_0, # Simulation of next state in dependence of x and a
               alpha,
               x_0, 
               eps_greedy = 0.1,
               Nr_iter = 50000,
               gamma_t_tilde = lambda t: 1/(t+1),
               Q_0 = 2*np.ones([len(X),len(A)]))

100%|██████████| 50000/50000 [00:09<00:00, 5318.76it/s]


In [24]:
Q_opt_robust = robust_q_learning(X,
               A,
               r,
               c,
               P_0, # Simulation of next state in dependence of x and a
               p_0, # The probability mass function
               epsilon,
               alpha,
               x_0, 
               eps_greedy = 0.1,
               Nr_iter = 50000,
               q =1,
               gamma_t_tilde =  lambda t: 1/(t+1),
               time_series = True,
               T=T,
               Q_0 = 2*np.ones([len(X),len(A)]))

100%|██████████| 50000/50000 [10:27:33<00:00,  1.33it/s]   


 Derive the optimal strategies from the optimal Q value functions

In [25]:
if np.ndim(A)>1:
    A_list = A
else:
    A_list = np.array([[a] for a in A])
if np.ndim(X)>1:
    X_list = X
else:
    X_list = np.array([[x] for x in X])
def a_index(a):
    return np.flatnonzero((a==A_list).all(1))[0]
def x_index(x):
    return np.flatnonzero((x==X_list).all(1))[0]
def a_opt_robust(x):
    return A[np.argmax(Q_opt_robust[x_index(x),:])]
def a_opt_nonrobust(x):
    return A[np.argmax(Q_opt_nonrobust[x_index(x),:])]

Compare with trivial strategy (constant with highest success ratio)

In [26]:
def a_trivial(x):
    m = np.max([np.sum(AAPL_returns ==-2 ),np.sum(AAPL_returns ==-1 ),np.sum(AAPL_returns == 1),np.sum(AAPL_returns == 2)])
    return int(np.sum(AAPL_returns == 1)==m)-int(np.sum(AAPL_returns == -1)==m)+2*int(np.sum(AAPL_returns == 2)==m)-2*int(np.sum(AAPL_returns == -2)==m)

# Evaluation

### Training Period

Test on training period (to see whether strategy was properly trained)

In [27]:
test_returns0 = AAPL_returns_train
X_t = np.array([test_returns0[i:(h+i)]  for i in range(len(test_returns0)-h-1)])
print("Test on Training Period:\n\nDays:             {}\nNegative Returns:   {}\nSmall Neg. Returns: {}\nSmall Pos. Returns: {}\nPositive Returns:   {}".format(len(test_returns0),
                                                                            np.sum(test_returns0==-2),
                                                                            np.sum(test_returns0==-1),                                             
                                                                            np.sum(test_returns0==1),
                                                                            np.sum(test_returns0==2)))
non_robust_rewards = np.array([r(X_t[i],a_opt_nonrobust(X_t[i]),X_t[i+1]) for i in range(len(X_t)-1)])
robust_rewards = np.array([r(X_t[i],a_opt_robust(X_t[i]),X_t[i+1]) for i in range(len(X_t)-1)])
trivial_rewards = np.array([r(X_t[i],a_trivial(X_t[i]),X_t[i+1]) for i in range(len(X_t)-1)])
print("Non-Robust:       {0:0.4f}% correct \nRobust:           {1:0.4f}% correct\nTrivial:          {2:0.4f}% correct  ".format(100*np.sum(non_robust_rewards>0)/len(X_t),
                                                                          100*np.sum(robust_rewards>0)/len(X_t),
                                                                                     100*np.sum(trivial_rewards>0)/len(X_t)))

Test on Training Period:

Days:             2200
Negative Returns:   404
Small Neg. Returns: 637
Small Pos. Returns: 627
Positive Returns:   532
Non-Robust:       28.7147% correct 
Robust:           22.3336% correct
Trivial:          28.9426% correct  


### Test Period

In [41]:
test_returns1 = AAPL_returns_test1
X_t = np.array([test_returns1[i:(h+i)]  for i in range(len(test_returns1)-h-1)])
print("Test Period 1:\n\nDays:             {}\nNegative Returns:   {}\nSmall Neg. Returns: {}\nSmall Pos. Returns: {}\nPositive Returns:   {}".format(len(test_returns1),
                                                                            np.sum(test_returns1==-2),
                                                                            np.sum(test_returns1==-1),                                             
                                                                            np.sum(test_returns1==1),
                                                                            np.sum(test_returns1==2)))
non_robust_rewards = np.array([r(X_t[i],a_opt_nonrobust(X_t[i]),X_t[i+1]) for i in range(len(X_t)-1)])
robust_rewards = np.array([r(X_t[i],a_opt_robust(X_t[i]),X_t[i+1]) for i in range(len(X_t)-1)])
trivial_rewards = np.array([r(X_t[i],a_trivial(X_t[i]),X_t[i+1]) for i in range(len(X_t)-1)])
print("Non-Robust:       {0:0.4f}% correct \nRobust:           {1:0.4f}% correct\nTrivial:          {2:0.4f}% correct  ".format(100*np.sum(non_robust_rewards>0)/len(X_t),
                                                                          100*np.sum(robust_rewards>0)/len(X_t),
                                                                                     100*np.sum(trivial_rewards>0)/len(X_t)))

Test Period 1:

Days:             100
Negative Returns:   29
Small Neg. Returns: 21
Small Pos. Returns: 22
Positive Returns:   28
Non-Robust:       23.4043% correct 
Robust:           28.7234% correct
Trivial:          21.2766% correct  
