<a href="https://colab.research.google.com/github/jdchen5/machinelearninglabs/blob/main/W22/requiredActivity22-3-multiTask-JC.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [7]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
!pip install scikit-learn



In [2]:
!pip install torchinfo

Collecting torchinfo
  Downloading torchinfo-1.8.0-py3-none-any.whl (23 kB)
Installing collected packages: torchinfo
Successfully installed torchinfo-1.8.0


# Only required at the beinning to split the traing and set data

In [None]:
import os
import shutil
from sklearn.model_selection import train_test_split

# Path to the dataset
data_dir = '/content/gdrive/My Drive/Pythoncode/W22/faces_4'
# Get all class directories
class_dirs = [d for d in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, d))]

print(class_dirs)

train_dir = os.path.join(data_dir, 'train')
test_dir = os.path.join(data_dir, 'test')

# Create train and test directories if they don't exist
os.makedirs(train_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)


# Split each class's files and move them to the corresponding train/test directories
for class_dir in class_dirs:
    # Full path to the class directory
    class_path = os.path.join(data_dir, class_dir)
    # Get all files in the class directory
    files = [f for f in os.listdir(class_path) if os.path.isfile(os.path.join(class_path, f))]

    # Check if there are any files to split
    if not files:
        print(f"No files to split in directory {class_dir}. Skipping...")
        continue

    # Split the files into 80% train and 20% test
    train_files, test_files = train_test_split(files, test_size=0.2, random_state=42)

    # Create corresponding class directories in train and test directories
    train_class_dir = os.path.join(train_dir, class_dir)
    test_class_dir = os.path.join(test_dir, class_dir)
    os.makedirs(train_class_dir, exist_ok=True)
    os.makedirs(test_class_dir, exist_ok=True)

    # Function to copy files to the specified directory
    def copy_files(files, source_dir, destination_dir):
        for file in files:
            shutil.copy(os.path.join(source_dir, file), os.path.join(destination_dir, file))

    # Copy the files to their respective directories
    copy_files(train_files, class_path, train_class_dir)
    copy_files(test_files, class_path, test_class_dir)

print("Data split into train and test directories.")

['choon', 'ch4f', 'karyadi', 'bpm', 'an2i', 'glickman', 'at33', 'danieln', 'boland', 'cheyer', 'night', 'saavik', 'mitchell', 'kk49', 'steffi', 'kawamura', 'megak', 'sz24', 'phoebe', 'tammo']
Data split into train and test directories.


In [1]:
%matplotlib inline

In [3]:
from __future__ import print_function

import argparse
import cv2
import numpy as np
import matplotlib.pyplot as plt
import os
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from PIL import Image
from tqdm import tqdm
from torch.utils.data import DataLoader, random_split, Dataset, Subset
from torchvision.datasets.folder import default_loader
from torchvision import datasets, transforms
from torchvision.datasets import ImageFolder
from torchinfo import summary


In [4]:
import cv2
img = cv2.imread("some_image.pgm", cv2.IMREAD_COLOR)

In [5]:
def imshow(img):
    img = img / 2 + 0.5     # unnormalize
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()

In [8]:
# Custom helper function to load images in PGM format using OpenCV
def img_loader(path):
    return cv2.imread(path, cv2.IMREAD_GRAYSCALE)

data = ImageFolder(root='/content/gdrive/My Drive/Pythoncode/W22/faces_4/', loader=img_loader, transform=transforms)
data.classes



['an2i',
 'at33',
 'boland',
 'bpm',
 'ch4f',
 'cheyer',
 'choon',
 'danieln',
 'glickman',
 'karyadi',
 'kawamura',
 'kk49',
 'megak',
 'mitchell',
 'night',
 'phoebe',
 'saavik',
 'steffi',
 'sz24',
 'tammo',
 'test',
 'train']

In [16]:
#custom class for face and expression
class CMUFaceDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        """
        Args:
            root_dir (string): Directory with all the images.
            transform (callable, optional): Optional transform to be applied on a sample.
        """
        self.root_dir = root_dir
        self.transform = transform
        self.images = []
        self.person_labels = set()
        self.expression_labels = set()

        for subdir, _, files in os.walk(root_dir):
            for file in files:
                if file.lower().endswith('.pgm'):
                    img_path = os.path.join(subdir, file)
                    self.images.append(img_path)

                    # Extract and store person and expression labels
                    person_label, expression_label = self._extract_labels(img_path)
                    self.person_labels.add(person_label)
                    self.expression_labels.add(expression_label)

        # Convert sets to sorted lists for consistent indexing
        self.person_labels = sorted(list(self.person_labels))
        self.expression_labels = sorted(list(self.expression_labels))

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_path = self.images[idx]
        image = Image.open(img_path)

        person_label, expression_label = self._extract_labels(img_path)
        person_idx = self.person_labels.index(person_label)
        expression_idx = self.expression_labels.index(expression_label)

        if self.transform:
            image = self.transform(image)

        return image, (person_idx, expression_idx)

    def _extract_labels(self, img_path):
        """
        Extracts person and expression labels from an image file path.
        """
        basename = os.path.basename(img_path)
        parts = basename.split('_')
        person_label = parts[0]
        expression_label = parts[2]  # Adjust index based on your file naming convention
        return person_label, expression_label


