In [2]:
!pip install torchvision

Collecting torchvision
  Downloading torchvision-0.20.1-cp310-cp310-win_amd64.whl.metadata (6.2 kB)
Collecting torch==2.5.1 (from torchvision)
  Downloading torch-2.5.1-cp310-cp310-win_amd64.whl.metadata (28 kB)
Downloading torchvision-0.20.1-cp310-cp310-win_amd64.whl (1.6 MB)
   ---------------------------------------- 1.6/1.6 MB 5.5 MB/s eta 0:00:00
Downloading torch-2.5.1-cp310-cp310-win_amd64.whl (203.1 MB)
   ---------------------------------------- 203.1/203.1 MB 1.8 MB/s eta 0:00:00
Installing collected packages: torch, torchvision
  Attempting uninstall: torch
    Found existing installation: torch 2.5.0
    Uninstalling torch-2.5.0:
      Successfully uninstalled torch-2.5.0
Successfully installed torch-2.5.1 torchvision-0.20.1



[notice] A new release of pip is available: 24.1.2 -> 24.3.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [3]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, MinMaxScaler
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision.datasets import OxfordIIITPet

In [4]:
class DataProcess:
    def __init__(self, resize=(128, 128)):
        self.resize = resize
        # Image transformations
        self.transform_image = transforms.Compose([
            transforms.Resize(self.resize),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406],
                              std=[0.229, 0.224, 0.225])
        ])
        # Mask transformations
        self.transform_mask = transforms.Compose([
            transforms.Resize(self.resize, interpolation=Image.NEAREST),
            transforms.ToTensor()
        ])

    def load_oxford_pet(self, root='./data'):
        dataset = OxfordIIITPet(
            root=root,
            download=True,
            target_types=['category', 'segmentation']
        )
        return dataset

    def preprocess_oxford_pet(self, dataset):
        # Initialize lists to store processed data
        processed_images = []
        processed_masks = []
        processed_labels = []

        print("Processing dataset...")
        for idx, (img, targets) in enumerate(dataset):
            try:
                # Apply transformations
                transformed_img = self.transform_image(img)
                transformed_mask = self.transform_mask(targets[1])  # targets[1] is the segmentation mask
                label = targets[0]  # targets[0] is the category label

                # Convert to numpy and append
                processed_images.append(transformed_img.numpy())
                processed_masks.append(transformed_mask.numpy())
                processed_labels.append(label)

            except Exception as e:
                print(f"Error processing sample {idx}: {str(e)}")
                continue

        # Convert lists to numpy arrays
        processed_images = np.array(processed_images)
        processed_masks = np.array(processed_masks)
        processed_labels = np.array(processed_labels)

        # Split the data
        train_idx, test_idx = train_test_split(
            np.arange(len(processed_labels)),
            test_size=0.2,
            random_state=42,
            stratify=processed_labels
        )

        # Create train and test sets
        train_images = processed_images[train_idx]
        test_images = processed_images[test_idx]
        train_masks = processed_masks[train_idx]
        test_masks = processed_masks[test_idx]
        train_labels = processed_labels[train_idx]
        test_labels = processed_labels[test_idx]

        return (train_images, test_images,
                train_labels, test_labels,
                train_masks, test_masks)

In [5]:
class PetDataset(Dataset):
    def __init__(self, images, masks, labels):
        """
        Args:
            images (np.ndarray): Image data with shape (N, C, H, W)
            masks (np.ndarray): Mask data with shape (N, 1, H, W)
            labels (np.ndarray): Label data with shape (N,)
        """
        self.images = images
        self.masks = masks
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        # Convert numpy arrays to tensors
        image = torch.from_numpy(self.images[idx]).float()
        mask = torch.from_numpy(self.masks[idx]).float()
        label = torch.tensor(self.labels[idx], dtype=torch.long)

        return image, mask, label

In [6]:
# Neural Network Classes

# CNN for Classification
class CNN(nn.Module):
    def __init__(self, num_classes=37):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(64 * 16 * 16, 512)  # Adjust these numbers based on your input size
        self.fc2 = nn.Linear(512, num_classes)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.pool(self.relu(self.conv1(x)))
        x = self.pool(self.relu(self.conv2(x)))
        x = self.pool(self.relu(self.conv3(x)))
        x = x.view(-1, 64 * 16 * 16)  # Adjust these numbers based on your input size
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

