In [2]:
import random
import pandas as pd
import numpy as np
import torch, os
from random import shuffle
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import torchvision.datasets as dset
import matplotlib.pyplot as plt
from torchvision import transforms
from torchvision.datasets import ImageFolder
from torch.autograd import Variable
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from sklearn.model_selection import train_test_split
from torch.utils.data import Subset
from torchvision.transforms import ToTensor
from torch import optim
from torch.autograd import Variable
from PIL import Image
%matplotlib inline
from pylab import *

In [3]:
from sklearn import preprocessing
label_encoder = preprocessing.LabelEncoder()

In [9]:
df_attr = pd.read_csv('/MSDS/3rd Sem/ML/Assignment/CelebA/CelebA/Anno/list_attr_celeba.txt',skiprows=1,delim_whitespace=True)
df_attr['image_id'] = df_attr.index
df_attr.reset_index(inplace=True,level=0)
df_attr.drop('index', axis=1, inplace=True)

df_identity = pd.read_csv('/MSDS/3rd Sem/ML/Assignment/CelebA/CelebA/Anno/identity_CelebA.txt',skiprows=0,delim_whitespace=True,names=['image_id','identity'])

sample_df = pd.merge(df_attr,df_identity)

identity_labels = df_identity['identity'].values.tolist()
random_list = identity_labels.copy()
random_list = list(set(random_list))

sample_100_labels = random_list[0:3]
sample_df_identity = df_identity[df_identity['identity'].isin(sample_100_labels)]

att_df = sample_df[['image_id','identity','Male','Eyeglasses','Smiling','Wearing_Hat','Young']]
att_df=att_df[['image_id','identity','Male','Eyeglasses','Smiling','Wearing_Hat','Young']].replace(-1,0)
att_df.head()

Unnamed: 0,image_id,identity,Male,Eyeglasses,Smiling,Wearing_Hat,Young
0,000001.jpg,2880,0,0,1,0,1
1,000002.jpg,2937,0,0,1,0,1
2,000003.jpg,8692,1,0,0,0,1
3,000004.jpg,5805,0,0,0,0,1
4,000005.jpg,9295,0,0,0,0,1


In [10]:
sample_images_names = att_df['image_id'].values.tolist()
att_df['identity'] = label_encoder.fit_transform(att_df['identity'])
sample_images_labels = att_df['identity'].values.tolist()
sample_images_features = att_df[['Male','Eyeglasses','Smiling','Wearing_Hat','Young']].values.tolist()

In [11]:
ngpu = 1
device = torch.device('cuda:0' if (
    torch.cuda.is_available() and ngpu > 0) else 'cpu')

In [14]:
## Create a custom Dataset class
class CelebADataset(Dataset):
  def __init__(self, root_dir, transform=None):
    
    # Read names of images in the root directory
    # image_names = os.listdir(root_dir)
    image_names = sample_images_names

    self.root_dir = root_dir
    self.transform = transform 
    self.image_names = image_names
    self.labels = sample_images_features

  def __len__(self): 
    return len(self.image_names)

  def __getitem__(self, idx):
    # Get the path to the image 
    img_path = os.path.join(self.root_dir, self.image_names[idx])

    # Load image and convert it to RGB
    img = Image.open(img_path).convert('RGB')

    # Apply transformations to the image
    if self.transform:
      img = self.transform(img)
    labels = self.labels[idx]
    male = torch.tensor(int(labels[0]), dtype=torch.int64)
    eye_glasses = torch.tensor(int(labels[1]), dtype=torch.int64)
    smiling = torch.tensor(int(labels[2]), dtype=torch.int64)
    wearing_hat = torch.tensor(int(labels[3]), dtype=torch.int64)
    young = torch.tensor(int(labels[4]), dtype=torch.int64)

    return {'img':img,'male':male, 'eye_glasses':eye_glasses, 
            'smiling':smiling, 'wearing_hat':wearing_hat,
            'young':young
            }

def train_val_dataset(dataset, test_ratio=0.20):
    train_idx, test_idx = train_test_split(list(range(len(dataset))), test_size=test_ratio)
    datasets = {}
    datasets['train'] = Subset(dataset, train_idx)
    datasets['test'] = Subset(dataset, test_idx)
    return datasets

## Load the dataset 
# Path to directory with all the images
img_folder ='/MSDS/3rd Sem/ML/Assignment/CelebA/CelebA/Img/img_align_celeba'