In [24]:
# Get some random dataset  images

def show_test_images(test_dataset, test_loader):
    # Get some random test images
    dataiter = iter(test_loader)
    images, labels_tuple = next(dataiter)
    person_labels, expression_labels = labels_tuple

    # Show images
    imshow(torchvision.utils.make_grid(images))

    # Get the class names for persons and expressions from the test_dataset
    person_class_names = test_dataset.person_labels
    expression_class_names = test_dataset.expression_labels

    # Prepare labels for printing
    person_labels_text = ' '.join('%5s' % person_class_names[person_labels[j]] for j in range(len(person_labels)))
    expression_labels_text = ' '.join('%5s' % expression_class_names[expression_labels[j]] for j in range(len(expression_labels)))

    # Print labels with class names
    print('GroundTruth Persons: ', person_labels_text)
    print('GroundTruth Expressions: ', expression_labels_text)



In [30]:
# Define a CNN architecture inspired by LeNet5, adjusted for multi-tasks
class MultiTaskLeNet5(nn.Module):
    def __init__(self, num_classes_taskA, num_classes_taskB):
        super(MultiTaskLeNet5, self).__init__()
        # Shared layers
        self.conv1 = nn.Conv2d(1, 6, 5)  # Adjust the input channels as needed
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)

        # Task A and B specific layers (initialized later)
        self.fc1_taskA = None
        self.fc2_taskA = nn.Linear(120, 84)
        self.fc3_taskA = nn.Linear(84, num_classes_taskA)

        self.fc1_taskB = None
        self.fc2_taskB = nn.Linear(120, 50)
        self.fc3_taskB = nn.Linear(50, num_classes_taskB)

        # Flag to check if dynamic layers are initialized
        self.initialized = False

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))

        # Dynamically calculate the number of flat features
        num_flat_features = x.nelement() / x.shape[0]  # This calculates the total number of features per sample

        if not self.initialized:
            # Now that we have the number of flat features, initialize the fc layers properly
            self.fc1_taskA = nn.Linear(int(num_flat_features), 120).to(x.device)
            self.fc1_taskB = nn.Linear(int(num_flat_features), 120).to(x.device)
            self.fc2_taskA = nn.Linear(120, 84).to(x.device)  # Re-initialize to ensure it's on the correct device
            self.fc2_taskB = nn.Linear(120, 50).to(x.device)  # Re-initialize
            self.fc3_taskA = nn.Linear(84, self.fc3_taskA.out_features).to(x.device)  # Assuming out_features is set
            self.fc3_taskB = nn.Linear(50, self.fc3_taskB.out_features).to(x.device)  # Assuming out_features is set
            self.initialized = True

        # Flatten the features for the fully connected layers
        x = x.view(-1, int(num_flat_features))

        # Task A path
        x_a = F.relu(self.fc1_taskA(x))
        x_a = F.relu(self.fc2_taskA(x_a))
        x_a = self.fc3_taskA(x_a)

        # Task B path
        x_b = F.relu(self.fc1_taskB(x))
        x_b = F.relu(self.fc2_taskB(x_b))
        x_b = self.fc3_taskB(x_b)

        return x_a, x_b


In [None]:
# Define a function to reset the model
def reset_model(num_classes, device):
    model = Net(num_classes=num_classes).to(device)
    return model

# Define a function to reset the optimizer
def reset_optimizer(model, lr=0.01, momentum=0.9):
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=momentum)
    return optimizer

In [37]:
# Train and Test function
def train(model, device, train_loader, optimizer, criterion1, criterion2, epoch):
    model.train()
    for batch_idx, (data, (targetsA, targetsB)) in enumerate(train_loader):
        data, targetsA, targetsB = data.to(device), targetsA.to(device), targetsB.to(device)
        optimizer.zero_grad()
        outputsA, outputsB = model(data)
        lossA = criterion1(outputsA, targetsA)
        lossB = criterion2(outputsB, targetsB)
        loss = lossA + lossB  # Combine losses; adjust if you're weighting tasks differently
        loss.backward()
        optimizer.step()
        if batch_idx % 10 == 0:  # Adjust log interval as needed
            print(f'Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)} ({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}')
    avg_loss = loss / len(train_loader)
    print(f"Average Loss: {avg_loss:.4f}")

