In [3]:
import torch
import torchvision
from torchvision import datasets, transforms, models
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import confusion_matrix
import seaborn as sns
import wandb
from tqdm.notebook import tqdm
import torch.nn as nn
import timm

  from .autonotebook import tqdm as notebook_tqdm


In [4]:

data_dir = "D:/Semester 6 Materials/DLSIP/PCOS_Clean/archive (4)/data"
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
dataset = datasets.ImageFolder(root=data_dir, transform=transform)

train_size = int(0.7 * len(dataset))
val_size = int(0.15 * len(dataset))
test_size = len(dataset) - train_size - val_size
train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, val_size, test_size])

class_to_idx = dataset.class_to_idx
idx_to_class = {value: key for key, value in class_to_idx.items()}


In [5]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

train_data_dir = "D:/Semester 6 Materials/DLSIP/PCOS_Clean/archive (4)/data/train"
test_data_dir = "D:/Semester 6 Materials/DLSIP/PCOS_Clean/archive (4)/data/test"

train_dataset = datasets.ImageFolder(root=train_data_dir, transform=transform)

test_dataset = datasets.ImageFolder(root=test_data_dir, transform=transform)
test_size = int(0.7 * len(test_dataset))
val_size = int(len(test_dataset)-test_size)
test_dataset,val_dataset = torch.utils.data.random_split(test_dataset,[test_size,val_size])

In [6]:
class_to_idx = train_dataset.class_to_idx
idx_to_class = {value: key for key, value in class_to_idx.items()}

In [7]:
class_to_idx

{'infected': 0, 'notinfected': 1}

In [8]:
def count_samples_per_class(dataset, idx_to_class):
    counts = {}
    for _, label in dataset:
        class_name = idx_to_class[label]
        counts[class_name] = counts.get(class_name, 0) + 1
    return counts


class_to_idx = dataset.class_to_idx
idx_to_class = {value: key for key, value in class_to_idx.items()}

train_counts = count_samples_per_class(train_dataset, idx_to_class)
val_counts = count_samples_per_class(val_dataset, idx_to_class)
test_counts = count_samples_per_class(test_dataset, idx_to_class)

print("Training Dataset:")
for class_name, count in train_counts.items():
    print(f"{class_name}: {count} samples")

print("\nValidation Dataset:")
for class_name, count in val_counts.items():
    print(f"{class_name}: {count} samples")

print("\nTest Dataset:")
for class_name, count in test_counts.items():
    print(f"{class_name}: {count} samples")


Training Dataset:
test: 781 samples
train: 1143 samples

Validation Dataset:
train: 332 samples
test: 245 samples

Test Dataset:
test: 536 samples
train: 809 samples


In [9]:
def modify_last_layer(model):
    
    last_layer = list(model.children())[-1]
    
    
    if isinstance(last_layer, nn.modules.container.Sequential):
        num_ftrs = last_layer[-1].out_features
    else:
        num_ftrs = last_layer.out_features
    
   
    classification_head = nn.Sequential(
        nn.Linear(num_ftrs, 2048),
        nn.ReLU(),
        nn.Linear(2048, 1024),
        nn.ReLU(),
        nn.Linear(1024, 512),
        nn.ReLU(),
        nn.Linear(512, 256),
        nn.ReLU(),
        nn.Linear(256,64),
        nn.ReLU(),
        nn.Linear(64,4)
    )

    classification_head = classification_head #create_classification_head(num_ftrs)
    
   
    
    
    model.classification_head = classification_head

    
    original_forward = model.forward

    def new_forward(x):
        x = original_forward(x)
        return model.classification_head(x)

    model.forward = new_forward
    
    return model



def get_vit_model(model_name):
 
    # Load the pre-trained Vision Transformer model
    model = timm.create_model(model_name, pretrained=True)
    num_ftrs = model.head.in_features
    # Replace the classification head
    model.head = nn.Sequential(
        nn.Linear(num_ftrs, 2048),
        nn.ReLU(),
        nn.Linear(2048, 1024),
        nn.ReLU(),
        nn.Linear(1024, 512),
        nn.ReLU(),
        nn.Linear(512, 256),
        nn.ReLU(),
        nn.Linear(256,64),
        nn.ReLU(),
        nn.Linear(64,2)
    )
    
    return model


models_and_names = {
    "VGG16": models.vgg16(pretrained=True),
    "VGG19": models.vgg19(pretrained=True),
    "InceptionV3": models.inception_v3(pretrained=True),
    "GoogLeNet": models.googlenet(pretrained=True),
    "ResNet50": models.resnet50(pretrained=True),
    "ResNet152": models.resnet152(pretrained=True),
    "DenseNet121": models.densenet121(pretrained=True),
    "DenseNet169": models.densenet169(pretrained=True),
    "DenseNet201": models.densenet201(pretrained=True),
    "MobileNetV2": models.mobilenet_v2(pretrained=True),
    
}


