# Model Test
Uses a pre-trained model to make predictions for dates that occur after the last date included in the training data.

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Device: {device}")

In [None]:
###################
##### SET PARAMETERS
################### 
TICKER="TSLA"
DATA_INTERVAL_MINUTES = 15   # (Set to 5 or 15)
DATA_AFTER_HOURS = False
###################L
##### SET PARAMETERS
###################

In [None]:
import sys
sys.path.append('..\\..')

from datetime import datetime

import settings
import apis.tiingo_api as tiingo

secret_key= settings.get_secret("tiingo-key")

# # For first prediction need 2584 ticks = 2584 / 26 ~ 100 days -> 100 * 7 / 5 / 30 = 4.6 months
csv_data2023= tiingo.download_ticker(secret_key, TICKER, datetime(2023, 8, 1), datetime(2023,12,31), DATA_INTERVAL_MINUTES, DATA_AFTER_HOURS)
csv_data2024= tiingo.download_ticker(secret_key, TICKER, datetime(2024, 1, 1), datetime(2024,12,31), DATA_INTERVAL_MINUTES, DATA_AFTER_HOURS)


In [None]:
import io
import pandas as pd


### TEST PREDICTIONS ON RECENT DATA
df2023 = pd.read_csv(io.StringIO(csv_data2023))
df2024 = pd.read_csv(io.StringIO(csv_data2024))
df = pd.concat([df2023, df2024], axis=0, ignore_index=True)


In [None]:
# Validates that data has been concatenated correctly = ordered ascending
if df["date"].is_monotonic_increasing and df["date"].is_unique:
    print("Correct: DataFrame is in ascending order.")
else:
    print("Error: DataFrame is not in ascending order.")


In [None]:
# Displays first and last element in the data
print(f"Data first:\n{df[['date', 'close']][:5]}")
print(f"Data last:\n{df[['date', 'close']][-5:]}")

In [None]:
###################
##### SET PARAMETERS
###################
DAYS_PREDICT = 4
# ISSUE: If stock steadily goes down less than 'DOWN_PCTS_PREDICT' then won't sell but after few periods will be very down 
#  example: DOWN_PCTS_PREDICT=5% then down 4% and down 3% and down 1%... and never sell
# If DOWN_PCTS_PREDICT is too low (ex. 1) is very unlikey it does not go down 1% in 52 periods even if at the end of the day goes up 7%
# Evem "DOWN_PCTS_PREDICT" is written as positive for calculations is down = negative
DOWN_PCTS_PREDICT= [4]
UP_PCTS_PREDICT= [7]
###################
##### SET PARAMETERS
###################

In [None]:
# Classify test data to compare distribution between training and test

# If quotes every 15min there 26 per day if quotes every 5min there are 78 per day
TICKS_IN_DAY = 26 if DATA_INTERVAL_MINUTES == 15 else 78
# How many data ticks are inspecting to determine the if up or down by percentage 
TICKS_PREDICT= TICKS_IN_DAY * DAYS_PREDICT

import classificators.series_classificator as classificator

classes_calc = classificator.SeriesClassificator(classificator.find_first_down_up, TICKS_PREDICT, DOWN_PCTS_PREDICT, UP_PCTS_PREDICT)

close_list = df['close'].astype(float).tolist()

classes= classes_calc.classify(close_list)

print(f"classes: {classes[-TICKS_PREDICT-1:-TICKS_PREDICT+1]}")
price_classes = [(pc[0], pc[1]) for pc in zip(close_list[2650:3000], classes[2650:3000])]
print(f"prices vs classes (window={TICKS_PREDICT}): {price_classes}")


In [None]:
# Display histogram for the classes

import matplotlib.pyplot as plt

# Create a histogram
hist_values, bin_edges, _ = plt.hist(classes, bins=21, edgecolor='black')

plt.xlabel('Value')
plt.ylabel('Frequency')
plt.title('Histogram of Data')

# Display frequency on top of each bar
for value, edge in zip(hist_values, bin_edges[:-1]):
    plt.text(edge, value, str(int(value)), color='black')
    
plt.show()

