In [None]:
import os
try:
    from google.colab import drive
    drive.mount("/content/gdrive", force_remount=True)
    # cd gdrive/MyDrive/'Colab Notebooks'/Innovative-Approaches-to-Asset-Prediction/
    os.chdir("/content/gdrive/MyDrive/'Colab Notebooks'/Innovative-Approaches-to-Asset-Prediction/")
    print("Working on Google Colab...")
except:
    try:
        os.chdir(os.path.abspath(os.path.join(os.path.abspath(os.path.dirname(__vsc_ipynb_file__)), os.pardir)))
        print("Working on local machine...")
    except:
        print("Can't change directory. Quitting...")
        exit(1)

In [2]:
# !pip install -r requirements.txt

In [None]:
import random
run_id = random.randint(10_000, 100_000)

# Configuration with additional complexity
config = {
    'data_filename': 'US.RANDOM.10.5.64.3.RANGE.VOL',
    'data_length': 10_000,
    'batch_size': 128,  # Increased batch size
    'epochs': 200,     # Increased epochs
    'in_channels': 3,
    'output_size': 4,
    'conv_layers': [
        (16, 3, 1),
        (64, 3, 1),
        (256, 3, 1),
        (1024, 3, 1),
        (1024, 3, 1),
        (1024, 3, 1),
    ],
    'pool_layers': [ (2, 2), (2, 2), None, None ],
    'fc_layers': [256, 64],
    'lstm_hidden_size': 64,
    'lstm_layers': 1,
    'leak': 0.1,
    'dropout': 0.5,
    'img_size': (64, 64),
    'learning_rate': 1e-4,  # Adjusted learning rate for Adam
    'num_candles': 5,      # Number of candles to consider in sequence
}
print(f"Run ID: {run_id}")

In [4]:
import torch
import json
import numpy as np
import pandas as pd

import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.cuda.amp import GradScaler, autocast

import torchvision

from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

import matplotlib.pyplot as plt
from livelossplot import PlotLosses

import seaborn as sns


In [5]:
def set_seed(seed):
    """
    Use this to set ALL the random seeds to a fixed value and take out any randomness from cuda kernels
    """
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.benchmark = False  ##uses the inbuilt cudnn auto-tuner to find the fastest convolution algorithms. -
    torch.backends.cudnn.enabled = False
    return True

In [None]:
print(torch.version.cuda)

In [None]:
set_seed(42)

device = 'cpu'
if torch.cuda.device_count() > 0 and torch.cuda.is_available():
    print("Cuda installed! Running on GPU!")
    device = 'cuda'
else:
    print("No GPU available! Using CPU!")

In [8]:
from PIL import Image
# Custom dataset to handle image and target loading
class ImageSequenceDataset(Dataset):
    def __init__(self, images, targets, transform=None, num_candles=10):
        self.images = images
        self.targets = targets
        self.transform = transform
        self.num_candles = num_candles

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image_array = self.images[idx]
        target = self.targets[idx]

        # Split the image into individual candles
        # Assuming the candles are equally spaced horizontally
        image = Image.fromarray(image_array)
        width, height = image.size
        candle_width = width // self.num_candles

        candles = []
        for i in range(self.num_candles):
            left = i * candle_width
            right = left + candle_width
            candle_img = image.crop((left, 0, right, height))

            if self.transform:
                candle_img = self.transform(candle_img)
            candles.append(candle_img)

        # Stack candles to create a sequence tensor
        sequence = torch.stack(candles)  # Shape: (num_candles, C, H, W)
        return sequence, target

In [None]:
# Load the data
filename = config["data_filename"]
full_data = np.load(f"./data/processed/{filename}/data.npy", allow_pickle=True)   # noqa

full_data = pd.DataFrame(full_data)
print("Full shape: ", full_data.shape)

full_data = full_data.sample(n=config["data_length"] if config["data_length"] != None else len(full_data))

full_data.dropna(inplace=True)
# min_sample = full_data[1].value_counts().min()

# full_data = full_data.groupby(by=[1]).sample(n=min_sample, random_state=1)

print("Used shape: ", full_data.shape)
print(full_data.head())