# U-Net for Segmentation
class UNet(nn.Module):
    def __init__(self):
        super(UNet, self).__init__()
        # Encoder
        self.enc1 = self.conv_block(3, 64)
        self.enc2 = self.conv_block(64, 128)
        self.enc3 = self.conv_block(128, 256)
        self.enc4 = self.conv_block(256, 512)

        # Decoder
        self.up1 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)
        self.dec1 = self.conv_block(512, 256)
        self.up2 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.dec2 = self.conv_block(256, 128)
        self.up3 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.dec3 = self.conv_block(128, 64)

        self.final = nn.Conv2d(64, 1, kernel_size=1)

    def conv_block(self, in_channels, out_channels):
        block = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU()
        )
        return block

    def forward(self, x):
        enc1 = self.enc1(x)
        enc2 = self.enc2(nn.MaxPool2d(2,2)(enc1))
        enc3 = self.enc3(nn.MaxPool2d(2,2)(enc2))
        enc4 = self.enc4(nn.MaxPool2d(2,2)(enc3))

        dec1 = self.up1(enc4)
        dec1 = torch.cat((dec1, enc3), dim=1)
        dec1 = self.dec1(dec1)

        dec2 = self.up2(dec1)
        dec2 = torch.cat((dec2, enc2), dim=1)
        dec2 = self.dec2(dec2)

        dec3 = self.up3(dec2)
        dec3 = torch.cat((dec3, enc1), dim=1)
        dec3 = self.dec3(dec3)

        out = self.final(dec3)
        return out

# Recurrent Neural Networks

# Simple RNN
class RNNModel(nn.Module):
    def __init__(self, input_size=30, hidden_size=64, num_layers=2):
        super(RNNModel, self).__init__()
        self.rnn = nn.RNN(input_size, hidden_size, num_layers, nonlinearity='tanh', batch_first=True)
        self.fc = nn.Linear(hidden_size, 1)

    def forward(self, x):
        out, _ = self.rnn(x)
        out = self.fc(out[:, -1, :])
        return out

# LSTM
class LSTMModel(nn.Module):
    def __init__(self, input_size=30, hidden_size=64, num_layers=2):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, 1)

    def forward(self, x):
        out, _ = self.lstm(x)
        out = self.fc(out[:, -1, :])
        return out

# Transformer
class TransformerModel(nn.Module):
    def __init__(self, input_size=30, nhead=4, num_layers=2, hidden_dim=128):
        super(TransformerModel, self).__init__()
        self.transformer = nn.Transformer(
            d_model=input_size,
            nhead=nhead,
            num_encoder_layers=num_layers,
            num_decoder_layers=num_layers,
            dim_feedforward=hidden_dim
        )
        self.fc = nn.Linear(input_size, 1)

    def forward(self, src, tgt):
        out = self.transformer(src, tgt)
        out = self.fc(out[-1,:,:])
        return out


In [7]:
# Task 1.3: Training and Evaluation
def train_model(model, train_loader, criterion, optimizer, device, epochs=5, early_stopping_patience=5):
    model.train()
    best_loss = float('inf')
    patience_counter = 0

    for epoch in range(epochs):
        running_loss = 0.0
        for batch_idx, (images, masks, labels) in enumerate(train_loader):
            # Move data to device
            images = images.to(device)
            labels = labels.to(device)  # We only need labels for classification

            # Zero the parameter gradients
            optimizer.zero_grad()

            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Backward pass and optimize
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

            if batch_idx % 10 == 0:
                print(f'Epoch [{epoch+1}/{epochs}], Batch [{batch_idx+1}/{len(train_loader)}], '
                      f'Loss: {loss.item():.4f}')

        # Early stopping check
        epoch_loss = running_loss / len(train_loader)
        if epoch_loss < best_loss:
            best_loss = epoch_loss
            patience_counter = 0
        else:
            patience_counter += 1

        if patience_counter >= early_stopping_patience:
            print(f'Early stopping triggered after {epoch+1} epochs')
            break

        print(f'Epoch [{epoch+1}/{epochs}] Loss: {epoch_loss:.4f}')