In [None]:
# Show percentages of each class value
import classificators.list_utils as lu

lu.display_frequency_numbers(classes, DOWN_PCTS_PREDICT, UP_PCTS_PREDICT)

In [None]:

# Display classes value changes over time (last 500 ticks)
graph_ticks = 500
x = range(len(classes[-graph_ticks:]))

plt.figure(figsize=(20,5))
plt.plot(x, classes[-graph_ticks:], linestyle='-')

plt.xlabel('Index')
plt.ylabel('Class')
plt.title('Plot of Classes')

plt.show()

In [None]:
###################
##### SET PARAMETERS
###################
signal_avg= [
    2, 
    3, 
    5, 
    8, 
    13, 
    21, 
    34, 
    55, 
    89, 
    144, 
    233, 
    377, 
    610, 
    987, 
    1597, 
    2584]
###################
##### SET PARAMETERS
###################

In [None]:

# Calculate the signals as input for the neural network as proportions
import classificators.proportions_calc as proportions

signals_calculator = proportions.ProportionsCalc(signal_avg)

proportions_avg = signals_calculator.calculate(close_list)


In [None]:
print(f"Prices length: {len(close_list)}")
print(f"Proportions length: {len(proportions_avg[-1])}")

print(f"Last 10 close: {close_list[-10:]}")
print(f"Last 10 proportions(avg={signal_avg[0]}): {proportions_avg[0][-10:]}")

print(f"Proportions avgs: Length: {len(signal_avg)} Last: {signal_avg[-1]}")
# At the end of the data, when less ticks than necessary no possible to predict so "nan" 
print(f"Classes last non-nan: {classes[-TICKS_PREDICT-1:-TICKS_PREDICT+1]} len: {len(classes)}")
print(f"Proportions first non-nan(avg={signal_avg[-1]}): {proportions_avg[-1][signal_avg[-1]-2:signal_avg[-1]]} len: {len(proportions_avg[-1])}")
print(f"Proportions (avg={signal_avg[0]}) Min: {min(proportions_avg[0][signal_avg[0]-1:-TICKS_PREDICT-1])} Max: {max(proportions_avg[0][signal_avg[0]-1:-TICKS_PREDICT])}")
print(f"Proportions (avg={signal_avg[-1]}) Min: {min(proportions_avg[-1][signal_avg[-1]-1:-TICKS_PREDICT-1])} Max: {max(proportions_avg[-1][signal_avg[-1]-1:-TICKS_PREDICT])}")

In [None]:
# Removing the "nan" from the proportions
#   At the beging first signal_avg[-1] are "nan" (need previous values for first avg.)
#   At the end decided not predict if period to predict is shorter
targets = classes[signal_avg[-1]-1:-TICKS_PREDICT]
inputs = []
for proportion in proportions_avg:
    proportion_cut= proportion[signal_avg[-1]-1:-TICKS_PREDICT]
    print(proportion_cut[:2])
    inputs.append(proportion_cut)

print(f"First target: {targets[0]} and last target: {targets[-1]}")
print(f"Classes: {len(classes)} after cut to targets: {len(targets)}")
print(f"Inputs len: {len(inputs[len(signal_avg)-1])}")
print(f"Distinct targets: {list(set(targets))}")


In [None]:
###################
##### SET PARAMETERS
###################
# SET index_keep TO THE CLASS WANT TO SET AS 1, WANT THE NN TO LEARN - SET THE OTHER INDEX REMOVE 
### Learn when "down"
# index_keep= 0
# index_remove_a= 1
# index_remove_b= 2

### Learn when "up"
index_keep= 2
index_remove_a= 0
index_remove_b= 1
###################
##### SET PARAMETERS
###################

In [None]:
from collections import Counter

print("After removing the nan at the begining and the end")
lu.display_frequency_numbers(targets, DOWN_PCTS_PREDICT, UP_PCTS_PREDICT)

targets_frequency = Counter(targets)
print("VALIDATE removing should be POSITIVE?")
count_remove_a= targets_frequency[index_remove_a] - targets_frequency[index_keep] + targets_frequency[index_keep] //2
count_remove_b= targets_frequency[index_remove_b] - targets_frequency[index_keep] + targets_frequency[index_keep] //2
print(f"Removing {index_remove_a}: {count_remove_a}")
print(f"Removing {index_remove_b}: {count_remove_b}")