# Plot the distribution of each column (except the image column)
for col in full_data.columns[1:]:
    plt.figure(figsize=(10, 6))
    sns.histplot(full_data[col], bins=config["output_size"], kde=True)
    plt.title(f'Distribution of {col}')
    plt.xlabel(col)
    plt.ylabel('Frequency')
    plt.show()

In [None]:
data = full_data.to_numpy()
print(data.shape)

In [11]:
from sklearn.model_selection import train_test_split

def train_val_test_split(data, train_size=0.6, val_size=0.3):
    # Split data into train+val and test sets
    train_val_data, test_data = train_test_split(data, test_size=(1 - (train_size + val_size)), random_state=42)

    # Calculate the proportion of validation data relative to the remaining data (train + val)
    relative_val_size = val_size / (train_size + val_size)

    # Split train+val into train and validation sets
    train_data, val_data = train_test_split(train_val_data, test_size=relative_val_size, random_state=42)

    return train_data, val_data, test_data

In [None]:
train_data, val_data, test_data = train_val_test_split(full_data)

train_data = train_data.to_numpy()

# min_sample = val_data[1].value_counts().min()
# val_data = val_data.groupby(by=[1]).sample(n=min_sample, random_state=1)
val_data = val_data.to_numpy()

min_sample = test_data[1].value_counts().min()
test_data = test_data.groupby(by=[1]).sample(n=min_sample, random_state=1)
test_data = test_data.to_numpy()

print(train_data.shape)
plt.hist(train_data[:, 1:], bins=config["output_size"])
plt.show()
print(val_data.shape)
plt.hist(val_data[:, 1:], bins=config["output_size"])
plt.show()
print(test_data.shape)
plt.hist(test_data[:, 1:], bins=config["output_size"])
plt.show()

train_images = train_data[:, 0]
train_targets = np.asarray(train_data[:, 1:], dtype=np.float64)

val_images = val_data[:, 0]
val_targets = np.asarray(val_data[:, 1:], dtype=np.float64)

test_images = test_data[:, 0]
test_targets = np.asarray(test_data[:, 1:], dtype=np.float64)

In [None]:
def quantize_labels(targets, num_classes=2):
    """
    Quantize targets from the range [-1, 1] to the integer range [0, num_classes-1].

    Args:
        targets (numpy.ndarray): Array of targets in the range [-1, 1].
        num_classes (int): Number of classes to quantize to.

    Returns:
        numpy.ndarray: Quantized targets in the integer range [0, num_classes-1].
    """
    # Scale the range from [-1, 1] to [0, 1]
    scaled_targets = (targets + 1) / 2

    # Quantize to the range [0, num_classes-1]
    quantized_targets = np.floor(scaled_targets * (num_classes)).astype(np.int64)

    # Ensure targets are within the range [0, num_classes-1]
    quantized_targets = np.clip(quantized_targets, 0, num_classes-1)

    return quantized_targets

plt.figure(figsize=(10, 6))
sns.histplot(train_targets, bins=config["output_size"])
plt.title(f'Distribution of original range')
plt.xlabel(col)
plt.ylabel('Frequency')
plt.show()

# train_targets = quantize_labels(train_targets, num_classes=config['output_size'])
# val_targets = quantize_labels(val_targets, num_classes=config['output_size'])
# test_targets = quantize_labels(test_targets, num_classes=config['output_size'])

train_targets = train_targets.astype(np.int64)
val_targets = val_targets.astype(np.int64)
test_targets = test_targets.astype(np.int64)

plt.figure(figsize=(10, 6))
sns.histplot(train_targets, bins=config["output_size"])
plt.title(f'Distribution from 0 to {config["output_size"]-1}')
plt.xlabel(col)
plt.ylabel('Frequency')
plt.show()

In [14]:
from torchvision import transforms

# Transformations for the images
transform = transforms.Compose([
    transforms.Resize((config["img_size"][0], config["img_size"][1])),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,)),   # Normalize to [-1, 1]
])

# Create datasets and dataloaders
trainset = ImageSequenceDataset(train_images, train_targets, transform=transform, num_candles=config['num_candles'])
valset = ImageSequenceDataset(val_images, val_targets, transform=transform, num_candles=config['num_candles'])
testset = ImageSequenceDataset(test_images, test_targets, transform=transform, num_candles=config['num_candles'])

