#Squeeznet implementation using flower102 dataset
---

author: [@linjiw](mailto:linjiw@andrew.cmu.edu) 

date: Nov, 18, 2021
---

In [126]:
# import
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset
from torchvision import datasets
import torchvision.transforms as transforms
from torchvision.transforms import ToTensor, Lambda, Compose
import matplotlib.pyplot as plt
import scipy.io
import numpy as np
import torch.nn.functional as F
import torch.nn.init as init
from os.path import join
import os
from skimage import io
from torch.utils.tensorboard import SummaryWriter # try to use tensorboard to visualize your results~
writer = SummaryWriter() 


##Take a look at the dataset.
First let's check what dataset we're dealing with: [flower102](https://www.robots.ox.ac.uk/~vgg/data/flowers/102/).

In the flower102 website, the datas we need are the [images](https://www.robots.ox.ac.uk/~vgg/data/flowers/102/102flowers.tgz) and the [labels](https://www.robots.ox.ac.uk/~vgg/data/flowers/102/setid.mat). Let's download them, and store them in your data folder. (you'd better create a folder called data in your current directory)

In the 102flowers directory, there is a jpg folder where contains all the picture files. 

And the imagelabels.mat ['labels'] contains the lables from 1 to 102.

However, these are not we want. In general we use a csv file, which first column contains the filepath and the second colum contains the labels index from 0.

Then the problem comes out: we need to get the sorted imagepath and make the labels use index 0.

In [53]:
# This is a custom dataset function, I named it Catset.
class Catset(Dataset):
    def __init__(self, pth_file,label_file, root_dir, transform=None):        
        self.files = sorted(os.listdir(pth_file)) #this give us the image names in sorted
        label_name = scipy.io.loadmat(label_file) 
        self.labels = label_name['labels'].flatten()-1 #this make our labels index 0
        self.root_dir = root_dir
        self.transform = transform
    def __len__(self):
        return len(self.files)
    def __getitem__(self, index):
        
        img_pth = os.path.join(self.root_dir, self.files[index])
        image = io.imread(img_pth)
        y_label = torch.tensor(int(self.labels[index]))
        if self.transform:
            image =self.transform(image)
        return (image,y_label)

## We also need some basic transform functions.
We need do some transformation for our datas, we need to transfer it to tensors for our device and in most times we also need to do some normalization and resize for a better performance.

In [54]:
# define a transform variable.
alltransform = transforms.Compose([
                transforms.ToPILImage(),# first transfer to PILImage to do some resize work
                transforms.Resize((224,224)), #224 is the value that our model looking for
				transforms.ToTensor(), # this will transfer the data to tensor
				transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) # basic normalize parameters
				])

## With these defined functions and parameters, we 're ready to make our dataset.

In [55]:
all_set = Catset(
    pth_file = "data/102flowers/jpg",
    label_file = "data/imagelabels.mat",
    root_dir = "data/102flowers/jpg",
    transform = alltransform,
)

## Split data to train and validation
We need dataset to train our model and need validation dataset to validate our loss and accuracy value. Thus we need to split the dataset we have now.

In [56]:
# Set a number for validation dataset, and then we use the random_split function to split our data.
val_nums = 100 #set our validation dataset numbers to 100, make sure this number is less than the total dataset capacity.
train_set , val_set = torch.utils.data.random_split(all_set,[all_set.__len__()-val_nums,val_nums])

## Use dataloader to load data easily.
pytorch has builtin Dataloader functions, which could bring use easy to set batch, shuffle and load data.

In [57]:
# dataloader
batch_size = 32
dataloader = {
    'train': DataLoader(train_set, batch_size=batch_size, shuffle=True), #for train we need to set shuffle=True to make it randomly.
    'val': DataLoader(val_set, batch_size=batch_size, shuffle=False), #for test we just leave it as False.
}

## Build our Squeeznet model.
Here we follow the [Squeeznet](https://arxiv.org/pdf/1602.07360.pdf) [v1.1](https://github.com/forresti/SqueezeNet/tree/master/SqueezeNet_v1.1) architecture.
If you have previous knowledge or just want to implement Squeezenet quickly, you can directly go the following codes. Otherwise, you could take a look at the paper, it will give some intuition.

In [58]:
# Fire Module
# Fire module take an input inplanes, and squeeze the inplanes using conv2d (kernel_size=1), after squeeze, it expand to expand1x1_planes and expand3x3_planes and then concatenate the resulst.
class Fire(nn.Module):
    
    def __init__(self, inplanes, squeeze_planes,
                 expand1x1_planes, expand3x3_planes):
        super(Fire, self).__init__()
        self.inplanes = inplanes
        self.squeeze = nn.Conv2d(inplanes, squeeze_planes, kernel_size=1) #squeeze
        self.squeeze_activation = nn.ReLU(inplace=True) # follow with a ReLu every time
        self.expand1x1 = nn.Conv2d(squeeze_planes, expand1x1_planes, #expand 1*1
                                   kernel_size=1)
        self.expand1x1_activation = nn.ReLU(inplace=True)
        self.expand3x3 = nn.Conv2d(squeeze_planes, expand3x3_planes, #expand 3*3
                                   kernel_size=3, padding=1)
        self.expand3x3_activation = nn.ReLU(inplace=True)

    def forward(self, x):
        x = self.squeeze_activation(self.squeeze(x))
        return torch.cat([ # concatenate
            self.expand1x1_activation(self.expand1x1(x)),
            self.expand3x3_activation(self.expand3x3(x))
        ], 1)


In [59]:
# conv1: 64 filters of resolution 3x3
# pooling layers: pool_{1,3,5} (which means we do the pooling at these layers)
class SqueezeNet(nn.Module):

    def __init__(self, num_classes=36):
        super(SqueezeNet, self).__init__()
        
        self.num_classes = num_classes
      
        self.features = nn.Sequential(
                nn.Conv2d(3, 64, kernel_size=3, stride=2), # 1
                nn.ReLU(inplace=True), 
                nn.MaxPool2d(kernel_size=3, stride=2, ceil_mode=True), 
                Fire(64, 16, 64, 64),# 2
                Fire(128, 16, 64, 64),# 3
                nn.MaxPool2d(kernel_size=3, stride=2, ceil_mode=True),
                Fire(128, 32, 128, 128),# 4
                Fire(256, 32, 128, 128),# 5
                nn.MaxPool2d(kernel_size=3, stride=2, ceil_mode=True),
                Fire(256, 48, 192, 192),# 6
                Fire(384, 48, 192, 192),# 7
                Fire(384, 64, 256, 256),# 8
                Fire(512, 64, 256, 256),# 9
            )
        # Final convolution is initialized differently form the rest
        final_conv = nn.Conv2d(512, self.num_classes, kernel_size=1) #10
        self.classifier = nn.Sequential(
            nn.Dropout(p=0.5),
            final_conv,
            nn.ReLU(inplace=True),
            nn.AvgPool2d(13)
        )
    
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                if m is final_conv:
                    init.normal_(m.weight.data, mean=0.0, std=0.01)
                else:
                    init.kaiming_uniform_(m.weight.data)
                if m.bias is not None:
                    m.bias.data.zero_()

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x.view(x.size(0), self.num_classes)

## Let's train our model.
Before we train our model, we'd better have some tool functions to write the code clearly. e.g, def train(), test(),

In [128]:
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    # print(f"train size = {size}")
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        # a = loss.cpu().detach().numpy
        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
    return loss

In [129]:
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    # print(f"test size = {size}")

    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
    return test_loss

Then let's set some hypeparmeters, loss function, optimizor.

In [130]:
# before setting these values, let's set device first.
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")
torch.cuda.set_device(0)
print(torch.cuda.get_device_name(0))

Using cuda device
NVIDIA GeForce RTX 2080


In [131]:
# Then let's get our model and send it to device.
num_class = 102 
model = SqueezeNet(num_class).to(device)
print(model)

SqueezeNet(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(2, 2))
    (1): ReLU(inplace=True)
    (2): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=True)
    (3): Fire(
      (squeeze): Conv2d(64, 16, kernel_size=(1, 1), stride=(1, 1))
      (squeeze_activation): ReLU(inplace=True)
      (expand1x1): Conv2d(16, 64, kernel_size=(1, 1), stride=(1, 1))
      (expand1x1_activation): ReLU(inplace=True)
      (expand3x3): Conv2d(16, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (expand3x3_activation): ReLU(inplace=True)
    )
    (4): Fire(
      (squeeze): Conv2d(128, 16, kernel_size=(1, 1), stride=(1, 1))
      (squeeze_activation): ReLU(inplace=True)
      (expand1x1): Conv2d(16, 64, kernel_size=(1, 1), stride=(1, 1))
      (expand1x1_activation): ReLU(inplace=True)
      (expand3x3): Conv2d(16, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (expand3x3_activation): ReLU(inplace=True)
    )
    (5): MaxPool2d

Then let's set some hype parameter and the loss function, optimizer.

In [132]:
# optimizor and loss func
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3, momentum=0.9)
loss_fn = nn.CrossEntropyLoss()
epochs = 200

In [None]:
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train_loss=train(dataloader['train'], model, loss_fn, optimizer)
    test_loss=test(dataloader['val'], model, loss_fn)
    writer.add_scalar("Loss/train", train_loss, t)
    writer.add_scalar("Loss/test", test_loss, t)
writer.flush()

print("Done!")

## Check your results in tensorboard
You are free to check your results in tensorborad.

You could see two graphs, one is about the train loss and another is the train loss.

If you're running in about 100+ epochs, you're about to see the test loss gets higher at some point and do not optimize. That's what we call overfitting.

In your later research, if you see this happens, you could consider stopping your trainning.

### Train loss
![train loss](../results/Loss_train_flower102.svg?raw=true)
### Test loss
![test loss](../results/Loss_test_flower102.svg?raw=true)