def evaluate_model(model, test_loader, criterion, device, task_type='classification'):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0

    with torch.no_grad():
        for images, masks, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)

            outputs = model(images)
            loss = criterion(outputs, labels)
            total_loss += loss.item()

            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    avg_loss = total_loss / len(test_loader)
    accuracy = 100 * correct / total
    print(f'Test Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%')
    return avg_loss, accuracy

In [8]:
# Additional Training and Evaluation for U-Net and Recurrent Models

def train_unet(model, train_loader, criterion, optimizer, device, epochs=5, early_stopping_patience=5):
    model.train()
    best_loss = float('inf')
    patience_counter = 0

    for epoch in range(epochs):
        running_loss = 0.0
        for batch_idx, (images, masks, _) in enumerate(train_loader):  # Ignore labels
            # Move data to device
            images = images.to(device)
            masks = masks.to(device)

            # Zero the parameter gradients
            optimizer.zero_grad()

            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, masks)

            # Backward pass and optimize
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

            if batch_idx % 10 == 0:
                print(f'Epoch [{epoch+1}/{epochs}], Batch [{batch_idx+1}/{len(train_loader)}], '
                      f'Loss: {loss.item():.4f}')

        # Early stopping check
        epoch_loss = running_loss / len(train_loader)
        if epoch_loss < best_loss:
            best_loss = epoch_loss
            patience_counter = 0
        else:
            patience_counter += 1

        if patience_counter >= early_stopping_patience:
            print(f'Early stopping triggered after {epoch+1} epochs')
            break

        print(f'Epoch [{epoch+1}/{epochs}] Loss: {epoch_loss:.4f}')

def evaluate_unet(model, test_loader, criterion, device):
    model.eval()
    total_loss = 0

    with torch.no_grad():
        for images, masks, _ in test_loader:  # Ignore labels
            images = images.to(device)
            masks = masks.to(device)

            outputs = model(images)
            loss = criterion(outputs, masks)
            total_loss += loss.item()

    avg_loss = total_loss / len(test_loader)
    print(f'Test Loss: {avg_loss:.4f}')
    return avg_loss

def dice_coef(pred, mask):
    pred = pred.astype(bool)
    mask = mask.astype(bool)
    intersection = np.logical_and(pred, mask).sum()
    union = pred.sum() + mask.sum()
    if union == 0:
        return 1.0
    return 2. * intersection / union

def train_regression_model(model, dataloader, criterion, optimizer, device, epochs=5, early_stopping_patience=5):
    model.to(device)
    best_loss = float('inf')
    patience = early_stopping_patience
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for sequences, targets in dataloader:
            sequences = sequences.to(device).unsqueeze(-1)  # Add feature dimension
            targets = targets.to(device).unsqueeze(-1)

            optimizer.zero_grad()
            outputs = model(sequences)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        epoch_loss = running_loss / len(dataloader)
        print(f'Epoch {epoch+1}/{epochs}, Loss: {epoch_loss:.4f}')

        if epoch_loss < best_loss:
            best_loss = epoch_loss
            patience = early_stopping_patience
            # Save the best model if needed
        else:
            patience -= 1
            if patience == 0:
                print("Early stopping")
                break

def evaluate_regression_model(model, dataloader, criterion, device):
    model.eval()
    model.to(device)
    running_loss = 0.0
    all_preds = []
    all_targets = []
    with torch.no_grad():
        for sequences, targets in dataloader:
            sequences = sequences.to(device).unsqueeze(-1)
            targets = targets.to(device).unsqueeze(-1)
            outputs = model(sequences)
            loss = criterion(outputs, targets)
            running_loss += loss.item()
            all_preds.extend(outputs.cpu().numpy())
            all_targets.extend(targets.cpu().numpy())

    epoch_loss = running_loss / len(dataloader)
    mae = np.mean(np.abs(np.array(all_preds) - np.array(all_targets)))
    rmse = np.sqrt(np.mean((np.array(all_preds) - np.array(all_targets))**2))
    print(f'Loss: {epoch_loss:.4f}, MAE: {mae:.4f}, RMSE: {rmse:.4f}')


