In [1]:
import torch
from torchvision import models, transforms
from PIL import Image
import requests
from io import BytesIO
import matplotlib.pyplot as plt
import os
import cv2
import keras
import warnings
import time
import numpy as np
import pandas as pd
import seaborn as sns
import tensorflow as tf
import matplotlib.pyplot as plt
import random

## Load dog breed and dog classification

In [2]:
def loading_training_data(data_dir):
    data = []
    labels_list = []
    
    for label in labels:
        # path = os.path.join(data_dir, label)
        path = data_dir+"/"+label
        class_num = labels.index(label)
        for img in os.listdir(path):
            # print(img)
            # print(path)
            # img_arr = cv2.imread(os.path.join(path, img), cv2.IMREAD_GRAYSCALE)
            img_arr = cv2.imread(os.path.join(path, img))
            
            # print(os.path.join(path,img))
            if img_arr is None:
                # print(f"Warning: Unable to load image {img}")
                print(os.path.join(path,img))
                continue  # Skip the image if it can't be loaded
            resized_arr = cv2.resize(img_arr, (img_size, img_size))
            data.append(resized_arr)
            labels_list.append(class_num)
            
    return np.array(data), np.array(labels_list)


In [3]:
def loading_training_data_subset(data_dir, subset_size=None):
    data = []
    labels_list = []
    
    for label in labels:
        # path = os.path.join(data_dir, label)  # Assuming label directories are in data_dir
        path = data_dir+"/"+label
        class_num = labels.index(label)
        
        # Get list of images in the directory
        all_images = os.listdir(path)
        
        # If subset_size is provided, select a random subset of images
        if subset_size is not None and len(all_images) > subset_size:
            all_images = random.sample(all_images, subset_size)
        
        for img in all_images:
            img_arr = cv2.imread(os.path.join(path, img))
            
            if img_arr is None:
                print(f"Warning: Unable to load image {img}")
                continue  # Skip the image if it can't be loaded
            
            resized_arr = cv2.resize(img_arr, (img_size, img_size))
            data.append(resized_arr)
            labels_list.append(class_num)
    
    return np.array(data), np.array(labels_list)

In [4]:
def transform_input_resnet(data):
    # Convert teh data array to a PIL Image
    
    # Example: Load multiple images (let's say 3 images)
    # image_paths = ["image1.jpg", "image2.jpg", "image3.jpg"]  # List of image file paths

    # Define the transformations
    transform = transforms.Compose([
        transforms.Resize(256),           # Resize to 256x256 (larger than 224x224 to allow for cropping)
        transforms.CenterCrop(224),       # Crop to 224x224
        transforms.ToTensor(),            # Convert to tensor (will scale pixel values to [0, 1])
        transforms.Normalize(             # Normalize with ImageNet mean and std
            mean=[0.485, 0.456, 0.406], 
            std=[0.229, 0.224, 0.225]
        ),
    ])
    
    # Prepare the list of images
    input_tensors = []
    for image in data:
        image = Image.fromarray(image)
        image_tensor = transform(image)  # Apply transformations
        input_tensors.append(image_tensor)
    
    # Stack the images into a single tensor (batch of images)
    input_batch = torch.stack(input_tensors)  # Shape will be [batch_size, 3, 224, 224]
    return input_batch


In [5]:
# Labels for image categories
labels = ['Beagle', 'Boxer', 'Bulldog', 'Dachshund', 'German_Shepherd', 'Golden_Retriever', 'Labrador_Retriever', 'Poodle', 'Rottweiler', 'Yorkshire_Terrier']
img_size = 32

In [6]:
dog_breed_train_data, dog_breed_train_label = loading_training_data('./DogBreedImageDataset/dataset')

In [7]:
dog_breed_train_data.shape

(967, 32, 32, 3)

In [8]:
test_data = dog_breed_train_data[0:5]

In [9]:
test_data.shape

(5, 32, 32, 3)

In [10]:
type(test_data)

numpy.ndarray

In [11]:
input_tensor = transform_input_resnet(test_data)

In [12]:
input_tensor.shape

torch.Size([5, 3, 224, 224])

In [13]:
labels = ['angry', 'happy', 'relaxed', 'sad']
# Load data for training
dog_emotion_train_data, dog_emotion_train_label = loading_training_data_subset('./DogEmotionPrediction/images/', 250) #100 images per class

In [14]:
dog_emotion_train_data.shape

(1000, 32, 32, 3)

## Combine the two together

In [22]:
dog_breed_train_data.shape

