<a href="https://colab.research.google.com/github/oneonethree113/Advanced-Python-Learning/blob/main/Attention_Model_Predict_Price.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import numpy as np
# Replace 'filename.csv' with the path of your CSV file
csv_file = '/content/drive/MyDrive/Colab Notebooks/Experiment/data.csv'

# Read the CSV file using pandas
data = pd.read_csv(csv_file)
data = data.drop('Date', axis=1)
shiftDay=-60
sequence_length = 200

# Display the first 5 rows of the data
print(data.head())

In [None]:
# Assuming your columns are named 'column1' and 'column2'
# Replace these names with the actual column names in your CSV file
column1 = data['Asset']
shifted_column1 = column1.shift(shiftDay)

# Calculate the new column values
new_column_values = shifted_column1/ column1 

# Add the new column to the DataFrame
data['5daysChange'] = new_column_values
data['C103'] = (data['5daysChange'] > 1.03).astype(int)
data['C106'] = (data['5daysChange'] > 1.06).astype(int)
# Display the first 10 rows of the updated DataFrame
data

In [None]:
from google.colab import drive
drive.mount('/content/drive')

2. Preprocess the data: Prepare the input and output sequences for your model using a sliding window approach. You'll create sequences of 10 rows as input, and the corresponding new column value as output.


In [None]:
def create_sequences(input_data, target_data, seq_length):
    inputs = []
    targets = []
    for i in range(len(input_data) - seq_length):
        inputs.append(input_data[i:i+seq_length])
        targets.append(target_data[i+seq_length])
    return np.array(inputs), np.array(targets)

# Assuming your DataFrame is named `data`
# Separate the input features and target values
input_features = data.drop(columns=['5daysChange', 'C103', 'C106']).to_numpy()
input_features =  data[['Asset']].to_numpy()
target_values = data[['C103', 'C106']].to_numpy()

# Discard the last 5 elements in both input_features and target_values
input_features = input_features[:shiftDay]
target_values = target_values[:shiftDay]

sequence_length = 10
inputs, targets = create_sequences(input_features, target_values, sequence_length)

In [None]:
data.head(10)

In [None]:
inputs.shape


3. Split the data into training and testing sets.

In [None]:
split_index = int(0.75 * len(inputs))

# Split the data into training and testing sets
X_train, X_test = inputs[:split_index], inputs[split_index:]
y_train, y_test = targets[:split_index], targets[split_index:]


X_train.shape, X_test.shape
y_train.shape, y_test.shape


4. Implement the attention model using PyTorch.




In [None]:

import torch
import torch.nn as nn

class AttentionModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_heads, num_layers, output_size):
        super(AttentionModel, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        
        # Three Multi-Head Attention layers
        self.multi_head_attention1 = nn.MultiheadAttention(embed_dim=hidden_size, num_heads=num_heads)
        self.multi_head_attention2 = nn.MultiheadAttention(embed_dim=hidden_size, num_heads=num_heads)
        self.multi_head_attention3 = nn.MultiheadAttention(embed_dim=hidden_size, num_heads=num_heads)
        
        fc_hidden_size = int(2/3 * hidden_size)
        self.fc1 = nn.Linear(hidden_size, fc_hidden_size)
        self.fc2 = nn.Linear(fc_hidden_size, int(2/3 *fc_hidden_size))
        self.fc3 = nn.Linear(int(2/3 *fc_hidden_size), output_size)
    def forward(self, x):
        x = x[:, ::5, :]
        # LSTM
        lstm_out, _ = self.lstm(x)
        
        # Multi-Head Attention layers
        attention_out1, _ = self.multi_head_attention1(lstm_out, lstm_out, lstm_out)
        attention_out2, _ = self.multi_head_attention2(attention_out1, attention_out1, attention_out1)
        attention_out3, _ = self.multi_head_attention3(attention_out2, attention_out2, attention_out2)
        
        # Fully connected layer        # Fully connected layers
        fc_out = nn.ReLU()(self.fc1(attention_out3[:, -1, :]))
        fc_out = nn.ReLU()(self.fc2(fc_out))
        out = self.fc3(fc_out)
        return out
class ConvLSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(ConvLSTMModel, self).__init__()
        
        # Convolutional layers and max-pooling
        self.conv1 = nn.Conv1d(input_size, hidden_size, kernel_size=10,  padding=3,stride=3)
        self.pool1 = nn.MaxPool1d(3)
        self.conv2 = nn.Conv1d(hidden_size, hidden_size, kernel_size=10, padding=9,stride=3)
        self.pool2 = nn.MaxPool1d(3)
        self.conv3 = nn.Conv1d(hidden_size, hidden_size, kernel_size=10, padding=9,stride=3)
        self.pool3 = nn.MaxPool1d(3)
        
        # LSTM layers
        self.lstm = nn.LSTM(hidden_size, hidden_size, num_layers=2, batch_first=True)
        
        # Fully connected layers
        fc_hidden_size = int(2/3 * hidden_size)
        self.fc1 = nn.Linear(hidden_size, fc_hidden_size)
        self.fc2 = nn.Linear(fc_hidden_size, fc_hidden_size)
        self.fc3 = nn.Linear(fc_hidden_size, output_size)

    def forward(self, x):
        # Convolutional and max-pooling layers
        x = x.permute(0, 2, 1)  # Swap time and feature dimensions
        x = nn.ReLU()(self.conv1(x))
        x = self.pool1(x)
        x = nn.ReLU()(self.conv2(x))
        x = self.pool2(x)
        x = nn.ReLU()(self.conv3(x))
        x = self.pool3(x)
        
        x = x.permute(0, 2, 1)  # Swap time and feature dimensions back
        
        # LSTM layers
        lstm_out, _ = self.lstm(x)
        
        # Fully connected layers
        fc_out = nn.ReLU()(self.fc1(lstm_out[:, -1, :]))
        fc_out = nn.ReLU()(self.fc2(fc_out))
        out = torch.sigmoid(self.fc3(fc_out))  # Use sigmoid activation for multi-label classification
        
        return out



# Set the input size, hidden size, number of layers, and output size
input_size = inputs.shape[2]  # Number of features in your data
hidden_size = 64
num_layers = 2
output_size = 2  # There are two output classes (C103 and C106)

# Create the model
model = ConvLSTMModel(input_size, hidden_size, num_layers, output_size)



5. Train the model using the training data.



In [None]:

# Set hyperparameters
learning_rate = 0.001
num_epochs = 1000
batch_size = 32

# Create data loaders
train_data = torch.utils.data.TensorDataset(torch.tensor(X_train).float(), torch.tensor(y_train).float())
train_loader = torch.utils.data.DataLoader(train_data, batch_size=batch_size, shuffle=True)

test_data = torch.utils.data.TensorDataset(torch.tensor(X_test).float(), torch.tensor(y_test).float())
test_loader = torch.utils.data.DataLoader(test_data, batch_size=1, shuffle=False)

# Set the loss function and optimizer
criterion = nn.BCELoss()  # Use Binary Cross-Entropy Loss for multi-label classification
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)