In [9]:
# Transformer Training and Evaluation
class TransformerDataset(Dataset):
    def __init__(self, sequences, targets):
        self.sequences = sequences
        self.targets = targets

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        src = torch.tensor(self.sequences[idx], dtype=torch.float32).unsqueeze(1)  # (seq_len, feature)
        tgt = torch.tensor([self.targets[idx]], dtype=torch.float32)  # (1)
        return src, tgt

def train_transformer(model, dataloader, criterion, optimizer, device, epochs=5, early_stopping_patience=5):
    model.to(device)
    best_loss = float('inf')
    patience = early_stopping_patience
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for src, tgt in dataloader:
            src = src.to(device).permute(1, 0, 2)  # Transformer expects (seq_len, batch, feature)
            tgt_input = src  # For simplicity, using src as tgt_input
            tgt = tgt.to(device).unsqueeze(0)  # (1, batch, 1)

            optimizer.zero_grad()
            outputs = model(src, tgt_input)
            loss = criterion(outputs, tgt)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        epoch_loss = running_loss / len(dataloader)
        print(f'Epoch {epoch+1}/{epochs}, Loss: {epoch_loss:.4f}')

        if epoch_loss < best_loss:
            best_loss = epoch_loss
            patience = early_stopping_patience
            # Save the best model if needed
        else:
            patience -= 1
            if patience == 0:
                print("Early stopping")
                break

def evaluate_transformer(model, dataloader, criterion, device):
    model.eval()
    model.to(device)
    running_loss = 0.0
    all_preds = []
    all_targets = []
    with torch.no_grad():
        for src, tgt in dataloader:
            src = src.to(device).permute(1, 0, 2)
            tgt_input = src
            tgt = tgt.to(device).unsqueeze(0)
            outputs = model(src, tgt_input)
            loss = criterion(outputs, tgt)
            running_loss += loss.item()
            all_preds.extend(outputs.cpu().numpy().flatten())
            all_targets.extend(tgt.cpu().numpy().flatten())

    epoch_loss = running_loss / len(dataloader)
    mae = np.mean(np.abs(np.array(all_preds) - np.array(all_targets)))
    rmse = np.sqrt(np.mean((np.array(all_preds) - np.array(all_targets))**2))
    print(f'Loss: {epoch_loss:.4f}, MAE: {mae:.4f}, RMSE: {rmse:.4f}')

# Visualization Functions
def visualize_training_history(history, metric='loss'):
    plt.figure(figsize=(10,6))
    plt.plot(history['train'], label='Train')
    plt.plot(history['val'], label='Validation')
    plt.title(f'Training and Validation {metric.capitalize()}')
    plt.xlabel('Epochs')
    plt.ylabel(metric.capitalize())
    plt.legend()
    plt.show()

def visualize_segmentation_results(model, dataloader, device, num_samples=5):
    model.eval()
    model.to(device)
    samples = 0
    with torch.no_grad():
        for inputs, masks, _ in dataloader:
            inputs = inputs.to(device)
            masks = masks.to(device)
            outputs = model(inputs)
            preds = torch.sigmoid(outputs) > 0.5
            for i in range(inputs.size(0)):
                if samples >= num_samples:
                    return
                img = inputs[i].cpu().permute(1, 2, 0).numpy()
                img = (img * 0.5) + 0.5  # Denormalize
                mask = masks[i].cpu().squeeze().numpy()
                pred = preds[i].cpu().squeeze().numpy()

                fig, axs = plt.subplots(1,3, figsize=(15,5))
                axs[0].imshow(img)
                axs[0].set_title('Input Image')
                axs[1].imshow(mask, cmap='gray')
                axs[1].set_title('Ground Truth Mask')
                axs[2].imshow(pred, cmap='gray')
                axs[2].set_title('Predicted Mask')
                plt.show()
                samples += 1


