In [None]:
import yfinance as yf
import datetime
import pandas as pd
from sklearn.preprocessing import scale
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import DataLoader, Dataset, TensorDataset
import numpy as np
from src.CNN import CNN
import torch.nn.functional as F

In [None]:
ticker = "AAPL"
years = 40

In [None]:
end_date = datetime.date.today().strftime("%Y-%m-%d")
start_date = (datetime.date.today() - datetime.timedelta(days=years*365)).strftime("%Y-%m-%d")

In [None]:
data = yf.download(ticker, start=start_date, end=end_date)

# EDA

### Get sample of data


In [None]:
data.head()

In [None]:
### Shape of data

In [None]:
data.shape

In [None]:
### Get summary of data and check for nulls

In [None]:
# check columns and data types
print(data.info())

# get summaries
print(data.describe())

# check for nulls
print(data.isnull().sum())

Get X and Y

In [None]:
data["Target"] = data["Close"].shift(-1) # target: next day closing price
data.dropna(inplace=True) # remove one row with nan target

In [None]:
input_vars = ['Close', 'High', 'Low', 'Open']
output_var = 'Target'
X = data[input_vars]
y = data[output_var]

In [None]:
X

In [None]:
y

In [None]:
data[input_vars] = scale(data[input_vars], axis=0)
data_in = data[input_vars]
data_out = data[output_var]

In [None]:
data_in

In [None]:
data_out

In [None]:
def create_time_series_windows(data, window_size=30):
    X, y = [], []
    for i in range(len(data) - window_size):
        X.append(data_in[i:i+window_size])  # past 30 days
        y.append(data_out[i+window_size])    # target: next day's close price
    return np.array(X), np.array(y)

data_values = data[['Open', 'High', 'Low', 'Close', 'Volume']].values 
X, y = create_time_series_windows(data_values, window_size=30)
X = torch.tensor(X, dtype=torch.float32)
y = torch.tensor(y, dtype=torch.float32)

print(f"X shape: {X.shape}")
print(f"y shape: {y.shape}")

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

In [None]:
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True)

# Manully build dataloaders

# Train the model

In [None]:
input_dim = 4
n_embedding = 64 # how many embeddings to represent each token with
n_layers = 6
block_size = 30 # how many tokens in each "block"
batch_size = 16
device = torch.device('mps' if torch.mps.is_available() else 'cpu')
print(device)
lr = 1e-4
n_epochs = 100

In [None]:
cnn = CNN(input_dim, block_size, conv_layers=4)
cnn.to(device)

In [None]:
# use MSE Loss for regression
criterion = torch.nn.MSELoss()

In [None]:
optimizer = torch.optim.Adam(cnn.parameters(), lr=lr)

In [None]:
# Training the model
for epoch in range(n_epochs):
    cnn.train()  # Set the model to training mode
    total_loss = 0

    for data, labels in train_loader:
        # Forward pass
        outputs = cnn(data.to(device))
        #print(outputs.shape)
        #print(labels.shape)
        loss = criterion(labels.to(device), outputs)
        #print("Loss",loss.item())

        # Backward pass
        optimizer.zero_grad()  # Clear previous gradients
        loss.backward()  # Compute gradients
        optimizer.step()  # Update weights

        total_loss += loss.item()

    print(f"Epoch [{epoch+1}/{n_epochs}], Loss: {total_loss/len(train_loader):.4f}")

In [None]:
def eval_model(model, dataloader):
    model.eval()
    total_loss = 0
    n_batches = len(dataloader)
    
    with torch.no_grad():
        for data, labels in dataloader:
            data, labels = data.to(device), labels.to(device)
            output = model(data.unsqueeze(0))
            
            mse = (output, labels)
            total_loss += mse.item()
            
    avg_loss = total_loss/n_batches
    return avg_loss

In [None]:
eval_model(cnn, test_dataset)