In [None]:
!gdown --fuzzy https://drive.google.com/file/d/1j0taLQAqvSYjArb6jm40FhzFbG1yxhRs/view?usp=sharing

Downloading...
From (original): https://drive.google.com/uc?id=1j0taLQAqvSYjArb6jm40FhzFbG1yxhRs
From (redirected): https://drive.google.com/uc?id=1j0taLQAqvSYjArb6jm40FhzFbG1yxhRs&confirm=t&uuid=3bc50f53-e48f-4896-aece-a600aa0bee19
To: /content/archive.zip
100% 753M/753M [00:10<00:00, 71.2MB/s]


In [None]:
!apt install p7zip-full -y
!7z x /content/archive.zip -o/content/dataset


In [None]:
!pip install torchsampler
!pip install torchmetrics
!pip install timm
!pip install split-folders
!pip install thop
!pip install einops
!pip install mmcv-full -f https://download.openmmlab.com/mmcv/dist/cpu/torch1.11.0/index.html

In [None]:
import os
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
#Load libraries
import numpy as np
import glob
from torch.optim import Adam
from torch.autograd import Variable
import torchvision
import pathlib
import torchvision.models as models
import torch.nn.functional as F
from sklearn.metrics import accuracy_score,f1_score,precision_score,recall_score,confusion_matrix, classification_report,roc_auc_score,roc_curve, auc
from torchsampler import ImbalancedDatasetSampler
from torch.utils.data.sampler import BatchSampler
import random
import timm
from torch.autograd import Variable
from torch.nn import Linear, ReLU, LeakyReLU, CrossEntropyLoss, Sequential, Conv2d, MaxPool2d, Module, Softmax, BatchNorm2d, Dropout
#import cupy as cp
import seaborn as sns
import torchmetrics
from torch.cuda.amp import autocast, GradScaler
from tqdm.notebook import tqdm, trange
import time
import matplotlib.pyplot as plt
import pandas as pd
from mmcv.cnn import constant_init, kaiming_init
from thop import profile
from einops import rearrange
from einops.layers.torch import Rearrange, Reduce
from timm.models.layers import trunc_normal_, DropPath
import splitfolders

In [None]:
seed = 123
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
random.seed(seed)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True

In [None]:
# Define input dataset path (adjust accordingly)
input_folder = "/content/Potato_Leaf_Disease_Dataset"

# Define output directory where split data will be saved
output_folder = "/content/data"

# Split dataset into train (70%), validation (20%), and test (10%)
splitfolders.ratio(input_folder, output=output_folder, seed=1337, ratio=(0.7, 0.2, 0.1))

# Define new folder paths
train_folder = "/content/data/train"
val_folder = "/content/data/val"
test_folder = "/content/data/test"

print("Dataset split completed!")


In [None]:
#Transforms
train_transforms = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(90),
    transforms.ToTensor(),
])

val_transforms = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
])

test_transforms = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
])

In [None]:
# Datasets
train_dataset = torchvision.datasets.ImageFolder(root=train_path, transform=train_transforms)
val_dataset = torchvision.datasets.ImageFolder(root=val_path, transform=val_transforms)
test_dataset = torchvision.datasets.ImageFolder(root=test_path, transform=test_transforms)

In [None]:
train_dataset.class_to_idx

In [None]:
class BalancedBatchSampler(torch.utils.data.sampler.Sampler):
    def __init__(self, dataset, labels=None):
        self.labels = labels
        self.dataset = dict()
        self.balanced_max = 0
        # Save all the indices for all the classes
        for idx in range(0, len(dataset)):
            label = self._get_label(dataset, idx)
            if label not in self.dataset:
                self.dataset[label] = list()
            self.dataset[label].append(idx)
            self.balanced_max = len(self.dataset[label]) \
                if len(self.dataset[label]) > self.balanced_max else self.balanced_max

        # Oversample the classes with fewer elements than the max
        for label in self.dataset:
            while len(self.dataset[label]) < self.balanced_max:
                self.dataset[label].append(random.choice(self.dataset[label]))
        self.keys = list(self.dataset.keys())
        self.currentkey = 0
        self.indices = [-1]*len(self.keys)

    def __iter__(self):
        while self.indices[self.currentkey] < self.balanced_max - 1:
            self.indices[self.currentkey] += 1
            yield self.dataset[self.keys[self.currentkey]][self.indices[self.currentkey]]
            self.currentkey = (self.currentkey + 1) % len(self.keys)
        self.indices = [-1]*len(self.keys)

    def _get_label(self, dataset, idx, labels = None):
        if self.labels is not None:
            return self.labels[idx].item()
        else:
            # Trying guessing
            dataset_type = type(dataset)
            if is_torchvision_installed and dataset_type is torchvision.datasets.MNIST:
                return dataset.train_labels[idx].item()
            elif is_torchvision_installed and dataset_type is torchvision.datasets.ImageFolder:
                return dataset.imgs[idx][1]
            else:
                raise Exception("You should pass the tensor of labels to the constructor as second argument")

    def __len__(self):
        return self.balanced_max*len(self.keys)