(967, 32, 32, 3)

In [23]:
dog_emotion_train_data.shape

(1000, 32, 32, 3)

### Modify label to have two label, first label is emotion, second is dog breed

In [24]:
dog_breed_train_label = dog_breed_train_label.reshape(-1, 1)

# Create a column of zeros (same number of rows as arr_reshaped)
emotion_col = np.full((dog_breed_train_label.shape[0], 1), 10)

# Add the column of zeros to the right using np.hstack
dog_breed_train_label1 = np.hstack((emotion_col, dog_breed_train_label))

In [28]:
dog_emotion_train_label = dog_emotion_train_label.reshape(-1, 1)

# Create a column of zeros (same number of rows as arr_reshaped)
breed_col = np.full((dog_emotion_train_label.shape[0], 1), 10)

# Add the column of zeros to the right using np.hstack
dog_emotion_train_label1 = np.hstack((dog_emotion_train_label, breed_col))

In [32]:
dog_breed_train_label1.shape

(967, 2)

In [33]:
dog_breed_train_label1

array([[10,  0],
       [10,  0],
       [10,  0],
       ...,
       [10,  9],
       [10,  9],
       [10,  9]])

In [34]:
dog_emotion_train_label1.shape

(1000, 2)

In [36]:
dog_emotion_train_label1

array([[ 0, 10],
       [ 0, 10],
       [ 0, 10],
       ...,
       [ 3, 10],
       [ 3, 10],
       [ 3, 10]])

In [38]:
combined_dataset = np.concatenate((dog_breed_train_data, dog_emotion_train_data), axis=0)

In [39]:
combined_label = np.concatenate((dog_breed_train_label1, dog_emotion_train_label1), axis = 0)

### Train test split

In [40]:
from sklearn.model_selection import train_test_split

In [41]:
X_train, X_test, y_train, y_test = train_test_split(combined_dataset, combined_label, test_size=0.4, random_state=42)

In [42]:
X_train.shape

(1180, 32, 32, 3)

In [43]:
type(X_train)

numpy.ndarray

### Take part of data only

In [45]:
random_indices = np.random.choice(X_train.shape[0], 50, replace=False)

# Use these indices to extract 50 random samples
X_train1 = X_train[random_indices]
y_train1 = y_train[random_indices]

In [46]:
X_train1.shape

(50, 32, 32, 3)

In [48]:
np.unique(y_train)

array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10])

In [50]:
y_train1.shape

(50, 2)

## Define model

In [143]:
import torch
import torch.nn as nn
from torchvision import models

class MultiTaskResNet50(nn.Module):
    def __init__(self, num_classes_task1, num_classes_task2):
        super(MultiTaskResNet50, self).__init__()
        
        # Load the pre-trained ResNet-50 model
        self.resnet = models.resnet50()
        
        # Remove the final fully connected layer (classifier) for both tasks
        self.resnet = nn.Sequential(*list(self.resnet.children())[:-1])  # Exclude final FC layer
        
        # Task 1: A new fully connected layer for the first task
        self.fc_task1 = nn.Linear(2048, num_classes_task1)
        
        # Task 2: A new fully connected layer for the second task
        self.fc_task2 = nn.Linear(2048, num_classes_task2)
        
    def forward(self, x):
        # Pass through the ResNet backbone (everything before the final fully connected layer)
        features = self.resnet(x)  # Output size: [batch_size, 2048, 1, 1]
        
        # Flatten the output to [batch_size, 2048]
        features = features.view(features.size(0), -1)
        
        # Task 1: Pass through the task-specific classifier
        task1_output = self.fc_task1(features)
        
        # Task 2: Pass through the task-specific classifier
        task2_output = self.fc_task2(features)
        
        # Get predicted class by taking argmax (index of the maximum logit)
        task1_pred = torch.argmax(task1_output,1 )  # Predicted class for task 1
        task2_pred = torch.argmax(task2_output,1)  # Predicted class for task 2
        
        return task1_pred, task2_pred
        # return task1_output, task2_output


In [169]:
# Example usage:
num_classes_task1 = 10  # Number of classes for task 1
num_classes_task2 = 4   # Number of classes for task 2

# Instantiate the model
model = MultiTaskResNet50(num_classes_task1, num_classes_task2)

# Example input tensor with shape [batch_size, 3, 224, 224]
input_tensor = np.random.rand(10, 3, 224, 224)  # Batch size of 2

# Forward pass
task1_output, task2_output = model(transform_input_resnet(X_train[:10]))

