<a href="https://colab.research.google.com/github/meisisoiisme/FH-ML_PyTorch/blob/main/Full_Pipeline_workfile.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [131]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torch.utils.data import DataLoader, Dataset
from torchvision import models


import numpy as np
import random
from PIL import Image, ImageDraw
import os
import shutil

import csv
import cv2
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt

ImageGenerator - Class
* generate_noisy_image()
* generate_images()

LandmarksImageGenerator(ImageGenerator) - Class
* generate_images_with_landmarks
* generate_random_landmarks





In [94]:
class ImageGenerator:
    def __init__(self, output_dir, num_classes=5, images_per_class=100, image_size=(224, 224), noise_factor=20):
        self.output_dir = output_dir
        self.num_classes = num_classes
        self.images_per_class = images_per_class
        self.image_size = image_size
        self.noise_factor = noise_factor

    def generate_noisy_image(self):
        # Generate a base image with random pixel values
        base_image = np.random.randint(0, 256, size=(self.image_size[1], self.image_size[0], 3), dtype=np.uint8)

        # Add random noise to the base image
        noisy_image = base_image + np.random.randint(-self.noise_factor, self.noise_factor + 1, size=base_image.shape)

        # Clip pixel values to the valid range [0, 255]
        noisy_image = np.clip(noisy_image, 0, 255).astype(np.uint8)

        # Convert the noisy image to a PIL Image
        return Image.fromarray(noisy_image)

    def generate_images(self):
        # Set a random seed for reproducibility
        np.random.seed(42)

        # Remove the output directory and its contents if they exist
        if os.path.exists(self.output_dir):
            shutil.rmtree(self.output_dir)

        # Create the output directory
        os.makedirs(self.output_dir)

        # Loop through each class
        for class_idx in range(self.num_classes):
            class_dir = os.path.join(self.output_dir, f"class_{class_idx}")
            os.makedirs(class_dir)

            # Loop through each image in the class
            for i in range(self.images_per_class):
                # Generate a base image with random pixel values
                base_image = np.random.randint(0, 256, size=(self.image_size[1], self.image_size[0], 3), dtype=np.uint8)

                # Add random noise to the base image
                noisy_image = base_image + np.random.randint(-self.noise_factor, self.noise_factor + 1, size=base_image.shape)

                # Clip pixel values to the valid range [0, 255]
                noisy_image = np.clip(noisy_image, 0, 255).astype(np.uint8)

                # Convert the noisy image to a PIL Image
                pil_image = Image.fromarray(noisy_image)

                # Save the image
                image_path = os.path.join(class_dir, f"image_{i}.png")
                pil_image.save(image_path)

        # Print information about the generated images
        print(f"{self.num_classes} classes with {self.images_per_class} images each generated and saved in {self.output_dir}")


class LandmarksImageGenerator(ImageGenerator):
    def __init__(self, output_dir, num_classes=5, images_per_class=100, image_size=(224, 224), noise_factor=20, num_landmarks=5):
        super().__init__(output_dir, num_classes, images_per_class, image_size, noise_factor)
        self.num_landmarks = num_landmarks

    def generate_images_with_landmarks(self):
        # Remove the output directory and its contents if they exist
        if os.path.exists(self.output_dir):
            shutil.rmtree(self.output_dir)

        # Create the output directory and landmarks CSV file
        os.makedirs(self.output_dir)
        csv_file_path = os.path.join(self.output_dir, "landmarks.csv")

        # Open the CSV file for writing
        with open(csv_file_path, mode='w', newline='') as csv_file:
            csv_writer = csv.writer(csv_file)
            csv_writer.writerow(['Image Path', 'Class', 'Landmark 1 X', 'Landmark 1 Y', 'Landmark 2 X', 'Landmark 2 Y', '...'])

            # Loop through each class
            for class_idx in range(self.num_classes):
                class_dir = os.path.join(self.output_dir, f"class_{class_idx}")
                os.makedirs(class_dir)

                # Loop through each image in the class
                for i in range(self.images_per_class):
                    # Generate a noisy image
                    noisy_image = self.generate_noisy_image()

                    # Generate random landmarks
                    landmarks = self.generate_random_landmarks()

                    # Convert the noisy image to a PIL Image
                    pil_image = Image.fromarray(np.array(noisy_image))

                    # Save the image
                    image_path = os.path.join(class_dir, f"image_{i}.png")
                    pil_image.save(image_path)

                    # Write landmark information to the CSV file
                    csv_writer.writerow([image_path, class_idx] + landmarks)

        # Print information about the generated images and landmarks
        print(f"{self.num_classes} classes with {self.images_per_class} images each generated and saved in {self.output_dir}")
        print(f"Landmark information saved in {csv_file_path}")

    def generate_random_landmarks(self):
        landmarks = []
        for landmark_idx in range(self.num_landmarks):
            # Generate random coordinates for each landmark
            landmark_x = random.randint(0, self.image_size[0])
            landmark_y = random.randint(0, self.image_size[1])
            landmarks.extend([landmark_x, landmark_y])

        return landmarks