def test(model, device, test_loader, criterion1, criterion2):
    model.eval()
    test_loss = 0
    correct1 = 0
    correct2 = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target1, target2 = data.to(device), target[0].to(device), target[1].to(device)
            output1, output2 = model(data)
            test_loss += (criterion1(output1, target1) + criterion2(output2, target2)).item()  # Sum up batch loss
            pred1 = output1.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            correct1 += pred1.eq(target1.view_as(pred1)).sum().item()
            # Assuming task2's accuracy can be calculated similarly; adjust if not
            pred2 = output2.argmax(dim=1, keepdim=True)
            correct2 += pred2.eq(target2.view_as(pred2)).sum().item()

    test_loss /= len(test_loader.dataset)
    print(f'\nTest set: Average loss: {test_loss:.4f}, Task1 Accuracy: {correct1}/{len(test_loader.dataset)} ({100. * correct1 / len(test_loader.dataset):.0f}%), Task2 Accuracy: {correct2}/{len(test_loader.dataset)} ({100. * correct2 / len(test_loader.dataset):.0f}%)\n')


In [49]:
def save_checkpoint(state, filename="/content/gdrive/My Drive/Pythoncode/W22/modelMultitask_checkpoint.pth"):
    """Save the current state of the model, optimizer, and training parameters."""
    torch.save(state, filename)

def load_checkpoint(model, optimizer, filename="/content/gdrive/My Drive/Pythoncode/W22/modelMultitask_checkpoint.pth"):
    """Load a saved model checkpoint if it exists."""
    if os.path.isfile(filename):
        print(f"Loading checkpoint '{filename}'")
        checkpoint = torch.load(filename)
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        epoch = checkpoint.get('epoch', 0)
        print(f"Checkpoint loaded successfully from '{filename}' at epoch {epoch}")
        return epoch
    else:
        print(f"No checkpoint found at '{filename}'. Starting from scratch.")
        return 0  # Start from the beginning if no checkpoint exists.

In [53]:
def load_pgm_to_pil(image_path):
    """Load a PGM image file as a PIL image. Implement this based on your needs."""
    return Image.open(image_path)

In [59]:
def predict_person_expression(model, dataset, device, transform, image_path):
    # Assuming load_pgm_to_pil and transform are defined elsewhere
    image = load_pgm_to_pil(image_path)
    image_tensor = transform(image).unsqueeze(0).to(device)  # Add batch dimension and send to device

    model.eval()  # Set the model to evaluation mode

    with torch.no_grad():
        output_person, output_expression = model(image_tensor)
        _, pred_person_idx = torch.max(output_person, 1)
        _, pred_expression_idx = torch.max(output_expression, 1)

    predicted_person_label = dataset.person_labels[pred_person_idx.item()]
    predicted_expression_label = dataset.expression_labels[pred_expression_idx.item()]

    print("Predicted Person Index:", pred_person_idx.item())
    print("Predicted Expression Index:", pred_expression_idx.item())
    print("Predicted Person Label:", predicted_person_label)
    print("Predicted Expression Label:", predicted_expression_label)

    if "mitchell" in predicted_person_label.lower():
        print("This image is a picture of Mitchell.")
    else:
        print("This image is not a picture of Mitchell.")



In [63]:
import sys
import time


