In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, 
import os

In [None]:
import torch
import torchvision
from torchvision import datasets
from torchvision import transforms as T # for simplifying the transforms
from torch import nn, optim
from torch.nn import functional as F
from torch.utils.data import DataLoader, sampler, random_split
from torchvision import models

In [None]:
# remove warnings
import warnings
warnings.filterwarnings("ignore")

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline
# Data Visualization
import plotly.express as px

In [None]:
import sys
from tqdm import tqdm
import time
import copy

In [None]:
def get_classes(data_dir):
    all_data = datasets.ImageFolder(data_dir)
    return all_data.classes

In [None]:
from PIL import Image
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import os

# Define an image validation function
def is_valid_image(file_path):
    try:
        with Image.open(file_path) as img:
            img.verify()  # This will raise an exception if the image is invalid
        return True
    except Exception as e:
        return False

def get_data_loaders(data_dir, batch_size, train=False):
    if train:
        # Train
        transform = transforms.Compose([
            # Data augmentation for training, add as needed
            transforms.RandomHorizontalFlip(),
            transforms.RandomVerticalFlip(),
            transforms.RandomApply([transforms.ColorJitter()], p=0.25),
            transforms.Resize(438),
            transforms.CenterCrop(384),
            transforms.ToTensor(),
            transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
        ])
        train_data = datasets.ImageFolder(os.path.join(data_dir, "/kaggle/input/all-dataset2-splitted/ALL_Splitted/train/"), transform=transform)
        train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, num_workers=4)
        return train_loader, len(train_data)
    else:
        # Validation and Test
        transform = transforms.Compose([
            transforms.Resize(438),
            transforms.CenterCrop(384),
            transforms.ToTensor(),
            transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
        ])
        val_data = datasets.ImageFolder(os.path.join(data_dir, "/kaggle/input/all-dataset2-splitted/ALL_Splitted/val/"), transform=transform)
        test_data = datasets.ImageFolder(os.path.join(data_dir, "/kaggle/input/all-dataset2-splitted/ALL_Splitted/test/"), transform=transform)

        # Iterate through the dataset and remove corrupted images
        val_data.samples = [(image, label) for image, label in val_data.samples if is_valid_image(image)]
        test_data.samples = [(image, label) for image, label in test_data.samples if is_valid_image(image)]

        val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False, num_workers=4)
        test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False, num_workers=4)

        return val_loader, test_loader, len(val_data), len(test_data)

In [None]:
def get_data_loaders(data_dir, batch_size, train = False):
    if train:
        #train
        transform = T.Compose([
            #T.RandomHorizontalFlip(),
            #T.RandomVerticalFlip(),
            #T.RandomApply(torch.nn.ModuleList([T.ColorJitter()]), p=0.25),
            T.Resize(438),
            T.CenterCrop(384),
            T.ToTensor(),
            T.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)), # imagenet means
           # T.RandomErasing(p=0.2, value='random')
        ])
        train_data = datasets.ImageFolder(os.path.join(data_dir, "/kaggle/input/all-dataset2-splitted/ALL_Splitted/train/"), transform = transform)
        train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, num_workers=4)
        return train_loader, len(train_data)
    else:
        # val/test
        transform = T.Compose([ # We dont need augmentation for test transforms
            T.Resize(438),
            T.CenterCrop(384),
            T.ToTensor(),
            T.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)), # imagenet means
        ])
        val_data = datasets.ImageFolder(os.path.join(data_dir, "/kaggle/input/all-dataset2-splitted/ALL_Splitted/val/"), transform=transform)
        test_data = datasets.ImageFolder(os.path.join(data_dir, "/kaggle/input/all-dataset2-splitted/ALL_Splitted/val/"), transform=transform)
        val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False, num_workers=4)
        test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False, num_workers=4)
        return val_loader, test_loader, len(val_data), len(test_data)

In [None]:
dataset_path = "/kaggle/input/all-dataset2-splitted/ALL_Splitted/"

In [None]:
(train_loader, train_data_len) = get_data_loaders(dataset_path, 128, train=True)
(val_loader, test_loader, valid_data_len, test_data_len) = get_data_loaders(dataset_path, 32, train=False)

In [None]:
classes = get_classes("/kaggle/input/all-dataset2-splitted/ALL_Splitted/val/")

In [None]:
dataset_path1= "/kaggle/input/all-dataset2-splitted/ALL_Splitted/val/"

# Get Class Names
class_names = sorted(os.listdir(dataset_path1))
n_classes = len(class_names)

# Show
print(f"Class Names : {class_names}")
print(f"Number of Classes  : {n_classes}")

In [None]:
pred_dataset_path ="/kaggle/input/all-dataset2-splitted/ALL_Splitted/val/"
# Calculate class distribution
class_dis = [len(os.listdir(pred_dataset_path + name)) for name in class_names]

