# This script is to use images to predict mobility variables



In [29]:
# Install required libraries
# !pip install torch torchvision scikit-learn matplotlib pandas


In [39]:
# Import necessary libraries and modules
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score, mean_squared_error
from sklearn.metrics import classification_report, mean_absolute_error, mean_squared_error, r2_score
import torch.nn.functional as F
import matplotlib.pyplot as plt
import numpy as np
import torch.nn.init as init
from CustomDataset import CustomDataset


In [40]:
# Pre-define some lists and loss
loss_list = []
residuals = []
l1_loss = nn.L1Loss()

In [41]:
# Define the training loop function: trainloop
def trainloop(train_loader):
  # Set the number of training epochs
  num_epoch = 10

  # Iterate over each epoch
  for epoch in range(num_epoch):
    running_loss = 0.0

    # Iterate over the batches in the training data
    for i, data in enumerate(train_loader, 0):
      # Extract inputs (images) and labels from the batch
      inputs, labels = data

      # Move inputs and labels to the device (GPU if available)
      inputs = inputs.to(device).float()
      labels = labels.to(device).float()

      # Zero the gradients in the optimizer
      optimizer.zero_grad()

      # Forward pass: compute model outputs
      outputs = net(inputs)

      # Calculate the L1 loss
      loss = criterion(outputs, labels.view(-1, 1))

      # Calculate L1 regularization term
      l1_reg = 0.0
      for param in net.parameters():
        l1_reg += l1_loss(param, torch.zeros_like(param))

      # Apply L1 regularization with a specified lambda
      l1_lambda = 0.001
      loss += l1_lambda * l1_reg

      # Backpropagation: compute gradients and update model parameters
      loss.backward()
      optimizer.step()

      # Update running loss
      running_loss += loss.item()

      # Append current loss to loss_list for tracking
      loss_list.append(loss.item())

    # Print training statistics for the epoch
    print(f"Epoch {epoch + 1}/{num_epoch} - Loss: {running_loss / len(train_loader):.4f} ")

  # Calculate and print R-squared on training data
  predicted_train_labels = []
  true_train_labels = []
  with torch.no_grad():
    for inputs, labels in train_loader:
      inputs = inputs.to(device).float()
      labels = labels.to(device).float()
      outputs = net(inputs)
      predicted_train_labels += outputs.squeeze().tolist()
      true_train_labels += labels.tolist()
  train_r2 = r2_score(true_train_labels, predicted_train_labels)
  print("Train R-squared:", train_r2)