def main(args=None):
    if args is None:
        # When the script is run in a Jupyter notebook, ignore the command-line arguments
        args = sys.argv[1:]
        args = [arg for arg in args if not arg.startswith('-f')]

    # Training settings
    parser = argparse.ArgumentParser(description='PyTorch Faces MultiTask Classifier Training')
    parser.add_argument('--data', type=str, default='/content/gdrive/My Drive/Pythoncode/W22/faces_4', metavar='N',
                        help='Path to directory containing faces dataset.')
    parser.add_argument('--batch-size', type=int, default=64, metavar='N',
                        help='input batch size for training (default: 64)')
    parser.add_argument('--epochs', type=int, default=10, metavar='N',
                        help='number of epochs to train (default: 10)')
    parser.add_argument('--lr', type=float, default=0.01, metavar='LR',
                        help='learning rate (default: 0.01)')
    parser.add_argument('--momentum', type=float, default=0.9, metavar='M',
                        help='SGD momentum (default: 0.9)')


    # Parse only the known arguments and ignore the rest
    args, unknown = parser.parse_known_args(args)

    print(f"args.data= {args.data}")
    model_path = '/content/gdrive/My Drive/Pythoncode/W22/modelMultitask_checkpoint.pth'

    # Setup device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Define transformations
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,))
    ])

    # Load datasets
    train_dataset = CMUFaceDataset(root_dir=os.path.join(args.data, 'train'), transform=transform)
    test_dataset = CMUFaceDataset(root_dir=os.path.join(args.data, 'test'), transform=transform)

    # Setup DataLoaders
    train_loader = DataLoader(train_dataset, batch_size=args.batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=args.batch_size, shuffle=False)


    # Ensure these numbers are correctly determined based on your dataset
    num_classes_taskA = len(train_dataset.person_labels)  # Number of unique persons
    num_classes_taskB = len(train_dataset.expression_labels)  # Number of unique expressions

    print(f"train_dataset.person_labels= {train_dataset.person_labels}\ntrain_dataset.expression_labels= {train_dataset.expression_labels}")


    model = MultiTaskLeNet5(num_classes_taskA=num_classes_taskA, num_classes_taskB=num_classes_taskB).to(device)

    # Setup optimizer
    optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)

    # Use torchinfo to summarize the model
    # You need to specify the input size (including the batch size)
    # an2i_left_angry_open_4.pgm: indicates a quarter-resolution image (32 by 30) the dimensions for each image are: Channels=1, height='32' pixels, width='30' pixels
    # Hence my input image is 1 channel, 32x30 pixels, and you're using a batch size of 64:
    input_size = (64, 1, 32, 30)  # Format: (batch_size, channels-colour, RGB, height, width)
    summary(model, input_size=input_size)


    start_epoch = load_checkpoint(model, optimizer, filename=model_path)  # Load checkpoint if exists



# Specify the loss functions for each task
    criterion1 = nn.CrossEntropyLoss()  # For the first task, assuming it's classification
    criterion2 = nn.CrossEntropyLoss()  # For the second task, assuming it's classification


    #show_test_images(train_dataset, train_loader)



    num_training_sessions = 4  # Define the number of training sessions

    start_time = time.time()

    for session in range(num_training_sessions):
        print(f"Starting training session {session}/{num_training_sessions}")

        for epoch in range(start_epoch, args.epochs + start_epoch):
            train(model, device, train_loader, optimizer, criterion1, criterion2, epoch)


        test(model, device, test_loader, criterion1, criterion2)
        # Save the model
        checkpoint = {
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
        }
        save_checkpoint(checkpoint, filename=model_path)


    print('Finished Training')
    end_time = time.time()
    print('Total training time: {:.2f} seconds'.format(end_time - start_time))

    image_path = '/content/gdrive/My Drive/Pythoncode/W22/faces_4/mitchell/mitchell_up_sad_sunglasses_4.pgm'
    # Now call the prediction function with the required arguments
    print(f"working on file, {image_path}")
    predict_person_expression(model, train_dataset, device, transform, image_path)

    image_path = '/content/gdrive/My Drive/Pythoncode/W22/faces_4/bpm/bpm_up_sad_sunglasses_4.pgm'
    # Now call the prediction function with the required arguments
    print(f"working on file, {image_path}")
    predict_person_expression(model, train_dataset, device, transform, image_path)



if __name__ == '__main__':
    main()


args.data= /content/gdrive/My Drive/Pythoncode/W22/faces_4
train_dataset.person_labels= ['an2i', 'at33', 'boland', 'bpm', 'ch4f', 'cheyer', 'choon', 'danieln', 'glickman', 'karyadi', 'kawamura', 'kk49', 'megak', 'mitchell', 'night', 'phoebe', 'saavik', 'steffi', 'sz24', 'tammo']
train_dataset.expression_labels= ['angry', 'happy', 'neutral', 'sad']
Loading checkpoint '/content/gdrive/My Drive/Pythoncode/W22/modelMultitask_checkpoint.pth'
Checkpoint loaded successfully from '/content/gdrive/My Drive/Pythoncode/W22/modelMultitask_checkpoint.pth' at epoch 81
Starting training session 0/100
Average Loss: 0.4440
Average Loss: 0.4186
Average Loss: 0.4306
Average Loss: 0.4678
Average Loss: 0.4234
Average Loss: 0.4245
Average Loss: 0.4439
Average Loss: 0.4284
Average Loss: 0.4839
Average Loss: 0.3914

Test set: Average loss: 0.0752, Task1 Accuracy: 54/136 (40%), Task2 Accuracy: 26/136 (19%)

Starting training session 1/100
Average Loss: 0.4330
Average Loss: 0.4200
Average Loss: 0.4092
Average L