# Visualization
fig = px.pie(names=class_names, values=class_dis, title="Prediction Class Distribution")
fig.update_layout({'title':{'x':0.45}})
fig.show()

In [None]:
dataloaders = {
    "train": train_loader,
    "val": val_loader
}
dataset_sizes = {
    "train": train_data_len,
    "val": valid_data_len
}

In [None]:
print(len(train_loader), len(val_loader), len(test_loader))

In [None]:
print(train_data_len, valid_data_len, test_data_len)

In [None]:
# now, for the model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

In [None]:
## Now, we import timm, torchvision image models
!pip install timm # kaggle doesnt have it installed by default
import timm
model = torch.hub.load('facebookresearch/deit:main', 'deit_base_patch16_384', pretrained=True)

In [None]:
dataloaders = {
    "train": train_loader,
    "val": val_loader
}
dataset_sizes = {
    "train": train_data_len,
    "val": valid_data_len
}

In [None]:
print(model)

In [None]:
import torch.nn as nn

for param in model.parameters():
    param.requires_grad = False

n_inputs = model.head.in_features
model.head = nn.Sequential(
    nn.Linear(n_inputs, 2048),
    nn.BatchNorm1d(2048),  # Adding batch normalization for regularization
    nn.ReLU(inplace=True),
    nn.Dropout(0.5),  # Increasing dropout rate for stronger regularization
    nn.Linear(2048, 1024),
    nn.BatchNorm1d(1024),
    nn.ReLU(inplace=True),
    nn.Dropout(0.5),
    nn.Linear(1024, 512),
    nn.BatchNorm1d(512),
    nn.ReLU(inplace=True),
    nn.Dropout(0.5),
    nn.Linear(512, len(classes))
)
model = model.to(device)
print(model.head)

In [None]:
criterion =  nn.CrossEntropyLoss()
criterion = criterion.to(device)
optimizer = optim.Adam(model.head.parameters(), lr=0.001)

# lr scheduler
exp_lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.97)

In [None]:
from sklearn.metrics import confusion_matrix, f1_score, classification_report, cohen_kappa_score, roc_auc_score, roc_curve, auc
from sklearn.preprocessing import label_binarize
import time
import copy
import torch
from tqdm import tqdm
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import confusion_matrix, f1_score, classification_report, cohen_kappa_score, roc_auc_score, roc_curve, auc
from sklearn.preprocessing import label_binarize
import seaborn as sns

def plot_roc_curve(true_labels, pred_probs, classes):
    # Convert true labels to one-hot encoded form
    true_labels_onehot = label_binarize(true_labels, classes=np.unique(true_labels))

    # Compute ROC curve and ROC area for each class
    fpr = dict()
    tpr = dict()
    roc_auc = dict()
    for i in range(len(classes)):
        fpr[i], tpr[i], _ = roc_curve(true_labels_onehot[:, i], pred_probs[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])

    # Compute micro-average ROC curve and ROC area
    fpr["micro"], tpr["micro"], _ = roc_curve(true_labels_onehot.ravel(), pred_probs.ravel())
    roc_auc["micro"] = auc(fpr["micro"], tpr["micro"])

    # Compute macro-average ROC curve and ROC area
    all_fpr = np.unique(np.concatenate([fpr[i] for i in range(len(classes))]))
    mean_tpr = np.zeros_like(all_fpr)
    for i in range(len(classes)):
        mean_tpr += np.interp(all_fpr, fpr[i], tpr[i])
    mean_tpr /= len(classes)
    fpr["macro"] = all_fpr
    tpr["macro"] = mean_tpr
    roc_auc["macro"] = auc(fpr["macro"], tpr["macro"])

    # Plot ROC curve
    plt.figure(figsize=(10, 8))
    plt.plot(fpr["micro"], tpr["micro"], label='Micro-average (area = {0:0.2f})'.format(roc_auc["micro"]), linestyle=':', linewidth=4)
    plt.plot(fpr["macro"], tpr["macro"], label='Macro-average (area = {0:0.2f})'.format(roc_auc["macro"]), linestyle=':', linewidth=4)
    for i in range(len(classes)):
        plt.plot(fpr[i], tpr[i], label='{0} (area = {1:0.2f})'.format(classes[i], roc_auc[i]))
    plt.plot([0, 1], [0, 1], 'k--', linewidth=2)
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC)')
    plt.legend(loc="lower right")
    plt.show()