In [45]:
if __name__ == '__main__':
  # Read Excel file and extract the labels for the training set
  train_label_df = pd.read_excel('SampleDataset/trainlabel1.xlsx')
  train_labels = train_label_df['target'].tolist()

  # Read Excel file and extract the labels for the test set
  test_label_df = pd.read_excel('SampleDataset/testlabel1.xlsx')
  test_labels = test_label_df['target'].tolist()

  # Check if GPU is available, otherwise use CPU
  device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

  # Define data transformations for training and test sets
  train_transform = transforms.Compose([
      transforms.Resize((224, 224)),
      transforms.RandomHorizontalFlip(),  # randomly flip the image horizontally
      transforms.RandomRotation(10),  # randomly rotate the image by up to 10 degrees
      transforms.ToTensor(),  # convert the image to a PyTorch tensor
      transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
  ])
  test_transform = transforms.Compose([
      transforms.Resize((224, 224)),
      transforms.RandomHorizontalFlip(),  # randomly flip the image horizontally
      transforms.RandomRotation(10),  # randomly rotate the image by up to 10 degrees
      transforms.ToTensor(),  # convert the image to a PyTorch tensor
      transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
  ])

  # Create custom datasets for training and test sets
  train_dataset = CustomDataset(root_dir='SampleDataset/trainset', targets=train_labels, transform=train_transform)
  test_dataset = CustomDataset(root_dir='SampleDataset/testset', targets=test_labels, transform=test_transform)

  # Create data loaders to load the data in batches
  train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=8)
  test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=16, shuffle=False, num_workers=8)

  # This class defines a single residual block within the ResNet architecture.
  # Residual blocks are the key building blocks of ResNet and help mitigate the vanishing gradient problem by introducing shortcut connections.
  # The class defines two convolutional layers, each followed by batch normalization and ReLU activation.
  # The shortcut connection handles the residual mapping when the input and output dimensions are different due to stride changes.
  class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
      super(ResidualBlock, self).__init__()

      # First convolutional layer with optional stride
      self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
      self.bn1 = nn.BatchNorm2d(out_channels)

      # Second convolutional layer
      self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
      self.bn2 = nn.BatchNorm2d(out_channels)

      # Shortcut connection for the residual
      self.shortcut = nn.Sequential()
      if stride != 1 or in_channels != out_channels:
          self.shortcut = nn.Sequential(
              nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
              nn.BatchNorm2d(out_channels)
          )

      # Initialize weights using Kaiming initialization
      for m in self.modules():
          if isinstance(m, nn.Conv2d):
              init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='relu')

    def forward(self, x):
      residual = x

      # First convolutional layer followed by batch normalization and ReLU
      out = F.relu(self.bn1(self.conv1(x)))

      # Second convolutional layer followed by batch normalization
      out = self.bn2(self.conv2(out))

      # Add the shortcut connection to the residual
      out += self.shortcut(residual)

      # Apply ReLU activation
      out = F.relu(out)
      return out


  # This class defines the complete Residual Neural Network (ResNet) architecture for regression tasks.
  # It includes the initial convolutional layer, multiple stages of residual blocks, global average pooling, and a fully connected layer for regression output.
  # The method implements the forward pass of the entire ResNet architecture.
  # It applies each layer sequentially, from the initial convolutional layer to the fully connected output layer.
  # The global average pooling operation is used to reduce the spatial dimensions before the final regression prediction.
  class ResNet(nn.Module):
    def __init__(self, num_layers=2, num_channels=64, num_classes=1):
      super(ResNet, self).__init__()

      # Initialize the number of input channels for the first convolutional layer
      self.in_channels = num_channels

      # First convolutional layer
      self.conv1 = nn.Conv2d(3, num_channels, kernel_size=3, stride=1, padding=1, bias=False)
      self.bn1 = nn.BatchNorm2d(num_channels)

      # Create layers for each stage of the network using the make_layer method
      self.layer1 = self.make_layer(64, num_layers)
      self.layer2 = self.make_layer(128, num_layers, stride=2)
      self.layer3 = self.make_layer(256, num_layers, stride=2)
      self.layer4 = self.make_layer(512, num_layers, stride=2)

      # Global average pooling and fully connected layer for classification/regression
      self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
      self.fc = nn.Linear(512, num_classes)

      # Initialize weights using Kaiming initialization for convolutional layers
      for m in self.modules():
        if isinstance(m, nn.Conv2d):
          init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='relu')

    def make_layer(self, out_channels, num_blocks, stride=1):
      layers = []

      # Create the first residual block for each stage with a possible stride
      layers.append(ResidualBlock(self.in_channels, out_channels, stride))
      self.in_channels = out_channels

      # Create additional residual blocks for the stage
      for _ in range(1, num_blocks):
        layers.append(ResidualBlock(out_channels, out_channels))

      # Construct a sequential container with the layers
      return nn.Sequential(*layers)

    def forward(self, x):
      # Apply the initial convolutional layer followed by batch normalization and ReLU
      out = F.relu(self.bn1(self.conv1(x)))

      # Pass through each stage of the network
      out = self.layer1(out)
      out = self.layer2(out)
      out = self.layer3(out)
      out = self.layer4(out)

      # Apply global average pooling to reduce spatial dimensions to 1x1
      out = self.avg_pool(out)

      # Reshape the tensor and apply the fully connected layer
      out = out.view(out.size(0), -1)
      out = self.fc(out)
      return out

  # Initialize the ResNet model
  net = ResNet().to(device)

  # Define the loss function and optimizer
  criterion = nn.L1Loss()
  optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

  # Train the model using the trainloop function
  trainloop(train_loader)

Epoch 1/10 - Loss: 0.8546 
Unexpected exception formatting exception. Falling back to standard exception


Traceback (most recent call last):
Traceback (most recent call last):
  File "/Users/autumnstar/opt/anaconda3/envs/Python_Tutorials/lib/python3.11/multiprocessing/queues.py", line 239, in _feed
    reader_close()
  File "/Users/autumnstar/opt/anaconda3/envs/Python_Tutorials/lib/python3.11/multiprocessing/connection.py", line 177, in close
    self._close()
  File "/Users/autumnstar/opt/anaconda3/envs/Python_Tutorials/lib/python3.11/multiprocessing/connection.py", line 360, in _close
    _close(self._handle)
