In [None]:
!pip install torchsummary



In [None]:
import torch
import torch.nn as nn
import torch.nn.init as init
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
from sklearn.metrics import accuracy_score, f1_score
import pandas as pd



# Assuming your CSV files are named train.csv, test.csv, and validate.csv
train_csv_path = '/kaggle/input/future-5-and-10-excel-data-scenario17/scenario17_dev_series_train_5.csv'
test_csv_path = '/kaggle/input/future-5-and-10-excel-data-scenario17/scenario17_dev_series_test_5.csv'
validate_csv_path = '/kaggle/input/future-5-and-10-excel-data-scenario17/scenario17_dev_series_val_5.csv'

class CustomDataset(Dataset):
    def __init__(self, csv_path):
        self.data = pd.read_csv(csv_path)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        # Load and preprocess the images
        image_paths = ['/kaggle/input/scenario-17-dataset/scenario17_vision/dev_data/' + str(path).lstrip('.') for path in self.data.iloc[idx, 1:9].values]
        images = [Image.open(path).convert('RGB') for path in image_paths]

        transform = transforms.Compose([
            transforms.Resize((64, 64)),  # Resize images to match the expected input size
            transforms.ToTensor(),
        ])
        images = [transform(img) for img in images]
        images = torch.stack(images)

        # Load the target
        target_value = self.data.iloc[idx,15]

        target = torch.tensor(int(target_value))

#         print("images - ",images)
        return images, target

train_dataset = CustomDataset(train_csv_path)
test_dataset = CustomDataset(test_csv_path)
validate_dataset = CustomDataset(validate_csv_path)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
validate_loader = DataLoader(validate_dataset, batch_size=64, shuffle=False)


In [None]:
import torch.nn.functional as F

class CNNBlock(nn.Module):
    def __init__(self):
        super(CNNBlock, self).__init__()
        self.cnn_layers = nn.ModuleList([
            nn.Sequential(
                nn.Conv2d(3, 64, kernel_size=3, padding=1),
                nn.BatchNorm2d(64),
                nn.Conv2d(64, 128, kernel_size=3, padding=1),
                nn.BatchNorm2d(128),
                nn.ReLU(),
                nn.MaxPool2d(8),
                nn.MaxPool2d(8),
            ) for _ in range(8)  # 8 parallel CNN layers
        ])

        self.dense_layers = nn.ModuleList([
            nn.Linear(1 * 1 * 128, 4) for _ in range(8)  # Dense layer for each CNN layer
        ])

    def forward(self, x):
        cnn_outputs = []
        for i in range(8):
            cnn_output = self.cnn_layers[i](x[:, i, :, :, :])
#             print(cnn_output)
#             print(cnn_output.size())
            cnn_output = cnn_output.view(cnn_output.size(0), -1)  # Flatten the output
#             print(cnn_output.size())
            dense_output = self.dense_layers[i](cnn_output)
#             print(dense_output)
#             print(dense_output.size())
            cnn_outputs.append(dense_output)


        return cnn_outputs


class LSTMModel(nn.Module):
    def __init__(self):
        super(LSTMModel, self).__init__()
        self.cnn_block = CNNBlock()
        self.lstm = nn.LSTM(input_size=4, hidden_size=64, batch_first=True)
        self.fc = nn.Linear(64, 1)

    def forward(self, x):
        cnn_outputs = self.cnn_block(x)
        #print(len(cnn_outputs))

        cnn_outputs = torch.stack(cnn_outputs, dim=1)  # Stack along the sequence dimension

        lstm_out, _ = self.lstm(cnn_outputs)

        lstm_out = lstm_out[:, -1, :]  # Take the last time step output
        #print(lstm_out)
        output = self.fc(lstm_out)

        return output

# Instantiate the model
model = LSTMModel()


In [None]:
print(model)