results = {}
for model_name, model in models_and_names.items():
    print(f"modifying {model_name}...")
    
    
    for param in model.parameters():
        param.requires_grad = True
    
    model = modify_last_layer(model)
    
print("Adding ViT from timm.....")

models_and_names["ViT-B-16"] = get_vit_model("vit_base_patch16_224")
models_and_names["ViT-L-16"] = get_vit_model("vit_large_patch16_224")
#models_and_names["ViT-L-32"] = get_vit_model("vit_large_patch32_224")



modifying VGG16...
modifying VGG19...
modifying InceptionV3...
modifying GoogLeNet...
modifying ResNet50...
modifying ResNet152...
modifying DenseNet121...
modifying DenseNet169...
modifying DenseNet201...
modifying MobileNetV2...
Adding ViT from timm.....


In [10]:
import random
import string

def generate_random_character():
    return random.choice(string.ascii_letters)


def generate_random_string(length=1):
    return ''.join(random.choice(string.ascii_letters) for _ in range(length))


In [11]:
import numpy as np

def test(model,testloader,model_name):
    # Test
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for inputs, labels in tqdm(testloader):
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = outputs.max(1)
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # Plot confusion matrix
    confusion_mtx = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(10, 8))
    sns.heatmap(confusion_mtx, annot=True, fmt='d', cmap='Blues')
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.show()
    plt.savefig(f"figures/confusionmatrix/{model_name}.png")
    
    return confusion_mtx


def calculate_metrics_from_confusion_matrix(confusion_matrix,model_name):
  
    num_classes = confusion_matrix.shape[0]
    overall_precision = 0
    overall_recall = 0
    overall_f1 = 0
    correct = np.trace(confusion_matrix)  # Sum of diagonal elements
    total = np.sum(confusion_matrix)
    
    for c in range(num_classes):
        TP = confusion_matrix[c, c]
        FP = np.sum(confusion_matrix[:, c]) - TP
        FN = np.sum(confusion_matrix[c, :]) - TP
        TN = total - TP - FP - FN
        
        precision = TP / (TP + FP) if TP + FP != 0 else 0
        recall = TP / (TP + FN) if TP + FN != 0 else 0
        f1 = 2 * (precision * recall) / (precision + recall) if precision + recall != 0 else 0
        
        overall_precision += precision
        overall_recall += recall
        overall_f1 += f1
    
    # Average the metrics across all classes
    overall_precision /= num_classes
    overall_recall /= num_classes
    overall_f1 /= num_classes
    accuracy = correct / total
    
    # Print the metrics
    print(f"Model: {model_name}")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Average Precision: {overall_precision:.4f}")
    print(f"Average Recall: {overall_recall:.4f}")
    print(f"Average F1 Score: {overall_f1:.4f}")
    
    return {
        'accuracy': accuracy,
        'average_precision': overall_precision,
        'average_recall': overall_recall,
        'average_f1': overall_f1
    }

import glob
import os
import copy

# Function to train and evaluate a given model
def train_and_evaluate(model, model_name, train_dataset, val_dataset, test_dataset,batch_size=32, num_epochs=50, lr=0.001,verbose_freq=100,other_logs=None):
    trainloader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    valloader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=True)
    testloader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    
    

    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    model.to(device)
    run_name = f"{model_name}_{generate_random_string(10)}"
    wandb.init(project='transfer-learning-PDMSAPSP-ALL-2', name = run_name)
   
    hyperparameters = {
        "learning_rate": lr,
        "batch_size": 32,
        "epochs": num_epochs
    }
    
    wandb.config.update(hyperparameters)
    wandb.log({"model_name": model_name})
    if other_logs is not None:
        wandb.log(other_logs)

    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    # Training
    num_epochs = num_epochs
    steps = 0
    best_model=0
    best_acc=0
    
    for epoch in tqdm(range(num_epochs)):
        model.train()
        for inputs, labels in tqdm(trainloader):
            steps += 1
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # Log metrics and details every 250 steps
            if steps % verbose_freq == 0:
                wandb.log({"loss": loss.item()})
                # Checkpoint model
                
                

                # Validation
                model.eval()
                val_loss = 0.0
                correct = 0
                total = 0
                with torch.no_grad():
                    for inputs, labels in tqdm(valloader):
                        inputs, labels = inputs.to(device), labels.to(device)
                        outputs = model(inputs)
                        loss = criterion(outputs, labels)
                        val_loss += loss.item()
                        _, predicted = outputs.max(1)
                        total += labels.size(0)
                        correct += predicted.eq(labels).sum().item()
                val_acc = 100* correct / total
                wandb.log({"val acc":  100* correct / total})
                print(correct,total)
                
                if best_acc<val_acc:
                    if os.path.exists(f'{run_name}_{best_model}.pth'):
                        os.remove(f'{run_name}_{best_model}.pth')
                    best_acc = val_acc
                    best_model = steps
                    torch.save(model.state_dict(), f'{run_name}_{best_model}.pth')