In [None]:
train_loader = torch.utils.data.DataLoader(dataset = train_dataset, sampler=ImbalancedDatasetSampler(train_dataset), batch_size=4)
val_loader = torch.utils.data.DataLoader(dataset = val_dataset, batch_size=4, shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset = test_dataset, batch_size=4, shuffle=True)

#train_eg_loader = torch.utils.data.DataLoader(dataset = train_eg, batch_size=1, shuffle=False)

In [None]:
is_torchvision_installed = True

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
def weights_init(m):
    if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
        nn.init.xavier_uniform_(m.weight.data)
        nn.init.zeros_(m.bias.data)

In [None]:
def last_zero_init(m):
    if isinstance(m, nn.Sequential):
        constant_init(m[-1], val=0)
    else:
        constant_init(m, val=0)


In [None]:
class GDSW (nn.Module):
    def __init__ (self, dim_in, dim_out):
        super(GDSW, self).__init__()

        self.gc1 = nn.Conv2d (dim_in, 1668, kernel_size = (3,3),padding = 1, groups = 3)
        self.cs = channel_shuffle (groups = 3)
        self.DSWC = depthwise_separable_conv (1668, 1671)
        self.gc2 = nn.Conv2d (1671, dim_out, kernel_size = (3, 3),stride = 2, padding = 1, groups = 3)

    def forward (self, x):
        #print("input to gsdw",x.shape)
        x = self.gc1 (x)
       # print("gconv1",x.shape)
        x = self.cs(x)
        #print(" channel shuffle ",x.shape)
        x = self.DSWC (x)
        #print("depthwise",x.shape)
        x = self.gc2(x)
        #print("gconv2",x.shape)
        return x


In [None]:

class GDSW2 (nn.Module):
    def __init__ (self, dim_in, dim_out):
        super(GDSW2, self).__init__()

        self.gc1 = nn.Conv2d (dim_in, 1686, kernel_size = (3,3), padding = 1, groups = 3)
        self.cs = channel_shuffle (groups = 3)
        self.DSWC = depthwise_separable_conv (1686, 1689)
        self.gc2 = nn.Conv2d (1689, dim_out, kernel_size = (3,3), stride = 2,padding = 1, groups = 3)

    def forward (self, x):
       # print("input to gsdw",x.shape)
        x = self.gc1 (x)
       # print("gconv1",x.shape)
        x = self.cs(x)
       # print(" channel shuffle ",x.shape)
        x = self.DSWC (x)
       # print("depthwise",x.shape)
        x = self.gc2(x)
       # print("gconv2",x.shape)
        return x


In [None]:
class GDSW3 (nn.Module):
    def __init__ (self, dim_in, dim_out):
        super(GDSW3, self).__init__()

        self.gc1 = nn.Conv2d (dim_in, 1692, kernel_size = (3,3), padding = 1, groups = 3)
        self.cs = channel_shuffle (groups = 3)
        self.DSWC = depthwise_separable_conv (1692,1767)
        self.gc2 = nn.Conv2d (1767, dim_out, kernel_size = (3,3),stride = 2, padding = 1, groups = 3)

    def forward (self, x):
        #print("input to gsdw",x.shape)
        x = self.gc1 (x)
        #print("gconv1",x.shape)
        x = self.cs(x)
        #print(" channel shuffle ",x.shape)
        x = self.DSWC (x)
       # print("depthwise",x.shape)
        x = self.gc2(x)
       # print("gconv2",x.shape)
        return x

