In [1]:
import os, sys
import csv
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision
import random
from torch.utils.data import DataLoader
from PIL import Image
import pytorch_lightning as pl


def set_all_seeds(seed):
    os.environ["PL_GLOBAL_SEED"] = str(seed)
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    
# Hyperparameters
in_channel = 3
num_classes = 2
learning_rate = 1e-3
batch_size = 32
num_epochs = 10
random_seed = 1

set_all_seeds(random_seed)

# Find folder paths

base_path = r"C:/Users/kaspe/OneDrive - Aarhus Universitet/Skrivebord/BI/4. semester/Data/LBST/Danish Challenge/2023 J#/"
target_dirs = os.listdir(base_path)
print(target_dirs)

['All', 'Approved', 'NonApproved', 'test_set.csv', 'train_set.csv', 'valid_set.csv', 'xlbst.csv']


In [None]:
# Create 2 separate df's: one for Approved images, one for NonApproved, containing file_name and label

# Assign label = 0 to Approved images
approved = pd.DataFrame(data = os.listdir(os.path.join(base_path, target_dirs[1])), columns = ['file_name'])
approved = approved.assign(label = 0)

# Assign label = 1 to NonApproved images
nonapproved = pd.DataFrame(data = os.listdir(os.path.join(base_path, target_dirs[2])), columns = ['file_name'])
nonapproved = nonapproved.assign(label = 1)

# Merge into 1 df
df = pd.concat([approved, nonapproved])

# Add parcel_id column containing character 3-10 from file_name column
df['parcel_id'] = df['file_name'].str[3:10]

# Write .csv
df.to_csv(r'C:/Users/kaspe/OneDrive - Aarhus Universitet/Skrivebord/BI/4. semester/Data/LBST/Danish Challenge/2023 J#/xlbst.csv', sep = ',', encoding='utf-8', index=False)

In [None]:
# Split group by

set_all_seeds(random_seed)

from sklearn.model_selection import GroupShuffleSplit

splitter = GroupShuffleSplit(test_size=0.20, n_splits=2, random_state=random_seed)
split = splitter.split(df, groups=df.parcel_id)
train_inds, test_inds = next(split)

train_set = df.iloc[train_inds]
test_set = df.iloc[test_inds]


print(test_set.to_string())
print('Test set length:',len(test_set), 'Train set length:', len(train_set), sep='\n')

In [None]:
# Split train into 80/20 train/valid

train_set2 = train_set
splitter2 = GroupShuffleSplit(test_size=0.2, n_splits =1, random_state=random_seed)
split2 = splitter2.split(train_set2, groups=train_set2.parcel_id)
train_inds2, valid_inds = next(split2)

train_set2 = train_set.iloc[train_inds2]
valid_set = train_set.iloc[valid_inds]

# Print train, valid, test
print(train_set.to_string())
print(train_set2)
print(valid_set)
print(len(train_set), len(train_set2), len(valid_set))

# Save to csv

train_set2.to_csv(r'C:/Users/kaspe/OneDrive - Aarhus Universitet/Skrivebord/BI/4. semester/Data/LBST/Danish Challenge/2023 J#/train_set.csv', sep = ',', encoding='utf-8', index=False)
valid_set.to_csv(r'C:/Users/kaspe/OneDrive - Aarhus Universitet/Skrivebord/BI/4. semester/Data/LBST/Danish Challenge/2023 J#/valid_set.csv', sep = ',', encoding='utf-8', index=False)
test_set.to_csv(r'C:/Users/kaspe/OneDrive - Aarhus Universitet/Skrivebord/BI/4. semester/Data/LBST/Danish Challenge/2023 J#/test_set.csv', sep = ',', encoding='utf-8', index=False)

In [None]:
# https://www.youtube.com/watch?v=ZoZHd0Zm3RY&ab_channel=AladdinPersson

set_all_seeds(random_seed)

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Create PyTorch compatible datasets: From images to tensors

from customDataset import LBSTDataset
train_set = LBSTDataset(csv_file = r'C:/Users/kaspe/OneDrive - Aarhus Universitet/Skrivebord/BI/4. semester/Data/LBST/Danish Challenge/2023 J#/train_set.csv', 
                      root_dir = r'C:/Users/kaspe/OneDrive - Aarhus Universitet/Skrivebord/BI/4. semester/Data/LBST/Danish Challenge/2023 J#/All',
                      transform = transforms.Compose([
                                                transforms.Resize((224, 224)),
                                                #transforms.RandomHorizontalFlip(),
                                                #transforms.RandomResizedCrop(224),
                                                transforms.ToTensor()#,
                                                #transforms.Normalize()
                      ]))

