## Imports:

In [None]:
import numpy as np
import yfinance as yf # probably easiest for financial data
import pandas as pd
import math

## Defining the Discrete MDP class:

In [None]:
class MDP:

  max_iter = 20
  convergence_threshold = 0.1
  gamma = 0.9913 #realistic day to day discount based on risk free rate

  def __init__(self, action, tp, reward_function, pricedict, values):

    self.action = action
    self.tp = tp
    self.reward = reward_function
    self.pricedict = pricedict

    self.values = values


  def value_iteration(self):
    iter = 0
    delta = 1
    while(iter!=self.max_iter and (delta>=self.convergence_threshold)):
      iter+=1
      delta = 0
      new_vals = np.zeros((5,5,5,5,11,3))

      for i in range(5):
        for j in range(5):
          for k in range(5):
            for l in range(5):
              for inventory in range(11):
                for a in range(len(self.action)):
                  state = (i,j,k,l,inventory)

                  if (self.action[a] == "buy" and inventory == 10) or (self.action[a] == "sell" and inventory == 0):
                    continue

                  reward = self.reward(state, self.action[a], self.pricedict, self.tp, self.gamma, self.values)
                  delta = max(delta, abs(self.values[i][j][k][l][inventory][a] - reward))
                  new_vals[i][j][k][l][inventory][a] = reward

      print(iter, delta)
      self.values = new_vals

    return

  def best_action(self, s):

    max = self.values[s[0]][s[1]][s[2]][s[3]][s[4]][0]
    a = 0
    for i in range(1,3):
      if self.values[s[0]][s[1]][s[2]][s[3]][s[4]][i] > max:
        max = self.values[s[0]][s[1]][s[2]][s[3]][s[4]][i]
        a = i
        #long but gets rid of if statement
    return self.action[a]*bool(max>=0 and not((s[4] == 0 and self.action[a] == 'sell') or (s[4] == 10 and self.action[a] == 'buy')))+bool(max<0 and not((s[4] == 0 and self.action[a] == 'sell') or (s[4] == 10 and self.action[a] == 'buy')))*"clear"+bool((s[4] == 0 and self.action[a] == 'sell') or (s[4] == 10 and self.action[a] == 'buy'))*"hold"

## Cleaning data and sorting Data

In [None]:
stock = "NEM"

In [None]:
tkr = yf.Ticker(stock)
hist = tkr.history(period="max")
print((tkr.splits))

Date
1987-06-11 00:00:00-04:00    2.00
1987-10-09 00:00:00-04:00    1.50
1994-04-22 00:00:00-04:00    1.25
Name: Stock Splits, dtype: float64


In [None]:
data = yf.download(stock, start='1999-01-01', end='2024-10-01')
data['MA_5'] = data['Open'].rolling(window=5).mean()  # 5-day moving average
data['Variance'] = data['Open'].rolling(window=5).var()  # 5-day rolling variance
data = data.iloc[20:]
data = data.drop(columns=['Adj Close', 'High', 'Low', 'Close'])

[*********************100%***********************]  1 of 1 completed


## Defining functions to custumize MDP

In [None]:
def transform_data(data, num_buckets):
  bucket_labels = range(0, num_buckets)
  bucketized_data = data.copy()
  bucket_boundaries = {}

  for col in data.columns:
      bucketized_data[col], bins = pd.qcut(data[col], q=num_buckets, labels=bucket_labels, retbins=True)
      bucket_boundaries[col] = bins[1:]
  return bucketized_data, bucket_boundaries


def bucket_state(s, bucket_boundaries, c):
  r = []
  for i in range(len(c)):
    buckets = bucket_boundaries[c[i]]
    v = s[i]
    for j in range(len(buckets)):
      if v < buckets[j]:
        r.append(j)
        break
      if j == len(buckets) - 1:
        r.append(j)
  return r

def create_pricedict(data, close_bucket):
  price_dict = np.zeros((len(close_bucket), 2))
  r = np.zeros(len(close_bucket))
  for row in data.itertuples():
    p = row[1]
    for i in range(len(close_bucket)):
      if p < close_bucket[i]:
        price_dict[i][0]+=p
        price_dict[i][1]+=1
        break
      if i == len(close_bucket)-1:
        price_dict[i][0]+=p
        price_dict[i][1]+=1
  for i in range(len(price_dict)):
    r[i] = price_dict[i][0]/price_dict[i][1]
  return r # r[state_price] returns average price for that bucket

def tp(transformed_data):
  tp = np.zeros((5,5,5,5,5,5,5,5))
  count = np.zeros((5,5,5,5))
  i = 0
  for row in transformed_data.itertuples():
    count[row[1]][row[2]][row[3]][row[4]]+=1
    if i!=0:
      tp[prevrow[1]][prevrow[2]][prevrow[3]][prevrow[4]][row[1]][row[2]][row[3]][row[4]]+=1 #replaces if statement
    i+=1
    prevrow = row

  for i in range(5):
    for j in range(5):
      for k in range(5):
        for l in range(5):
          for o in range(5):
            for p in range(5):
              for q in range(5):
                for r in range(5):
                  tp[i][j][k][l][o][p][q][r] = (tp[i][j][k][l][o][p][q][r] * (count[i][j][k][l]!=0))/(count[i][j][k][l] + (count[i][j][k][l]==0)) # gets rid of divide by zero if statement

  return tp

