# iLab Project: Group 7-2

#### Federated Learning for Gastric Cancer Detection - gashissdb

David Bain 91082596

In [None]:
import io
import os.path
import random

import logging

import boto3
from PIL import Image
import pandas as pd
import numpy as np
from typing import List, Tuple
from typing import Optional
from botocore.exceptions import BotoCoreError, ClientError
from matplotlib import pyplot as plt

import torchvision.models as models
import torchvision.transforms as v2
import torch
from torch.optim import lr_scheduler
from torch import Tensor
from torch.utils.data import Dataset, DataLoader, random_split
import torch.nn as nn
import torch.optim as optim

Read in the aws keys to access s3

In [None]:
key_df = pd.read_csv('../team_user_accessKeys.csv', sep=',')

Access the s3 bucket with the images and create an s3_client object

In [None]:
# Replace the following values with your access key details
AWS_ACCESS_KEY_ID = key_df.iloc[0,0]
AWS_SECRET_ACCESS_KEY = key_df.iloc[0,1]
region_name = 'ap-southeast-2'

# Create an S3 client using the provided access keys
s3_client = boto3.client(
    's3',
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
    region_name=region_name
)

# List all the buckets
buckets = s3_client.list_buckets()
for bucket in buckets['Buckets']:
    print(bucket['Name'])

Read data files from local disk (not AWS)

In [None]:
clinic1 = 'clinic1-80'
clinic2 = 'clinic2-80'
clinic3 = 'clinic3-80'
clinic4 = 'clinic4-80'


[1] Load data from clinic 0

In [None]:
def list_s3_bucket_images(s3_client: 'boto3.client', bucket_n: str, prefix: str = '') -> List[str]:
    
    paginator = s3_client.get_paginator('list_objects_v2')
    pages = paginator.paginate(Bucket=bucket_n, Prefix=prefix)

    image_keys = []
    for page in pages:
        if 'Contents' in page:
            for obj in page['Contents']:
                if obj['Key'].lower().endswith(('.png', '.jpg', '.jpeg')):
                    image_keys.append(obj['Key'])
    return image_keys

In [None]:
bucket_image_list = list_s3_bucket_images(s3_client, clinic1, '')

[2] Read an image from s3

In [None]:
def read_image_from_s3(s3_client: 'boto3.client', bucket_n: str, image_key: str) -> Optional[Image]:

    try:
        obj = s3_client.get_object(Bucket=bucket_n, Key=image_key)
        img_data = obj['Body'].read()

        # Convert bytes data to a file-like object
        img_bytes = io.BytesIO(img_data)

        # Use PIL to open the image
        image = Image.open(img_bytes).convert('RGB')

        return image

    # Handle S3 related errors
    except (BotoCoreError, ClientError) as e:
        print(f"Error accessing S3 for image {image_key}: {e}")

    # Handle decoding errors
    except (ValueError, IOError) as e:
        print(f"Error decoding image {image_key}: {e}")

    return None


[3] Process the images from disk without transformation.

Resizing depends on the desired neural network architecture which have a min of 32 x 32

In [None]:
# Read in all images from disk and store in an array. Preprocessing is done later

def load_images(s3_client: 'boto3.client', bucket_n: str, prefixes: List[str]) -> List[Tuple[int, Image]]:
    
    images = []
    
    for prefix in prefixes:
        print(f"Processing images for prefix: {prefix}")
        image_keys = list_s3_bucket_images(s3_client, bucket_n, prefix)
        
        for image_key in image_keys:
            try:
                image = read_image_from_s3(s3_client, bucket_n, image_key)
                if image is not None:
                    bin_image_key = image_key.split('/')[1] 
                    images.append((bin_image_key, image))
                     
                else:
                    print(f"Failed to load image {image_key}")
            except Exception as e:
                print(f"Error processing image {image_key}: {e}")
    return images


Define the s3 'prefix' which relates to the directory structure or clinic: 0, 1, 2, 3

Define image preprocessiing

In [None]:
def preprocess_images(s3_client: 'boto3.client', bucket_name: str, prefixes: List[str]) -> List[Tuple[int, Image]]:
    
    # List all image keys
    for prefix in prefixes:
        print(f"Listing images for prefixes: {prefix}")
        image_keys = list_s3_bucket_images(s3_client, bucket_name, prefix)
        print(f"Total number of images: {len(image_keys)}\n")
    
    # Process images in memory
    loaded_images = load_images(s3_client, bucket_name, prefixes)
    print(f"Total number of loaded images: {len(loaded_images)}")
    
    return loaded_images


