In [None]:
import torch
from torchvision import transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

In [None]:
import os
from torchvision.io import read_image, ImageReadMode
from torch.utils.data import Dataset


# Create Custom Dataset
class CustomDataset(Dataset):
    def __init__(self, data_dir, csv_file, transform=None):
        self.data_dir = data_dir
        self.csv_file = csv_file
        self.transform = transform
        self.data = csv_file
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        img_path = os.path.join(self.data_dir, self.data['Filename'][idx])
        image = read_image(img_path, mode=ImageReadMode.RGB)
        
        if self.transform:
            # ByteTensor to FloatTensor, scaling is automatically applied
            image = transforms.functional.convert_image_dtype(image)
            image = self.transform(image)
        
        label = self.data['Label'][idx]
        return image, label





In [None]:
data_dir = 'Deepweed Images'
img_height = 256
img_width = 256
batch_size = 32
num_channels = 3 

# Load CSV files
train_df = pd.read_csv('labels/train.csv')
val_df = pd.read_csv('labels/val.csv')
test_df = pd.read_csv('labels/test.csv')

# Define the transformations
transform = transforms.Compose([
    transforms.Resize((img_height, img_width)),
])

train_transform = transforms.Compose([
    transforms.Resize((img_height, img_width)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomVerticalFlip(p=0.5),
    transforms.RandomRotation(degrees=45),
    transforms.RandomGrayscale(p=0.1)
])

train_dataset = CustomDataset(data_dir, train_df, train_transform)
val_dataset = CustomDataset(data_dir, val_df, transform)
test_dataset = CustomDataset(data_dir, test_df, transform)



In [None]:
image_datasets = {'train': train_dataset, 'val': val_dataset}
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}

dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=4, shuffle=True)
              for x in ['train', 'val']}
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
import random

X_train, y = train_dataset[random.randint(0, len(train_dataset) - 1)]
transforms.functional.to_pil_image(X_train, mode="RGB")

In [None]:
import torch
import torch.nn as nn

class CustomModel(nn.Module):
    def __init__(self, img_height, img_width, num_channels):
        super(CustomModel, self).__init__()

        # Block 1
        self.block1 = nn.Sequential(
            nn.Conv2d(num_channels, 32, kernel_size=3, stride=2),
            nn.Conv2d(32, 64, kernel_size=3, stride=2),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )

        # Block 2
        self.block2 = nn.Sequential(
            nn.Conv2d(64, 64, kernel_size=3),
            nn.Conv2d(64, 128, kernel_size=3),
            nn.BatchNorm2d(128),
            nn.ReLU()
        )

        # Block 3
        self.block3 = nn.Sequential(
            nn.Conv2d(128, 128, kernel_size=3),
            nn.Conv2d(128, 128, kernel_size=3),
            nn.BatchNorm2d(128),
            nn.ReLU()
        )

        # Block 4
        self.block4 = nn.Sequential(
            nn.Conv2d(128, 128, kernel_size=3,stride=2),
            nn.Conv2d(128, 64, kernel_size=3, stride=2),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )

        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(256, 128)
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(128, 9)

    def forward(self, x):
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.block4(x)
        x = self.flatten(x)
        x = self.fc1(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x

# Create an instance of the CustomModel
custom_model = CustomModel(img_height, img_width, num_channels)
print(custom_model)


In [None]:
from torchinfo import summary 

summary(custom_model)

In [None]:
custom_model = CustomModel(img_height, img_width, num_channels).to(device)

In [None]:
import time 
from tempfile import TemporaryDirectory
from tqdm import tqdm

def train_model(model, criterion, optimizer, scheduler, num_epochs=25):
    since = time.time()

    # Create a temporary directory to save training checkpoints
    with TemporaryDirectory() as tempdir:
        best_model_params_path = os.path.join(tempdir, 'best_model_params.pt')

        torch.save(model.state_dict(), best_model_params_path)
        best_acc = 0.0

        for epoch in tqdm(range(num_epochs), desc='Epochs'):
            # Each epoch has a training and validation phase
            for phase in ['train', 'val']:
                if phase == 'train':
                    model.train()  # Set model to training mode
                else:
                    model.eval()   # Set model to evaluate mode

                running_loss = 0.0
                running_corrects = 0

                # Iterate over data.
                for inputs, labels in tqdm(dataloaders[phase], desc=phase.capitalize(), leave=False):
                    inputs = inputs.to(device)
                    labels = labels.to(device)

                    # zero the parameter gradients
                    optimizer.zero_grad()

                    # forward
                    # track history if only in train
                    with torch.set_grad_enabled(phase == 'train'):
                        outputs = model(inputs)
                        _, preds = torch.max(outputs, 1)
                        loss = criterion(outputs, labels)

                        # backward + optimize only if in training phase
                        if phase == 'train':
                            loss.backward()
                            optimizer.step()

                    # statistics
                    running_loss += loss.item() * inputs.size(0)
                    running_corrects += torch.sum(preds == labels.data)
                    
                if phase == 'train':
                    scheduler.step()

                epoch_loss = running_loss / dataset_sizes[phase]
                epoch_acc = running_corrects.double() / dataset_sizes[phase]
                
                print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

                # deep copy the model
                if phase == 'val' and epoch_acc > best_acc:
                    best_acc = epoch_acc
                    torch.save(model.state_dict(), best_model_params_path)

            print()

        time_elapsed = time.time() - since
        print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
        print(f'Best val Acc: {best_acc:4f}')

        # load best model weights
        model.load_state_dict(torch.load(best_model_params_path))
    return model

In [None]:
from torch.optim import lr_scheduler, Adam

criterion = nn.CrossEntropyLoss()
learning_rate = 0.001


In [None]:
optimizer = Adam(custom_model.parameters(), lr=learning_rate)
exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)
trained_custom_model = train_model(custom_model, criterion, optimizer, exp_lr_scheduler, num_epochs=50)

