In [None]:
import pandas as pd
import numpy as np
import networkx as nx

# Load the AAPL.csv file into a pandas dataframe
df = pd.read_csv('AAPL.csv')

# Create feature matrix
X = np.zeros((len(df), 2))
start_date = pd.to_datetime(df.iloc[0]['Date'])
for i, row in df.iterrows():
    date = pd.to_datetime(row['Date'])
    days_since_start = (date - start_date).days
    X[i][0] = days_since_start
    X[i][1] = row['Close']


# Create a graph object
G = nx.Graph()

# Add nodes to the graph for each day in the dataset
for i in range(len(df)):
    G.add_node(i)

# Add edges to the graph based on the price change between consecutive days within a window of 5 days
window_size = 5
for i in range(len(df)-window_size):
    for j in range(i+1, i+window_size+1):
        price_change = df.iloc[j]['Close'] - df.iloc[j-1]['Close']
        weight = 1 / (j - i)  # Weight is 1 divided by the index difference
        if price_change > 0:
            G.add_edge(i, j, weight=weight)
        elif price_change < 0:
            G.add_edge(i, j, weight=-weight)


# Create adj matrix
A = nx.to_numpy_array(G)
adj = A + np.eye(len(df)) #add self loop


# Compute the graph Laplacian
D = np.diag(np.sum(adj, axis=1))
L = D - adj



In [None]:
from sklearn.model_selection import train_test_split
# Split the data into training and testing sets
train_df, test_df = train_test_split(df, test_size=0.2, random_state=642)



# Create feature matrix for training data
X_train = np.zeros((len(train_df), 2))
start_date_train = pd.to_datetime(train_df.iloc[0]['Date'])
for i, row in enumerate(train_df.iterrows()):
    date = pd.to_datetime(row[1]['Date'])
    days_since_start = (date - start_date_train).days
    X_train[i][0] = days_since_start
    X_train[i][1] = row[1]['Close']



# Create a graph object for training data
G_train = nx.Graph()

# Add nodes to the graph for each day in the training dataset
for i in range(len(train_df)):
    G_train.add_node(i)

# Add edges to the graph based on the price change between consecutive days in training data
for i in range(len(train_df)-1):
    price_change = train_df.iloc[i+1]['Close'] - train_df.iloc[i]['Close']
    if price_change > 0:
        G_train.add_edge(i, i+1, weight=1)
    elif price_change < 0:
        G_train.add_edge(i, i+1, weight=-1)


# Create adjacency matrix for training data
A_train = nx.to_numpy_array(G_train)
train_adj = A_train + np.eye(len(train_df))  # Add self loop


# Create feature matrix for testing data
X_test = np.zeros((len(test_df), 2))
start_date_test = pd.to_datetime(test_df.iloc[0]['Date'])
for i, row in enumerate(test_df.iterrows()):
    date = pd.to_datetime(row[1]['Date'])
    days_since_start = (date - start_date_test).days
    X_test[i][0] = days_since_start
    X_test[i][1] = row[1]['Close']



# Create a graph object for testing data
G_test = nx.Graph()

# Add nodes to the graph for each day in the testing dataset
for i in range(len(test_df)):
    G_test.add_node(i)

# Add edges to the graph based on the price change between consecutive days in testing data
for i in range(len(test_df)-1):
    price_change = test_df.iloc[i+1]['Close'] - test_df.iloc[i]['Close']
    if price_change > 0:
        G_test.add_edge(i, i+1, weight=1)
    elif price_change < 0:
        G_test.add_edge(i, i+1, weight=-1)



# Create adjacency matrix for testing data
A_test = nx.to_numpy_array(G_test)
test_adj = A_test + np.eye(len(test_df))  # Add self loop


# Calculate mean and standard deviation of the training features
train_mean = np.mean(X_train, axis=0)
train_std = np.std(X_train, axis=0)

# Normalize the training features
X_train_normalized = (X_train - train_mean) / train_std

# Normalize the testing features using the same mean and standard deviation
X_test_normalized = (X_test - train_mean) / train_std