valid_set = LBSTDataset(csv_file = r'C:/Users/kaspe/OneDrive - Aarhus Universitet/Skrivebord/BI/4. semester/Data/LBST/Danish Challenge/2023 J#/valid_set.csv', 
                      root_dir = r'C:/Users/kaspe/OneDrive - Aarhus Universitet/Skrivebord/BI/4. semester/Data/LBST/Danish Challenge/2023 J#/All',
                      transform = transforms.Compose([
                                                transforms.Resize((224, 224)),
                                                #transforms.RandomHorizontalFlip(),
                                                #transforms.RandomResizedCrop(224),
                                                transforms.ToTensor()#,
                                                #transforms.Normalize()
                      ]))

test_set = LBSTDataset(csv_file = r'C:/Users/kaspe/OneDrive - Aarhus Universitet/Skrivebord/BI/4. semester/Data/LBST/Danish Challenge/2023 J#/test_set.csv', 
                      root_dir = r'C:/Users/kaspe/OneDrive - Aarhus Universitet/Skrivebord/BI/4. semester/Data/LBST/Danish Challenge/2023 J#/All',
                      transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor()
]))

len(train_set)
print(train_set)
print('The shape of tensor for 50th image in train dataset: ',train_set[49][0].shape)
print('The label for 50th image in train dataset: ',train_set[49][1])

In [None]:
set_all_seeds(random_seed)

# Train, valid and test loader

train_loader = DataLoader(dataset=train_set,
                          batch_size=batch_size,
                          shuffle=True)

valid_loader = DataLoader(dataset=valid_set,
                          batch_size=batch_size,
                          shuffle=True)

test_loader = DataLoader(dataset=test_set,
                         batch_size=batch_size,
                         shuffle=True)

# print batch of image tensor
print('print 1st batch of image tensor:', next(iter(train_loader))[0].shape)
print('print batch of corresponding labels:', next(iter(train_loader))[1].shape)

In [None]:
class NN(nn.Module):
    def __init__(self, input_size, num_classes):
        super().__init__()
        self.fc1 = nn.Linear(input_size, 50)
        self.fc2 = nn.Linear(50, num_classes)
        
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x
    
class NN(pl.LightningModule): # pl.LightningModule inherits from nn.Module and adds extra functionality
    def __init__(self, input_size, num_classes):
        super().__init__()
        self.fc1 = nn.Linear(input_size, 50)
        self.fc2 = nn.Linear(50, num_classes)
        self.loss_fn = nn.CrossEntropyLoss()
        
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x
    
    def training_step(self, batch, batch_idx):
        x, y = batch
        x = x.reshape(x.size(0), -1)
        scores = self.forward(x) # compute scores
        loss = F.loss_fn(scores, y) # compute losses
        self.log('train_loss', loss)
        return loss
    
    def validation_step(self, batch, batch_idx):
        x, y = batch
        x = x.reshape(x.size(0), -1)
        scores = self.forward(x) # compute scores
        loss = F.loss_fn(scores, y) # compute losses
        self.log('val_loss', loss)
        return loss
    
    def test_step(self, batch, batch_idx):
        x, y = batch
        x = x.reshape(x.size(0), -1)
        scores = self.forward(x) # compute scores
        loss = F.loss_fn(scores, y) # compute losses
        self.log('test_loss', loss)
        return loss
    
    def predict_step(self, batch, batch_idx):
        x, y = batch
        x = x.reshape(x.size(0), -1)
        scores = self.forward(x)
        preds = torch.argmax(scores, dim=1)
        return preds
    
    def configure_optimizers(self):
        return optim.Adam(self.parameters(), lr=0.001)

In [None]:
# Reduce code length by creating a _common_step function, in order not to have to repeat code in training_step, validation_step and test_step


class NN(pl.LightningModule): # pl.LightningModule inherits from nn.Module and adds extra functionality
    def __init__(self, input_size, num_classes): # In the constructor, you declare all the layers you want to use.
        super().__init__()
        self.fc1 = nn.Linear(input_size, 50)
        self.fc2 = nn.Linear(50, num_classes)
        self.loss_fn = nn.CrossEntropyLoss()
        
    def forward(self, x): # Forward function computes output Tensors from input Tensors. In the forward function, you define how your model is going to be run, from input to output. We're accepting only a single input in here, but if you want, feel free to use more
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

    def training_step(self, batch, batch_idx):
        loss, scores, y = self._common_step(batch, batch_idx)
        self.log('train_loss', loss)
        return loss
    
    def validation_step(self, batch, batch_idx):
        loss, scores, y = self._common_step(batch, batch_idx)
        self.log('val_loss', loss)
        return loss
    
    def test_step(self, batch, batch_idx):
        loss, scores, y = self._common_step(batch, batch_idx)
        self.log('test_loss', loss)
        return loss
    
    def _common_step(self, batch, batch_idx):
        x, y = batch
        x = x.reshape(x.size(0), -1)
        scores = self.forward(x)
        loss = self.loss_fn(scores, y)
        return loss, scores, y
    
    def predict_step(self, batch, batch_idx):
        x, y = batch
        x = x.reshape(x.size(0), -1)
        scores = self.forward(x)
        preds = torch.argmax(scores, dim=1)
        return preds
    
    def configure_optimizers(self):
        return optim.Adam(self.parameters(), lr=0.001)