In [None]:
class depthwise_separable_conv(nn.Module):
    def __init__(self, nin, nout):
        super(depthwise_separable_conv, self).__init__()
        self.depthwise = nn.Conv2d(nin, nin, kernel_size=3, padding=1, groups=nin)
        #self.depthwise = nn.Conv2d(nin, nin, kernel_size=1, padding=1, groups=nin)
        self.pointwise = nn.Conv2d(nin, nout, kernel_size=1)

    def forward(self, x):
        out = self.depthwise(x)
        out = self.pointwise(out)
        return out

In [None]:
class channel_shuffle (nn.Module):
    def __init__(self, groups):
        super (channel_shuffle, self).__init__()

        self.groups = groups

    def forward (self, x):

        batchsize, num_channels, height, width = x.size()
        channels_per_group = num_channels // self.groups
        x = x.view(batchsize, self.groups, channels_per_group, height, width)
        x = torch.transpose(x, 1, 2).contiguous()
        x = x.view(batchsize, -1, height, width)
        return x

In [None]:
class BasicConv(nn.Module):
    def __init__(self, in_planes, out_planes, kernel_size, stride=1, padding=0, dilation=1, groups=1, relu=True, bn=True, bias=False):
        super(BasicConv, self).__init__()
        self.out_channels = out_planes
        self.conv = nn.Conv2d(in_planes, out_planes, kernel_size=kernel_size, stride=stride, padding=padding, dilation=dilation, groups=groups, bias=bias)
        self.bn = nn.BatchNorm2d(out_planes,eps=1e-5, momentum=0.01, affine=True) if bn else None
        self.relu = nn.ReLU() if relu else None

    def forward(self, x):
        #self.conv = self.conv.half()
        x = self.conv(x)
        if self.bn is not None:
            x = self.bn(x)
        if self.relu is not None:
            x = self.relu(x)
        return x

class ZPool(nn.Module):
    def forward(self, x):
        return torch.cat( (torch.max(x,1)[0].unsqueeze(1), torch.mean(x,1).unsqueeze(1)), dim=1)

class AttentionGate(nn.Module):
    def __init__(self):
        super(AttentionGate, self).__init__()
        kernel_size = 7
        self.compress = ZPool()
        self.conv = BasicConv(2, 1, kernel_size, stride=1, padding=(kernel_size-1) // 2, relu=False)
    def forward(self, x):
        x_compress = self.compress(x)
        x_out = self.conv(x_compress)
        scale = torch.sigmoid_(x_out)
        return x * scale

class TripletAttention(nn.Module):
    def __init__(self, no_spatial=False):
        super(TripletAttention, self).__init__()
        self.cw = AttentionGate()
        self.hc = AttentionGate()
        self.no_spatial=no_spatial
        if not no_spatial:
            self.hw = AttentionGate()
    def forward(self, x):
        x_perm1 = x.permute(0,2,1,3).contiguous()
        x_out1 = self.cw(x_perm1)
        x_out11 = x_out1.permute(0,2,1,3).contiguous()
        x_perm2 = x.permute(0,3,2,1).contiguous()
        x_out2 = self.hc(x_perm2)
        x_out21 = x_out2.permute(0,3,2,1).contiguous()
        if not self.no_spatial:
            x_out = self.hw(x)
            x_out = 1/3 * (x_out + x_out11 + x_out21)
        else:
            x_out = 1/2 * (x_out11 + x_out21)
        return x_out

In [None]:
# ---- Model ----
class PotatoDiseaseModel(nn.Module):
    def __init__(self):
        super(PotatoDiseaseModel, self).__init__()
        self.backbone = models.densenet169(pretrained=True)
        num_ftrs = self.backbone.classifier.in_features

        # Remove classifier but keep feature maps
        self.features = self.backbone.features
        self.sh = GDSW(1668,1680)
        self.batchn = nn.BatchNorm2d(1680)
        self.sh2 = GDSW2(1680,1689) #1686
        self.batchn2 = nn.BatchNorm2d(1689)
        self.sh3 = GDSW3(1689,1767) #1692
        self.batchn3 = nn.BatchNorm2d(1767)
        #self.gcn = GlobalContextBlock(1664,2)
        self.triplet = TripletAttention()
        self.channel_upscale = nn.Conv2d(1664, 1668, kernel_size=1, stride=1, padding=0)
        self.pool = nn.AdaptiveAvgPool2d((1,1))
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d((1,1)),
            nn.Flatten(),
            nn.Linear(1767, 7))

    def forward(self, x):
        x = self.features(x)       # [batch, 1664, H, W]
        x1 = self.channel_upscale(x)  # Correctly increases channels from 1664 → 1668
        x1 = F.interpolate(x1, size=(64, 64), mode='bilinear', align_corners=True)
        x1 = self.sh(x1)
        #print("x1 first GSDW:",x1.shape)
        x1 = self.batchn(x1)

        #GSDW block2
        x1 = self.sh2(x1)
        #print("x1 second GSDW:",x1.shape)
        x1 = self.batchn2(x1)

        #GSDW block3
        x1 = self.sh3(x1)
        #print("x1 third GSDW:",x1.shape)
        x1 = self.batchn3(x1)
       # x = self.gcn(x)            # still [batch, 1664, H, W]
        x1 = self.triplet(x1)        # still [batch, 1664, H, W]
        x1 = self.classifier(x1)
        return x1