#                     wandb.save(f'{run_name}_{best_model}.pth')
                    wandb.log({"best val acc": best_acc})


                print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {val_loss / len(valloader):.4f}, Accuracy: {100 * correct / total:.2f}%")
                
    
    #Delete all models except best model
    saved_models = glob.glob("./*pth")
    best_model_name = f"{run_name}_{best_model}.pth"
    for model_ in saved_models:
        if  best_model_name != model_.split("/")[-1]:
            os.remove(model_)
            
    
    #Test
    testloader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    best_model = copy.deepcopy(model)
    best_model.load_state_dict(torch.load(best_model_name))
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

    best_model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for inputs, labels in tqdm(testloader):
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = best_model(inputs)
            _, predicted = outputs.max(1)
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # Plot confusion matrix
    confusion_mtx = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(10, 8))
    sns.heatmap(confusion_mtx, annot=True, fmt='d', cmap='Blues')
    plt.xlabel('Predicted')
    plt.title(f"{model_name} Best Model Confusion Matrix")
    plt.ylabel('True')
    
    plt.show()
    plt.savefig(f"{model_name} CM.png")
    wandb.save(f"{model_name} CM.png")
    
    metrics  = calculate_metrics_from_confusion_matrix(confusion_mtx,"ResNet50")
    wandb.log(metrics)  
    wandb.finish()
    
    return metrics,best_model


In [12]:
models_and_names.keys()

dict_keys(['VGG16', 'VGG19', 'InceptionV3', 'GoogLeNet', 'ResNet50', 'ResNet152', 'DenseNet121', 'DenseNet169', 'DenseNet201', 'MobileNetV2', 'ViT-B-16', 'ViT-L-16'])

In [1]:
from getpass import getpass
key = getpass("Enter key")
wandb.login(key=key)


In [None]:

metrics,best_model = train_and_evaluate(models_and_names["ViT-L-16"],"ViT-L-16",train_dataset, val_dataset,test_dataset,lr=0.0001, num_epochs=10,other_logs={"finetune_all": "True"})



Failed to detect the name of this notebook, you can set it manually with the WANDB_NOTEBOOK_NAME environment variable to enable code saving.


[34m[1mwandb[0m: Logging into wandb.ai. (Learn how to deploy a W&B server locally: https://wandb.me/wandb-server)
[34m[1mwandb[0m: You can find your API key in your browser here: https://wandb.ai/authorize
[34m[1mwandb[0m: Paste an API key from your profile and hit enter, or press ctrl+c to quit:

In [None]:
idx_to_class

In [None]:
def extract_embeddings(model, dataloader):
    embeddings = []
    labels_list = []

    def hook(module, input, output):
        embeddings.append(output.cpu().numpy())

    handle = model.head[8].register_forward_hook(hook)

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            _ = model(inputs)
            labels_list.extend(labels.cpu().numpy())
    
    handle.remove()

    embeddings = np.concatenate(embeddings, axis=0)
    return embeddings, labels_list

testloader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=False)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = models_and_names["ViT L16"]
model_path = "/kaggle/working/ResNet152_oKCzhfWSKa_6900.pth"
model.load_state_dict(torch.load(model_path))
model = model.to(device)
model.eval()

from sklearn.manifold import TSNE
import matplotlib.pyplot as plt

def visualize_tsne(embeddings, labels):
    tsne = TSNE(n_components=2, random_state=42)
    reduced = tsne.fit_transform(embeddings)

    plt.figure(figsize=(12,10))
    for i, label in enumerate(np.unique(labels)):
        plt.scatter(reduced[labels == label, 0], reduced[labels == label, 1], label=idx_to_class[label])
        
    
    plt.grid(True)
    plt.xlabel("t-SNE component 1", fontsize=14)
    plt.ylabel("t-SNE component 2", fontsize=14)
    plt.title("t-SNE visualization of embeddings from Best ResNET152 Model", fontsize=16)
    plt.legend(loc='upper right', fontsize=12)
    plt.tight_layout()
    
    plt.savefig("tsne_visualization_resnet152.png", format="png")
    plt.show()

   

# Extract embeddings and visualize
embeddings, labels = extract_embeddings(model, testloader)
visualize_tsne(embeddings, labels)
