# Faces

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from random import randint as ri
import os
import torch
import pandas as pd
from skimage import io, transform
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import models
from collections import Counter

# Ignore warnings
import warnings
warnings.filterwarnings("ignore")

In [2]:
df = pd.read_csv('Data/age_gender.csv.zip')
df['pixels']=df['pixels'].apply(lambda x:  np.array(x.split(), dtype="float32")) #converting data to numpy array
df = df[df["ethnicity"]!=4].reset_index()
df.drop("index",axis = 1, inplace = True)
bins = [0,18,35,50,70,120]
labels = [0,1,2,3,4]
df['AgeGroup'] = pd.cut(df['age'], bins=bins, labels=labels, right=False).astype(int)

samples = df.sample(100)
df = df.drop(samples.index)
df = df.reset_index()
df = df.drop("index",axis = 1)
df.info()

FileNotFoundError: [Errno 2] No such file or directory: 'Data/age_gender.csv.zip'

In [None]:
pd.crosstab(df['gender'], df['ethnicity'])
#df["geneth"] = str(df['gender']) + str(df['ethnicity'])
#df.head()

In [None]:
df['AgeGroup'].value_counts()

In [None]:
def plot_rand_image():
    """
    this function plots a random image.
    the title of the image is the index of the image and the features.
    """
    num = ri(0,df.shape[0])
    labels = dict(zip(df.columns.tolist()[:-1],df.loc[num].tolist()[:-1]))
    plt.title(f"sample #{num}- {list(labels.keys())[0]}: {list(labels.values())[0]}, {list(labels.keys())[1]}: {list(labels.values())[1]}, {list(labels.keys())[2]}: {list(labels.values())[2]}")
    plt.imshow(df["pixels"][num].reshape(48,48),cmap = 'gray')
    plt.show()

In [None]:
plot_rand_image()

## Ethnicity

In [None]:
class TransferImages(Dataset):
    """images in a format for transfer learning"""

    def __init__(self, my_df, transform=None):
        """
        Args:
            csv_file (string): Path to the csv file with annotations.
            transform (callable, optional): Optional transform to be applied
                on a sample.
        """
        self.TransferImages = my_df
        self.transform = transform
        
    def __len__(self):
        return len(self.TransferImages)

    def __getitem__(self, idx):                       
        image = self.TransferImages.loc[idx,"pixels"]
        #image = np.array(image.split(), dtype="float32")
        #image = image.reshape(48, 48)
        image = np.repeat(image.reshape(48, 48)[...,np.newaxis], 3, -1)
        y_label = self.TransferImages["ethnicity"][idx]
        if self.transform:
            image = self.transform(image)

        return image, y_label
#create transforms
my_transforms = transforms.Compose([
        transforms.ToTensor(),
        #transforms.RandomSizedCrop(24),
         transforms.RandomHorizontalFlip(),
        transforms.CenterCrop(size=48),  
        transforms.Normalize([0.485, 0.456, 0.406],
                             [0.229, 0.224, 0.225])])

data = TransferImages(df, transform = my_transforms)

train_size = int(data.__len__()*0.70)
valid_size = int(data.__len__()*0.15)
test_size = int(data.__len__() - valid_size - train_size)

train, valid, test = torch.utils.data.random_split(data, [train_size,valid_size,test_size])

print(f"train size: {train.__len__()}\nvalid size: {valid.__len__()}\ntest size: {test.__len__()}")

In [None]:
batch_size = 128

my_data = {'train':train,'valid':valid,'test':test}
dataloaders = {
    'train': DataLoader(my_data['train'], batch_size=batch_size, shuffle=True),
    'val': DataLoader(my_data['valid'], batch_size=batch_size, shuffle=True),
    'test': DataLoader(my_data['test'], batch_size=batch_size, shuffle=True)
}

In [None]:
model = models.wide_resnet50_2(pretrained=True)
#model = models.resnet50(pretrained=True)

In [None]:
# Freeze model weights
#for param in model.parameters():
    #param.requires_grad = False

In [None]:
num_features = model.fc.in_features
model.fc = nn.Sequential(
                      nn.Linear(num_features, 256),
                      nn.ReLU(),
                      nn.Dropout(0.4),
                      nn.Linear(256, 4),                   
                      nn.LogSoftmax(dim=1))

