# Multi Layer Perceptron

In [None]:
import torch.nn as nn
import torch.nn.functional as F

class MultiLayerPerceptron(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        """
        Args:
            input_dim (int): size of input dimension
            hidden_dim (int): size of hidden dimension
            output_dim (int): size of output dimension
        """
        super(MultiLayerPerceptron, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, output_dim)
        
    def forward(self, x_in, apply_softmax=False):
        """
        Args:
            x_in (torch.Tensor): input data tensor with shape
                (batch, input_dim)
            apply_softmax (bool): flag for softmax activation
                default false if using cross entropy
        Returns:
            resulting tensor with shape (batch, output_dim)
        """
        intermediate = F.relu(self.fc1(x_in))
        output = self.fc2(intermediate)
        
        if apply_softmax:
            output = F.softmax(output, dim=1)
        return output

In [None]:
batch_size = 2
input_dim = 3
output_dim = 4
hidden_dim = 100

# intialize model
mlp = MultiLayerPerceptron(input_dim, hidden_dim, output_dim)
print(mlp)

MultiLayerPerceptron(
  (fc1): Linear(in_features=3, out_features=100, bias=True)
  (fc2): Linear(in_features=100, out_features=4, bias=True)
)


In [None]:
# testing the MLP
import torch

def describe(x):
    print("Type: {}".format(x.type()))
    print("Shape/size: {}".format(x.shape))
    print("Values: \n{}".format(x))


# define input
input_tensor = torch.rand(batch_size, input_dim)
describe(input_tensor)

# passing the input
y_output = mlp(input_tensor, apply_softmax=False)
describe(y_output)

Type: torch.FloatTensor
Shape/size: torch.Size([2, 3])
Values: 
tensor([[0.0813, 0.6619, 0.3093],
        [0.9877, 0.4410, 0.7280]])
Type: torch.FloatTensor
Shape/size: torch.Size([2, 4])
Values: 
tensor([[ 0.0821, -0.2692, -0.2108,  0.1421],
        [-0.0140, -0.1247, -0.2154,  0.2488]], grad_fn=<AddmmBackward>)


The Rows in tensor are data point in mini batch
The Columns are input features for each data point

In [None]:
# producing probability output
# with apply_softmax=True
y_output = mlp(input_tensor, apply_softmax=True)
describe(y_output)

Type: torch.FloatTensor
Shape/size: torch.Size([2, 4])
Values: 
tensor([[0.2848, 0.2004, 0.2125, 0.3024],
        [0.2492, 0.2231, 0.2037, 0.3241]], grad_fn=<SoftmaxBackward>)


# MLP Surname Classification

In [None]:
from torch.utils.data import Dataset

class SurnameDataset(Dataset):
    def __getitem__(self, index):
        row = self._target_df_.iloc[index]
        
        surname_vector = self._vectorizer.vectorize(row.surname)
        
        nationality_index = self._vectorizer.nationality_vocab.lookup_token(row.nationality)
        
        return {
            'x_surname': surname_vector,
            'y_nationality': nationality_index
        }

# Surname Classification


## Vocabulary


In [1]:
# create vocabulary class
class Vocabulary(object):
    """Class to extract and process vocabularies for mapping"""
    
    def __init__(self, token_to_idx=None, add_unk=True, unk_token="<UNK>"):
        if token_to_idx is None:
            token_to_idx = {}
        self._token_to_idx = token_to_idx
        
        self._idx_to_token = {
            idx: token for token, idx in self._token_to_idx.items()
        }
        
        self._add_unk = add_unk
        self._unk_token = unk_token
        
        self.unk_index = -1
        if add_unk:
            self.unk_index = self.add_token(unk_token)
            
    def to_serializeable(self):
        """return a serializeable dictionary"""
        return {
            'token_to_idx': self._token_to_idx,
            'add_unk': self._add_unk,
            'unk_token': self._unk_token
        }
    
    @classmethod
    def from_serializeable(cls, contents):
        """create vocabulary object from serialize dictionary"""
        return cls(**contents)
    
    def add_token(self, token):
        """Add a token and return it's index"""
        if token in self._token_to_idx:
            index = self._token_to_idx[token]
        else:
            index = len(self._token_to_idx)
            self._token_to_idx[token] = index
            self._idx_to_token[index] = token
        return index
    
    def lookup_token(self, token):
        """get the index of a token 
        if not exist returns the unk_index"""
        if self.unk_index >= 0:
            return self._token_to_idx.get(token, self.unk_index)
        else:
            return self._token_to_idx[token]
        
    def lookup_index(self, index):
        if index not in self._idx_to_token:
            raise KeyError("the index %d is not in the vocabulary" % index)
        return self._idx_to_token[index]
    
    def __str__(self):
        return "<Vocabulary(size=%d)>" % len(self)
    
    def __len__(self):
        return len(self._token_to_idx)

## Vectorizer

In [2]:
import numpy as np
from collections import Counter
import string

class SurnameVectorizer(object):
    def __init__(self, surname_vocab, nationality_vocab):
        self.surname_vocab = surname_vocab
        self.nationality_vocab = nationality_vocab
        
    def vectorize(self, surname):
        """Create one_hot vector for review"""
        vocab = self.surname_vocab
        one_hot = np.zeros(len(vocab), dtype=np.float32)
        
        for token in surname:
          one_hot[vocab.lookup_token(token)] = 1
                
        return one_hot
    
    @classmethod
    def from_dataframe(cls, dataframe, cutoff=25):
        """Instantiate a ReviewVector from dataset"""
        
        surname_vocab = Vocabulary(unk_token="@")
        nationality_vocab = Vocabulary(add_unk=False)
        
        for index, row in dataframe.iterrows():
            for letter in row.surname:
                surname_vocab.add_token(letter)
            nationality_vocab.add_token(row.nationality)
            
        return cls(surname_vocab, nationality_vocab)
                
        
    @classmethod
    def from_serializeable(cls, contents):
        """Instantiate vectorizer from serializeable"""
        surname_vocab = Vocabulary.from_serializeable(contents['surname_vocab'])
        nationality_vocab = Vocabulary.from_serializeable(contents['nationality_vocab'])
        
        return cls(surname_vocab, nationality_vocab)
    
    def to_serializeable(self):
        return {
            'surname_vocab': self.surname_vocab.to_serializeable(),
            'nationality_vocab': self.nationality_vocab.to_serializeable()
        }

## Surname Dataset

In [3]:
from torch.utils.data import Dataset
import pandas as pd
import torch

class SurnameDataset(Dataset):
    def __init__(self, surname_df, vectorizer):
        self.surname_df = surname_df
        self._vectorizer = vectorizer
        
        self.train_df = self.surname_df[self.surname_df.split == 'train']
        self.train_size = len(self.train_df)
        
        self.val_df = self.surname_df[self.surname_df.split == 'val']
        self.val_size = len(self.val_df)
        
        self.test_df = self.surname_df[self.surname_df.split == 'test']
        self.test_size = len(self.test_df)
        
        self._lookup_dict = {'train': (self.train_df, self.train_size),
                            'val': (self.val_df, self.val_size),
                            'test': (self.test_df, self.test_size)}
        
        self.set_split('train')

        # Class weights to use with cross entropy
        class_counts = surname_df.nationality.value_counts().to_dict()
        def sort_key(item):
            return self._vectorizer.nationality_vocab.lookup_token(item[0])
        sorted_counts = sorted(class_counts.items(), key=sort_key)
        frequencies = [count for _, count in sorted_counts]
        self.class_weights = 1.0 / torch.tensor(frequencies, dtype=torch.float32)
        
    @classmethod
    def load_dataset_and_make_vectorizer(cls, surname_csv, cuda=False):
        """Load dataset from csv and returns the dataset object
        and vectorizer"""
        surname_df = pd.read_csv(surname_csv)
        train_surname_df = surname_df[surname_df.split == 'train']
        return cls(surname_df,
                   SurnameVectorizer.from_dataframe(train_surname_df))
    
    def get_vectorizer(self):
        """Get vectorizer"""
        return self._vectorizer
    
    def set_split(self, split='train'):
        """Set the split from data"""
        self._target_split = split
        self._target_df, self._target_size = self._lookup_dict[split]
        
    def __len__(self):
        return self._target_size
    
    def __getitem__(self, index):
        """the primary entry point method for PyTorch datasets
        Args:
            index (int): the index to the data point
        Returns:
            a dict of the data point's features (x_data) and label (y_target)
        """
        row = self._target_df.iloc[index]
        
        surname_vector = self._vectorizer.vectorize(row.surname)
        
        nationality_index = self._vectorizer.nationality_vocab.lookup_token(row.nationality)
        
        return {
            'x_data' : surname_vector,
            'y_target' : nationality_index
        }
    
    def get_num_batches(self, batch_size):
        """Given the batch size return the number of batches in the dataset"""
        return len(self) // batch_size

## Surname Classifier Model

In [17]:
import torch.nn as nn
import torch

class SurnameClassifier(nn.Module):
    """2 layer multi perceptron multiclass classifier"""
    
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(SurnameClassifier, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, output_dim)
        
    def forward(self, x_in, apply_softmax=False):
        """Forward pass the network given the x_in"""
        
        intermediate_vector = torch.relu(self.fc1(x_in))

        # Adding dropout
        # only applied in training
        dropout = torch.nn.Dropout(p=.5)
        if self.training:
            prediction_vector = self.fc2(dropout(intermediate_vector))
        else:
            prediction_vector = self.fc2(intermediate_vector)
        
        if apply_softmax:
            prediction_vector = torch.softmax(prediction_vector, dim=1)
        
        return prediction_vector

## Training Routine

In [5]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [11]:
from argparse import Namespace

args = Namespace(
    # Data information
    frequency_cutoff = 25,
    model_state_file = '/content/drive/My Drive/Colab Notebooks/Data/surname_dataset/model.pth',
    surname_csv = '/content/drive/My Drive/Colab Notebooks/Data/surname_dataset/surnames_with_splits.csv',
    save_dir = '/content/drive/My Drive/Colab Notebooks/Data/surname_dataset/',
    vectorizer_file = '/content/drive/My Drive/Colab Notebooks/Data/surname_dataset/vectorizer.json',
    # Model HyperParameters
    hidden_dim = 500,
    # Training HyperParameters
    batch_size = 128,
    early_stopping_criteria=5,
    learning_rate=0.001,
    num_epochs=100,
    seed=1337,
    cuda=True
)

In [7]:
from torch.utils.data import DataLoader

def generate_batches(dataset, batch_size, shuffle=True,
                     drop_last=True, device="cpu"):
    """
    A generator function which wraps the PyTorch DataLoader. It will
    ensure each tensor is on the write device location.
    """
    dataloader = DataLoader(dataset=dataset, batch_size=batch_size,
                            shuffle=shuffle, drop_last=drop_last)

    for data_dict in dataloader:
        out_data_dict = {}
        for name, tensor in data_dict.items():
            out_data_dict[name] = data_dict[name].to(device)
        yield out_data_dict

In [18]:
import torch.optim as optim
import torch

# create variables to record
# the training process
def make_train_state(args):
    return {
        'epoch_index':0,
        'train_loss':[],
        'train_acc':[],
        'val_loss': [],
        'val_acc': [],
        'test_loss': -1,
        'test_acc': -1,
    }

def compute_accuracy(y_pred, y_target):
    _, y_pred_indices = y_pred.max(dim=1)
    n_correct = torch.eq(y_pred_indices, y_target).sum().item()
    return n_correct / len(y_pred_indices) * 100

train_state = make_train_state(args)

if torch.cuda.is_available() and args.cuda:
  args.cuda = True
else:
  args.cuda = False
args.device = torch.device("cuda" if args.cuda else "cpu")
print("Device available ", args.device)

# dataset object
dataset = SurnameDataset.load_dataset_and_make_vectorizer(args.surname_csv)
dataset.class_weights = dataset.class_weights.to(args.device)

# vectorizer
vectorizer = dataset.get_vectorizer()

# classifier
classifier = SurnameClassifier(input_dim=len(vectorizer.surname_vocab),
                               hidden_dim=args.hidden_dim,
                               output_dim = len(vectorizer.nationality_vocab))
classifier = classifier.to(args.device)

# loss function and optimizer
loss_func = nn.CrossEntropyLoss(dataset.class_weights)
optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate)