In [None]:
###################
##### SET PARAMETERS
###################
# DECISION SET: REMOVING?
indexes_remove_a= []
# 2024-03-01 Do not remove anything
# if count_remove_a > 0:
#     indexes_remove_a = get_indexes_value(targets, index_remove_a, count_remove_a)

# DECISION SET: REMOVING?
indexes_remove_b= []
# 2024-03-01 Do not remove anything
# if count_remove_b > 0:
#     indexes_remove_b = get_indexes_value(targets, index_remove_b, count_remove_b)

indexes_remove= indexes_remove_a + indexes_remove_b
targets_clean= lu.remove_indexes(targets, indexes_remove)

lu.display_frequency_numbers(targets_clean, DOWN_PCTS_PREDICT, UP_PCTS_PREDICT)
print(f"Targets len: {len(targets)} Targets clean: {len(targets_clean)} Difference: {len(targets)-len(targets_clean)}")

inputs_clean = [lu.remove_indexes(input, indexes_remove) for input in inputs]    
print(f"targets_clean positions(Keep={index_keep})(First:{targets_clean.index(index_keep)},Last:-{targets_clean[::-1].index(index_keep)})")


In [None]:
# Sets 'index_keep' as target = 1 and rest of indexes to target=0
targets_binary= lu.convert_binary(targets_clean, index_keep)
print(f"targets_binary First {targets_binary.index(True)} and Last(counting from end) {targets_binary[::-1].index(True)} position with True")
print(f"targets_binary len: {len(targets_binary)} Input clean[0]: {len(inputs_clean[0])} Input clean[-1]: {len(inputs_clean[-1])}")

In [None]:
import torch

inputs_tensor = torch.Tensor(inputs_clean)
print(f"inputs_tensor: {inputs_tensor.size()}")
inputs_tensor = inputs_tensor.T
print(f"inputs_tensor: {inputs_tensor.size()}")
targets_tensor = torch.Tensor(targets_binary)
print(f"inputs_clean len0 x len1: {len(inputs_clean)} x {len(inputs_clean[0])} -> inputs_tensor.shape: {inputs_tensor.shape}")
print(f"targets_binary.shape: {len(targets_binary)} -> targets_tensor.shape: {targets_tensor.shape}")
print(f"inputs_tensor: {inputs_tensor}")
print(f"targets_tensor: {targets_tensor}")

In [None]:
#Shuffle inputs and targets
torch.manual_seed(42) 
permutation = torch.randperm(inputs_tensor.size(0))

inputs_tensor_shuffle = inputs_tensor[permutation]
targets_tensor_shuffle = targets_tensor[permutation]

print(f"inputs_tensor.size(0): {inputs_tensor.size(0)}")
print(f"inputs_tensor.shape: {inputs_tensor.shape} -> inputs_tensor_shuffle.shape: {inputs_tensor_shuffle.shape}")
print(f"targets_tensor.shape: {targets_tensor.shape} -> targets_tensor_shuffle.shape: {targets_tensor_shuffle.shape}")

In [None]:

inputs_tensor_test, targets_tensor_test= inputs_tensor_shuffle, targets_tensor_shuffle

print(f"inputs_tensor_test: {inputs_tensor_test.size(0)} targets_tensor_test: {targets_tensor_test.size(0)}")

In [None]:
from typing import Tuple
import torch
from torch.utils.data import Dataset

class StockDataset(Dataset):
    def __init__(
        self,
        inputs,
        targets):
        
        self.inputs= inputs
        self.targets= targets

    def __len__(self) -> int:
        return len(self.targets)

    def __getitem__(self, index: int) -> Tuple[torch.Tensor, int]:
        return self.inputs[index], self.targets[index]


In [None]:
from torch.utils.data import DataLoader

# TODO: When executing only using 33-38% GPU - Try different BATCH_SIZE see if parallelism increases? Learning decreases because less batches?
BATCH_SIZE= 32