In [None]:
clinic_1_images = preprocess_images(s3_client, clinic1, ['abnormal', 'normal'])
#clinic_2_test_images = preprocess_images(s3_client, bucket_name, clinic_0_test_data)

In [None]:
# Print first and last image and classification
print(clinic_1_images[0])
print(clinic_1_images[-1])


Read from disk

In [None]:
image_dir = os.path.expanduser('~/Development/GasHisSDB/80/clinic1')

# Configure logging
logging.basicConfig(level=logging.INFO)

In [None]:
def read_image_with_label(file_path, label):
    try:
        with Image.open(file_path) as image:  # Ensure proper file handling
            image.load()  # Load image data into memory
            return image, label
    except Exception as e:
        logging.error(f"Error reading {file_path}: {e}")
        return None, None


def process_directory(directory_path, label, batch_size=100):
    image_files = [os.path.join(directory_path, f) for f in os.listdir(directory_path)
                   if f.endswith('.jpg') or f.endswith('.png')]

    images_with_labels = []

    for i, file_path in enumerate(image_files):
        img, lbl = read_image_with_label(file_path, label)
        if img is not None:
            images_with_labels.append((img, lbl))

        # Process in batches to avoid memory issues
        if (i + 1) % batch_size == 0:
            #logging.info(f"Processed {i + 1} / {len(image_files)} images from {directory_path}")
            yield images_with_labels
            images_with_labels = []

    # Yield remaining images
    if images_with_labels:
        yield images_with_labels


def read_images_for_classification(base_directory, batch_size=100):
    abnormal_dir = os.path.join(base_directory, 'abnormal')
    normal_dir = os.path.join(base_directory, 'normal')

    abnormal_images_batches = process_directory(abnormal_dir, label=1, batch_size=batch_size)
    normal_images_batches = process_directory(normal_dir, label=0, batch_size=batch_size)

    for abnormal_batch in abnormal_images_batches:
        for img, lbl in abnormal_batch:
            yield img, lbl

    for normal_batch in normal_images_batches:
        for img, lbl in normal_batch:
            yield img, lbl


In [None]:
images = []

for img, lbl in read_images_for_classification(image_dir):
    # Process each image and label here
    images.append((lbl, img))


Define function to print a handful of original images

In [None]:
def plot_images(images: List[Image], titles: List[str], rows: int, cols: int) -> None:
    """
    Plot a list of images with their titles using matplotlib.

    :param images: List of images to plot
    :param titles: List of titles corresponding to the images
    :param rows: Number of rows in the plot
    :param cols: Number of columns in the plot
    """
    fig, axes = plt.subplots(rows, cols, figsize=(12, 8))
    axes = axes.flatten()

    for img, ax, title in zip(images, axes, titles):
        ax.imshow(img)
        ax.set_title(title)
        ax.axis('off')

    plt.tight_layout()
    plt.show()

Read images and plot

In [None]:
# Access a random sample of 5 images
clinic_1_images = images

if clinic_1_images:
    random_sample = random.sample(clinic_1_images, min(10, len(clinic_1_images)))
    sampled_images = [img_data for label, img_data in random_sample]
    sampled_titles = [label for label, img_data in random_sample]

    # Display images using matplotlib
    num_images = len(sampled_images)
    plot_images(sampled_images, sampled_titles, 2, int(np.ceil(num_images / 2)))


In [None]:
sampled_images

Resolution of images are small in size and vary. This will limit the transfer learning models used 

Image Transformation
1) Transform to tensor
2) Normalise
3) Augmentation

