In [1]:
import os
import torch

import torch.nn as nn
import torchvision as tv
import torch.nn.functional as F
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import albumentations as A


from torch.utils.data import Dataset, DataLoader, random_split
from PIL import Image
from albumentations.pytorch import ToTensorV2

# Check for CUDA GPU
if torch.cuda.is_available():
    device = torch.device("cuda")
# Check for Apple MPS - Comment if causing problems
elif torch.backends.mps.is_available():
    device = torch.device("mps")
# Default to CPU
else:
    device = torch.device("cpu")

In [10]:
class DatasetLoader(Dataset):
    def __init__(self, metadata, img_dir, transform=None, max_per_class=300, target_samples_per_class=3000):
        """
        Args:
            metadata (DataFrame): DataFrame containing image file names and labels.
            img_dir (string): Directory with all the images.
            transform (callable, optional): Optional transform to be applied on a sample.
            max_per_class (int): Maximum images per class for balancing.
        """
        self.img_dir = img_dir
        self.transform = transform
        self.class_to_idx = {class_name: idx for idx, class_name in enumerate(metadata['dx']. unique())}
        self.metadata = self.create_balanced_subset(metadata, max_per_class)
        self.augmentation = get_augmentation()
        self.target_samples = target_samples_per_class

    def create_balanced_subset(self, metadata, max_per_class):
        return pd.concat([
            df.sample(n=min(len(df), max_per_class), random_state=42) if len(df) > max_per_class else df
            for _, df in metadata.groupby('dx')
        ]).reset_index(drop=True)

    def __len__(self):
        return int(np.ceil(len(self.metadata) * self.target_samples / self.metadata['dx'].value_counts().min()))
    
    def __getitem__(self, idx):
        actual_idx = idx % len(self.metadata)
        img_name = os.path.join(self.img_dir, self.metadata.iloc[actual_idx, 1] + '.jpg')
        image = Image.open(img_name).convert('RGB')
        label_name = self.metadata.iloc[actual_idx, 2]
        label = self.class_to_idx[label_name]
        filename = self.metadata.iloc[actual_idx, 1] + '.jpg'

        # Apply augmentation or regular transform
        if len(self.metadata[self.metadata['dx'] == label_name]) < self.target_samples:
            image = self.augmentation(image=np.array(image))['image']
        #elif self.transform:
            image = self.transform(image)

        return image, label, filename  # Returning filename here

In [11]:
# Add ToTensor and Convert to Float in your transformation pipeline
transform = tv.transforms.Compose([
    tv.transforms.ToTensor(),  # Converts to [0, 1] and changes data to tensor
    # tv.transforms.Lambda(lambda x: x.float()),  # Ensures data is float32
    # Add other transformations here as needed
])

def get_augmentation():
    return A.Compose([
        A.Rotate(limit=360, p=1.0),  # Random rotation between 0 and 360 degrees
        A.ShiftScaleRotate(shift_limit=0.1, scale_limit=0.25, rotate_limit=0, p=0.5),  # Random translation and scaling
        A.Flip(p=0.5),  # Random horizontal flipping
        A.Affine(shear=20, p=0.5),  # Random shearing
        A.LongestMaxSize(max_size=400, p=1.0),  # Resize longest axis to 400 pixels
        A.PadIfNeeded(min_height=400, min_width=400, p=1.0),  # Pad to make the image square
        # ToTensorV2()  # Convert the image to a PyTorch tensor
    ])


In [12]:
img_path = './data/images/'
metadata_path = './data/HAM10000_metadata.csv'
metadata = pd.read_csv(metadata_path)
dataset = DatasetLoader(metadata, img_path, transform=transform)

In [13]:
# Split the dataset 60-20-20
train_size = int(0.6 * len(dataset))
validation_size = int(0.2 * len(dataset))
test_size = len(dataset) - train_size - validation_size
train_dataset, validation_dataset, test_dataset = random_split(dataset, [train_size, validation_size, test_size])

batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
validation_loader = DataLoader(validation_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [14]:
#Used ChatGPT to generate this code to make sure that the images are properly extracted

# class_names = metadata['dx'].unique().tolist()

# def imshow(inp, label, filename, title=None):
#     """Imshow for Tensor."""
#     inp = inp.numpy().transpose((1, 2, 0))
#     mean = np.array([0.485, 0.456, 0.406])
#     std = np.array([0.229, 0.224, 0.225])
#     inp = std * inp + mean  # Unnormalize
#     inp = np.clip(inp, 0, 1)  # Clip to the valid range [0,1]
#     plt.imshow(inp)
    
#     plt.title(f"{filename} - {class_names[label]}")
#     plt.show()

# # Get a batch of training data
# inputs, classes, filenames = next(iter(train_loader))

# # Visualize each image with its filename and label
# for i in range(len(inputs)):
#     imshow(inputs[i], classes[i], filenames[i])

In [15]:
class RegNetYBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, groups=1):
        super(RegNetYBlock, self).__init__()
        # Assuming use of bottleneck blocks with a reduction at the beginning
        self.conv1 = nn.Conv2d(in_channels, out_channels // 2, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels // 2)
        self.conv2 = nn.Conv2d(out_channels // 2, out_channels // 2, kernel_size=3, stride=stride, padding=1, groups=groups, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels // 2)
        self.conv3 = nn.Conv2d(out_channels // 2, out_channels, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(out_channels)

        self.downsample = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.downsample = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        identity = self.downsample(x)

        out = F.relu(self.bn1(self.conv1(x)))
        out = F.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))

        out += identity
        out = F.relu(out)

        return out

class RegNetY320(nn.Module):
    def __init__(self, block, layers, num_classes=1000):
        super(RegNetY320, self).__init__()
        self.in_channels = 32  # Initial number of channels
        self.conv1 = nn.Conv2d(3, self.in_channels, kernel_size=3, stride=2, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(self.in_channels)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # Example of layer configuration, actual values should be based on the RegNetY-320 config
        self.layer1 = self._make_layer(block, 64, layers[0], stride=2, groups=1)
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2, groups=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2, groups=4)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2, groups=8)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, num_classes)  # Correct the input feature size here

        
    def _make_layer(self, block, out_channels, blocks, stride, groups):
        layers = [block(self.in_channels, out_channels, stride, groups)]
        self.in_channels = out_channels
        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels, groups=groups))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.maxpool(self.relu(self.bn1(self.conv1(x))))

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x