print("Input dim ", len(vectorizer.surname_vocab))
print("Output dim ", len(vectorizer.nationality_vocab))

Device available  cuda
Input dim  77
Output dim  18


In [19]:
# Create training loop
from tqdm.notebook import tqdm

epoch_bar = tqdm(desc='training routine', 
                          total=args.num_epochs,
                          position=0)

dataset.set_split('train')
train_bar = tqdm(desc='split=train',
                          total=dataset.get_num_batches(args.batch_size)-1, 
                          position=1, 
                          leave=True)

dataset.set_split('val')
val_bar = tqdm(desc='split=val',
                        total=dataset.get_num_batches(args.batch_size)-1, 
                        position=1, 
                        leave=True)

try:
    for epoch_index in range(args.num_epochs):
        train_state['epoch_index'] = epoch_index
        # setup batch generator
        # set loss and train mode on
        dataset.set_split('train')
        batch_generator = generate_batches(dataset=dataset,
                                        batch_size=args.batch_size,
                                        device=args.device)
      
        running_loss = 0.0
        running_acc = 0.0
        classifier.train()
      
        for batch_index, batch_dict in enumerate(batch_generator):
            # step 1 zero the gradients
            optimizer.zero_grad()
          
            # step 2 compute the output
            y_pred = classifier(x_in=batch_dict['x_data'])
          
            # step 3 compute the loss
            loss = loss_func(y_pred, batch_dict['y_target'])
            loss_batch = loss.item()
            running_loss += (loss_batch - running_loss) / (batch_index + 1)
          
            # step 4 use loss to produce gradients
            loss.backward()
          
            # step 5 use optimizer to take the gradient step
            optimizer.step()
          
            # step 6 compute the acccuracy
            acc_batch = compute_accuracy(y_pred, batch_dict['y_target'])
            running_acc += (acc_batch - running_acc) / (batch_index + 1)

            # update bar
            train_bar.set_postfix(loss=running_loss, 
                                acc=running_acc, 
                                epoch=epoch_index)
            train_bar.update()
          
        train_state['train_loss'].append(running_loss)
        train_state['train_acc'].append(running_acc)
      
        # Iterate over val dataset
        # setup: batch generator, set loss and acc to 0, set eval mode on
        dataset.set_split('val')
        batch_generator = generate_batches(dataset,
                                        batch_size=args.batch_size,
                                        device=args.device)
      
        running_loss = 0.
        running_acc = 0.
        classifier.eval()
      
        for batch_index, batch_dict in enumerate(batch_generator):
            # step 1. compute the output
            y_pred = classifier(x_in=batch_dict['x_data'])
          
            # step 2. compute the loss
            loss = loss_func(y_pred, batch_dict['y_target'])
            loss_batch = loss.item()
            running_loss += (loss_batch - running_loss) / (batch_index + 1)
          
            # step 3. compute the accuracy
            acc_batch = compute_accuracy(y_pred, batch_dict['y_target'])
            running_acc += (acc_batch - running_acc) / (batch_index + 1)
            train_state['val_loss'].append(running_loss)
            train_state['val_acc'].append(running_acc)
            
            val_bar.set_postfix(loss=running_loss, acc=running_acc, 
                            epoch=epoch_index)
            val_bar.update()
          
        train_state['val_loss'].append(running_loss)
        train_state['val_acc'].append(running_acc)

        train_bar.n = 0
        val_bar.n = 0
        epoch_bar.update()