def train_model(model, criterion, optimizer, scheduler, num_epochs=100, patience=300):
    since = time.time()
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    no_improvement_count = 0

    val_true_labels = []
    val_pred_labels = []
    val_pred_probs = []
    train_acc_history = []
    val_acc_history = []
    train_loss_history = []
    val_loss_history = []

    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print("-" * 10)

        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0.0

            for inputs, labels in tqdm(dataloaders[phase]):
                inputs = inputs.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    try:
                        outputs = model(inputs)
                        _, preds = torch.max(outputs, 1)
                        loss = criterion(outputs, labels)

                        if phase == 'train':
                            loss.backward()
                            optimizer.step()

                        running_loss += loss.item() * inputs.size(0)
                        running_corrects += torch.sum(preds == labels.data)

                        if phase == 'val':
                            val_true_labels += labels.tolist()
                            val_pred_labels += preds.tolist()
                            val_pred_probs += torch.softmax(outputs, dim=1).tolist()
                    except OSError as e:
                        print(f"Error processing an image: {str(e)}")
                        continue

            if phase == 'train':
                train_loss_history.append(running_loss / dataset_sizes[phase])
            else:
                val_loss_history.append(running_loss / dataset_sizes[phase])

            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print("{} Loss: {:.4f} Acc: {:.4f}".format(phase, running_loss / dataset_sizes[phase], epoch_acc))

            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict()) # keep the best validation accuracy model
                no_improvement_count = 0
            elif phase == 'val':
                no_improvement_count += 1

            if phase == 'train':
                train_acc_history.append(epoch_acc.item())
            else:
                val_acc_history.append(epoch_acc.item())

        print()

        if no_improvement_count >= patience:
            print(f"No improvement in validation accuracy for {no_improvement_count} epochs. Early stopping...")
            break

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print("Best Val Acc: {:.4f}".format(best_acc))

    model.load_state_dict(best_model_wts)

    # Calculate confusion matrix, F1 score, sensitivity, specificity, accuracy, and Cohen's kappa
    confusion_mat = confusion_matrix(val_true_labels, val_pred_labels)
    f1 = f1_score(val_true_labels, val_pred_labels, average='weighted')
    sensitivity = confusion_mat.diagonal() / confusion_mat.sum(axis=1)
    specificity = np.diag(confusion_mat) / np.sum(confusion_mat, axis=1)
    accuracy = np.sum(np.diag(confusion_mat)) / np.sum(confusion_mat)
    true_positives = np.diag(confusion_mat)
    true_negatives = np.sum(confusion_mat) - (np.sum(true_positives) + np.sum(confusion_mat.sum(axis=0)) - np.sum(true_positives))
    false_positives = confusion_mat.sum(axis=0) - true_positives
    false_negatives = confusion_mat.sum(axis=1) - true_positives
    kappa = cohen_kappa_score(val_true_labels, val_pred_labels)

    # Print confusion matrix, F1 score, sensitivity, specificity, accuracy, and Cohen's kappa
    print("Confusion Matrix:")
    print(confusion_mat)
    print("F1 Score: {:.4f}".format(f1))
    print("Sensitivity (Recall):", sensitivity)
    print("Specificity:", specificity)
    print("Accuracy: {:.4f}".format(accuracy))
    print("True Positives:", true_positives)
    print("True Negatives:", true_negatives)
    print("False Positives:", false_positives)
    print("False Negatives:", false_negatives)
    print("Cohen's Kappa:", kappa)

    # Convert true labels and predicted labels to one-hot encoded form for multiclass AUC calculation
    val_true_labels_onehot = label_binarize(val_true_labels, classes=np.unique(val_true_labels))
    val_pred_probs = np.array(val_pred_probs)

    # Calculate AUC for each class
    auc_scores = roc_auc_score(val_true_labels_onehot, val_pred_probs, average=None)
    # Calculate macro-average AUC
    macro_auc = roc_auc_score(val_true_labels_onehot, val_pred_probs, average='macro')

    print("AUC Scores (per class):", auc_scores)
    print("Macro-average AUC:", macro_auc)

    # Print classification report
    target_names = [str(i) for i in range(len(classes))]
    print(classification_report(val_true_labels, val_pred_labels, target_names=target_names))

    # Plot accuracy curves
    plt.plot(range(1, len(train_acc_history) + 1), train_acc_history, label='Train')
    plt.plot(range(1, len(val_acc_history) + 1), val_acc_history, label='Validation')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.legend()
    plt.show()

    # Plot loss curves
    plt.plot(range(1, len(train_loss_history) + 1), train_loss_history, label='Train')
    plt.plot(range(1, len(val_loss_history) + 1), val_loss_history, label='Validation')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.legend()
    plt.show()

    # Plot ROC curve
    plot_roc_curve(val_true_labels, val_pred_probs, classes)
    # Calculate confusion matrix
    confusion_mat = confusion_matrix(val_true_labels, val_pred_labels, labels=np.arange(len(classes)))
    # Plot colored confusion matrix
    plt.figure(figsize=(15, 15))
    sns.heatmap(confusion_mat, annot=True, fmt=".0f", cmap="Blues")
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted label')
    plt.ylabel('True label')
    # Set ticks and tick labels
    tick_labels = classes
    plt.xticks(np.arange(len(classes)) + 0.5, tick_labels, rotation=90, ha='right')
    plt.yticks(np.arange(len(classes)) + 0.5, tick_labels,rotation=360, ha='right')
    plt.show()


    return model

