In [1]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import os
import json

In [2]:
file_list = [file for file in os.listdir('data_torun') if file.endswith('.csv')]

In [3]:
file_list

['CHD_data.csv',
 'CVX_data.csv',
 'DUK_data.csv',
 'DVA_data.csv',
 'EQR_data.csv',
 'ESS_data.csv',
 'FOX_data.csv',
 'GILD_data.csv',
 'GPS_data.csv',
 'HAL_data.csv']

In [4]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import *
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.losses import MeanSquaredError
from tensorflow.keras.metrics import RootMeanSquaredError
from tensorflow.keras.optimizers import Adam
from keras.models import load_model

2024-04-30 22:34:35.078604: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [11]:
def inverse_scaling(arr, mean, std):
    return arr * std + mean

In [12]:
test_list = [file.split('_')[0] for file in os.listdir('data_torun') if file.endswith('.csv')]
test_list.sort()

In [13]:
test_list

['CHD', 'CVX', 'DUK', 'DVA', 'EQR', 'ESS', 'FOX', 'GILD', 'GPS', 'HAL']

In [14]:
# Load mean and std dictionaries from the working directory
with open('means_torun/means_dict_torun.json', 'r') as f:
    means_dict = json.load(f)

with open('stds_torun/stds_dict_torun.json', 'r') as f:
    std_dict = json.load(f)

In [15]:
# Exponential Moving Average (EMA)
def add_ema(df, alpha=0.2):
    df['EMA'] = df['close'].ewm(alpha=alpha, min_periods=1).mean()
    return df

# Relative Strength Index (RSI)
def add_rsi(df, window=14):
    delta = df['close'].diff()
    gain = delta.where(delta > 0, 0)
    loss = -delta.where(delta < 0, 0)
    avg_gain = gain.rolling(window=window, min_periods=1).mean()
    avg_loss = loss.rolling(window=window, min_periods=1).mean()
    rs = avg_gain / avg_loss
    df['RSI'] = 100 - (100 / (1 + rs))
    return df


### GUI 

In [17]:
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import ipywidgets as widgets
from IPython.display import display, clear_output
from tensorflow.keras.models import load_model


# Load mean and std dictionaries from the working directory
with open('means_torun/means_dict_torun.json', 'r') as f:
    means_dict = json.load(f)

with open('stds_torun/stds_dict_torun.json', 'r') as f:
    std_dict = json.load(f)
    
def df_to_X_y2(df, window_size=5):
    df_as_np = df.to_numpy()
    X = []
    y = []
    for i in range(len(df_as_np)-window_size):
        row = [r for r in df_as_np[i:i+window_size]]
        X.append(row)
        label = df_as_np[i+window_size][3]
        y.append(label)
    return np.array(X), np.array(y)

# to standardisze the test data
def preprocess_X(X, means, stds):
    X[:, :, 0] = (X[:, :, 0] - means['open']) / stds['open']
    X[:, :, 1] = (X[:, :, 1] - means['high']) / stds['high']
    X[:, :, 2] = (X[:, :, 2] - means['low']) / stds['low']
    X[:, :, 3] = (X[:, :, 3] - means['close']) / stds['close']
    X[:, :, 4] = (X[:, :, 4] - means['EMA']) / stds['EMA']
    X[:, :, 5] = (X[:, :, 5] - means['RSI']) / stds['RSI']
    return X

# standardise y in case we wish to train
def preprocess_y(y, mean, std):
    y[:] = (y[:] - mean) / std
    return y

# Load data and preprocess
def load_data(stock_name):
    df = pd.read_csv(f'data_test_torun/{stock_name}_data.csv') 
    df['open'] = df['open'].astype(float)
    df['high'] = df['high'].astype(float)
    df['close'] = df['close'].astype(float)
    df['low'] = df['low'].astype(float)
    df['EMA'] = df['close'].ewm(alpha=0.3, min_periods=1).mean()
    
    delta = df['close'].diff()
    gain = delta.where(delta > 0, 0)
    loss = -delta.where(delta < 0, 0)
    avg_gain = gain.rolling(window=14, min_periods=1).mean()
    avg_loss = loss.rolling(window=14, min_periods=1).mean()
    rs = avg_gain / avg_loss
    df['RSI'] = 100 - (100 / (1 + rs))
    return df

# filtering data from the previous dates
def prepare_data(df, target_date, means, stds):
    subset_df =df[df['date'] == str(target_date)]
    target_index = subset_df.index[0]
    if not np.isnan(target_index):  # Check if target_index is not NaN
        df_subset = df.iloc[target_index-50:target_index].copy()  # Adjust range to prevent negative indices
        df_subset.drop(columns=['date','Name','volume'],inplace =True)
        X, y = df_to_X_y2(df_subset)
        X = preprocess_X(X, means, stds)
        y = preprocess_y(y, means['close'], stds['close'])
        return X, y
    else:
        print(f"Date {target_date} not found in the dataframe.")
        return None, None