In [None]:
# Whether to train on a gpu
train_on_gpu = torch.cuda.is_available()
print(f'Train on gpu: {train_on_gpu}')

# Number of gpus
if train_on_gpu:
    gpu_count = torch.cuda.device_count()
    print(f'{gpu_count} gpus detected.')
    if gpu_count > 1:
        multi_gpu = True
    else:
        multi_gpu = False
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
if train_on_gpu:
    model = model.to(device)

if multi_gpu:
    model = nn.DataParallel(model)

In [None]:
criterion = nn.NLLLoss()
optimizer = optim.Adam(model.parameters())

In [None]:
from timeit import default_timer as timer
def train_loop(model,
          criterion,
          optimizer,
          train_loader,
          valid_loader,
          save_file_name,
          max_epochs_stop=3,
          n_epochs=20,
          print_every=1):
    """Train a PyTorch Model
    Params
    --------
        model (PyTorch model): cnn to train
        criterion (PyTorch loss): objective to minimize
        optimizer (PyTorch optimizier): optimizer to compute gradients of model parameters
        train_loader (PyTorch dataloader): training dataloader to iterate through
        valid_loader (PyTorch dataloader): validation dataloader used for early stopping
        save_file_name (str ending in '.pt'): file path to save the model state dict
        max_epochs_stop (int): maximum number of epochs with no improvement in validation loss for early stopping
        n_epochs (int): maximum number of training epochs
        print_every (int): frequency of epochs to print training stats
    Returns
    --------
        model (PyTorch model): trained cnn with best weights
        history (DataFrame): history of train and validation loss and accuracy
    """

    # Early stopping intialization
    epochs_no_improve = 0
    valid_loss_min = np.Inf

    valid_max_acc = 0
    history = []

    # Number of epochs already trained (if using loaded in model weights)
    try:
        print(f'Model has been trained for: {model.epochs} epochs.\n')
    except:
        model.epochs = 0
        print(f'Starting Training from Scratch.\n')

    overall_start = timer()

    # Main loop
    for epoch in range(n_epochs):

        # keep track of training and validation loss each epoch
        train_loss = 0.0
        valid_loss = 0.0

        train_acc = 0
        valid_acc = 0

        # Set to training
        model.train()
        start = timer()

        # Training loop
        for ii, (data, target) in enumerate(train_loader):
            # Tensors to gpu
            if train_on_gpu:
                data, target = data.cuda(), target.cuda()

            # Clear gradients
            optimizer.zero_grad()
            # Predicted outputs are log probabilities
            output = model(data)

            # Loss and backpropagation of gradients
            loss = criterion(output, target.long())
            loss.backward()

            # Update the parameters
            optimizer.step()

            # Track train loss by multiplying average loss by number of examples in batch
            train_loss += loss.item() * data.size(0)

            # Calculate accuracy by finding max log probability
            _, pred = torch.max(output, dim=1)
            correct_tensor = pred.eq(target.data.view_as(pred))
            # Need to convert correct tensor from int to float to average
            accuracy = torch.mean(correct_tensor.type(torch.FloatTensor))
            # Multiply average accuracy times the number of examples in batch
            train_acc += accuracy.item() * data.size(0)

            # Track training progress
            print(
                f'Epoch: {epoch+1}\t{100 * (ii + 1) / len(train_loader):.2f}% complete. {timer() - start:.2f} seconds elapsed in epoch.',
                end='\r')

        # After training loops ends, start validation
        else:
            model.epochs += 1

            # Don't need to keep track of gradients
            with torch.no_grad():
                # Set to evaluation mode
                model.eval()

                # Validation loop
                for data, target in valid_loader:
                    # Tensors to gpu
                    if train_on_gpu:
                        data, target = data.cuda(), target.cuda()

                    # Forward pass
                    output = model(data)

                    # Validation loss
                    loss = criterion(output, target.long())
                    # Multiply average loss times the number of examples in batch
                    valid_loss += loss.item() * data.size(0)

                    # Calculate validation accuracy
                    _, pred = torch.max(output, dim=1)
                    correct_tensor = pred.eq(target.data.view_as(pred))
                    accuracy = torch.mean(
                        correct_tensor.type(torch.FloatTensor))
                    # Multiply average accuracy times the number of examples
                    valid_acc += accuracy.item() * data.size(0)

                # Calculate average losses
                train_loss = train_loss / len(train_loader.dataset)
                valid_loss = valid_loss / len(valid_loader.dataset)

                # Calculate average accuracy
                train_acc = train_acc / len(train_loader.dataset)
                valid_acc = valid_acc / len(valid_loader.dataset)

                history.append([train_loss, valid_loss, train_acc, valid_acc])

                # Print training and validation results
                if (epoch + 1) % print_every == 0:
                    print(
                        f'\nEpoch: {epoch+1} \tTraining Loss: {train_loss:.4f} \tValidation Loss: {valid_loss:.4f}'
                    )
                    print(
                        f'\t\tTraining Accuracy: {100 * train_acc:.2f}%\t Validation Accuracy: {100 * valid_acc:.2f}%'
                    )

                # Save the model if validation loss decreases
                if valid_loss < valid_loss_min:
                    # Save model
                    torch.save(model.state_dict(), save_file_name)
                    # Track improvement
                    epochs_no_improve = 0
                    valid_loss_min = valid_loss
                    valid_best_acc = valid_acc
                    best_epoch = epoch + 1

                # Otherwise increment count of epochs with no improvement
                else:
                    epochs_no_improve += 1
                    # Trigger early stopping
                    if epochs_no_improve >= max_epochs_stop:
                        print(
                            f'\nEarly Stopping! Total epochs: {epoch+1}. Best epoch: {best_epoch} with loss: {valid_loss_min:.2f} and acc: {100 * valid_acc:.2f}%'
                        )
                        total_time = timer() - overall_start
                        print(
                            f'{total_time:.2f} total seconds elapsed. {total_time / (epoch+1):.2f} seconds per epoch.'
                        )

                        # Load the best state dict
                        model.load_state_dict(torch.load(save_file_name))
                        # Attach the optimizer
                        model.optimizer = optimizer

                        # Format history
                        history = pd.DataFrame(
                            history,
                            columns=[
                                'train_loss', 'valid_loss', 'train_acc',
                                'valid_acc'
                            ])
                        return model, history

    # Attach the optimizer
    model.optimizer = optimizer
    # Record overall time and print out stats
    total_time = timer() - overall_start
    print(
        f'\nBest epoch: {best_epoch} with loss: {valid_loss_min:.2f} and acc: {100 * valid_acc:.2f}%'
    )
    print(
        f'{total_time:.2f} total seconds elapsed. {total_time / (epoch):.2f} seconds per epoch.'
    )
    # Format history
    history = pd.DataFrame(
        history,
        columns=['train_loss', 'valid_loss', 'train_acc', 'valid_acc'])
    return model, history