OSError: [Errno 9] Bad file descriptor
  File "/Users/autumnstar/opt/anaconda3/envs/Python_Tutorials/lib/python3.11/multiprocessing/queues.py", line 239, in _feed
    reader_close()
  File "/Users/autumnstar/opt/anaconda3/envs/Python_Tutorials/lib/python3.11/multiprocessing/connection.py", line 177, in close
    self._close()
  File "/Users/autumnstar/opt/anaconda3/envs/Python_Tutorials/lib/python3.11/multiprocessing/connection.py", line 360, in _close
    _close(self._handle)
OSErr

In [None]:
def r_squared(y_pred, y_true):
  # Calculate the total sum of squares and the residual sum of squares
  total_sum_of_squares = torch.sum((y_true - torch.mean(y_true)) ** 2)
  residual_sum_of_squares = torch.sum((y_true - y_pred) ** 2)

  # Calculate the R-squared value
  r2 = 1 - (residual_sum_of_squares / total_sum_of_squares)

  return r2


In [None]:
# Set the network to evaluation mode
net.eval()

# Initialize empty lists to store true and predicted labels
true_labels = []
predicted_labels = []

# Use torch.no_grad() to disable gradient calculation during testing
with torch.no_grad():
  # Iterate over the test data
  for inputs, labels in test_loader:
    # Move input data and labels to the specified device and convert to float tensors
    inputs = inputs.to(device).float()
    labels = labels.to(device).float()

    # Perform a forward pass through the network to get predicted outputs
    outputs = net(inputs)

    # Append true labels and predicted labels to their respective lists
    true_labels += labels.tolist()
    predicted_labels += outputs.squeeze().tolist()


In [None]:
"""
This code block calculates and prints three regression evaluation metrics:

1. Mean Absolute Error (MAE):
The average absolute difference between the predicted and true labels.

2. Mean Squared Error (MSE):
The average of the squared differences between predicted and true labels.

3. R-squared (R^2):
The proportion of variance in the dependent variable that is explained by the independent variable(s).

"""
# Convert the predicted_labels and true_labels lists to PyTorch tensors
predicted_labels = torch.tensor(predicted_labels)
true_labels = torch.tensor(true_labels)

# Calculate Mean Absolute Error (MAE)
mae = mean_absolute_error(true_labels, predicted_labels)
print("Mean Absolute Error (MAE):", mae)

# Calculate Mean Squared Error (MSE)
mse = mean_squared_error(true_labels, predicted_labels)
print("Mean Squared Error (MSE):", mse)

# Calculate R-squared (R^2)
r2 = r_squared(predicted_labels, true_labels)
print("R-squared (R^2):", r2)




In [None]:
"""
This code block generates plots for visualizing the model's performance:

1. Training Loss Curve:
Plots the loss values over iterations during training.
It helps visualize the convergence and progress of the training process.

2. Model Prediction Residuals:
Creates a scatter plot of true labels against residuals (differences between true and predicted labels).
This plot helps analyze the distribution and patterns of prediction errors.

3. Regression Line:
Calculates and plots the regression line by fitting a linear regression model to the true and
predicted labels.
This helps visualize how well the predicted labels align with the true labels.

"""

# Calculate the residuals as the difference between true_labels and predicted_labels
residuals = np.array(true_labels) - np.array(predicted_labels)

# Plot the training loss curve
plt.plot(loss_list)
plt.xlabel('Iterations')
plt.ylabel('Loss')
plt.title('Training Loss Curve')
plt.show()

# Plot the residuals
plt.scatter(true_labels, residuals, s=5)
plt.xlabel('True Labels')
plt.ylabel('Prediction Residuals')
plt.title('Model Prediction Residuals')
plt.show()

# Calculate and plot regression line
regression_line = np.polyfit(true_labels, predicted_labels, 1)
regression_line_fn = np.poly1d(regression_line)
plt.scatter(true_labels, predicted_labels, s=5)
plt.plot(true_labels, regression_line_fn(true_labels), color='red')
plt.xlabel('True Labels')
plt.ylabel('Predicted Labels')
plt.title('Regression Line')
plt.show()