In [10]:
if __name__ == "__main__":
    # Initialize data processor
    data_processor = DataProcess(resize=(128, 128))

    print("Loading Oxford-IIIT Pet Dataset...")
    pet_dataset = data_processor.load_oxford_pet()

    print("Preprocessing dataset...")
    try:
        (train_images, test_images,
         train_labels, test_labels,
         train_masks, test_masks) = data_processor.preprocess_oxford_pet(pet_dataset)

        # Create dataset objects
        train_dataset = PetDataset(train_images, train_masks, train_labels)
        test_dataset = PetDataset(test_images, test_masks, test_labels)

        # Create data loaders
        train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
        test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

        print(f"Training samples: {len(train_dataset)}")
        print(f"Testing samples: {len(test_dataset)}")

        # Initialize device
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        print(f"Using device: {device}")

        # Initialize models
        cnn_model = CNN(num_classes=37).to(device)  # 37 classes for Oxford Pet dataset
        unet_model = UNet().to(device)

        # Define loss functions and optimizers
        criterion_cls = nn.CrossEntropyLoss()
        criterion_seg = nn.BCEWithLogitsLoss()

        optimizer_cnn = optim.Adam(cnn_model.parameters(), lr=0.001)
        optimizer_unet = optim.Adam(unet_model.parameters(), lr=0.001)

        # Train and evaluate CNN
        print("\nTraining CNN...")
        train_model(cnn_model, train_loader, criterion_cls, optimizer_cnn, device)
        print("\nEvaluating CNN...")
        evaluate_model(cnn_model, test_loader, criterion_cls, device)

        # Train and evaluate U-Net
        print("\nTraining U-Net...")
        train_unet(unet_model, train_loader, criterion_seg, optimizer_unet, device)
        print("\nEvaluating U-Net...")
        evaluate_unet(unet_model, test_loader, criterion_seg, device)

    except Exception as e:
        print(f"An error occurred during processing: {str(e)}")
        raise

  transforms.Resize(self.resize, interpolation=Image.NEAREST),


Loading Oxford-IIIT Pet Dataset...
Downloading https://thor.robots.ox.ac.uk/pets/images.tar.gz to data\oxford-iiit-pet\images.tar.gz


100%|███████████████████████████████████████████████████████████████████████████████| 792M/792M [02:43<00:00, 4.84MB/s]


Extracting data\oxford-iiit-pet\images.tar.gz to data\oxford-iiit-pet
Downloading https://thor.robots.ox.ac.uk/pets/annotations.tar.gz to data\oxford-iiit-pet\annotations.tar.gz


100%|█████████████████████████████████████████████████████████████████████████████| 19.2M/19.2M [00:10<00:00, 1.80MB/s]


Extracting data\oxford-iiit-pet\annotations.tar.gz to data\oxford-iiit-pet
Preprocessing dataset...
Processing dataset...
Training samples: 2944
Testing samples: 736
Using device: cpu

Training CNN...
Epoch [1/5], Batch [1/92], Loss: 3.6239
Epoch [1/5], Batch [11/92], Loss: 3.5608
Epoch [1/5], Batch [21/92], Loss: 3.6179
Epoch [1/5], Batch [31/92], Loss: 3.6096
Epoch [1/5], Batch [41/92], Loss: 3.6215
Epoch [1/5], Batch [51/92], Loss: 3.5956
Epoch [1/5], Batch [61/92], Loss: 3.5698
Epoch [1/5], Batch [71/92], Loss: 3.5727
Epoch [1/5], Batch [81/92], Loss: 3.5448
Epoch [1/5], Batch [91/92], Loss: 3.4910
Epoch [1/5] Loss: 3.5913
Epoch [2/5], Batch [1/92], Loss: 3.4545
Epoch [2/5], Batch [11/92], Loss: 3.3515
Epoch [2/5], Batch [21/92], Loss: 3.5078
Epoch [2/5], Batch [31/92], Loss: 3.3839
Epoch [2/5], Batch [41/92], Loss: 3.1974
Epoch [2/5], Batch [51/92], Loss: 3.2984
Epoch [2/5], Batch [61/92], Loss: 3.2404
Epoch [2/5], Batch [71/92], Loss: 3.2992
Epoch [2/5], Batch [81/92], Loss: 3.35