In [1]:
import torch

# Rely on transformation functions in torch
import torchvision.transforms as transforms
from torchvision.io import read_image
import pandas as pd # for data manipulation
import torch.nn as nn # nn: neural network module
import os
import warnings
import numpy as np
import seaborn as sns # to visualize data
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import confusion_matrix, f1_score, accuracy_score
from torchmetrics.classification import MulticlassF1Score

# Note: if there is no nvidia gpu, set cuda:0 to cpu
warnings.filterwarnings('ignore')
DEVICE = 'cuda:0'



In [2]:
class CustomImageDataset(Dataset):
    def __init__(self, annotations_file, img_dir, transform=None, target_transform=None, filter_fn=None):
        
        # Read images using pandas
        self.img_labels = pd.read_csv(annotations_file, sep=' ', header=None)
        if filter_fn is not None:
            self.img_labels = self.img_labels[self.img_labels[19] == filter_fn]
        
        # Save image directory
        self.img_dir = img_dir
        
        # Saving transformation
        self.transform = transform
        self.target_transform = target_transform

    # Return the number of images 
    def __len__(self):
        return len(self.img_labels)
    
    # Constructs the image path using 
    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_labels.iloc[idx, 0])
        image = read_image(img_path)
        label = self.img_labels.iloc[idx, 2]
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        cat = {'mono': 0, 'poly': 1, np.nan: 3}[self.img_labels.iloc[idx, 19]]
        return image, label, cat

In [3]:
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride = 1, downsample = None):
        # forced to call the constructor of the parent class to get ResidualBlock
        # working as a pytorch module
        super(ResidualBlock, self).__init__()
        
        # Self.conv1 is the main computation
        # 3x3 convolution batch normalization and ReLU 
        self.conv1 = nn.Sequential(
                        nn.Conv2d(in_channels, out_channels, kernel_size = 3, stride = stride, padding = 1),
                        nn.BatchNorm2d(out_channels),
                        nn.ReLU()) # Using ReLU to fix vanishing gradient problem
        
        # Also a 3x3 convolution batch but takes in conv1's output to refine conv1's features
        self.conv2 = nn.Sequential(
                        nn.Conv2d(out_channels, out_channels, kernel_size = 3, stride = 1, padding = 1),
                        nn.BatchNorm2d(out_channels))
        
        self.downsample = downsample # Optional downsample
        self.relu = nn.ReLU()
        self.out_channels = out_channels

    # Transform input data into output predictions     
    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.conv2(out)
        if self.downsample:
            residual = self.downsample(x)
        out += residual
        out = self.relu(out)
        return out

In [4]:
class ResNet(nn.Module):
    def __init__(self, block, layers):
        super(ResNet, self).__init__()
        self.inplanes = 64
        self.conv1 = nn.Sequential(
                        nn.Conv2d(3, 64, kernel_size = 7, stride = 2, padding = 3),
                        nn.BatchNorm2d(64),
                        nn.ReLU())
        self.maxpool = nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)

        # Constructed 4 residual layers
        self.layer0 = self._make_layer(block, 64, layers[0], stride = 1)
        self.layer1 = self._make_layer(block, 128, layers[1], stride = 2)
        self.layer2 = self._make_layer(block, 256, layers[2], stride = 2)
        self.layer3 = self._make_layer(block, 512, layers[3], stride = 2)
        self.avgpool = nn.AvgPool2d(7, stride=1)
        self.fc = nn.Linear(512, 4)
        
    def _make_layer(self, block, planes, blocks, stride=1):
        downsample = None
        if stride != 1 or self.inplanes != planes:
            
            downsample = nn.Sequential(
                nn.Conv2d(self.inplanes, planes, kernel_size=1, stride=stride),
                nn.BatchNorm2d(planes),
            )
        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample))
        self.inplanes = planes
        for i in range(1, blocks):
            layers.append(block(self.inplanes, planes))

        return nn.Sequential(*layers)
    
    
    def forward(self, x):
        x = self.conv1(x)
        x = self.maxpool(x)
        x = self.layer0(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)

        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        m_x = torch.softmax(self.fc(x), dim=-1)

        return m_x

In [None]:
dataset = CustomImageDataset('/kaggle/input/elpv-dataset-master/labels.csv', '/kaggle/input/elpv-dataset-master/images', transform=transforms.Compose([
                transforms.ToPILImage(), 
                transforms.Grayscale(num_output_channels=3),
                transforms.Resize((224, 224)),
                transforms.ToTensor()
            ]),
                            target_transform=lambda x: int(x * 3))
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [int(len(dataset) * 0.75), int(len(dataset) * 0.25)])
train_dataloader = DataLoader(train_dataset, batch_size=128, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=128, shuffle=False)

# Create an instance of the resNet model which is moved to DEVICE (idk how to get this working for AMD)
model = ResNet(ResidualBlock, [3, 4, 3, 4]).to(DEVICE)

# Learning rate (common starting point, too high and the model might fail coverage)
lr = 0.001
# Set to 100 passes through entire dataset
epochs = 100

loss = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=lr, weight_decay = 0.001, momentum=0.9) 

