In [None]:
import pandas as pd

import os
os.environ['PYTHONHASHSEED']=str(221)

import random
random.seed(221)

import numpy as np
np.random.seed(221)

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import layers

tf.random.set_seed(221)

import matplotlib.pyplot as plt
from datetime import datetime, timedelta
import yfinance as yf
import genetic_algo as GA

In [None]:
df = yf.download('MSFT')[['Close']]
df

In [None]:
def convertStringtoDate(s):
    l = s.split('-')
    y, m, d = int(l[0]), int(l[1]), int(l[2])
    return datetime(year=y, month=m, day=d)

In [None]:
df.plot()

In [None]:
def getNextDateFromData(df, curr_date):
    ind = df.index.get_loc(curr_date)
    return df.index[ind+1]


In [None]:
def createWindowedDataframe(df, start, end, w):
    start_date = convertStringtoDate(start)
    end_date = convertStringtoDate(end)
    curr_date = start_date
    reached_end = False

    dates = []
    features, target = [], []
    
    while not reached_end:
        df_window = df.loc[:curr_date].tail(w+1)

        if (len(df_window) != w+1):
            raise Exception(f'Window size too large for date {curr_date}')
        
        closing_values = df_window['Close'].to_numpy()
        X, y = closing_values[:-1], closing_values[-1]

        dates.append(curr_date)
        features.append(X)
        target.append(y)

        if curr_date == end_date:
            reached_end = True
        else:
            curr_date = getNextDateFromData(df, curr_date)

    windowed_dataframe = pd.DataFrame({})
    windowed_dataframe['Target Date'] = dates
    features = np.array(features)

    for i in range(w):
        windowed_dataframe[f'Target-{w-i}'] = features[:,i]
    
    windowed_dataframe['Target'] = target
    return windowed_dataframe

windowed_dataframe = createWindowedDataframe(df, '2021-08-20', '2023-10-05', w=3)
windowed_dataframe


In [None]:
def train_val_test_split(windowed_dataframe, train_ratio, test_ratio):
    df = windowed_dataframe[:]
    dates = df.pop('Target Date')
    df = df.to_numpy()
    features = df[:, :-1]
    features = features.reshape((len(df), features.shape[1], 1)).astype(np.float32)
    target = df[:, -1].astype(np.float32)

    train_split_point = int(len(df)*train_ratio)
    test_split_point = int(len(df)*(1-test_ratio))
    
    train_dates = dates[:train_split_point]
    train_X = features[:train_split_point]
    train_y = target[:train_split_point]

    val_dates = dates[train_split_point:test_split_point]
    val_X = features[train_split_point:test_split_point]
    val_y = target[train_split_point:test_split_point]

    test_dates = dates[test_split_point:]
    test_X = features[test_split_point:]
    test_y = target[test_split_point:]

    return train_dates, train_X, train_y, val_dates, val_X, val_y, test_dates, test_X, test_y


In [None]:
def train(parameters, train_X, train_y, epochs=150, verbose=0):
    model = Sequential([
        layers.Input((parameters['window_len'], 1)),
        layers.LSTM(parameters['hidden_units_1'], return_sequences=True),
        layers.LSTM(parameters['hidden_units_2'], activation='relu'),
        layers.Dense(1)
    ])

    model.compile(loss='mse', optimizer=Adam(learning_rate=0.001))
    model.fit(train_X, train_y, epochs=epochs, verbose=verbose)
    return model


In [None]:
def plot(dates, observed, predicted, label):
    plt.figure(figsize=(12,7))
    plt.plot(dates, predicted)
    plt.plot(dates, observed)

    plt.legend([label+' Predictions',
                label+' Observations'])
    plt.show()


In [None]:
def decode_parameters(s):
    parameters={}
    parameters['window_len'] = int(s[0:3], 2)
    parameters['hidden_units_1'] = int(s[3:11], 2)
    parameters['hidden_units_2'] = int(s[11:18], 2)
    return parameters


In [None]:
fitness_values = {}

In [None]:
def fitness(x : GA.Individual):
    
    if x.gene in fitness_values:
        return fitness_values[x.gene]

    parameters = decode_parameters(x.gene)

    if parameters['window_len']==0 or parameters['hidden_units_1']==0 or parameters['hidden_units_2']==0:
        return float('inf')

    print(parameters)
    windowed_dataframe = createWindowedDataframe(df, '2021-08-20', '2023-10-05', w=parameters['window_len'])
    train_dates, train_X, train_y, val_dates, val_X, val_y, test_dates, test_X, test_y = train_val_test_split(windowed_dataframe, 0.8, 0.1)
    model = train(parameters, train_X=train_X, train_y=train_y)
    fitness_values[x.gene] = model.evaluate(val_X, val_y) 
    return fitness_values[x.gene]


# Running Genetic Algorithm


In [None]:
ga = GA.GeneticAlgorithm(3, 20, 18, 0.1, fitness)
ind = ga.run()
best_param = decode_parameters(ind.gene)
print("Best params :")
print(best_param)
print("Val loss 1 :", fitness_values[ind.gene])

In [None]:
windowed_dataframe = createWindowedDataframe(df, '2021-08-20', '2023-10-05', w=best_param['window_len'])
train_dates, train_X, train_y, val_dates, val_X, val_y, test_dates, test_X, test_y = train_val_test_split(windowed_dataframe, 0.8, 0.1)
best_model = train(best_param, train_X=train_X, train_y=train_y, verbose=0, epochs=200)
best_model.evaluate(val_X, val_y)
best_model.evaluate(test_X, test_y)
test_predictions = best_model.predict(test_X)
plot(dates=test_dates, observed=test_y, predicted=test_predictions, label='Test')