In [None]:
def preprocess_images(images: List[Tuple[int, Image]], size: List[int]) -> List[Tuple[int, Image]]:
    
    transformed_images = []
    transform = v2.Compose([
        v2.Resize(size, interpolation=Image.BICUBIC),
        #v2.RandomRotation(40),
        #v2.RandomHorizontalFlip(),
        #v2.RandomVerticalFlip(),
        v2.ToTensor(),
        v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    
    for label, image in images:
        transformed_images.append((label, transform(image)))
    
    return transformed_images
    


In [None]:
#target_size = [32, 32]
target_size = [224, 224]

clinic_1_ds = preprocess_images(clinic_1_images, target_size)

In [None]:
clinic_1_ds_trunc = []
for label, image in clinic_1_ds:
    trunc_label = label.split('-')[0]
    if trunc_label == 'Abnormal':
        trunc_label = 1
    else: 
        trunc_label = 0
    clinic_1_ds_trunc.append((trunc_label, image))

In [None]:
print(clinic_1_ds[0], clinic_1_ds[-1])

In [None]:
train_ds_c1, test_ds_c1 = random_split(clinic_1_ds, [int(len(clinic_1_ds) * 0.8), len(clinic_1_ds) - int(len(clinic_1_ds) * 0.8)])

Create a custom Dataset for the images

In [None]:
# Create a custom Dataset for the images
class CustomDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        # Retrieve the item (label, image) from the list
        label, image = self.data[idx]
        # Return the image and label
        return image, label

Create an instance of the custom dataset

In [None]:
# Create custom datasets

train_dataset = CustomDataset(train_ds_c1)
test_dataset = CustomDataset(test_ds_c1)

Create a DataLoader

In [None]:
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

Make sure this runs on Mac GPU

In [None]:
# Check if MPS is available
if torch.backends.mps.is_available():
    device = torch.device("mps")
else:
    device = torch.device("cpu")

Define the first Torchvision model: VGG16

In [None]:
# Load pre-trained VGG16 model
model = models.vgg16(weights='IMAGENET1K_V1')

# Freeze the parameters of the base model
for param in model.features.parameters():
    param.requires_grad = False
    
# Modify the classifier part for binary classification
model.classifier[6] = nn.Sequential(
    nn.Linear(model.classifier[6].in_features, 512),
    nn.ReLU(),
    nn.Dropout(p=0.5),
    nn.Linear(512, 1),
    nn.Sigmoid()
)

model.to(device)



Training Loop

In [None]:
# Loss function
criterion = nn.BCELoss()

# Optimiser
optimiser = optim.SGD(model.classifier.parameters(), lr=0.001, momentum=0.9)

num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    train_loss = 0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.type(torch.FloatTensor).to(device)

        optimiser.zero_grad()  # Zero the parameter gradients

        # Removed the torch.no_grad() part from the training loop
        features = model.features(inputs)
        features_cpu = features.to('cpu')  # Move to CPU for AdaptiveAvgPool2d
        pooled_cpu = torch.nn.functional.adaptive_avg_pool2d(features_cpu, model.avgpool.output_size)
        pooled = pooled_cpu.to(device)  # Move back to MPS device
        outputs = model.classifier(torch.flatten(pooled, 1))

        outputs = outputs.view(-1)  # Flatten output to match labels shape        
        loss = criterion(outputs, labels)
        loss.backward()
        optimiser.step()

        train_loss += loss.item() * inputs.size(0)

    model.eval()
    test_loss = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.type(torch.FloatTensor).to(device)

            features = model.features(inputs)
            features_cpu = features.to('cpu')  # Move to CPU for AdaptiveAvgPool2d
            pooled_cpu = torch.nn.functional.adaptive_avg_pool2d(features_cpu, model.avgpool.output_size)
            pooled = pooled_cpu.to(device)  # Move back to MPS device
            outputs = model.classifier(torch.flatten(pooled, 1))

            outputs = outputs.view(-1)  # Flatten output to match labels shape
            loss = criterion(outputs, labels)

            test_loss += loss.item() * inputs.size(0)

    train_loss /= len(train_loader.dataset)
    test_loss /= len(test_loader.dataset)

    print(f'Epoch [{epoch + 1}/{num_epochs}], Training Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}')

# Save the trained model
#torch.save(model.state_dict(), 'vgg16_cancer_detection.pth')



Test Classification results

In [None]:
def evaluate(model, test_loader):
    model.eval()
    total_loss = 0.0
    correct_predictions = 0
    incorrect_predictions = 0
    total = 0

    for images, labels in test_loader:
        images, labels = images.to(device).float(), labels.to(device).float().unsqueeze(1)

        with torch.no_grad():
            features = model.features(images)
            # Move tensor to CPU for the pooling operation
            features_cpu = features.to('cpu')
            pooled_cpu = torch.nn.functional.adaptive_avg_pool2d(features_cpu, model.avgpool.output_size)
            pooled = pooled_cpu.to(device)  # Move back to device
            outputs = model.classifier(torch.flatten(pooled, 1))

            loss = criterion(outputs, labels)
            total_loss += loss.item()

            predicted = torch.sigmoid(outputs).round()
            total += labels.size(0)
            correct_predictions += (predicted == labels).sum().item()
            incorrect_predictions += (predicted != labels).sum().item()

    accuracy = correct_predictions / total
    print(f"Correct predictions: {correct_predictions}")
    print(f"Incorrect predictions: {incorrect_predictions}")
    return total_loss / len(test_loader), accuracy




In [None]:
# Test the model for accuracy and loss
test_loss, test_accuracy = evaluate(model, test_loader)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")