# Transformations to be applied to each individual image sample
transform=transforms.Compose([
    transforms.Resize((128,128)),
    transforms.ToTensor()
])
# Load the dataset from file and apply transformations
celeba_dataset = CelebADataset(img_folder, transform)
datasets = train_val_dataset(celeba_dataset)
 
# Batch size during training
batch_size = 128
# # Number of workers for the dataloader
num_workers = 0 if device.type == 'cuda' else 2
# # Whether to put fetched data tensors to pinned memory
pin_memory = True if device.type == 'cuda' else False

In [15]:
train_set = datasets['train']
val_set = datasets['test']

train_dataloader = torch.utils.data.DataLoader(datasets['train'],
                                                batch_size=batch_size,
                                                num_workers=num_workers,
                                                pin_memory=pin_memory,
                                                shuffle=True)
val_dataloader = torch.utils.data.DataLoader(datasets['test'],
                                                batch_size=batch_size,
                                                num_workers=num_workers,
                                                pin_memory=pin_memory,
                                                shuffle=False)

loaders = {
    'train' : train_dataloader,
    
    'test'  : val_dataloader ,
}
loaders

{'train': <torch.utils.data.dataloader.DataLoader at 0x28863a55c70>,
 'test': <torch.utils.data.dataloader.DataLoader at 0x28863a55370>}

In [16]:
class MultiTaskLearning(nn.Module):
    def __init__(self):
        super(MultiTaskLearning, self).__init__()
        self.conv1 = nn.Sequential(         
            nn.Conv2d(
                in_channels=3, # gray-scale images           
                # out_channels=256,
                out_channels=64,            
                kernel_size=5, # 5x5 convolutional kernel               
                stride=1,  #no. of pixels pass at a time                 
                padding=2, # to preserve size of input image                 
            ),                              
            nn.ReLU(),                      
            nn.MaxPool2d(kernel_size=2),    
        )
        self.conv2 = nn.Sequential(         
            nn.Conv2d(64, 32, 5, 1, 2),    
            nn.ReLU(),                      
            nn.MaxPool2d(2),                
        )
        self.conv3 = nn.Sequential(         
            nn.Conv2d(32, 16, 3, 1, 2),     
            nn.ReLU(),                      
            nn.MaxPool2d(2),                
        )
        # fully connected layers
        self.fc1 = nn.Linear(32*32*32, 1000)
        self.fc2 = nn.Linear(1000, 2)


    def forward(self, x1):
        x1 = self.conv1(x1)
        x1 = self.conv2(x1)
        x1 = x1.view(x1.size(0), -1)    
        x1 = self.fc1(x1)

        identity =  x1 
        gender = self.fc2(x1)
        eye_glasses = self.fc2(x1)
        smiling = self.fc2(x1)
        wearing_hat = self.fc2(x1)
        young = self.fc2(x1)   

        return gender,eye_glasses,smiling,wearing_hat,young

multi=MultiTaskLearning()
multi