LSTMModel(
  (cnn_block): CNNBlock(
    (cnn_layers): ModuleList(
      (0-7): 8 x Sequential(
        (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (3): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (4): ReLU()
        (5): MaxPool2d(kernel_size=8, stride=8, padding=0, dilation=1, ceil_mode=False)
        (6): MaxPool2d(kernel_size=8, stride=8, padding=0, dilation=1, ceil_mode=False)
      )
    )
    (dense_layers): ModuleList(
      (0-7): 8 x Linear(in_features=128, out_features=4, bias=True)
    )
  )
  (lstm): LSTM(4, 64, batch_first=True)
  (fc): Linear(in_features=64, out_features=1, bias=True)
)


In [None]:
image_shape = (3, 64, 64)
num_classes = 2

sequence_length = 8
num_epochs = 25
learning_rate = 0.001
hidden_size = 64
num_layers = 1

In [None]:
import torch.optim as optim
from sklearn.metrics import accuracy_score, f1_score

# Assuming binary classification, change num_classes accordingly if needed
num_classes = 2

# Instantiate the model
model = LSTMModel()

# Check for GPU availability
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Loss function and optimizer
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)


scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

max_gradient_norm = 1.0

# Training loop
for epoch in range(num_epochs):
    model.train()  # Set the model to training mode
    total_loss = 0
    correct = 0
    total_samples = 0
    all_predictions = []
    all_labels = []

    for inputs, labels in train_loader:
        # Move data to GPU if available
        inputs, labels = inputs.to(device), labels.to(device)

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)

        # Assuming labels are 0 or 1, make sure they are float tensors
        labels = labels.float().view(-1, 1)

        # Calculate loss
        loss = criterion(outputs, labels)

        # Backward pass and optimization
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=max_gradient_norm)
        optimizer.step()

        # Update statistics
        total_loss += loss.item()
        predictions = (torch.sigmoid(outputs) > 0.5).float()  # Convert logits to probabilities and threshold at 0.5
        correct += (predictions == labels).sum().item()
        total_samples += labels.size(0)

        # Collect predictions and labels for later evaluation
        all_predictions.extend(predictions.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

        # Debugging information during training
#         print("Training Predictions:", predictions)
#         print("Training Labels:", labels)

    # Calculate metrics
    accuracy = accuracy_score(all_labels, all_predictions)
    f1 = f1_score(all_labels, all_predictions)

    # Print training statistics
    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {total_loss / len(train_loader)}, Accuracy: {accuracy}, F1 Score: {f1}')


torch.save(model.state_dict(), 'lstm_model_weights.pth')

Epoch [1/25], Loss: 0.6063473701477051, Accuracy: 0.7037037037037037, F1 Score: 0.7059569074778199


In [None]:
for test_inputs, test_labels in test_loader:
    print("inputs ",test_inputs)
    print("labels ",test_labels)

In [None]:
import numpy as np

model.eval()  # Set the model to evaluation mode
test_correct = 0
test_total_samples = 0
test_all_predictions = []
test_all_labels = []

with torch.no_grad():
    for test_inputs, test_labels in test_loader:
        # Move data to GPU if available
        test_inputs, test_labels = test_inputs.to(device), test_labels.to(device)

        # Forward pass
        test_outputs = model(test_inputs)

        # Assuming labels are 0 or 1, make sure they are float tensors
#         test_labels = test_labels.float().view(-1, 1)

        # Apply sigmoid activation to get probabilities
        test_probs = torch.sigmoid(test_outputs)

        # Apply threshold at 0.5 to get binary predictions
        test_predictions = (test_probs > 0.5).float()

        test_correct += (test_predictions == test_labels).sum().item()
        test_total_samples += test_labels.size(0)

        # Collect test predictions and labels for later evaluation
        test_all_predictions.extend(test_predictions.cpu().numpy())
        test_all_labels.extend(test_labels.cpu().numpy())


# Convert true labels to binary format
test_all_labels_binary = np.array(test_all_labels).astype(int)
test_all_predictions_binary = np.array(test_all_predictions).astype(int)

# Debugging information
print("Binary Test Labels:", test_all_labels_binary)
print("Binary Test Predictions:", test_all_predictions_binary)

# Calculate test metrics with the correct binary format
test_accuracy = accuracy_score(test_all_labels_binary, test_all_predictions_binary)
test_f1 = f1_score(test_all_labels_binary, test_all_predictions_binary, average='macro')

# Print test statistics
print(f'Test Accuracy: {test_accuracy}, Test F1 Score: {test_f1}')