In [None]:
model=PotatoDiseaseModel().to(device)
#Optmizer and loss function
#optimizer=Adam(model.parameters(),lr=0.001,weight_decay=0.0001)
optimizer = torch.optim.SGD(model.parameters(), lr=1e-4, momentum=0.9, weight_decay=0.001)
criterion = torch.nn.CrossEntropyLoss()
loss_function=torch.nn.CrossEntropyLoss()  #change to focal loss
scaler = GradScaler(enabled=True)
use_cuda = True

In [None]:
train_accu = []
training_loss = []

def train(model, iterator, optimizer, criterion, device):

    epoch_loss = 0
    epoch_acc = 0
    image_preds_all = []
    image_targets_all = []

    model.train()

    for (x, y) in tqdm(iterator, desc="Training", leave=False):

        x = x.to(device).float()
        y = y.to(device).long()

        with autocast():

            y_pred = model(x)

            image_preds_all += [torch.argmax(y_pred, 1).detach().cpu().numpy()]
            image_targets_all += [y.detach().cpu().numpy()]

            loss = criterion(y_pred, y)

            acc = calculate_accuracy(y_pred, y)

            scaler.scale(loss).backward()
            #torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=5.0)

            scaler.step(optimizer)
            scaler.update()
            optimizer.zero_grad()

            epoch_loss += loss.item()
            epoch_acc += acc.item()
    image_preds_all = np.concatenate(image_preds_all)
    image_targets_all = np.concatenate(image_targets_all)
    score = (image_preds_all==image_targets_all).mean()

    train_losss = epoch_loss / len(iterator)

    train_accu.append(score*100)
    training_loss.append(train_losss)

    #print(score)
    #print(len(train_accu), len(training_loss))

    return epoch_loss / len(iterator), epoch_acc / len(iterator), train_accu, training_loss

In [None]:
val_accu = []
eval_loss = []

def evaluate(model, iterator, criterion, device):

    epoch_loss = 0
    epoch_acc = 0
    image_preds_all = []
    image_targets_all = []

    model.eval()

    with torch.no_grad():

        for (x, y) in tqdm(iterator, desc="Evaluating", leave=False):

            x = x.to(device).float()
            y = y.to(device).long()

            y_pred = model(x)

            image_preds_all += [torch.argmax(y_pred, 1).detach().cpu().numpy()]
            image_targets_all += [y.detach().cpu().numpy()]

            loss = criterion(y_pred, y)

            acc = calculate_accuracy(y_pred, y)

            epoch_loss += loss.item()
            epoch_acc += acc.item()

    image_preds_all = np.concatenate(image_preds_all)
    image_targets_all = np.concatenate(image_targets_all)
    score = (image_preds_all==image_targets_all).mean()

    val_losss = epoch_loss / len(iterator)

    val_accu.append(score*100)
    eval_loss.append(val_losss)

    performance_matrix(image_targets_all, image_preds_all)

    return epoch_loss / len(iterator), epoch_acc / len(iterator), val_accu, eval_loss

In [None]:
def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [None]:
def calculate_accuracy(y_pred, y):
    top_pred = y_pred.argmax(1, keepdim=True)
    correct = top_pred.eq(y.view_as(top_pred)).sum()
    acc = correct.float() / y.shape[0]
    return acc