In [None]:
# Running the model
model, history = train_loop(
    model,
    criterion,
    optimizer,
    dataloaders['train'],
    dataloaders['val'],
    save_file_name="Data/model1.pt",
    max_epochs_stop=10,
    n_epochs=100,
    print_every=5)

In [None]:
def Accuracy_report(loader = None, model = None, n_classes = None):
    """
    Args:
    >loader - the data for accuracy testing.
    >model - the neural network.
    <n_classes - the number of classes.
    
    Output: 
    >>>[my_classes,acc]
    > my_classes - accuracy per classes. non existant taregts in the test set are set to nan value.
    > acc - overall accuracy.
    """
    my_classes = []

    classes = [n_class for n_class in range(n_classes)]
    correct_pred = {classname: 0 for classname in classes}
    total_pred = {classname: 0 for classname in classes}

    with torch.no_grad():
        for data in loader:
            inputs, targets = data 
            inputs = inputs.to(device)
            targets = targets.to(device)           
            outputs = model(inputs)    
            _, predictions = torch.max(outputs, 1)

            # collect the correct predictions for each class
            for target, prediction in zip(targets, predictions):
                if target == prediction:
                    correct_pred[classes[target]] += 1
                total_pred[classes[target]] += 1

    for classname, correct_count in correct_pred.items():
        try:
            accuracy = 100 * float(correct_count) / total_pred[classname]

            my_classes.append(accuracy)
        except ZeroDivisionError:
            my_classes.append(np.nan)
            continue
    
    acc =  100 * float(sum(correct_pred.values())/sum(total_pred.values()))
    
    return [dict(zip(classes,my_classes)), acc]