In [None]:
!pip install torch_geometric

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting torch_geometric
  Downloading torch_geometric-2.3.1.tar.gz (661 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/661.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━[0m [32m358.4/661.6 kB[0m [31m10.7 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m661.6/661.6 kB[0m [31m12.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Building wheels for collected packages: torch_geometric
  Building wheel for torch_geometric (pyproject.toml) ... [?25l[?25hdone
  Created wheel for torch_geometric: filename=torch_geometric-2.3.1-py3-none-any.whl size=910459 sha256=3db580f28c7d22512d706b0d63c27fa2e54

In [None]:
import torch
import torch_geometric
from torch_geometric.data import Data

# Define the target variable for training data
y = df['Close'].values[:-1]
train_labels = torch.tensor(y[:-1], dtype=torch.float)
# train_labels = train_labels.view(-1, 1)
# train_labels = train_labels[:output.size(0)]
train_labels = train_labels.view(-1, 1)




# Convert the adjacency matrix to edge_index and edge_attr for training data

train_edge_index = torch.tensor(np.array(train_adj.nonzero()), dtype=torch.long).contiguous()

train_edge_attr = torch.tensor(train_adj[train_adj.nonzero()], dtype=torch.float).clone().detach()

# Create the feature tensor for training data
train_features = torch.tensor(X_train_normalized, dtype=torch.float)

# Create the PyTorch Geometric data object for training data
train_data = Data(x=train_features, edge_index=train_edge_index, edge_attr=train_edge_attr)

# Convert the adjacency matrix to edge_index and edge_attr for testing data
# test_edge_index = torch.tensor(np.array(test_adj.nonzero()), dtype=torch.long).t().contiguous()
test_edge_attr = torch.tensor(test_adj[test_adj.nonzero()], dtype=torch.float)





# Convert the adjacency matrix to edge_index for testing data
test_edge_index = torch.tensor(np.array(test_adj.nonzero()), dtype=torch.long)

# Create the feature tensor for testing data
test_features = torch.tensor(X_test_normalized, dtype=torch.float)

# Create the PyTorch Geometric data object for testing data
test_data = Data(x=test_features, edge_index=test_edge_index)




In [None]:
import torch
import torch.nn as nn
from torch_geometric.nn import GCNConv

class GCNModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, dropout_rate):
        super(GCNModel, self).__init__()
        self.conv1 = GCNConv(input_dim, hidden_dim)
        self.bn1 = nn.BatchNorm1d(hidden_dim)
        self.dropout1 = nn.Dropout(dropout_rate)
        self.conv2 = GCNConv(hidden_dim, hidden_dim)
        self.bn2 = nn.BatchNorm1d(hidden_dim)
        self.dropout2 = nn.Dropout(dropout_rate)
        self.conv3 = GCNConv(hidden_dim, hidden_dim)
        self.bn3 = nn.BatchNorm1d(hidden_dim)
        self.dropout3 = nn.Dropout(dropout_rate)
        self.conv4 = GCNConv(hidden_dim, output_dim)

    def forward(self, data):
        x, edge_index, edge_weight = data.x, data.edge_index, data.edge_weight
        x = self.conv1(x, edge_index)
        x = self.bn1(x)
        x = torch.relu(x)
        x = self.dropout1(x)  # Apply dropout after the first graph convolutional layer
        x = self.conv2(x, edge_index)
        x = self.bn2(x)
        x = torch.relu(x)
        x = self.dropout2(x)  # Apply dropout after the second graph convolutional layer
        x = self.conv3(x, edge_index)
        x = self.bn3(x)
        x = torch.relu(x)
        x = self.dropout3(x)  # Apply dropout after the third graph convolutional layer
        x = self.conv4(x, edge_index)
        return x



In [None]:
import torch.optim as optim
input_dim = train_data.num_node_features
hidden_dim = 256
output_dim = 1
dropout_rate = 0.5  # Set the dropout rate

# Create an instance of the GCNModel
model = GCNModel(input_dim, hidden_dim, output_dim, dropout_rate)
# Define the optimizer with L2 regularization
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=0.001)

# Define the loss function
criterion = nn.MSELoss()

# Set the model to training mode
model.train()

# Training loop
for epoch in range(50):
    # Clear gradients
    optimizer.zero_grad()

    # Forward pass
    output = model(train_data)

    train_labels = train_labels[:output.size(0)]
    # Calculate the loss with regularization
    loss = criterion(output, train_labels)

    # Add regularization term to the loss
    l2_regularization = torch.tensor(0.)
    for param in model.parameters():
        l2_regularization += torch.norm(param, 2)  # Calculate L2 norm of each parameter
    loss += 0.001 * l2_regularization  # Multiply by regularization strength

    # Backpropagation and parameter optimization
    loss.backward()
    optimizer.step()

    # Print the loss for monitoring
    print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch+1, 50, loss.item()))



Epoch [1/50], Loss: 33.5081
Epoch [2/50], Loss: 32.6628
Epoch [3/50], Loss: 32.2260
Epoch [4/50], Loss: 31.6709
Epoch [5/50], Loss: 30.8315
Epoch [6/50], Loss: 30.3133
Epoch [7/50], Loss: 30.3780
Epoch [8/50], Loss: 29.6241
Epoch [9/50], Loss: 29.1646
Epoch [10/50], Loss: 28.8056
Epoch [11/50], Loss: 28.6197
Epoch [12/50], Loss: 28.0996
Epoch [13/50], Loss: 28.0115
Epoch [14/50], Loss: 27.7532
Epoch [15/50], Loss: 27.6234
Epoch [16/50], Loss: 27.2869
Epoch [17/50], Loss: 26.8681
Epoch [18/50], Loss: 27.2096
Epoch [19/50], Loss: 26.9898
Epoch [20/50], Loss: 26.9279
Epoch [21/50], Loss: 27.0497
Epoch [22/50], Loss: 26.8787
Epoch [23/50], Loss: 26.7331
Epoch [24/50], Loss: 26.5651
Epoch [25/50], Loss: 26.7645
Epoch [26/50], Loss: 26.7727
Epoch [27/50], Loss: 26.7111
Epoch [28/50], Loss: 26.6647
Epoch [29/50], Loss: 26.6924
Epoch [30/50], Loss: 26.6534
Epoch [31/50], Loss: 26.5838
Epoch [32/50], Loss: 26.4544
Epoch [33/50], Loss: 26.8054
Epoch [34/50], Loss: 26.8392
Epoch [35/50], Loss: 26

In [None]:
# Set the model to evaluation mode
model.eval()

# Pass the testing data through the model
output = model(test_data)

# Convert the predicted values to NumPy array
predictions = output.detach().numpy()

# Print the predicted values
print(predictions)

[[4.2587705]
 [5.368486 ]
 [5.404531 ]
 ...
 [5.2375436]
 [5.7244043]
 [4.857492 ]]


In [None]:
# Define the test labels
test_labels = torch.tensor(df['Close'].values[test_nodes[:-1]], dtype=torch.float)

# Convert the test labels tensor to a NumPy array
test_labels = test_labels.numpy()

NameError: ignored

In [None]:
# Set the model to evaluation mode
model.eval()

# Pass the testing data through the model
output = model(test_data)

# Calculate the average predicted price
average_predicted_price = torch.mean(output)

# Calculate the average actual price
average_actual_price = torch.mean(torch.tensor(df['Close'].values[test_nodes], dtype=torch.float))

# Calculate the absolute percentage error
absolute_percentage_error = torch.abs((average_predicted_price - average_actual_price) / average_actual_price) * 100

# Print the average predicted price and the absolute percentage error
print("Average Predicted Price: {:.2f}".format(average_predicted_price.item()))
print("Average Actual Price: {:.2f}".format(average_actual_price.item()))
print("Absolute Percentage Error: {:.2f}%".format(absolute_percentage_error.item()))

In [None]:
# torch.save(model, 'trained_modellllll.pt')