In [None]:
def performance_matrix(true,pred):
    precision = precision_score(true,pred,average='macro')
    recall = recall_score(true,pred,average='macro')
    accuracy = accuracy_score(true,pred)
    f1_sco = f1_score(true,pred,average='macro')
    #print('Confusion Matrix:\n',confusion_matrix(true, pred))
    print('Precision: {:.4f} Recall: {:.4f}, Accuracy: {:.4f}: ,f1_score: {:.4f}'.format(precision,recall,accuracy,f1_sco))
    print('Classification Report:\n',classification_report(true, pred))

In [None]:
def checkpoint_model(epoch, model, opt, model_path):
    model_state_dict = model.state_dict() if (device.type == 'cuda') else model.state_dict()
    torch.save({
        'epoch': epoch,
        'model_state_dict': model_state_dict,
        'opt_state_dict': opt.state_dict(),
        'best_val_acc': best_val_acc
    }, model_path)

In [None]:

EPOCHS = 50

best_valid_loss = float('inf')
best_val_acc = 0.

def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model):,} trainable parameters')

train_acc_gr = []
train_loss_gr = []
val_acc_gr = []
val_loss_gr = []
dresults = pd.DataFrame(columns=['Epoch','Train_Acc', 'Train_loss','Val_Acc', 'Val_loss'])

#ckp_path = r"C:\Users\dell 5370\Desktop\try\CNN_base_epoch40.pth"
#model, optimizer, start_epoch = load_ckp(ckp_path, model, optimizer)

#model.load_state_dict(torch.load(ckp_path), strict=False)
for epoch in trange(EPOCHS, desc="Epochs"):

    start_time = time.monotonic()

    train_loss, train_acc, train_acc_gr, train_loss_gr = train(model, train_loader, optimizer, criterion, device)
    valid_loss, valid_acc, val_acc_gr, val_loss_gr = evaluate(model, val_loader, criterion, device)
    dresults = pd.concat([dresults, pd.DataFrame([{
    'Epoch': epoch+1,
    'Train_Acc': train_acc,
    'Train_loss': train_loss,
    'Val_Acc': valid_acc,
    'Val_loss': valid_loss
    }])], ignore_index=True)

    #dresults = dresults.append({'Epoch': epoch+1,'Train_Acc': train_acc, 'Train_loss':train_loss,'Val_Acc': valid_acc, 'Val_loss':valid_loss},ignore_index=True)
    # if valid_loss < best_valid_loss:
    #     best_valid_loss = valid_loss
    #     torch.save(model.state_dict(), '/home/ubuntu/Code/Alzheimers/Results/Model weights/base_4layer.pt')
    if epoch % 10 == 0:
        checkpoint_model(epoch, model, optimizer, '/kaggle/working/CNN_base_epoch%d.pth' % epoch)

    end_time = time.monotonic()

    epoch_mins, epoch_secs = epoch_time(start_time, end_time)

    print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')
dresults.to_csv('/content/dresults.csv', index=False)

plt.plot(train_acc_gr,'-o')
plt.plot(val_acc_gr,'-o')
plt.xlabel('epoch')
plt.ylabel('accuracy')
plt.legend(['Train','Valid'])
plt.title('Train vs Valid Accuracy')
#plt.savefig('/home/administrator/Desktop//accuracy.png')
plt.show()

plt.plot(train_loss_gr,'-o')
plt.plot(val_loss_gr,'-o')
plt.xlabel('epoch')
plt.ylabel('losses')
plt.legend(['Train','Valid'])
plt.title('Train vs Valid Losses')
#plt.savefig('/home/administrator/Desktop/loss.png')
plt.show()

In [None]:
epoch_loss = 0
epoch_acc = 0
image_preds_all = []
image_targets_all = []

model.eval()

with torch.no_grad():

    for (x1, y1) in tqdm(test_loader, desc="Evaluating", leave=False):

        x1 = x1.to(device).float()
        y = y1.to(device).long()

        y_pred =model(x1)

        image_preds_all += [torch.argmax(y_pred, 1).detach().cpu().numpy()]
        image_targets_all += [y.detach().cpu().numpy()]

        loss = criterion(y_pred, y)

        acc = calculate_accuracy(y_pred, y)

        epoch_loss += loss.item()
        epoch_acc += acc.item()

image_preds_all = np.concatenate(image_preds_all)
image_targets_all = np.concatenate(image_targets_all)
score = (image_preds_all==image_targets_all).mean()

performance_matrix(image_targets_all, image_preds_all)

#plot_roc(image_targets_all, image_preds_all, N_classes=3)