except KeyboardInterrupt:
    print("Exiting loop") 

HBox(children=(FloatProgress(value=0.0, description='training routine', style=ProgressStyle(description_width=…

HBox(children=(FloatProgress(value=0.0, description='split=train', max=59.0, style=ProgressStyle(description_w…

HBox(children=(FloatProgress(value=0.0, description='split=val', max=11.0, style=ProgressStyle(description_wid…

In [20]:
# evaluate the model
dataset.set_split('test')
batch_generator = generate_batches(dataset,
                                   args.batch_size,
                                   device=args.device)

running_loss = 0.
running_acc = 0.
classifier.eval()

for batch_index, batch_dict in enumerate(batch_generator):
  # compute the output
  y_pred = classifier(x_in=batch_dict['x_data'])

  # compute the loss
  loss = loss_func(y_pred, batch_dict['y_target'])
  loss_batch = loss.item()
  running_loss += (loss_batch - running_loss) / (batch_index + 1)

  # compute the accuracy
  acc_batch = compute_accuracy(y_pred, batch_dict['y_target'])
  running_acc += (acc_batch - running_acc) / (batch_index + 1)

train_state['test_loss'] = running_loss
train_state['test_acc'] = running_acc

print("Test loss : {:.3f}".format(train_state['test_loss']))
print("Test acc : {:.3f}".format(train_state['test_acc']))

Test loss : 2.059
Test acc : 57.161


In [21]:
# Inference mode

def predict_nationality(surname, classifier, vectorizer):
    """Predict the nationality from a new surname
    
    Args:
        surname (str): the surname to classifier
        classifier (SurnameClassifer): an instance of the classifier
        vectorizer (SurnameVectorizer): the corresponding vectorizer
    Returns:
        a dictionary with the most likely nationality and its probability
    """
    vectorized_surname = vectorizer.vectorize(surname)
    vectorized_surname = torch.tensor(vectorized_surname).view(1, -1)
    result = classifier(vectorized_surname, apply_softmax=True)

    probability_values, indices = result.max(dim=1)
    index = indices.item()

    predicted_nationality = vectorizer.nationality_vocab.lookup_index(index)
    probability_value = probability_values.item()

    return {'nationality': predicted_nationality, 'probability': probability_value}

new_surname = input("Enter a surname to classify: ")
classifier = classifier.to("cpu")
prediction = predict_nationality(new_surname, classifier, vectorizer)
print("{} -> {} (p={:0.2f})".format(new_surname,
                                    prediction['nationality'],
                                    prediction['probability']))

Enter a surname to classify: wong
wong -> Chinese (p=0.69)