In [None]:
model_ft = train_model(model, criterion, optimizer, exp_lr_scheduler)

example = torch.rand(1, 3, 384, 384)
traced_script_module = torch.jit.trace(model.cpu(), example)
traced_script_module.save("/kaggle/working/deit384_100epochs.pt")

In [None]:
import torch

model_path = "/kaggle/working/deit384_100epochs.pt"
model = torch.jit.load(model_path)
model = model.to(device)

test_loss = 0.0
class_correct = list(0 for i in range(len(classes)))
class_total = list(0 for i in range(len(classes)))
model.eval()

for data, target in tqdm(test_loader):
    data, target = data.to(device), target.to(device)
    with torch.no_grad(): # turn off autograd for faster testing
        output = model(data)
        loss = criterion(output, target)
    test_loss = loss.item() * data.size(0)
    _, pred = torch.max(output, 1)
    correct_tensor = pred.eq(target.data.view_as(pred))
    correct = np.squeeze(correct_tensor.cpu().numpy())
    if len(target) == 32:
        for i in range(32):
            label = target.data[i]
            class_correct[label] += correct[i].item()
            class_total[label] += 1

test_loss = test_loss / test_data_len

print('Test Loss: {:.{}f}'.format(test_loss, max(15, -int(np.log10(test_loss)) + 3)))

for i in range(len(classes)):
    if class_total[i] > 0:
        print("Test Accuracy of %5s: %2d%% (%2d/%2d)" % (
            classes[i], 100*class_correct[i]/class_total[i], np.sum(class_correct[i]), np.sum(class_total[i])
        ))
    else:
        print("Test accuracy of %5s: NA" % (classes[i]))
print('\nTest Accuracy : {:.4f} ({}/{})'.format(
      100. * np.sum(class_correct) / np.sum(class_total),
      np.sum(class_correct), np.sum(class_total)))

In [None]:
from sklearn.metrics import precision_recall_curve, average_precision_score
from sklearn.preprocessing import label_binarize
import matplotlib.pyplot as plt
import numpy as np

# Collect the true labels and predicted probabilities from the test set
test_true_labels = []
test_pred_probs = []

model.eval()  # Set model to evaluation mode
for data, target in tqdm(test_loader):
    data, target = data.to(device), target.to(device)
    
    with torch.no_grad():
        output = model(data)
        probs = torch.softmax(output, dim=1)  # Get the predicted probabilities
        test_pred_probs.extend(probs.cpu().numpy())  # Store the predicted probabilities
        test_true_labels.extend(target.cpu().numpy())  # Store the true labels

# Convert the true labels and predicted probabilities to numpy arrays
test_true_labels = np.array(test_true_labels)
test_pred_probs = np.array(test_pred_probs)

# Convert true labels to one-hot encoding format (needed for precision-recall calculation)
test_true_labels_onehot = label_binarize(test_true_labels, classes=np.arange(len(classes)))

# Function to plot PR curve
def plot_pr_curve(true_labels, pred_probs, classes):
    precision = dict()
    recall = dict()
    average_precision = dict()
    
    # Calculate Precision-Recall and plot curve for each class
    for i in range(len(classes)):
        precision[i], recall[i], _ = precision_recall_curve(true_labels[:, i], pred_probs[:, i])
        average_precision[i] = average_precision_score(true_labels[:, i], pred_probs[:, i])

    # Compute micro-average PR curve and average precision
    precision["micro"], recall["micro"], _ = precision_recall_curve(true_labels.ravel(), pred_probs.ravel())
    average_precision["micro"] = average_precision_score(true_labels, pred_probs, average="micro")

    # Plot PR curve
    plt.figure(figsize=(10, 8))
    plt.plot(recall["micro"], precision["micro"], label='Micro-average (AP = {0:0.2f})'.format(average_precision["micro"]), linestyle=':', linewidth=4)

    for i in range(len(classes)):
        plt.plot(recall[i], precision[i], label='{0} (AP = {1:0.2f})'.format(classes[i], average_precision[i]))

    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.title('Precision-Recall Curve')
    plt.legend(loc="lower left")
    plt.show()

# Call the function to plot PR curve
plot_pr_curve(test_true_labels_onehot, test_pred_probs, classes)