trainloader = DataLoader(trainset, batch_size=config["batch_size"], shuffle=True)
valloader = DataLoader(valset, batch_size=config["batch_size"], shuffle=False)
testloader = DataLoader(testset, batch_size=1, shuffle=False)

In [15]:
def matplotlib_imshow(img, one_channel=False):
    if one_channel:
        img = img.mean(dim=0)
    img = img / 2 + 0.5     # unnormalize
    npimg = img.numpy()
    if one_channel:
        plt.imshow(npimg, cmap="Greys")
    else:
        plt.imshow(np.transpose(npimg, (1, 2, 0)))

In [None]:
# get some random training sequences
sequence, label = trainloader.dataset[0]

# Show the sequence of candles
plt.figure(figsize=(15, 3))
for i in range(sequence.size(0)):
    plt.subplot(1, sequence.size(0), i+1)
    matplotlib_imshow(sequence[i])
    plt.axis('off')
plt.suptitle(f'Label: {label}')
plt.show()

In [None]:
# helper function
def select_n_random_sequences(dataset, n=5):
    '''
    Selects n random sequences and their corresponding labels from a dataset
    '''
    # Get n random indices
    rand_indices = torch.randperm(len(dataset)).tolist()[:n]

    # Select n random sequences and labels
    sequences_labels = [dataset[i] for i in rand_indices]

    return sequences_labels

# select random sequences and their target indices
sequences_labels = select_n_random_sequences(trainloader.dataset, n=3)

# plot sequences
for idx, (sequence, label) in enumerate(sequences_labels):
    plt.figure(figsize=(15, 3))
    for i in range(sequence.size(0)):
        plt.subplot(1, sequence.size(0), i+1)
        matplotlib_imshow(sequence[i])
        plt.axis('off')
    plt.suptitle(f'Sequence {idx+1} - Label: {label}')
    plt.show()

In [18]:
class FlexibleSequenceNet(nn.Module):
    def __init__(self, config):
        super(FlexibleSequenceNet, self).__init__()
        self.config = config
        self.num_candles = config['num_candles']
        in_channels = config['in_channels']
        self.cnn = nn.Sequential()
        layers = []
        for idx, (out_channels, kernel_size, padding) in enumerate(config['conv_layers']):
            layers.append(nn.Conv2d(in_channels, out_channels, kernel_size, padding=padding))
            layers.append(nn.BatchNorm2d(out_channels))
            layers.append(nn.LeakyReLU(negative_slope=config['leak']))
            if idx < len(config['pool_layers']) and config['pool_layers'][idx]:
                layers.append(nn.MaxPool2d(kernel_size=config['pool_layers'][idx]))
            in_channels = out_channels
        self.cnn = nn.Sequential(*layers)

        # Calculate the size of the flattened features after all convolutions and pooling layers
        with torch.no_grad():
            sample_input = torch.randn(1, config['in_channels'], *config['img_size'])
            conv_output = self.cnn(sample_input)
            self.feature_size = conv_output.view(-1).size(0)

        # LSTM to process sequence of features
        self.lstm = nn.LSTM(
            input_size=self.feature_size,
            hidden_size=config['lstm_hidden_size'],
            num_layers=config['lstm_layers'],
            batch_first=True,
        )

        # Fully connected layers
        fc_layers = []
        input_dim = config['lstm_hidden_size']
        for hidden_size in config['fc_layers']:
            fc_layers.append(nn.Linear(input_dim, hidden_size))
            fc_layers.append(nn.BatchNorm1d(hidden_size))
            fc_layers.append(nn.LeakyReLU(negative_slope=config['leak']))
            fc_layers.append(nn.Dropout(config['dropout']))
            input_dim = hidden_size

        fc_layers.append(nn.Linear(input_dim, config['output_size']))
        self.fc_layers = nn.Sequential(*fc_layers)

    def forward(self, x):
        # x shape: (batch_size, sequence_length, channels, height, width)
        batch_size, seq_len, C, H, W = x.size()
        x = x.view(batch_size * seq_len, C, H, W)  # Merge batch and sequence dimensions
        x = self.cnn(x)  # Apply CNN
        x = x.view(batch_size, seq_len, -1)  # Reshape back to (batch_size, seq_len, feature_size)
        x, _ = self.lstm(x)
        x = x[:, -1, :]  # Get the output of the last time step
        x = self.fc_layers(x)
        x = F.log_softmax(x, dim=1)
        return x