In [None]:
b1_test_acc = Accuracy_report(loader = dataloaders["test"],model = model, n_classes = 4)
b1_test_acc

# gender

In [None]:
class TransferImages2(Dataset):
    """images in a format for transfer learning"""

    def __init__(self, my_df, transform=None):
        """
        Args:
            csv_file (string): Path to the csv file with annotations.
            transform (callable, optional): Optional transform to be applied
                on a sample.
        """
        self.TransferImages2 = my_df
        self.transform = transform
        
    def __len__(self):
        return len(self.TransferImages2)

    def __getitem__(self, idx):                       
        image = self.TransferImages2.loc[idx,"pixels"]
        #image = np.array(image.split(), dtype="float32")
        image = np.repeat(image.reshape(48, 48)[...,np.newaxis], 3, -1)
        
        y_label = self.TransferImages2["gender"][idx]
        if self.transform:
            image = self.transform(image)

        return image, y_label
    
my_transforms = transforms.Compose([
        transforms.ToTensor(),
        #transforms.RandomSizedCrop(40),
         transforms.RandomHorizontalFlip(),
        transforms.CenterCrop(size=48),  
        transforms.Normalize([0.485, 0.456, 0.406],
                             [0.229, 0.224, 0.225])])


data2 = TransferImages2(df, transform = my_transforms)

train2, valid2, test2 = torch.utils.data.random_split(data2, [train_size,valid_size,test_size])


my_data = {'train2':train2,'valid2':valid2,'test2':test2}
dataloaders = {
    'train2': DataLoader(my_data['train2'], batch_size=batch_size, shuffle=True),
    'val2': DataLoader(my_data['valid2'], batch_size=batch_size, shuffle=True),
    'test2': DataLoader(my_data['test2'], batch_size=batch_size, shuffle=True)
}


model2 = models.wide_resnet50_2(pretrained=True)
num_features = model2.fc.in_features
model2.fc = nn.Sequential(
                      nn.Linear(num_features, 256),
                      nn.ReLU(),
                      nn.Dropout(0.4),
                      nn.Linear(256, 2),                   
                      nn.LogSoftmax(dim=1))

model2.to(device)

criterion2 = nn.NLLLoss()
optimizer2 = optim.Adam(model2.parameters())

In [None]:
# Running the model
model2, history2 = train_loop(
    model2,
    criterion2,
    optimizer2,
    dataloaders['train2'],
    dataloaders['val2'],
    save_file_name="Data/model2.pt",
    max_epochs_stop=10,
    n_epochs=100,
    print_every=5)

In [None]:
b2_test_acc = Accuracy_report(loader = dataloaders["test2"],model = model2, n_classes = 2)
b2_test_acc

# Age Group

In [None]:
class TransferImages3(Dataset):
    """images in a format for transfer learning"""

    def __init__(self, my_df, transform=None):
        """
        Args:
            csv_file (string): Path to the csv file with annotations.
            transform (callable, optional): Optional transform to be applied
                on a sample.
        """
        self.TransferImages3 = my_df
        self.transform = transform
        
    def __len__(self):
        return len(self.TransferImages3)

    def __getitem__(self, idx):                       
        image = self.TransferImages3.loc[idx,"pixels"]
        #image = np.array(image.split(), dtype="float32")
        image = np.repeat(image.reshape(48, 48)[...,np.newaxis], 3, -1)
        
        y_label = self.TransferImages3["AgeGroup"][idx]
        if self.transform:
            image = self.transform(image)

        return image, y_label
    
#create transforms
my_transforms = transforms.Compose([
        transforms.ToTensor(),
        #transforms.RandomSizedCrop(24),
         transforms.RandomHorizontalFlip(),
        transforms.CenterCrop(size=48),  
        transforms.Normalize([0.485, 0.456, 0.406],
                             [0.229, 0.224, 0.225])])

data3 = TransferImages3(df, transform = my_transforms)

train3, valid3, test3 = torch.utils.data.random_split(data3, [train_size,valid_size,test_size])