actions = ("buy","hold","sell")

def reward_function(s,a,price_dict,tp,gamma,values):
  reward = 0
  inv = s[4]
  inv-=(not(a=="buy" or a=="hold"))+(not(a=="sell" or a=="hold")) # no if statement
  for i in range(5):
    for j in range(5):
      for k in range(5):
        for l in range(5):
          immediate_reward = (price_dict[i] * (inv - 5)) - (price_dict[s[0]] * (inv - 5))
          future_value = max(values[i][j][k][l][inv])
          reward += tp[s[0]][s[1]][s[2]][s[3]][i][j][k][l] * (immediate_reward + gamma * future_value)

  return reward

try:
  values = np.load(stock+'.npy')
  print("success loading values")
except:
  values = np.zeros((5,5,5,5,11,3))
  print("failed loading values")

success loading values


In [None]:
d, b = transform_data(data, 5)
pricedict = create_pricedict(data, b[d.columns[0]])
_tp = tp(d)

## Creating Test class

In [None]:
class BackTest:
    def __init__(self, initial_cash, initial_inventory):
        self.inventory = initial_inventory
        self.cash = initial_cash
        self.current_index = -1 #Placeholder Value
        self.current_price = -1 #Placeholder value

    def download_data(self,ticker,start,end,interval):
        self.ticker = ticker
        self.data = yf.download(ticker, start=start, end=end, interval=interval)['Open'].iloc[20:]
        return

    def buy(self,n):
        #Buy n Shares
        if self.current_price*n > self.cash:
            print("not enough cash")
            return

        self.cash -= self.current_price*n
        self.inventory += n

    def sell(self,n):
        #Sell n Shares

        self.cash += self.current_price*n
        self.inventory -= n

    def step(self):
        self.current_index += 1
        self.current_price = self.data.iloc[self.current_index][self.ticker]
        return

    def getInv(self):
      return self.inventory

    def get_portfolio_value(self):
        return self.current_price*self.inventory + self.cash

## Model Init and Training

In [None]:
model = MDP(actions, _tp, reward_function, pricedict, values)
model.value_iteration()

1 73.26995109536979
2 28.80492067978596
3 9.171624852392386
4 5.4247562187232
5 2.7327286750642372
6 1.9657331182473583
7 1.583012020337125
8 1.2905354521676298
9 1.2554575082386474
10 1.2246440253450679
11 1.1987118882470824
12 1.1720367002760383
13 1.145052004904235
14 1.1151265675937498
15 1.0823443616922148
16 1.0501957828831543


## Back Test:

In [None]:
data = yf.download(stock, start='1999-01-01', end='2024-10-01')
data['MA_5'] = data['Open'].rolling(window=5).mean()  # 5-day moving average
data['Variance'] = data['Open'].rolling(window=5).var()  # 5-day rolling variance
data = data.iloc[20:]
data = data.drop(columns=['Adj Close', 'High', 'Low', 'Close'])

NameError: name 'yf' is not defined

In [None]:
test = BackTest(1000,0)
test.download_data(stock,"1999-01-01","2024-10-01","1d")
trades = [0,0,0,0]

for row in data.itertuples():
  test.step()
  s = bucket_state(row[1:],b,d.columns)
  s.append(test.getInv()+5)
  a = model.best_action(s)
  if a == "buy":
    test.buy(1)
    trades[0]+=1
  elif a == "sell":
    test.sell(1)
    trades[1]+=1
  elif a == "clear":
    test.sell(test.getInv())
    trades[2]+=1
  elif a == "hold":
    trades[3]+=1

print('')
print(test.get_portfolio_value()-1000)
print(trades)
print(test.getInv())

NameError: name 'BackTest' is not defined

## Forward Test:

In [None]:
data = yf.download(stock, start="2024-10-01", end="2024-12-03")
data['MA_5'] = data['Open'].rolling(window=5).mean()  # 5-day moving average
data['Variance'] = data['Open'].rolling(window=5).var()  # 5-day rolling variance
data = data.iloc[20:]
data = data.drop(columns=['Adj Close', 'High', 'Low', 'Close'])

In [None]:
test = BackTest(1000,0)
test.download_data(stock,"2024-10-01","2024-12-03","1d")
trades = [0,0,0,0]

for row in data.itertuples():
  test.step()
  s = bucket_state(row[1:],b,d.columns)
  s.append(test.getInv()+5)
  a = model.best_action(s)
  if a == "buy":
    test.buy(1)
    trades[0]+=1
  elif a == "sell":
    test.sell(1)
    trades[1]+=1
  elif a == "clear":
    test.sell(test.getInv())
    trades[2]+=1
  elif a == "hold":
    trades[3]+=1

print('')
print(test.get_portfolio_value()-1000)
print(trades)
print(test.getInv())

In [None]:
np.save(stock+'.npy', model.values)

NameError: name 'np' is not defined