In [19]:
def rmse(pred, target):
    return torch.sqrt(torch.mean((pred - target) ** 2))


def print_examples(targets, predictions, num_examples=5):
    print("Samples (Target -> Prediction):")
    for i in range(min(num_examples, len(targets))):
        correct = np.allclose(targets[i], predictions[i], atol=0.1)
        print(f"{targets[i]} -> {predictions[i]} {'OK' if correct else ''}")
    
    # Calculate and print relevant statistics
    accuracy = accuracy_score(targets, predictions)
    precision = precision_score(targets, predictions, average='weighted')
    recall = recall_score(targets, predictions, average='weighted')
    f1 = f1_score(targets, predictions, average='weighted')

    print("\nStatistics:")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")

In [20]:
def save_model(model: FlexibleSequenceNet, config: dict, val_accuracy: float, run_id, path='./deep_learning/models/'):
    # Find the pth file from the same run_id
    files = os.listdir(path)
    for file in files:
        if str(run_id) in file:
            os.remove(os.path.join(path, file))

    torch.save(model.state_dict(), f'./deep_learning/models/{run_id}.{config["data_filename"]}.{config["output_size"]}.{val_accuracy*100:.0f}.pth')
    json.dump(config, open(f'./deep_learning/models/{run_id}.{config["data_filename"]}.{config["output_size"]}.{val_accuracy*100:.0f}.json', 'w'))

    return f'{run_id}.{config["data_filename"]}.{config["output_size"]}.{val_accuracy*100:.0f}'

In [21]:
def calculate_class_weights(targets):
    # Ensure targets are a 1D array
    targets = targets.flatten()
    
    # Count occurrences of each class
    class_counts = np.bincount(targets)
    total_samples = targets.shape[0]
    num_classes = len(class_counts)
    
    # Compute class weights inversely proportional to class frequencies
    class_weights = np.zeros(num_classes, dtype=np.float32)
    for i in range(num_classes):
        if class_counts[i] > 0:
            class_weights[i] = total_samples / (num_classes * class_counts[i])
        else:
            class_weights[i] = 0.0  # or assign a very high weight
    
    # Convert to PyTorch tensor
    class_weights = torch.tensor(class_weights, dtype=torch.float32)
    
    return class_weights

In [None]:
# Initialize the model, loss function, optimizer, and scaler
model = FlexibleSequenceNet(config).to(device)

# Initialize Cross Entropy Loss
target_weights = calculate_class_weights(train_targets).to(device)
print(target_weights)
criterion = nn.NLLLoss(weight=target_weights, reduction='mean')

# optimizer
optimizer = optim.Adam(model.parameters(), lr=config['learning_rate'], weight_decay=1e-4)

scaler = GradScaler()

# Learning rate scheduler
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

# Wrap the model with DataParallel to enable multi-GPU support
if torch.cuda.device_count() > 1:
    print(f"Using {torch.cuda.device_count()} GPUs")
    print(f"Device name: {device}")
    model = nn.DataParallel(model)

total_params = sum(
    param.numel() for param in model.parameters()
)
print(f'{total_params:,} total parameters.')

In [None]:
# Training loop
import numpy as np
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

epochs = config['epochs']
best_val_loss = float('inf')
no_improvement_epochs = 0
max_no_improvement_epochs = 20 # Maximum number of epochs to wait for improvement
min_improvement = 0.001 # Minimum improvement to reset the counter

# Initialize livelossplot
liveloss = PlotLosses(outputs=['MatplotlibPlot'], groups={'Loss': ['loss', 'val_loss'], 'Accuracy': ['acc', 'val_acc'], 'Metrics': ['val_precision', 'val_recall', 'val_f1']})