In [None]:
custom_model_preds = []
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in iter(test_loader):
        images = images.to(device)
        
        outputs = trained_custom_model(images)
        _, predicted = torch.max(outputs.data, 1)
        predicted = predicted.tolist()
        for item in predicted:
            custom_model_preds.append(item)
        del images, labels, outputs
        

In [None]:
from sklearn.metrics import classification_report

print(classification_report(y_true=test_df['Label'], y_pred=custom_model_preds))

In [None]:
# Transfer Learning
import torchvision.models as models

# Define ResNet50 model
resnet18 = models.resnet18(pretrained=True)
resnet18 = nn.Sequential(*list(resnet18.children())[:-2])  # Remove final fully connected layers

for param in resnet18.parameters():
    param.requires_grad = False

class CustomResNetModel(nn.Module):
    def __init__(self):
        super(CustomResNetModel, self).__init__()
        self.resnet18 = resnet18
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(32768, 128)
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(128, 9)

    def forward(self, x):
        x = self.resnet18(x)
        x = self.flatten(x)
        x = self.fc1(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x

# Create an instance of the CustomResNetModel
resnet_model = CustomResNetModel()

# Move the model to the same device as the input
resnet_model = resnet_model.to(device)

print(resnet_model)

In [None]:
from torchinfo import summary 

# Check if parameters are trainable
for name, param in resnet_model.named_parameters():
    if param.requires_grad:
        print(f"Parameter: {name}, Requires Gradient: {param.requires_grad}")

summary(resnet_model, input_size=(1, 3, 256, 256))

In [None]:
optimizer = Adam(resnet_model.parameters(), lr=learning_rate)
exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

trained_resnet_model = train_model(resnet_model, criterion, optimizer, exp_lr_scheduler, num_epochs=50)

In [None]:
resnet_preds = []
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in iter(test_loader):
        images = images.to(device)
        
        outputs = trained_resnet_model(images)
        _, predicted = torch.max(outputs.data, 1)
        predicted = predicted.tolist()
        for item in predicted:
            resnet_preds.append(item)
        del images, labels, outputs
        

In [None]:
print(classification_report(y_true=test_df['Label'], y_pred=resnet_preds))

In [None]:
alexnet = models.alexnet(pretrained=True)
alexnet = nn.Sequential(*list(alexnet.children())[:-1])  # Remove final fully connected layers
for param in alexnet.parameters():
    param.requires_grad = False

class CustomAlexNetModel(nn.Module):
    def __init__(self):
        super(CustomAlexNetModel, self).__init__()
        self.alexnet = alexnet
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(9216, 128)
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(128, 9)

    def forward(self, x):
        x = self.alexnet(x)
        x = self.flatten(x)
        x = self.fc1(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x

# Create an instance of the CustomAlexNetModel
alexnet_model = CustomAlexNetModel()

# Move the model to the same device as the input
alexnet_model = alexnet_model.to(device)

print(alexnet_model)


In [None]:
from torchinfo import summary

# Check if parameters are trainable
for name, param in alexnet_model.named_parameters():
    if param.requires_grad:
        print(f"Parameter: {name}, Requires Gradient: {param.requires_grad}")


summary(alexnet_model, input_size=(1, 3, 256, 256))

In [None]:
optimizer = Adam(alexnet_model.parameters(), lr=learning_rate)
exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

trained_inception_model = train_model(alexnet_model, criterion, optimizer, exp_lr_scheduler, num_epochs=50)

In [None]:
alexnet_preds = []
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in iter(test_loader):
        images = images.to(device)
        
        outputs = trained_inception_model(images)
        _, predicted = torch.max(outputs.data, 1)
        predicted = predicted.tolist()
        for item in predicted:
            alexnet_preds.append(item)
        del images, labels, outputs


In [None]:
print(classification_report(y_true=test_df['Label'], y_pred=alexnet_preds))