# Assuming "layers" is a list that contains the number of blocks in each stage
model = RegNetY320(RegNetYBlock, layers=[3, 4, 6, 3], num_classes=7)


In [16]:
# model.to(device)
# print(f"Using device: {device}")

# criterion = nn.CrossEntropyLoss()
# optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# num_epochs = 20

# for epoch in range(num_epochs):
#     model.train()  # Set the model to training mode
#     running_loss = 0.0
#     for images, labels, _ in train_loader:
#         images, labels = images.to(device), labels.to(device)
        
#         # Zero the parameter gradients
#         optimizer.zero_grad()

#         # Forward pass
#         outputs = model(images)
#         loss = criterion(outputs, labels)

#         # Backward pass and optimize
#         loss.backward()
#         optimizer.step()

#         running_loss += loss.item() * images.size(0)
    
#     epoch_loss = running_loss / len(train_loader.dataset)
#     print(f'Epoch {epoch+1}/{num_epochs} - Loss: {epoch_loss:.4f}')

#     # Validation loop (optional, but recommended)
#     model.eval()  # Set the model to evaluation mode
#     validation_loss = 0.0
#     correct = 0
#     total = 0
#     with torch.no_grad():
#         for images, labels, _ in validation_loader:
#             images, labels = images.to(device), labels.to(device)
#             outputs = model(images)
#             loss = criterion(outputs, labels)
#             validation_loss += loss.item() * images.size(0)
#             _, predicted = torch.max(outputs.data, 1)
#             total += labels.size(0)
#             correct += (predicted == labels).sum().item()

#     epoch_val_loss = validation_loss / len(validation_loader.dataset)
#     val_accuracy = correct / total
#     print(f'Validation Loss: {epoch_val_loss:.4f}, Accuracy: {val_accuracy:.4f}')

In [17]:
# Check for available device
if torch.cuda.is_available():
    device = torch.device("cuda")
elif torch.backends.mps.is_available():
    device = torch.device("mps")
else:
    device = torch.device("cpu")

model.to(device)
print(f"Using device: {device}")

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
num_epochs = 20

# Dictionaries to store metrics
training_stats = {'train_loss': [], 'train_accuracy': [], 'val_loss': [], 'val_accuracy': []}

# Training and validation loop
for epoch in range(num_epochs):
    # Training phase
    model.train()
    train_loss = 0.0
    correct_train = 0
    total_train = 0

    for images, labels, _ in train_loader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        train_loss += loss.item() * images.size(0)
        _, predicted = torch.max(outputs.data, 1)
        total_train += labels.size(0)
        correct_train += (predicted == labels).sum().item()

    train_accuracy = 100 * correct_train / total_train
    training_stats['train_loss'].append(train_loss / len(train_loader.dataset))
    training_stats['train_accuracy'].append(train_accuracy)
    
    print(f'Epoch {epoch+1}/{num_epochs} - Train Loss: {training_stats["train_loss"][-1]:.4f}, Train Accuracy: {train_accuracy:.2f}%')

    # Validation phase
    model.eval()
    val_loss = 0.0
    correct_val = 0
    total_val = 0

    with torch.no_grad():
        for images, labels, _ in validation_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item() * images.size(0)
            _, predicted = torch.max(outputs.data, 1)
            total_val += labels.size(0)
            correct_val += (predicted == labels).sum().item()

    val_accuracy = 100 * correct_val / total_val
    training_stats['val_loss'].append(val_loss / len(validation_loader.dataset))
    training_stats['val_accuracy'].append(val_accuracy)
    
    print(f'Validation Loss: {training_stats["val_loss"][-1]:.4f}, Validation Accuracy: {val_accuracy:.2f}%')


Using device: mps


TypeError: pic should be PIL Image or ndarray. Got <class 'torch.Tensor'>

In [None]:
def test_model(model, test_loader, criterion, device):
    model.eval()  # Set the model to evaluation mode
    test_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels, _ in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            test_loss += loss.item() * images.size(0)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    average_test_loss = test_loss / len(test_loader.dataset)
    test_accuracy = correct / total
    print(f'Test Loss: {average_test_loss:.4f}, Accuracy: {test_accuracy:.4f}')
    return average_test_loss, test_accuracy

test_model(model, test_loader, criterion=criterion, device=device)

In [None]:
mps_available = hasattr(torch.backends, "mps") and torch.backends.mps.is_available()

print(mps_available)