my_data = {'train3':train3,'valid3':valid3,'test3':test3}
dataloaders = {
    'train3': DataLoader(my_data['train3'], batch_size=batch_size, shuffle=True),
    'val3': DataLoader(my_data['valid3'], batch_size=batch_size, shuffle=True),
    'test3': DataLoader(my_data['test3'], batch_size=batch_size, shuffle=True)
}


model3 = models.wide_resnet50_2(pretrained=True)
num_features = model3.fc.in_features
model3.fc = nn.Sequential(
                      nn.Linear(num_features, 256),
                      nn.ReLU(),
                      nn.Dropout(0.4),
                      nn.Linear(256, 5),                   
                      nn.LogSoftmax(dim=1))

model3.to(device)

criterion3 = nn.NLLLoss()#weight = torch.tensor((1/df.AgeGroup.value_counts(normalize = True)).tolist(), device = device))
optimizer3 = optim.Adam(model3.parameters())

In [None]:
# Running the model
model3, history3 = train_loop(
    model3,
    criterion3,
    optimizer3,
    dataloaders['train3'],
    dataloaders['val3'],
    save_file_name="Data/model3.pt",
    max_epochs_stop=10,
    n_epochs=100,
    print_every=5)

In [None]:
b3_test_acc = Accuracy_report(loader = dataloaders["test3"],model = model3, n_classes = 5)
b3_test_acc

## Prediction

In [None]:
df = df[["ethnicity","gender","AgeGroup","pixels"]]
samples = samples[["ethnicity","gender","AgeGroup","pixels"]]
samples.index = np.array([i for i in range(100)])
samples.info()

In [None]:
def model_loader(model_path = None, label = None):
    """
    Args:
    >model_path - the path to current model.
    >label -the label to predict. 
    """
    model = models.wide_resnet50_2()
    num_features = model.fc.in_features
    model.fc = nn.Sequential(
                          nn.Linear(num_features, 256),
                          nn.ReLU(),
                          nn.Dropout(0.4),
                          nn.Linear(256, len(df[label].unique().tolist()),                   
                          nn.LogSoftmax(dim=1)))


    model.load_state_dict(torch.load(model_path))
    return model.eval()

models = {f"model{i}" : model_loader(f"Data/model{i+1}.pt",df.columns[i]) for i in range(3)}

In [None]:

ethnicities = dict(zip([i for i in range(len(df.ethnicity.unique().tolist()))], ["white","black","asian","indian"]))
genders = dict(zip([i for i in range(len(df.gender.unique().tolist()))], ["male","female"]))
AgeGroups = dict(zip([i for i in range(len(df.AgeGroup.unique().tolist()))], ["child","young adult","adult","middle aged","elder"]))
    


def plot_image(num):
    """
    this function plots a random image.
    the title of the image is the index of the image and the features.
    """

    labels = dict(zip(df.columns.tolist()[:-1],samples.loc[num].tolist()[:-1]))
    plt.title(f"sample #{num+1}- {list(labels.keys())[0]}: {ethnicities[list(labels.values())[0]]}, {list(labels.keys())[1]}: {genders[list(labels.values())[1]]}, {list(labels.keys())[2]}: {AgeGroups[list(labels.values())[2]]}")
    plt.imshow(samples["pixels"][num].reshape(48,48),cmap = 'gray')
    plt.axis('off')
    plt.show()

In [None]:
def predictor(model = None, my_transforms = None, num = None):
    input_image = my_transforms(np.repeat(samples.pixels.iloc[num].reshape(48, 48)[...,np.newaxis], 3, -1)).unsqueeze(0)
    output = model(input_image)
    _, pred = torch.max(output, dim=1)
    return int(pred)

def profile(num):
    plot_image(num)
    ethnicity = ethnicities[predictor(models["model0"],transforms.ToTensor(),num = num)]
    gender = genders[predictor(models["model1"],transforms.ToTensor(),num = num)]
    AgeGroup = AgeGroups[predictor(models["model2"],transforms.ToTensor(),num = num)]
    print(f"Profile Prediction number #{num+1}:\n______________________________\n|\tethnicity: {ethnicity}\n|\tgender: {gender}\n|\tAgeGroup: {AgeGroup}\n______________________________")

In [None]:
samples.to_csv("Data/my_samples.csv")

In [None]:
num = ri(0,samples.shape[0]-1)
profile(num)