In [None]:
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
from PIL import Image
import pickle
from IPython.display import Image 

import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from torch.utils.data import Dataset
import numpy as np
import os

In [None]:
class MobileNetV2_with_const1s(nn.Module):
     def __init__(self):
        super(MobileNetV2_with_const1s, self).__init__()
        
        #Number of classes that the model will predict
        self.out_feature_num = 4
        
        #load default pytorch mobilenetv2 model without ImageNet pretrained weights
        #the Imagenet weights are not compatible with the 32x32 cifar10 images
        self.mobilenetv2 = torch.hub.load('pytorch/vision:v0.6.0', 'mobilenet_v2', pretrained=False)
        self.mobilenetv2.eval()
        
        #modify the last dense layer to omit values for every available class
        self.mobilenetv2.classifier[1] = nn.Linear(in_features=1280, out_features=self.out_feature_num, bias=True)
        
        
     def forward(self, input):
        mobile_out =self.mobilenetv2(input)
        
        #return the constant 1 array with the same shape as the last dense layer
        dummy1s = [1] * self.out_feature_num
        
        return mobile_out, dummy1s

In [None]:
mobilenetv2 = MobileNetV2_with_const1s()
mobilenetv2.eval()


mobilenetv2.mobilenetv2.features[0][0] = nn.Conv2d(3,32,kernel_size=(3, 3), stride=(1,1), padding=(1,1), bias=False)

In [None]:
#Test the model with dummy data
test = torch.ones([1,3,224, 224], dtype=torch.float)
print(mobilenetv2(test), mobilenetv2.forward(test))

mobilenetv2 = mobilenetv2.train()
mobilenetv2.training

In [None]:
#Data loader class for the cifar 10 dataset
class Ciffar(Dataset):
    def __init__(self, data, target, transform=None):
        self.data = torch.from_numpy(data).float()
        self.target = torch.from_numpy(target).long()
        self.transform = transform
        
    def __getitem__(self, index):
        x = self.data[index]
        y = self.target[index]
        if self.transform:
            x = self.transform(x)
        return x, y
    
    def __len__(self):
        return len(self.data)
    
    
#Transform it to [0,1], then normalize every channel accordingly
preprocess = transforms.Compose([
    transforms.ToPILImage(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

In [None]:
#Read the dataset and test if is it in the right shape
X = None
Y = None

cifar_classes = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']

with open("train_set", "rb") as f:
    d = pickle.load(f, encoding="bytes")
    X = d[b"data"].astype(float)/255
    Y = d[b"labels"].astype(int)
    
X = X.reshape(1000, 3, 32,32)
print(X.shape)
train_data = Ciffar(X,Y, preprocess)


train_loader = torch.utils.data.DataLoader(train_data, batch_size=16,
                                          shuffle=True, num_workers=2)


X_test = None
Y_test = None

with open("test_set", "rb") as f:
    d = pickle.load(f, encoding="bytes")
    X_test = d[b"data"].astype(float)/255
    Y_test = d[b"labels"].astype(int)
    
X_test = X_test.reshape(1000, 3, 32,32)
test_data = Ciffar(X_test,Y_test, preprocess)

test_loader = torch.utils.data.DataLoader(test_data, batch_size=16,
                                          shuffle=True, num_workers=2)

for b,l in test_loader:
    print("Test data shape", b[0].shape)
    break
    
for b,l in test_loader:
    print("Train b[0].shape")
    break

In [None]:
#Test if the test dataset contains the image data
data_id = 0
plt.imshow(X_test[data_id].transpose(1,2,0))

print(cifar_classes[Y_test[data_id]])

print(X_test[data_id].shape)

In [None]:
#Test if the train dataset contains the image data
data_id = 0
plt.imshow(X[data_id].transpose(1,2,0))
print(cifar_classes[Y[data_id]])

In [None]:
#Define loss function
criterion = nn.CrossEntropyLoss()

#Using a simple SGD to optimize
optimizer = torch.optim.SGD(mobilenetv2.parameters(), lr=0.01, momentum=0.9, weight_decay=0.001)


#Learning rate decay is exponential
scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer=optimizer, gamma=0.96)


max_epoch = 30

use_cuda = torch.cuda.is_available()

device = torch.device("cuda:0" if use_cuda else "cpu")
if use_cuda:
    mobilenetv2.to(device)

#This is a tensor which will be used to calculate accuracy over the batch
y_onehot = torch.cuda.FloatTensor(16,4)    


#List for storing metrics during the train session
test_metrics = []
train_metrics = []
    
for epoch in range(max_epoch):
    
    print("Epoch:", epoch)
    
    iter_count = 0
    loss_acc = 0
    accuracy_acc = 0
    
    test_metrics.append([])
    train_metrics.append([])
    
    
    #Train for 1 epoch
    for local_batch, local_labels in train_loader:
        mobilenetv2.train()
        
                
        local_batch, local_labels = local_batch.to(device), local_labels.to(device)
        optimizer.zero_grad()
    
        output = mobilenetv2(local_batch)[0]
        loss = criterion(output, local_labels)
        
        loss_acc += loss.data.item() / output.size(0)
        
        accuracy_acc += torch.eq(torch.argmax(torch.nn.Softmax(dim=1)(output), dim=1), local_labels).sum()  \
            / float(output.size(0))

        if iter_count % 10 == 0 and iter_count > 0:
             
            print("Iter", iter_count, " train loss:", loss_acc/10)
            
            
            print("Accuracy: ", accuracy_acc.item()/10)
           
            
            
            train_metrics[-1].append((loss_acc/10, accuracy_acc/10))
            
            accuracy_acc = 0
            loss_acc = 0

        
        loss.backward()
        optimizer.step()
        
        iter_count += 1
        
    
    with torch.no_grad():
        
        mobilenetv2.eval()
        test_loss = 0
        
        test_accuracy = 0 
        
        iter_size = 0
        for local_batch, local_labels in test_loader:
            iter_size =+1
            local_batch, local_labels = local_batch.to(device), local_labels.to(device)
            
         
            output = mobilenetv2(local_batch)[0]
            
            loss = criterion(output, local_labels)
            test_loss += loss.data.item() / output.size(0)
            
            test_accuracy += torch.eq(torch.argmax(torch.nn.Softmax(dim=1)(output), dim=1), local_labels).sum()  \
                / float(output.size(0))
        print("Test loss:", test_loss/len(test_loader))
        print("Test accuracy", test_accuracy.item()/len(test_loader))
        test_metrics[-1].append((test_loss/len(test_loader), test_accuracy/len(test_loader )))
        
        
        #Save the models under the models dir
        os.makedirs("models", exist_ok=True)
        torch.save(mobilenetv2.state_dict(), "models/mobilenetv2_" + str(epoch) + ".pt")

         
        
        
    
    scheduler.step()
    
    for param_group in optimizer.param_groups:
        print("lr", param_group['lr'])
            
                    

Test loss and test accuracy. As we can see on the test set it's not converging to optima all the time, it's drifting away from the optimal solution because the model is overfitting on the small train dataset.

In [None]:
plt.plot(range(len(test_metrics)), list(map(lambda x: x[0],test_metrics)))

Train loss and accuracy.

In [None]:
plt.plot(range(len(train_metrics)), list(map(lambda x: x[0],train_metrics)))

In [None]:
train_metrics