from torch.optim.lr_scheduler import StepLR
step_size = 10  # Decrease learning rate every 10 epochs
gamma = 0.3     # Multiply the learning rate by 0.1 (reduce it to 10%)

# Create the scheduler
scheduler = StepLR(optimizer, step_size=step_size, gamma=gamma)



def test_loss(model, criterion, test_loader):
    model.eval()
    running_loss = 0.0
    with torch.no_grad():
        for batch_X, batch_y in test_loader:
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            running_loss += loss.item()
    return running_loss / len(test_loader)

# Train the model and record losses
training_losses = []
testing_losses = []

model.train()
for epoch in range(num_epochs):
    running_loss = 0.0
    for batch_X, batch_y in train_loader:
        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs.squeeze(), batch_y)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    train_loss = running_loss / len(train_loader)
    training_losses.append(train_loss)
    
    test_loss_value = test_loss(model, criterion, test_loader)
    testing_losses.append(test_loss_value)
    scheduler.step()

    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Test Loss: {test_loss_value:.4f}, learning rate: {optimizer.param_groups[0]["lr"]:.8f}')

In [None]:
import matplotlib.pyplot as plt


def plot_losses(training_losses, testing_losses):
    plt.figure()
    plt.plot(training_losses[1:], label='Training Loss')
    plt.plot(testing_losses[1:], label='Testing Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Training and Testing Losses')
    plt.legend()
    plt.show()

# Plot the training and testing losses
plot_losses(training_losses, testing_losses)


6. Evaluate the model using the testing data.

In [None]:
import matplotlib.pyplot as plt
def plot_predictions(actuals, predictions, class_labels, title):
    num_classes = len(class_labels)
    fig, axes = plt.subplots(num_classes, figsize=(10, 4 * num_classes))
    
    for i in range(num_classes):
        actuals_i = [x[0][i] for x in actuals]  # Adjust indexing for actuals
        predictions_i = [x[0][i] for x in predictions]  # Adjust indexing for predictions
        axes[i].plot(actuals_i, label='Actual')
        axes[i].plot(predictions_i, label='Predicted')
        axes[i].set_xlabel('Time steps')
        axes[i].set_ylabel(f'Class: {class_labels[i]}')
        axes[i].set_title(f'{title} - Class: {class_labels[i]}')
        axes[i].legend()
    
    plt.tight_layout()
    plt.show()
# Evaluate the model and create graphs for the training set
train_predictions = []
train_actuals = []
with torch.no_grad():
    for batch_X, batch_y in torch.utils.data.DataLoader(train_data, batch_size=1, shuffle=False):
        output = model(batch_X)
        train_predictions.append(output.numpy())
        train_actuals.append(batch_y.numpy())
plot_predictions(train_actuals, train_predictions, ['C103', 'C106'], 'Training set: Actual vs. Predicted')

In [None]:

# Evaluate the model and create graphs for the testing set
test_predictions = []
test_actuals = []
with torch.no_grad():
    for batch_X, batch_y in test_loader:
        output = model(batch_X)
        test_predictions.append(output.numpy())
        test_actuals.append(batch_y.numpy())
plot_predictions(test_actuals, test_predictions, ['C103', 'C106'], 'Testing set: Actual vs. Predicted')