In [None]:
# Import necessary libraries
import os
import json
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

import torch
import torch.nn.functional as F
import torchvision.transforms as transforms
from torch import optim
from torch import nn
from torch.utils.data import DataLoader

import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
# Function to randomly shuffle the data
def shuffle_data(data, labels, SEED):
    
    np.random.seed(SEED)
    
    np.random.shuffle(data)
    np.random.shuffle(labels)
    
    return data, labels

In [None]:
# Function to split the data into training, validation, and testing set
def split_data(data, labels):
    
    # Split the data into training, validation, and testing set in ratio 80:10:10
    # Training set
    data_train = data[:int(0.8*len(data)), :, :]
    labels_train = labels[:int(0.8*len(labels))]
    
    # Validation set
    data_val = data[int(0.8*len(data)):int(0.9*len(data)), :, :]
    labels_val = labels[int(0.8*len(labels)):int(0.9*len(labels))]
    
    # Testing set
    data_test = data[int(0.9*len(data)):, :, :]
    labels_test = labels[int(0.9*len(labels)):]
        
    return data_train, labels_train, data_val, labels_val, data_test, labels_test

In [None]:
# CNN model
class CNN(nn.Module):
    
    # Here we are basically defining the layers of the model
    # To then use them in the forward pass
    def __init__(self, in_channels=1, num_classes=1):
        
        super(CNN, self).__init__()
        
        # First convolutional layer
        self.conv1 = nn.Conv2d(in_channels=in_channels, out_channels=8, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        
        # Note: kernel_size = (3, 3), stride = (1, 1), and padding = (1, 1) combination is called "same convolution"
        # because the input and output have the same spatial dimensions
        # Can verify using n_out = (n_in - k + 2*p)/s + 1
        
        # Max pooling layer
        # Halves the spatial dimensions
        self.pool = nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))
        
        # Second convolutional layer
        self.conv2 = nn.Conv2d(in_channels=8, out_channels=16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        
        # Fully connected layer
        # 92 because 371/2 = 185.5 -> 185/2 = 92.5 -> 92
        self.fc1 = nn.Linear(in_features=16*92*92, out_features=num_classes)
    
    # Forward pass
    def forward(self, x):
        
        # First convolutional layer
        x = F.relu(self.conv1(x))
        
        # Max pooling layer
        x = self.pool(x)
        
        # Second convolutional layer
        x = F.relu(self.conv2(x))
        
        # Max pooling layer
        x = self.pool(x)
        
        # Flatten the data
        x = x.reshape(x.shape[0], -1)
        
        # Fully connected layer
        x = self.fc1(x)
        
        return x

In [None]:
# Define directories
current_directory = os.getcwd()
parent_directory = os.path.dirname(current_directory)
grandparent_directory = os.path.dirname(parent_directory)
data_directory = os.path.join(grandparent_directory, 'data')
npy_files_directory = os.path.join(grandparent_directory, 'data', 'npy')
results_directory = os.path.join(grandparent_directory, 'results', 'binary_classification')

In [None]:
# Open the .json files with the class names
with open(os.path.join(data_directory, 'classes.json'), 'r') as file:
    classes = json.load(file)

In [None]:
# Load the .npy files
household_objects = np.load(os.path.join(npy_files_directory, 'household_objects.npy'))
animals = np.load(os.path.join(npy_files_directory, 'animals.npy'))

# Number of dataset in each class
num_household_objects = household_objects.shape[2]
print(f'Number of household objects: {num_household_objects}')

num_animals = animals.shape[2]
print(f'Number of animals: {num_animals}')

In [None]:
# No need to normalize the data since the values are already between 0 and 1
# Reshape the data
household_objects = household_objects.transpose(2, 0, 1)
animals = animals.transpose(2, 0, 1)

# Print the shape of the data
print(f'Household objects shape: {household_objects.shape}')
print(f'Animals shape: {animals.shape}')

In [None]:
# Create the labels
household_objects_labels = np.zeros(num_household_objects)
animals_labels = np.ones(num_animals)

In [None]:
# Stack the data
data = np.vstack((household_objects, animals))
labels = np.hstack((household_objects_labels, animals_labels))

In [None]:
# Randomize the data with a SEED
SEED = 42
data, labels = shuffle_data(data, labels, SEED)

In [None]:
# Split the data into training, validation, and testing set in ratio 80:10:10
data_train, labels_train, data_val, labels_val, data_test, labels_test = split_data(data, labels)

In [None]:
# Print the shape of the training, validation, and testing set
print(f'Training set shape: {data_train.shape}, {labels_train.shape}')
print(f'Validation set shape: {data_val.shape}, {labels_val.shape}')
print(f'Testing set shape: {data_test.shape}, {labels_test.shape}')

In [None]:
# Set device
device = torch.device('mps' if torch.backends.mps.is_available() else 'cuda')

In [None]:
# Instantiate the model
model = CNN().to(device)

In [None]:
# # Pass the basic tensor to see if the model is working
# x = torch.randn(64, 1, 371, 371).to(device)
# print(model(x).shape)

In [None]:
# Hyperparameters
in_channels = 1
num_classes = 2
learning_rate = 0.001
batch_size = 64
num_epochs = 2

In [None]:
# Loss and optimizer
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
# Data loaders
train_dataset = DataLoader(dataset=list(zip(data_train, labels_train)), batch_size=batch_size, shuffle=True)
val_dataset = DataLoader(dataset=list(zip(data_val, labels_val)), batch_size=batch_size, shuffle=False)
test_dataset = DataLoader(dataset=list(zip(data_test, labels_test)), batch_size=batch_size, shuffle=False)

In [None]:
# Empty lists to store the losses and accuracies
train_loss = {}
train_metrics = {
    'accuracy': {},
    'precision': {},
    'recall': {},
    'f1_score': {}
}

val_loss = {}
val_train_metrics = {
    'accuracy': {},
    'precision': {},
    'recall': {},
    'f1_score': {}
}

In [None]:
# Train the network
for epoch in range(num_epochs):
    
    # Set the model to training mode
    model.train()
    
    # Append an empty list to store the loss per batch
    train_loss[f'Epoch {epoch+1}'] = []
    
    # Empty lists to store the predictions and labels
    train_predictions = []
    train_labels = []
    
    for batch_idx, (data, labels) in enumerate(train_dataset):
    
        # Adding unsqueeze to add the channel dimension
        data = data.unsqueeze(1).to(device)
        labels = labels.unsqueeze(1).float().to(device) # Convert to float32
        
        # Forward pass
        scores = model(data)
        loss = criterion(scores, labels)
        
        # Add the loss to the train_loss_per_batch dictionary
        train_loss[f'Epoch {epoch+1}'].append(loss.item())
        
        # Store predictions and labels
        # If the prediction is greater than 0.5, then it is a household object class set as 0
        # If the prediction is less than 0.5, then it is a animal class set as 1
        predictions = torch.sigmoid(scores)
        predictions = (predictions > 0.5).float()
        
        train_predictions.append(predictions.cpu().detach().numpy())
        train_labels.append(labels.cpu().detach().numpy())
        
        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        
        # Gradient descent
        optimizer.step()
    
    # Calculate the metrics at the end of each epoch
    train_predictions = np.vstack(train_predictions)
    train_labels = np.vstack(train_labels)
    
    train_metrics['accuracy'][f'Epoch {epoch+1}'] = accuracy_score(train_labels, train_predictions)
    train_metrics['precision'][f'Epoch {epoch+1}'] = precision_score(train_labels, train_predictions)
    train_metrics['recall'][f'Epoch {epoch+1}'] = recall_score(train_labels, train_predictions)
    train_metrics['f1_score'][f'Epoch {epoch+1}'] = f1_score(train_labels, train_predictions)
    
    # Print the epoch, and loss
    print(f'Epoch {epoch+1}, Loss: {np.mean(train_loss[f"Epoch {epoch+1}"]):.3f}, F1-score: {train_metrics["f1_score"][f"Epoch {epoch+1}"]*100:.0f}%')