test_dataset= StockDataset(
  inputs_tensor_test,
  targets_tensor_test
)

print(f"First input vector:\n{test_dataset[0]}")

test_dataloader= DataLoader(
  dataset=test_dataset,
  batch_size=BATCH_SIZE,
  shuffle=False
)

test_input0, test_target0= next(iter(test_dataloader))
print(f"Dataloader batch={BATCH_SIZE}\nInput:\n{test_input0}\nTargets:\n{test_target0}")


In [None]:
###################
##### SET PARAMETERS
###################
HIDDEN_UNITS=12
###################
##### SET PARAMETERS
###################

In [None]:
# EXECUTE FROM THIS STEP To CREATE A NETWORK WITH RANDOM WEIGHTS

import torch
from torch import nn

class StockModelBinaryV0(nn.Module):
  def __init__(self, input_features, hidden_units):
    """Initializes multi-class classification model"""
    super().__init__()
    self.linear_layer_stack = nn.Sequential(
      nn.Linear(in_features=input_features, out_features=hidden_units*16),
      nn.LeakyReLU(negative_slope=0.1),
      nn.Linear(in_features=hidden_units*16, out_features=hidden_units*8),
      nn.LeakyReLU(negative_slope=0.1),
      nn.Linear(in_features=hidden_units*8, out_features=hidden_units*4),
      nn.LeakyReLU(negative_slope=0.1),
      nn.Linear(in_features=hidden_units*4, out_features=hidden_units),
      nn.LeakyReLU(negative_slope=0.1),
      nn.Linear(in_features=hidden_units, out_features=1)
    )

  def forward(self, x):
    # print("forward x: ",", ".join([str(num) for num in x.tolist()]))
    # Layers are defined inside the Sequencial NN and will be applied here.
    return self.linear_layer_stack(x)

# Create an instance of the model
model_0 = StockModelBinaryV0(
  input_features=len(signal_avg),
  hidden_units=HIDDEN_UNITS).to(device)


In [None]:
# Loads model from file
from pathlib import Path

# Create directory, if it doesn't exist, to store models
MODEL_PATH= Path("models")
MODEL_PATH.mkdir(parents=True, exist_ok=True)
MODEL_NAME= f"2024-04-29-TSLA-predictUP-dates20190101-20240101-in16-hid12-down4-up7-prec9107pct-fp137tp1397-imbalance.pth"
MODEL_SAVE_PATH = MODEL_PATH / MODEL_NAME

# Created a model from same class and since load_state_dict() is method from nn.Module it can be called to populate the state
model_0.load_state_dict(torch.load(f=MODEL_SAVE_PATH))
model_0.to(device)

In [None]:
from torchmetrics import ConfusionMatrix
from mlxtend.plotting import plot_confusion_matrix

model_0.eval()
test_precision= 0
with torch.inference_mode():
    X= inputs_tensor_test.to(device)
    y= targets_tensor_test.to(device)

    # Predict for test data
    test_logits= model_0(X).squeeze()
    test_pred= torch.round(torch.sigmoid(test_logits))

confmat= ConfusionMatrix(task='binary')

# test_data.targets are the values we want to predict in the test dataloader
confmat_tensor= confmat(
  preds= test_pred.cpu(),
  target= targets_tensor_test.cpu())

# Plot the confusion matrix
fig, ax= plot_confusion_matrix(
  conf_mat= confmat_tensor.numpy(),
  figsize= (10, 7)
)

accuracy_fn= torchmetrics.Accuracy(task='binary').to(device)
test_accuracy = accuracy_fn(test_pred, y)
print(f"Test Accuracy: {test_accuracy*100:.2f}%")
print(f"Confusion matrix:\n{confmat_tensor}")

precision_fn= torchmetrics.Precision(task='binary').to(device)
test_precision = precision_fn(test_pred, y)
print(f"\nTest Precision: {test_precision*100:.2f}%")
false_positives = confmat_tensor[0, 1].item()
true_positives = confmat_tensor[1, 1].item()
print(f"false_positives: {false_positives} true_positives: {true_positives}")