for epoch in range(epochs):
    logs = {}
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    for sequences, targets in trainloader:
        sequences, targets = sequences.to(device), targets.squeeze().to(device)
        optimizer.zero_grad()
        outputs = model(sequences)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item() * sequences.size(0)
        _, predicted = torch.max(outputs, 1)
        total += targets.size(0)
        correct += (predicted == targets).sum().item()
    
    train_loss = running_loss / len(trainloader.dataset)
    train_accuracy = correct / total
    
    # Validation
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    all_val_targets = []
    all_val_predictions = []
    
    with torch.no_grad():
        for sequences, targets in valloader:
            sequences, targets = sequences.to(device), targets.squeeze().to(device)
            outputs = model(sequences)
            loss = criterion(outputs, targets)
            val_loss += loss.item() * sequences.size(0)
            _, predicted = torch.max(outputs, 1)
            total += targets.size(0)
            correct += (predicted == targets).sum().item()
            all_val_targets.extend(targets.cpu().numpy())
            all_val_predictions.extend(predicted.cpu().numpy())
    
    val_loss = val_loss / len(valloader.dataset)
    val_accuracy = correct / total
    
    # Calculate additional metrics
    val_precision = precision_score(all_val_targets, all_val_predictions, average='weighted', zero_division=0)
    val_recall = recall_score(all_val_targets, all_val_predictions, average='weighted', zero_division=0)
    val_f1 = f1_score(all_val_targets, all_val_predictions, average='weighted', zero_division=0)
    
    # Log the values
    logs['loss'] = train_loss
    logs['acc'] = train_accuracy
    logs['val_loss'] = val_loss
    logs['val_acc'] = val_accuracy
    logs['val_precision'] = val_precision
    logs['val_recall'] = val_recall
    logs['val_f1'] = val_f1
    
    # Send the logs to livelossplot
    liveloss.update(logs)
    liveloss.send()
    
    print(f"Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}, Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}")
    
    # Print example predictions
    print_examples(all_val_targets, all_val_predictions, num_examples=10)
    
    # Compute and plot confusion matrix
    cm = confusion_matrix(all_val_targets, all_val_predictions)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title(f'Confusion Matrix - Epoch {epoch+1}')
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.show()
    
    # Step the scheduler
    scheduler.step()
    
    # Check for improvement
    if val_loss < best_val_loss - min_improvement:
        best_val_loss = val_loss
        no_improvement_epochs = 0
        filename = save_model(model, config, val_accuracy, run_id)
    else:
        no_improvement_epochs += 1
        print(f"No improvement for {no_improvement_epochs} epochs.")
    
    if no_improvement_epochs >= max_no_improvement_epochs:
        print("Stopping training.")
        break
    
    if train_accuracy >= 0.9999:
        print("Training accuracy reached 100%.")
        break

filename = save_model(model, config, val_accuracy, run_id)
print('Finished Training')

In [None]:
# Load the best model
# filename = f'{run_id}.{config["data_filename"]}.{config["output_size"]}.{val_accuracy*100:.0f}'

print(f'Loading model: {filename}')
config = json.load(open(f'./deep_learning/models/{filename}.json'))
model = FlexibleSequenceNet(config).to(device)
model.load_state_dict(torch.load(f'./deep_learning/models/{filename}.pth'))

# Test the model
model.eval()
test_loss = 0.0
correct = 0
total = 0

all_test_targets = []
all_test_predictions = []

with torch.no_grad():
    for sequences, targets in testloader:
        sequences = sequences.to(device)
        targets = targets.squeeze(0).to(device).long()
        outputs = model(sequences)
        loss = criterion(outputs, targets)
        test_loss += loss.item() * sequences.size(0)
        _, predicted = torch.max(outputs, 1)
        total += targets.size(0)
        correct += (predicted == targets).sum().item()
        all_test_targets.extend(targets.cpu().numpy())
        all_test_predictions.extend(predicted.cpu().numpy())

test_loss = test_loss / len(testloader.dataset)
test_accuracy = correct / total

print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")

# Calculate additional metrics
test_precision = precision_score(all_test_targets, all_test_predictions, average='weighted', zero_division=0)
test_recall = recall_score(all_test_targets, all_test_predictions, average='weighted', zero_division=0)
test_f1 = f1_score(all_test_targets, all_test_predictions, average='weighted', zero_division=0)

print("\nTest Statistics:")
print(f"Precision: {test_precision:.4f}")
print(f"Recall: {test_recall:.4f}")
print(f"F1 Score: {test_f1:.4f}")

# Add test accuracy to txt database file
with open('./deep_learning/models/results.txt', 'a') as f:
    f.write(f'{filename}: {test_accuracy:.4f}\n')
