## Early merge with hard parameters ##

In [2]:
# imports
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from PIL import Image
from torchvision import transforms
import random
from sklearn.metrics import accuracy_score

In [None]:
class hard_dataloader(Dataset):
    def __init__(self, data_dir , train):
        self.data_dir = data_dir
        self.all_classes = ['NotSnow','Snow']
        self.class_indexes = {cls: idx for idx, cls in enumerate(self.all_classes)}
        self.dataset = self.dataset_maker(train)
        self.length = len(self.dataset)
    def dataset_maker(self, train): # this will compile all the folder paths and the corresponding classes in a tuple to be stored in the class
        data_list = []
        for class_name in self.all_classes:
            all_class_folders =os.path.join(self.data_dir, class_name)
            for fold in os.listdir(all_class_folders):
                if fold == ".DS_Store":
                    continue
                sample_path_n_class = os.path.join(all_class_folders, fold)
                sample = (sample_path_n_class, self.class_indexes[class_name])
                data_list.append(sample)
        random.shuffle(data_list)
        if train:
            return data_list[:int(0.8*len(data_list))]
        else:
            return data_list[int(0.8*len(data_list)):]
    def __len__(self):
        return self.length
    
    def __getitem__(self, index):
        sample_path, sample_class = self.dataset[index]
        pic1 = transforms.ToTensor()(Image.open(os.path.join(sample_path, '0.png')).convert('L')) # image 1 in tensor
        pic2 = transforms.ToTensor()(Image.open(os.path.join(sample_path, '1.png')).convert('L')) # image 2 in tensor
        transform = transforms.Compose([
            transforms.Resize((224, 224)),
        ])
        pic1 = transform(pic1)
        pic2 = transform(pic2)
        return pic1.unsqueeze(0) , pic2.unsqueeze(0), sample_class

In [None]:
e_hard_training = hard_dataloader("./dataset/", True)
e_hard_testing = hard_dataloader("./dataset/", False)

In [None]:
sample2 = e_hard_training[321]
print(sample2[0].shape)
print(sample2[1].shape)
print(sample2[2])

In [3]:
class early_hard_NN(nn.Module):
    def __init__(self):
        super(early_hard_NN, self).__init__()
        self.convolution2 = nn. Conv2d(1,2, kernel_size=3, stride=1)
        self.convolution3 = nn. Conv2d(4, 2, kernel_size=2, stride=1)
        self.convolution4 = nn. Conv2d(2, 2, kernel_size=3, stride=1)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.pool3 = nn.MaxPool2d(kernel_size=4, stride=1)
        self.pool4 = nn.MaxPool2d(kernel_size=5, stride=4)
        self.fc1 = nn.Linear (1352, 2)
        
    def forward (self, x1,x2):
        x2 = F.relu(self.convolution2(x2))
        x2 = self.pool(x2)
        x= torch.cat((x1, x2), dim=1)
        x = F.relu(self.convolution3(x))
        x = self.pool3(x)
        x = F.relu(self.convolution4(x))
        x = self.pool4(x)
        x=torch.flatten (x, 1)
        x = self.fc1(x)
        return x

In [5]:
learning_rate3 = 0.001
epochs3 = 5

In [6]:
model3 = early_hard_NN()
criterion3 = nn.CrossEntropyLoss()
optimiser3 = optim.Adam(model3.parameters(), lr=learning_rate3)

In [7]:
# model params
total_params = sum(p.numel() for p in model3.parameters())
print(f'the total number of parameters in this model are {total_params}')

the total number of parameters in this model are 2798


In [None]:
model3.train()
for epoch in range(epochs3):
    running_loss = 0 
    predicted_list = []
    labels_list = []
    print(f'this is epoch number {epoch}')
    for i, (input1,input2, label) in enumerate(e_hard_training):
        optimiser3.zero_grad()
        labels_list.append(label)
        label = torch.tensor(label).view(-1)
        if (i % 1000 == 0):
            print(f'this is the {i} iteration')
        outputs = model3(input1, input2)
        loss = criterion3(outputs, label)
        loss.backward()
        optimiser3.step()
        _, predicted = torch.max(outputs.data, 1)
        predicted_list.append(predicted.item())
        running_loss += loss.item()
        if i % 1000 == 999:   
            print('loss: %.3f accuracy: %.3f' %( running_loss / 10, 100 * accuracy_score(labels_list, predicted_list)))
            running_loss = 0.0
            correct = 0
            total = 0

In [None]:
model3.eval()
loss = 0 
predicted_list = []
labels_list = []
with torch.no_grad():
    for i, (input1,input2, label) in enumerate(e_hard_testing):
        label = torch.tensor(label)
        label = torch.tensor(label).view(-1)
        labels_list.append(label.item())
        if (i % 1000 == 0):
            print(f'this is the {i} iteration')
        outputs = model3(input1, input2)
        loss += criterion3(outputs, label).item()
        predicted = outputs.argmax(dim = 1, keepdim = True)
        predicted_list.append(predicted.item())

In [None]:
print(f'the model accuracy is {accuracy_score(labels_list, predicted_list)}')