In [6]:
def process(model, loss_fn, optimizer, epochs, train_dataloader, test_datalodaer):
    for epoch in range(epochs):
        metric = MulticlassF1Score(num_classes=4, average='macro').to('cuda:0')
        steps = 0
        acc_loss = 0.0
        for image, label, cat in train_dataloader:
            image = image.to('cuda:0')
            label = label.to('cuda:0')
            optimizer.zero_grad()
            outputs = model(image)
            loss = loss_fn(outputs, label)
            loss.backward()
            optimizer.step()
            steps += 1
            acc_loss += loss.item()
        print(f'Epoch: {epoch} | Train Loss: {acc_loss / steps}')
        
        with torch.no_grad():
            steps = 0
            acc_loss = 0.0
            for image, label, cat in test_dataloader:
                image = image.to('cuda:0')
                label = label.to('cuda:0')
                outputs = model(image)
                loss = loss_fn(outputs, label)
                steps += 1
                acc_loss += loss.item()
                metric.update(torch.argmax(outputs, dim=-1), label)
            print(f'Epoch: {epoch} | Test Loss: {acc_loss / steps} | Test F1: {metric.compute()}')

        

In [7]:
process(model, loss, optimizer, epochs, train_dataloader, test_dataloader)

Epoch: 0 | Train Loss: 1.2607151418924332
Epoch: 0 | Test Loss: 1.1511670748392742 | Test F1: 0.22577610611915588
Epoch: 1 | Train Loss: 1.110164724290371
Epoch: 1 | Test Loss: 1.1081768075625102 | Test F1: 0.31094592809677124
Epoch: 2 | Train Loss: 1.0787195712327957
Epoch: 2 | Test Loss: 1.0914395054181416 | Test F1: 0.3334586024284363
Epoch: 3 | Train Loss: 1.0681684985756874
Epoch: 3 | Test Loss: 1.079145888487498 | Test F1: 0.34498846530914307
Epoch: 4 | Train Loss: 1.05283909663558
Epoch: 4 | Test Loss: 1.0678186615308125 | Test F1: 0.35438072681427
Epoch: 5 | Train Loss: 1.04617602750659
Epoch: 5 | Test Loss: 1.0602370301882427 | Test F1: 0.35645750164985657
Epoch: 6 | Train Loss: 1.0394245125353336
Epoch: 6 | Test Loss: 1.056873857975006 | Test F1: 0.36235252022743225
Epoch: 7 | Train Loss: 1.0375838726758957
Epoch: 7 | Test Loss: 1.0587756236394246 | Test F1: 0.3534063994884491
Epoch: 8 | Train Loss: 1.0264788568019867
Epoch: 8 | Test Loss: 1.0374229152997334 | Test F1: 0.3777

In [8]:
with torch.no_grad():
    all_preds = []
    all_truth = []
    mono_preds = []
    mono_truth = []
    poly_preds = []
    poly_truth = []
    for image, label, cat in test_dataloader:
        image = image.to(DEVICE)
        label = label.to(DEVICE)
        outputs = model(image)
        preds = torch.argmax(outputs, dim=-1)
        all_preds += preds.reshape(-1).tolist()
        all_truth += label.reshape(-1).tolist()
        for idx, c in enumerate(cat):
            if c == 0:
                mono_preds.append(preds[idx].cpu().item())
                mono_truth.append(label[idx].cpu().item())
            elif c == 1:
                poly_preds.append(preds[idx].cpu().item())
                poly_truth.append(label[idx].cpu().item())
                
acc = accuracy_score(all_truth, all_preds)
f1 = f1_score(all_truth, all_preds, average='macro')
cm = confusion_matrix(all_truth, all_preds)

print('All Result:')
print('Accurcay is: ', acc)
print('F1 Score is: ', f1)
print('Confusion Matrix is: \n', cm)
print()
                

acc = accuracy_score(poly_truth, poly_preds)
f1 = f1_score(poly_truth, poly_preds, average='macro')
cm = confusion_matrix(poly_truth, poly_preds)

print('Poly Result:')
print('Accurcay is: ', acc)
print('F1 Score is: ', f1)
print('Confusion Matrix is: \n', cm)
print()
        
        
acc = accuracy_score(mono_truth, mono_preds)
f1 = f1_score(mono_truth, mono_preds, average='macro')
cm = confusion_matrix(mono_truth, mono_preds)

print('Mono Result:')
print('Accurcay is: ', acc)
print('F1 Score is: ', f1)
print('Confusion Matrix is: \n', cm)
print()

All Result:
Accurcay is:  0.6722560975609756
F1 Score is:  0.3572219531880549
Confusion Matrix is: 
 [[317   0   0  45]
 [ 64   0   0  13]
 [ 24   0   0  10]
 [ 59   0   0 124]]

Poly Result:
Accurcay is:  0.8102409638554217
F1 Score is:  0.7764978682025582
Confusion Matrix is: 
 [[199  26]
 [ 37  70]]

Mono Result:
Accurcay is:  0.8075117370892019
F1 Score is:  0.7884088871659437
Confusion Matrix is: 
 [[118  19]
 [ 22  54]]