#  import model and predict closing values
def predict_closing(model_path, X):
    model = load_model(model_path)
    y_pred = model.predict(X)
    return y_pred

# invert transform the predicted 
def postprocess(y,mean,std):
    y = y*std +mean
    return y


# online training 
def retrain(stock_name, target_date):
    df = load_data(stock_name)
    df = df[df['date'] <= str(target_date)]  
    # prepare data using only data before the selected date
    X, y = prepare_data(df, target_date, means_dict[stock_name], std_dict[stock_name])
    
    model_path = f'modelstorun_/{stock_name}_model.keras'
    model = load_model(model_path)
 # Train model using only the data before the selected date
    cp = ModelCheckpoint(f'models_onlinetrained/{stock_name}_model.keras', save_best_only=True)
    model.compile(optimizer=Adam(learning_rate=0.01), loss=MeanSquaredError(), metrics=[RootMeanSquaredError()])
    model.fit(X, y, epochs=100,  callbacks = [cp],verbose= 0 )
    model.save(f"models_onlinetrained/{stock_name}_model.keras")
#     trained_models[stock_name] = model
    # Predict closing values
#     y_pred = model.predict(X)


# visualize predictions and actual values
def visualize(stock_name, df, target_date):
    X, y = prepare_data(df, target_date, means_dict[stock_name], std_dict[stock_name])
    
    
    trained_model_path = f'models_onlinetrained/{stock_name}_model.keras'
    if os.path.exists(trained_model_path):
        model_path = trained_model_path
        print('taking trained model')
    else:
        model_path = f'modelstorun_/{stock_name}_model.keras'
        print('taking original model')
    y_pred = predict_closing(model_path, X)
    
  # getting the last 45 dates from the target date
    target_index = df[df['date'] == str(target_date)].index[0]
    start_index = max(0, target_index - 44)  # Adjusted to get 45 dates
    end_index = target_index + 1
    subset_df = df.iloc[start_index:end_index]

    
    fig, ax = plt.subplots(figsize=(10, 6))
    ax.plot(subset_df['date'], postprocess(y, means_dict[stock_name]['close'], std_dict[stock_name]['close']), label='Actual', color='blue')
    ax.plot(subset_df['date'], postprocess(y_pred.flatten(), means_dict[stock_name]['close'], std_dict[stock_name]['close']), label='Predicted', color='red')
    
    # Formatting x-axis date labels
    ax.xaxis.set_major_locator(mdates.DayLocator())
    ax.xaxis.set_major_formatter(mdates.DateFormatter('%m-%d'))
    ax.set_xticks(ax.get_xticks()[::2])
    plt.xticks(rotation=45)
    
    ax.set_title("Predictions vs Actual Closing Values")
    ax.set_xlabel("Date")
    ax.set_ylabel("Closing Value")
    ax.legend()
    plt.grid(True)
    plt.show()

# GUI function
def visualize_stock():
    stock_dropdown = widgets.Dropdown(options=test_list[:10],
                                      description='Select Stock:')
#     date_input = widgets.DatePicker(description='Select Date:')
    date_input = widgets.DatePicker(description='Select Date:') #, min=pd.Timestamp('2020-02-05'), max=pd.Timestamp('2023-12-29'))
    visualize_button = widgets.Button(description="Visualize")
    clear_button = widgets.Button(description="Clear Graph")
    output_graph = widgets.Output()
    train_button = widgets.Button(description = "Retrain")
    def on_visualize_button_clicked(b):
        stock_name = stock_dropdown.value
        target_date = date_input.value
        df = load_data(stock_name)
        with output_graph:
            visualize(stock_name, df, target_date)
            
    def on_clear_button_clicked(b):
        output_graph.clear_output()
        
        
    def on_train_button_clicked(b):
        stock_name = stock_dropdown.value
        target_date = date_input.value
        with output_graph:
            retrain(stock_name, target_date)
            
            
    visualize_button.on_click(on_visualize_button_clicked)
    clear_button.on_click(on_clear_button_clicked)
    train_button.on_click(on_train_button_clicked)
    display(widgets.VBox([stock_dropdown, date_input, visualize_button,clear_button,train_button, output_graph]))

# Run GUI
visualize_stock()


VBox(children=(Dropdown(description='Select Stock:', options=('CHD', 'CVX', 'DUK', 'DVA', 'EQR', 'ESS', 'FOX',…