In [1]:
import os
import shutil

root_dir = "/kaggle/input/kvasir-v2-a-gastrointestinal-tract-dataset"
dest_directory = "/kaggle/working/dataset"
if not os.path.exists(dest_directory):
    os.mkdir(dest_directory)

for directory in os.listdir(root_dir):
    dir_path = os.path.join(root_dir, directory)
    destination = os.path.join(dest_directory, directory)
    if not os.path.exists(destination):
        os.mkdir(destination)
    for directory2 in os.listdir(dir_path):
        dir_path2 = os.path.join(dir_path, directory2)
        for image in os.listdir(dir_path2):
            image_path = os.path.join(dir_path2, image)
            dest_path = os.path.join(destination, image)
            shutil.copy(image_path, dest_path)


In [2]:
import torch

def list_physical_devices(device_type=None):
    devices = []
    if device_type is None or device_type == 'cpu':
        devices.append('cpu')
    if device_type is None or device_type == 'cuda':
        if torch.cuda.is_available():
            for i in range(torch.cuda.device_count()):
                devices.append(f'cuda:{i}')
    return devices

print(list_physical_devices())


['cpu']


In [28]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import datasets, transforms
from torchvision.models import resnet18
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import ConfusionMatrixDisplay, confusion_matrix, classification_report, accuracy_score
import warnings

warnings.filterwarnings("ignore")

In [29]:
import torch
from torch.utils.data import DataLoader, random_split, Subset
from torchvision import datasets, transforms

# Parameters
dataset_path = "/kaggle/working/dataset"
image_size = (224, 224)
batch_size = 64
validation_split = 0.2
random_seed = 23
num_workers = 4  # Number of subprocesses for data loading

# Transformations
transform = transforms.Compose([
    transforms.Resize(image_size),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Load the dataset
full_dataset = datasets.ImageFolder(root=dataset_path, transform=transform)

# Get class names
class_names = full_dataset.classes
print("Class names:", class_names)

# Split the dataset into training and validation sets
dataset_size = len(full_dataset)
val_size = int(validation_split * dataset_size)
train_size = dataset_size - val_size
train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size], generator=torch.Generator().manual_seed(random_seed))

# Further split the validation set into validation and test sets
val_size = len(val_dataset)
test_size = val_size // 2
val_size = val_size - test_size
val_dataset, test_dataset = random_split(val_dataset, [val_size, test_size], generator=torch.Generator().manual_seed(random_seed))

# Create DataLoader for training, validation, and test sets
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=True)


Class names: ['dyed-lifted-polyps', 'dyed-resection-margins', 'esophagitis', 'normal-cecum', 'normal-pylorus', 'normal-z-line', 'polyps', 'ulcerative-colitis']


In [30]:
class_names = full_dataset.classes
print("Class names:", class_names)

Class names: ['dyed-lifted-polyps', 'dyed-resection-margins', 'esophagitis', 'normal-cecum', 'normal-pylorus', 'normal-z-line', 'polyps', 'ulcerative-colitis']


In [38]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models

class CustomResNet50(nn.Module):
    def __init__(self, num_classes=8):
        super(CustomResNet50, self).__init__()
        # Load a pre-trained ResNet50 model without the top layer
        self.base_model = models.resnet50(pretrained=True)
        
        # Remove the final fully connected layer
        self.base_model = nn.Sequential(*list(self.base_model.children())[:-2])
        
        # Get the number of features in the final layer
        in_features = self.base_model[-1][-1].conv1.in_channels
        
        # Add a global average pooling layer
        self.pool = nn.AdaptiveAvgPool2d(1)
        
        # Add a new fully connected layer for classification
        self.fc = nn.Linear(in_features, num_classes)
    
    def forward(self, x):
        x = self.base_model(x)
        x = self.pool(x)
        x = torch.flatten(x, 1)  # Flatten the output
        x = self.fc(x)  # Final classification layer
        return x

# Instantiate the model
model = CustomResNet50(num_classes=8)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Define the optimizer
optimizer = optim.AdamW(
    model.parameters(),
    lr=0.001,
    weight_decay=0.004,
    betas=(0.9, 0.999),
    eps=1e-8
)

# Define the loss function
criterion = nn.CrossEntropyLoss()

# Model summary
print(model)

# Example usage
# inputs = torch.randn(1, 3, 224, 224)  # Example input tensor
# outputs = model(inputs)
# print(outputs)


Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth


URLError: <urlopen error [Errno -3] Temporary failure in name resolution>

In [32]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models, transforms, datasets
from torch.utils.data import DataLoader, random_split
from torch.optim.lr_scheduler import ReduceLROnPlateau
import os
import numpy as np

scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.2, patience=5, min_lr=1e-5)

class EarlyStopping:
    def __init__(self, patience=20, verbose=False, delta=0):
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.Inf
        self.delta = delta

    def __call__(self, val_loss, model):
        score = -val_loss
        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            if self.verbose:
                print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        if self.verbose:
            print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}).  Saving model ...')
        torch.save(model.state_dict(), 'checkpoint.pt')
        self.val_loss_min = val_loss

