In [90]:
import torch
from torch import embedding, nn
import torch.nn.functional as F
import torch.optim as optim
import random
import time
from tqdm import tqdm
from preprocessing import *
# train_data: tuple of length 320, each is a matrix represents a picture
# train_target: tuple of length 320, each is a label
# For training, there are 40 classes, each has 8 pictures
# For testing, there are 40 classes, each has 2 pictures

In [91]:
train_data = torch.tensor(train_data)
train_target = torch.tensor(train_target)
test_data = torch.tensor(test_data)
test_target = torch.tensor(test_target)

In [92]:
# Since the dataset is small, we assumed batch size = 1
train_data = train_data.reshape(280,1,1,64,64)
test_data = test_data.reshape(120,1,1,64,64)

train_target = train_target.reshape(len(train_target), 1)
test_target = test_target.reshape(len(test_target), 1)

In [93]:

class FaceRecognizer(nn.Module):
    def __init__(self, input_dim=64, output_dim=40, batch_size = 1):
        super().__init__()
        self.batch_size = batch_size
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=4, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(in_channels=4, out_channels=8, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)    
        self.fc1 = nn.Linear(8*16*16, 512)
        self.fc2 = nn.Linear(512, output_dim)

    def forward(self, input_batch):
        step1 = F.relu(self.conv1(input_batch)) 
        step2 = self.pool(step1)
        step3 = F.relu(self.conv2(step2)) 
        step4 = self.pool(step3).reshape(self.batch_size, -1)
        step5 = F.relu(self.fc1(step4))
        step6 = self.fc2(step5)
        ret = torch.softmax(step6, -1)
        
        return ret



In [94]:
fr = FaceRecognizer()
fr.forward(train_data[1])

tensor([[0.0269, 0.0236, 0.0252, 0.0247, 0.0254, 0.0256, 0.0247, 0.0257, 0.0258,
         0.0231, 0.0250, 0.0236, 0.0258, 0.0259, 0.0253, 0.0234, 0.0247, 0.0252,
         0.0247, 0.0232, 0.0246, 0.0253, 0.0252, 0.0256, 0.0245, 0.0246, 0.0255,
         0.0263, 0.0263, 0.0240, 0.0250, 0.0253, 0.0237, 0.0269, 0.0244, 0.0253,
         0.0245, 0.0251, 0.0250, 0.0254]], grad_fn=<SoftmaxBackward>)

In [95]:
optimizer = optim.Adam(fr.parameters())
criterion = nn.CrossEntropyLoss(reduction = 'sum')
training_losses = []
test_losses = []

In [113]:


num_epochs_train = 0

def train(model, data, targets, optimizer, criterion, clip, num_epochs=0):  
    global num_epochs_train 
#     if num_epochs_train == 1:
#         tmp = optimizer.state_dict()
#         tmp["param_groups"][0]["lr"] = 0.0005
    model.train()
    epoch_loss = 0
    sampling = list(range(train_data.shape[0]))
    random.shuffle(sampling)
    print("training ...")
    for i, selected_batch_index in tqdm(enumerate(sampling)):
        optimizer.zero_grad()
        z = fr.forward(data[selected_batch_index])
        loss = 0       
        loss=criterion(z,targets[selected_batch_index].long())
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        epoch_loss += loss.item()
    num_epochs_train += 1
    return epoch_loss

confusion_matrix = []
num_epochs = 0
def evaluate(model, data, targets, criterion, num_targets): 
    model.eval()
    epoch_loss = 0
    sampling = list(range(data.shape[0]))
    random.shuffle(sampling)
    confusion_matrix.append(torch.zeros(num_targets,num_targets))
    global num_epochs
    for i, selected_batch_index in tqdm(enumerate(sampling)):
        z = fr.forward(data[selected_batch_index])
        loss = 0
        loss=criterion(z,targets[selected_batch_index].long())
        epoch_loss += loss.item()
        # Load in confusion_matrix
        for i in range(data[selected_batch_index].shape[0]):
            row = targets[selected_batch_index].long()
            col = torch.argmax(z[i])
            
#             print(num_epochs,row.item(),col.item())
#             print(confusion_matrix)
            confusion_matrix[num_epochs][row.item()][col.item()] += 1
     
    num_epochs += 1
        
    return epoch_loss
def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [141]:

N_EPOCHS = 5
CLIP = 1
best_test_loss = 999999
for epoch in range(N_EPOCHS):  
    print("epoch start: ", epoch)  
    start_time = time.time()
    training_loss = train(fr, train_data, train_target, optimizer, criterion, CLIP)
    training_losses.append(training_loss)
    test_loss = evaluate(fr, test_data, test_target, criterion, 40)
    test_losses.append(test_loss)  
    end_time = time.time()
    epoch_mins, epoch_secs = epoch_time(start_time, end_time)
    if test_loss < best_test_loss:
        best_test_loss = test_loss 
        torch.save(fr.state_dict(), 'best_model.pt')
        

    print(f'Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs}s', end='')
    print(f'\tTrain Loss: {training_loss:.3f} | Test Loss: {test_loss:.3f}')

epoch start:  0
training ...


280it [00:01, 146.89it/s]
120it [00:00, 807.06it/s]


Epoch: 01 | Time: 0m 2s	Train Loss: 765.664 | Test Loss: 329.758
epoch start:  1
training ...


280it [00:01, 147.13it/s]
120it [00:00, 1122.19it/s]


Epoch: 02 | Time: 0m 2s	Train Loss: 765.664 | Test Loss: 329.743
epoch start:  2
training ...


280it [00:01, 153.53it/s]
120it [00:00, 1081.98it/s]


Epoch: 03 | Time: 0m 1s	Train Loss: 765.664 | Test Loss: 329.744
epoch start:  3
training ...


280it [00:01, 149.48it/s]
120it [00:00, 1154.35it/s]


Epoch: 04 | Time: 0m 1s	Train Loss: 765.663 | Test Loss: 329.755
epoch start:  4
training ...


280it [00:01, 149.85it/s]
120it [00:00, 1140.79it/s]


Epoch: 05 | Time: 0m 1s	Train Loss: 765.663 | Test Loss: 329.692


In [142]:
confusion_matrix[0]

tensor([[3., 0., 0.,  ..., 0., 0., 0.],
        [0., 3., 0.,  ..., 0., 0., 0.],
        [0., 0., 2.,  ..., 0., 0., 0.],
        ...,
        [0., 0., 0.,  ..., 3., 0., 0.],
        [0., 0., 0.,  ..., 0., 3., 0.],
        [0., 0., 0.,  ..., 0., 0., 2.]])

In [143]:
confusion_matrix[-1]

tensor([[3., 0., 0.,  ..., 0., 0., 0.],
        [0., 3., 0.,  ..., 0., 0., 0.],
        [0., 0., 3.,  ..., 0., 0., 0.],
        ...,
        [0., 0., 0.,  ..., 3., 0., 0.],
        [0., 0., 0.,  ..., 0., 3., 0.],
        [0., 0., 0.,  ..., 0., 0., 3.]])

In [144]:
sum = 0
matrix = -1
for i in range(confusion_matrix[matrix].shape[0]):
    sum = sum + confusion_matrix[matrix][i][i]
sum

tensor(119.)