MultiTaskLearning(
  (conv1): Sequential(
    (0): Conv2d(3, 64, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (conv2): Sequential(
    (0): Conv2d(64, 32, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (conv3): Sequential(
    (0): Conv2d(32, 16, kernel_size=(3, 3), stride=(1, 1), padding=(2, 2))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (fc1): Linear(in_features=32768, out_features=1000, bias=True)
  (fc2): Linear(in_features=1000, out_features=2, bias=True)
)

In [17]:
optimizer = optim.Adam(multi.parameters(), lr = 0.001)   
loss_func = nn.BCELoss()

In [18]:
def train(num_epochs, multi, loaders):
    multi.to(device).train()
   
    # Train the model
    total_step = len(loaders['train'])
    loss_total=[]
    for epoch in range(num_epochs):
        for i, data in enumerate(loaders['train']):
            # gives batch data, normalize x when iterate train_loader
            images = data['img'].to(device)
            male = data['male'].to(device)
            eye_glasses = data['eye_glasses'].to(device)
            smiling = data['smiling'].to(device)
            wearing_hat = data['wearing_hat'].to(device)
            young = data['young'].to(device)

            b_x = Variable(images) # batch x
            male_y = Variable(male)
            eye_glasses_y = Variable(eye_glasses)
            smiling_y = Variable(smiling)
            wearing_hat_y = Variable(wearing_hat)
            young_y = Variable(young)
            
            male_y = male_y.unsqueeze(1)
            eye_glasses_y = eye_glasses_y.unsqueeze(1)
            smiling_y = smiling_y.unsqueeze(1)
            wearing_hat_y = wearing_hat_y.unsqueeze(1)
            young_y = young_y.unsqueeze(1)

            output = multi(b_x)
            # print(output)
            (prd_male, prd_eye_glasses, prd_smiling, prd_wearing_hat, prd_young) = output
            # print(prd_male, prd_eye_glasses, prd_smiling, prd_wearing_hat, prd_young)
            prd_male = torch.max(prd_male, 1)[1]
            prd_eye_glasses = torch.max(prd_eye_glasses, 1)[1]
            prd_smiling = torch.max(prd_smiling, 1)[1]
            prd_wearing_hat = torch.max(prd_wearing_hat, 1)[1]
            prd_young = torch.max(prd_young, 1)[1]
           
            # Need to threshold them
            
            male_y = torch.flatten(male_y, start_dim=0, end_dim=1)
            eye_glasses_y = torch.flatten(eye_glasses_y, start_dim=0, end_dim=1)
            smiling_y = torch.flatten(smiling_y, start_dim=0, end_dim=1)
            wearing_hat_y = torch.flatten(wearing_hat_y, start_dim=0, end_dim=1)
            young_y = torch.flatten(young_y, start_dim=0, end_dim=1)
           
            loss_male = loss_func(male_y.float(),prd_male.float())
            loss_eye_glasses = loss_func(eye_glasses_y.float(),prd_eye_glasses.float())
            loss_smiling = loss_func(smiling_y.float(),prd_smiling.float())
            loss_wearing_hat = loss_func(wearing_hat_y.float(),prd_wearing_hat.float())
            loss_young = loss_func(young_y.float(),prd_young.float())


            t_loss = loss_male+loss_eye_glasses+loss_smiling+loss_wearing_hat+loss_young
                  
            optimizer.step()                
            loss_total.append(t_loss.item())
            if (i+1) % 100 == 1:
              print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(epoch + 1, num_epochs, i + 1, total_step, t_loss.item()))
       

In [None]:
num_epochs = 10
train(num_epochs, multi, loaders)

In [None]:
multi

MultiTaskLearning(
  (conv1): Sequential(
    (0): Conv2d(3, 64, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (conv2): Sequential(
    (0): Conv2d(64, 32, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (conv3): Sequential(
    (0): Conv2d(32, 16, kernel_size=(3, 3), stride=(1, 1), padding=(2, 2))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (fc1): Linear(in_features=32768, out_features=1000, bias=True)
  (fc2): Linear(in_features=1000, out_features=2, bias=True)
)

In [None]:
def test():
    # Test the model
    multi.cuda().eval()
    with torch.no_grad():
        correct = 0
        total = 0
        for data in loaders['test']:
            test_output =  multi(data['img'].to(device))
            (prd_male, prd_eye_glasses, prd_smiling, prd_wearing_hat, prd_young) = test_output
            prd_male = torch.max(prd_male, 1)[1]
            prd_eye_glasses = torch.max(prd_eye_glasses, 1)[1]
            prd_smiling = torch.max(prd_smiling, 1)[1]
            prd_wearing_hat = torch.max(prd_wearing_hat, 1)[1]
            prd_young = torch.max(prd_young, 1)[1]

            accuracy1 = (prd_male == data['male'].cuda()).sum().item() / float(data['male'].cuda().shape[0])
            accuracy2 = (prd_eye_glasses == data['eye_glasses'].cuda()).sum().item() / float(data['eye_glasses'].cuda().shape[0])
            accuracy3 = (prd_smiling == data['smiling'].cuda()).sum().item() / float(data['smiling'].cuda().shape[0])
            accuracy4 = (prd_wearing_hat == data['wearing_hat'].cuda()).sum().item() / float(data['wearing_hat'].cuda().shape[0])
            accuracy5 = (prd_young == data['young'].cuda()).sum().item() / float(data['young'].cuda().shape[0])
        
        print("Test Accuracy of the model on test images for Male: ", accuracy1*100)
        print("Test Accuracy of the model on test images for Sun Glasses: ", accuracy2*100)
        print("Test Accuracy of the model on test images for Similing: ", accuracy3*100)
        print("Test Accuracy of the model on test images for Wearing Hat: ", accuracy4*100)
        print("Test Accuracy of the model on test images for Young: ", accuracy5*100)

In [None]:
test()

Test Accuracy of the model on test images for Male:  61.111111111111114
Test Accuracy of the model on test images for Sun Glasses:  91.66666666666666
Test Accuracy of the model on test images for Similing:  50.0
Test Accuracy of the model on test images for Wearing Hat:  94.44444444444444
Test Accuracy of the model on test images for Young:  26.38888888888889