CustomRandomDataset(Dataset) - Class  
* load_data()
* load_image()
* load_landmarks()
* _len_()
* getitem()

In [149]:
class CustomRandomDataset(Dataset):
    def __init__(self, num_samples, num_classes, image_size, data_path=None, landmarks_csv_path=None):
        self.num_samples = num_samples
        self.num_classes = num_classes
        self.image_size = image_size
        self.data = []
        self.labels = []
        self.data_path = data_path
        self.landmarks_csv_path = landmarks_csv_path
        self.num_landmarks = 5  # Assuming the default number of landmarks
        self.landmarks = self.load_landmarks()

        # Check if data_path and landmarks_csv_path are provided
        if data_path is None or landmarks_csv_path is None:
            raise ValueError("Both data_path and landmarks_csv_path must be provided.")

        # Check if the provided paths exist
        if not os.path.exists(data_path):
            raise FileNotFoundError(f"Data path not found: {data_path}")

        if not os.path.exists(landmarks_csv_path):
            raise FileNotFoundError(f"Landmarks CSV path not found: {landmarks_csv_path}")

        # Load data
        self.load_data()

    def load_data(self):
        if self.data_path is not None:
            for label in range(self.num_classes):
                label_path = os.path.join(self.data_path, f"class_{label}")
                if os.path.exists(label_path):
                    class_images = [os.path.join(label_path, image_file) for image_file in os.listdir(label_path)]
                    if len(class_images) == 0:
                        print(f"Warning: Class {label} has no images.")
                        continue

                    self.data.extend(class_images)
                    self.labels.extend([label] * len(class_images))

        # Debugging print statements
        print(f"Available image paths: {self.data}")

        # Shuffle the data after loading
        random.shuffle(self.data)
        random.shuffle(self.labels)

    def load_image(self, image_path):
        try:
            image = Image.open(image_path).convert("RGB")
            image = image.resize((self.image_size, self.image_size))
            return transforms.ToTensor()(image)
        except Exception as e:
            print(f"Error loading image {image_path}: {e}")
            return None

    def load_landmarks(self):
        landmarks = np.zeros((len(self.data), 2 * self.num_landmarks), dtype=np.float32)

        try:
            with open(self.landmarks_csv_path, mode='r') as csv_file:
                csv_reader = csv.reader(csv_file)
                header = next(csv_reader)  # Skip header row

                # Debugging print statement
                print(f"CSV Header: {header}")

                for i, row in enumerate(csv_reader):
                    if len(row) < 2 * self.num_landmarks + 2:
                        print(f"Error: Incomplete row in CSV at index {i + 1}. Expected {2 * self.num_landmarks + 2} columns, found {len(row)}.")
                        continue

                    image_path, _, *landmark_values = row

                    # Construct the full image path by joining data_path and image_path
                    full_image_path = os.path.join(self.data_path, image_path)

                    # Debugging print statements
                    print(f"Checking image path: {full_image_path}")
                    print(f"Available image paths: {self.data}")

                    # Check if the full_image_path is in the list of available image paths
                    if full_image_path not in self.data:
                        print(f"Warning: Image path {full_image_path} not in list of available image paths.")
                        continue

                    image_index = self.data.index(full_image_path)

                    # Debugging print statements
                    print(f"Landmark values: {landmark_values}")

                    landmarks[image_index] = list(map(float, landmark_values))

        except Exception as e:
            print(f"Error while reading landmarks CSV: {e}")

        return torch.tensor(landmarks)



    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        try:
            image_path = self.data[index]
            label = self.labels[index]
            image = self.load_image(image_path)
            landmarks = self.landmarks[index]
            return image, label, landmarks
        except Exception as e:
            print(f"Error in __getitem__ at index {index}: {e}")
            return torch.zeros((3, self.image_size, self.image_size)), -1, torch.zeros((2 * self.num_landmarks))


* CNN - Custom
* AlexNet
* LenNet5
* ResNet - Pretrained Model: ResNet50