print("Task 1 Output:", task1_output.shape)  # Should be [batch_size, num_classes_task1]
print("Task 2 Output:", task2_output.shape)  # Should be [batch_size, num_classes_task2]


Task 1 Output: torch.Size([10])
Task 2 Output: torch.Size([10])


In [170]:
X_train1.shape

(50, 32, 32, 3)

In [171]:
task1_output

tensor([4, 5, 5, 9, 4, 4, 4, 4, 5, 5])

In [172]:
task2_output

tensor([2, 2, 2, 2, 2, 2, 2, 2, 2, 2])

## Define code for actual training

In [110]:
X_train1.shape

(50, 32, 32, 3)

In [111]:
y_train1.shape

(50, 2)

In [112]:
X_train1.shape[0]

50

In [114]:
batch_size = 4

In [120]:
test_input = transform_input_resnet(X_train1[0:4])

In [121]:
pred = model(test_input)

In [126]:
pred[0]==38

tensor(True)

In [174]:
for i in range(0, X_train1.shape[0], batch_size):
    batch_input = X_train1[i:i+batch_size]
    batch_label = y_train1[i:i+batch_size]
    batch_input = transform_input_resnet(batch_input)
    prediction = model(batch_input)
    print(prediction)
    
    

(tensor([5, 4, 4, 4]), tensor([2, 2, 2, 2]))
(tensor([4, 5, 4, 2]), tensor([2, 2, 2, 2]))
(tensor([4, 4, 2, 4]), tensor([2, 2, 2, 2]))
(tensor([4, 9, 2, 4]), tensor([2, 2, 2, 2]))
(tensor([4, 4, 4, 2]), tensor([2, 2, 2, 2]))
(tensor([5, 2, 5, 4]), tensor([2, 2, 2, 2]))
(tensor([9, 4, 5, 4]), tensor([2, 2, 2, 2]))
(tensor([2, 5, 4, 4]), tensor([2, 2, 2, 2]))
(tensor([4, 2, 4, 2]), tensor([2, 2, 2, 2]))
(tensor([4, 4, 5, 5]), tensor([2, 2, 2, 2]))
(tensor([5, 4, 4, 5]), tensor([2, 2, 2, 2]))
(tensor([4, 5, 4, 4]), tensor([2, 2, 2, 2]))
(tensor([4, 4]), tensor([2, 2]))


In [63]:
model(transform_input_resnet(X_train1[0:3]))

(tensor([1, 5, 1]), tensor([0, 0, 2]))

In [65]:
# Step 3: Define the Loss Functions
emotion_loss_fn = nn.CrossEntropyLoss()  # For classification task
breed_loss_fn = nn.CrossEntropyLoss()  # For regression task

# Step 4: Define the optimizer
model = MultiTaskResNet50(5,11)
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Step 6: Training Loop
num_epochs = 10
for epoch in range(num_epochs):
    model.train()  # Set the model to training mode
    running_emotion_loss = 0.0
    running_breed_loss = 0.0
    running_total_loss = 0.0

    for input, emotion_target, breed_target in dataloader:
        input1 = input.long()
        print("haha")
        print(emotion_target)
        optimizer.zero_grad()  # Zero the gradients

        # Forward pass
        emotion_output, breed_output = model(input)

        # Compute the classification and regression losses
        emotion_loss = emotion_loss_fn(emotion_output, emotion_target)
        breed_loss = breed_loss_fn(breed_output, breed_target)

        # Combine the losses (can use weighted sum if needed)
        total_loss = emotion_loss + breed_loss

        # Backpropagate the loss and update weights
        total_loss.backward()
        optimizer.step()

        # Accumulate the losses for reporting
        running_emotion_loss += emotion_loss.item()
        running_breed_loss += breed_loss.item()
        running_total_loss += total_loss.item()
    # Print the average losses for this epoch
    avg_emotion_loss = running_emotion_loss/len(dataloader)
    avg_breed_loss = running_breed_loss/len(dataloader)
    avg_total_loss = running_total_loss / len(dataloader)

    print(f"Epoch [{epoch+1}/{num_epochs}], "
          f"Emotion loss: {avg_emotion_loss:.4f}, "
          f"Breed Loss: {avg_breed_loss:.4f}, "
          f"Total Loss: {avg_total_loss:.4f}")

# Step 7: Save the trained model (optional)
torch.save(model.state_dict(), 'multi_task_resnet.pth')




haha
tensor([10,  1], dtype=torch.int32)


RuntimeError: Expected floating point type for target with class probabilities, got Int