early_stopping = EarlyStopping(patience=20, verbose=True)

best_val_accuracy = 0.0
best_model_path = "training_weights/best/model_best_val_accuracy.ckpt"
os.makedirs(os.path.dirname(best_model_path), exist_ok=True)

NameError: name 'optimizer' is not defined

In [None]:
# Training loop
num_epochs = 30
for epoch in range(num_epochs):
    model.train()
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
    
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    val_loss /= len(val_loader)
    val_accuracy = correct / total
    
    scheduler.step(val_loss)
    early_stopping(val_loss, model)
    if early_stopping.early_stop:
        print("Early stopping")
        break
    
    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        torch.save(model.state_dict(), best_model_path)
        print(f"Saved best model with accuracy: {best_val_accuracy:.4f}")

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}")

# Load the best model for evaluation
model.load_state_dict(torch.load(best_model_path))

In [None]:
import torch

# Define the path to save the model weights
weights_path = "training_weights/best/model_best_val_accuracy.ckpt"

# Save the model's state_dict (weights)
torch.save(model.state_dict(), weights_path)

print(f"Model weights saved to {weights_path}")


In [None]:
from IPython.display import FileLink

# Create a link to download the file
FileLink(weights_path)


In [None]:
model = CustomResNet50(num_classes=8)
model.load_state_dict(torch.load("training_weights/best/model_best_val_accuracy.ckpt"))
model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Initialize storage for labels and predictions
all_labels = []
all_probs = []

# Get predictions
with torch.no_grad():
    for images, labels in val_loader:
        images = images.to(device)
        outputs = model(images)
        probs = torch.softmax(outputs, dim=1)
        
        all_labels.extend(labels.cpu().numpy())
        all_probs.extend(probs.cpu().numpy())

all_labels = np.array(all_labels)
all_probs = np.array(all_probs)

In [9]:
from sklearn.metrics import roc_curve, precision_recall_curve
# Initialize dictionaries to store thresholds and metrics
thresholds = {}
optimal_thresholds = {}

classes = class_names
# Iterate over each class
for i in range(len(classes)):
    fpr, tpr, roc_thresholds = roc_curve(all_labels == i, all_probs[:, i])
    precision, recall, pr_thresholds = precision_recall_curve(all_labels == i, all_probs[:, i])
    
    # Example: Use Youden's J statistic to find optimal threshold from ROC
    youdens_j = tpr - fpr
    optimal_idx = np.argmax(youdens_j)
    optimal_thresholds[i] = roc_thresholds[optimal_idx]
    
    thresholds[i] = {
        'roc': (fpr, tpr, roc_thresholds),
        'pr': (precision, recall, pr_thresholds),
        'optimal': optimal_thresholds[i]
    }
# Print optimal thresholds for each class
for i, thresh in optimal_thresholds.items():
    print(f"Optimal threshold for class {classes[i]}: {thresh}")


NameError: name 'all_labels' is not defined

In [None]:
import torch

# Define the model architecture again (if not already defined)
class CustomResNet50(nn.Module):
    def __init__(self, num_classes=8):
        super(CustomResNet50, self).__init__()
        # Load a pre-trained ResNet50 model without the top layer
        self.base_model = models.resnet50(pretrained=True)
        
        # Remove the final fully connected layer
        self.base_model = nn.Sequential(*list(self.base_model.children())[:-2])
        
        # Get the number of features in the final layer
        in_features = self.base_model[-1][-1].conv1.in_channels
        
        # Add a global average pooling layer
        self.pool = nn.AdaptiveAvgPool2d(1)
        
        # Add a new fully connected layer for classification
        self.fc = nn.Linear(in_features, num_classes)
    
    def forward(self, x):
        x = self.base_model(x)
        x = self.pool(x)
        x = torch.flatten(x, 1)  # Flatten the output
        x = self.fc(x)  # Final classification layer
        return x

# Instantiate the model
model = CustomResNet50(num_classes=8)

# Move the model to the appropriate device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Load the checkpoint
checkpoint_path = '/kaggle/input/kvasir/pytorch/default/1/model_best_val_accuracy.ckpt'
checkpoint = torch.load(checkpoint_path, map_location=device)

# Load the state_dict into the model
model.load_state_dict(checkpoint)

# Set the model to evaluation mode
model.eval()

print("Model weights loaded successfully.")


In [None]:
from torchvision import transforms
from PIL import Image

# Define the image transformation
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize the image to 224x224
    transforms.ToTensor(),  # Convert the image to a tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize the image
])

# Load and preprocess the image
image_path = '/kaggle/input/kvasir-v2-a-gastrointestinal-tract-dataset/dyed-lifted-polyps/dyed-lifted-polyps/0053d7cd-549c-48cd-b370-b4ad64a8098a.jpg'  # Replace with your image path
image = Image.open(image_path)
image = transform(image)
image = image.unsqueeze(0)  # Add batch dimension

# Move the image to the same device as the model
image = image.to(device)

# Perform the classification
with torch.no_grad():  # Disable gradient calculation for inference
    output = model(image)
    _, predicted = torch.max(output, 1)
    print('Predicted class:', predicted.item())