In [135]:
class CNN(nn.Module):
    def __init__(self, num_classes, image_size):
        super(CNN, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.adaptive_pool = nn.AdaptiveAvgPool2d(1)  # Adaptive pooling to handle variable input sizes
        self.classifier = nn.Sequential(
            nn.Dropout(0.5),  # Added dropout for regularization
            nn.Linear(32, 128),
            nn.ReLU(),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.adaptive_pool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

class AlexNet(nn.Module):
    def __init__(self, num_classes=6):
        super(AlexNet, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(64, 192, kernel_size=5, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(192, 384, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(384, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
        )
        self.avgpool = nn.AdaptiveAvgPool2d((6, 6))
        self.classifier = nn.Sequential(
            nn.Dropout(),
            nn.Linear(256 * 6 * 6, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Linear(4096, num_classes),
        )

    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

class LeNet5(nn.Module):
    def __init__(self, num_classes=6):
        super(LeNet5, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 6, kernel_size=5),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(6, 16, kernel_size=5),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(16, 120, kernel_size=5),
            nn.ReLU(),
        )
        self.classifier = nn.Sequential(
            nn.Linear(120 * 53 * 53, 84),
            nn.ReLU(),
            nn.Linear(84, num_classes),
        )

    def forward(self, x):
        x = self.features(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

class ResNet50(nn.Module):
    def __init__(self, num_classes=6):
        super(ResNet50, self).__init__()
        self.model = models.resnet50(pretrained=True)
        in_features = self.model.fc.in_features
        self.model.fc = nn.Sequential(
            nn.Linear(in_features, 1024),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(1024, num_classes)
        )

    def forward(self, x):
        return self.model(x)


Model Trainer - class

*   train()
*   grad_cam()
*   overlay_heatmap()
*   evaluate_and_save_results()





In [150]:
class ModelTrainer:
    def __init__(self, model, train_loader, val_loader, test_loader, criterion, optimizer, landmarks_csv_path=None, num_epochs=30):
        self.model = model
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.test_loader = test_loader
        self.criterion = criterion
        self.optimizer = optimizer
        self.landmarks_csv_path = landmarks_csv_path
        self.num_epochs = num_epochs

    @property
    def device(self):
        return torch.device("cuda" if torch.cuda.is_available() else "cpu")

    def train(self):
        self.model.to(self.device)

        for epoch in range(self.num_epochs):
            self.model.train()
            for inputs, labels, landmarks in self.train_loader:
                inputs, labels, landmarks = inputs.to(self.device), labels.to(self.device), landmarks.to(self.device)

                self.optimizer.zero_grad()
                outputs = self.model(inputs)
                loss = self.criterion(outputs, labels)
                loss.backward()
                self.optimizer.step()

            self.model.eval()
            val_loss = 0.0
            correct = 0
            total = 0

            with torch.no_grad():
                for inputs, labels, landmarks in self.val_loader:
                    inputs, labels, landmarks = inputs.to(self.device), labels.to(self.device), landmarks.to(self.device)

                    outputs = self.model(inputs)
                    loss = self.criterion(outputs, labels)
                    val_loss += loss.item()

                    _, predicted = torch.max(outputs.data, 1)
                    total += labels.size(0)
                    correct += (predicted == labels).sum().item()

            print(f'Epoch {epoch+1}/{self.num_epochs}, Loss: {val_loss/len(self.val_loader)}, Validation Accuracy: {correct/total}')

    def grad_cam(self, input_tensor, target_class=None):
        self.model.eval()
        device = next(self.model.parameters()).device

        # Get the output from the final convolutional layer
        final_conv_layer = None
        for layer in self.model.children():
            if isinstance(layer, nn.Conv2d):
                final_conv_layer = layer

        # Get the gradients with respect to the output of the final convolutional layer
        self.model.zero_grad()
        output = self.model(input_tensor.to(device))
        if target_class is None:
            target_class = torch.argmax(output)
        output[:, target_class].backward()

        # Get the feature map from the final convolutional layer
        feature_maps = final_conv_layer.weight.grad
        alpha = torch.mean(feature_maps, dim=(2, 3), keepdim=True)

        # Perform weighted combination to get the heatmap
        heatmap = torch.sum(alpha * feature_maps, dim=1, keepdim=True)
        heatmap = nn.functional.relu(heatmap)

        # Normalize the heatmap
        heatmap /= torch.max(heatmap)

        return heatmap

    def overlay_heatmap(self, image, heatmap):
        heatmap = heatmap.squeeze().cpu().numpy()
        heatmap = np.uint8(255 * heatmap)
        heatmap = cv2.applyColorMap(heatmap, cv2.COLORMAP_JET)
        overlayed_image = cv2.addWeighted(image, 0.5, heatmap, 0.5, 0)
        return overlayed_image

    def evaluate_and_save_results(self):
        self.model.eval()
        all_preds = []
        all_labels = []

        with torch.no_grad():
            for inputs, labels, landmarks in self.test_loader:
                inputs, labels, landmarks = inputs.to(self.device), labels.to(self.device), landmarks.to(self.device)
                outputs = self.model(inputs)
                _, preds = torch.max(outputs, 1)

                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

        # Calculate confusion matrix
        cm = confusion_matrix(all_labels, all_preds)

        # Plot confusion matrix
        plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
        plt.title('Confusion Matrix')
        plt.colorbar()
        plt.xlabel('Predicted')
        plt.ylabel('True')
        plt.show()


Generate images and Create training images with landmarks

In [151]:
output_directory = "./training_images"
image_gen = ImageGenerator(output_directory, num_classes=5, images_per_class=100, image_size=(224, 224), noise_factor=20)
image_gen.generate_images()

output_directory_landmarks = "./training_images_with_landmarks"
landmarks_gen = LandmarksImageGenerator(output_directory_landmarks, num_classes=5, images_per_class=100, image_size=(224, 224), noise_factor=20, num_landmarks=5)
landmarks_gen.generate_images_with_landmarks()

5 classes with 100 images each generated and saved in ./training_images
5 classes with 100 images each generated and saved in ./training_images_with_landmarks
Landmark information saved in ./training_images_with_landmarks/landmarks.csv


Loader gonna load

In [155]:
# Parameters
num_samples = 100  # Make sure this is a positive value
num_classes = 5
image_size = (224, 224)
batch_size = 32
num_epochs = 10
data_path = "./training_images_with_landmarks"
landmarks_csv_path = "./training_images_with_landmarks/landmarks.csv"

# Create the custom dataset
custom_dataset = CustomRandomDataset(num_samples, num_classes, image_size, data_path, landmarks_csv_path)
custom_val_dataset = CustomRandomDataset(num_samples // 8, num_classes, image_size, data_path, landmarks_csv_path)
custom_test_dataset = CustomRandomDataset(num_samples // 5, num_classes, image_size, data_path, landmarks_csv_path)

# Create Training/Test/Validation Set
train_loader = DataLoader(custom_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(custom_val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(custom_test_dataset, batch_size=batch_size, shuffle=False)

# Instantiate CNN model
ResNet50_model = ResNet50(num_classes=num_classes)
LeNet5_model = LeNet5(num_classes=num_classes )
AlexNet_model = AlexNet(num_classes=num_classes)
CNN_model = CNN(num_classes=num_classes,image_size=image_size)

# Specify loss function and optimizer
criterion = nn.CrossEntropyLoss()
ResNet50_optimizer = optim.Adam(ResNet50_model.parameters(), lr=0.001)
LeNet5_optimizer = optim.Adam(LeNet5_model.parameters(), lr=0.001)
AlexNet_optimizer = optim.Adam(AlexNet_model.parameters(), lr=0.001)
CNN_optimizer = optim.Adam(CNN_model.parameters(), lr=0.001)

# Instantiate ModelTrainer
ResNet50_model_trainer = ModelTrainer(ResNet50_model, train_loader, val_loader, test_loader, criterion, ResNet50_optimizer, landmarks_csv_path, num_epochs=num_epochs)
LeNet5_model_trainer = ModelTrainer(LeNet5_model, train_loader, val_loader, test_loader, criterion, LeNet5_optimizer, landmarks_csv_path, num_epochs=num_epochs)
AlexNet_model_trainer = ModelTrainer(AlexNet_model, train_loader, val_loader, test_loader, criterion, AlexNet_optimizer, landmarks_csv_path, num_epochs=num_epochs)
CNN_model_trainer = ModelTrainer(CNN_model, train_loader, val_loader, test_loader, criterion, CNN_optimizer, landmarks_csv_path, num_epochs=num_epochs)


CSV Header: ['Image Path', 'Class', 'Landmark 1 X', 'Landmark 1 Y', 'Landmark 2 X', 'Landmark 2 Y', '...']
Checking image path: ./training_images_with_landmarks/./training_images_with_landmarks/class_0/image_0.png
Available image paths: []
Checking image path: ./training_images_with_landmarks/./training_images_with_landmarks/class_0/image_1.png
Available image paths: []
Checking image path: ./training_images_with_landmarks/./training_images_with_landmarks/class_0/image_2.png
Available image paths: []
Checking image path: ./training_images_with_landmarks/./training_images_with_landmarks/class_0/image_3.png
Available image paths: []
Checking image path: ./training_images_with_landmarks/./training_images_with_landmarks/class_0/image_4.png
Available image paths: []
Checking image path: ./training_images_with_landmarks/./training_images_with_landmarks/class_0/image_5.png
Available image paths: []
Checking image path: ./training_images_with_landmarks/./training_images_with_landmarks/class_0/

In [154]:
# Train the model
CNN_model_trainer.train()

# Evaluate and save results
CNN_model_trainer.evaluate_and_save_results()

Error loading image ./training_images/class_0/image_55.png: 'tuple' object cannot be interpreted as an integer
Error in __getitem__ at index 109: index 109 is out of bounds for dimension 0 with size 0


TypeError: zeros(): argument 'size' failed to unpack the object at pos 2 with error "type must be tuple of ints,but got tuple"