# CNN from scratch

In [None]:
import torch
import torchvision
import torchvision.transforms as transforms

In [None]:
trainset = torchvision.datasets.CIFAR10(root='./datasets',train=True,download=True,transform=transforms.ToTensor())
testset = torchvision.datasets.CIFAR10(root='./datasets',train=False,download=True,transform=transforms.ToTensor())

In [None]:
# batches
trainloader = torch.utils.daa.DataLoader(trainset,
                                        batch_size = 8,
                                        shuffle = True,
                                        num_workers=2)
testloader = torch.utils.daa.DataLoader(testset,
                                        batch_size = 8,
                                        shuffle = False,
                                        num_workers=2)

In [None]:
labels = ('plane','car','bird','cat','deer','dog','frog','horse','ship','truck')

In [None]:
# explore data

# loading batches
images_batch, labels_batch = iter(trainloader).next()

# to use matplotlib
img = torchvision.utils.make_grid(images_batch)
plt.imgshow(np.transpose(img, (1,2,0)))
plt.show()

In [None]:
import torch.nn as nn

In [None]:
inp = 3
hid = 16
hid2 = 32
out = len(labels)
k_conv = 5

In [None]:
class ConvNet(nn.Model):
    
    def __init__(self):
        super(ConvNet,self).__init__()
        
        self.layer1 = nn.Sequential(
            nn.Conv2d(inp, hid, k_conv),
            nn.BatchNorm2(hid),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        
        self.layer2 = nn.Sequential(
            nn.Conv2d(hid, hid2, k_conv),
            nn.BatchNorm2(hid2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        # pooling layers do not change number of input features
        self.fc = nn.Linear(hid2 * k_conv * k_conv, out)
        
    def forward(self,x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.reshape(out.size(0),-1)
        out = self.fc(out)
        
        return out

In [None]:
model = ConvNet()
lr = 0.001
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(),lr=lr)

total_step = len(trainloader)
num_epochs = 5

In [None]:
# running model
for epoch in range(num_epochs):
    for i, (image,labels) in enumerate(trainloader):
        # forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        if (i+1) % 2000 == 0:
            print('Epoch [{}/{}], Step [{}/{}], Loss:  {:.4f}'\
                 .format(epoch+1, num_epochs, i+1, total_step, loss.item()))

In [None]:
# predictions
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in testloader:
        outputs = model(images)
        _,predicted = torch.max(outputs.data,1)
        total += labels.size(0)
        correct += (predicted==labels).sum().item()
    print('Accuracy of the model on 10k test images: {}%'\
         .format(100*correct/total))

### transfer learning

In [None]:
# reusing train neurla network - saves time and effort of re training from scratch
# only makes sense for similar problems

# freeze lower layers (lower layers focus on feature extraction) and retrain only using higher layers

In [None]:
# resnet-18 pretrained model

import torch
from torchvision import datasets, models, transforms

# from docu: images fed to pre-trained model have to be normalized using some sort of pre determined parameters
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]

train_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean,std)
])

test_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean,std)
])

In [None]:
import zipfile
zip = zipfile.Zipfile('datasets/flowers_.zip')
zip.extractall('datasets')

In [None]:
data_dir = 'datasets/flowers_'

images_datasets = {}
images_datasets['train'] = datasets.ImageFolder(data_dif+'/train',train_transform)
images_datasets['test'] = datasets.ImageFolder(data_dif+'/test',train_transform)

In [None]:
class_names = images_datasets['train'].classes

In [None]:
dataloaders = {}

dataloaders['train'] = torch.utils.daa.DataLoader(image_dataset['train'],
                                        batch_size = 8,
                                        shuffle = True,
                                        num_workers=2)
dataloaders['test'] = torch.utils.daa.DataLoader(image_dataset['test'],
                                        batch_size = 8,
                                        shuffle = False,
                                        num_workers=2)

In [None]:
inputs, labels = next(iter(dataloaders['train']))

In [None]:
import torchvision
import numpy as np
import matplotlib.pyplot as plt

In [None]:
inp = torchvision.utils.make_grid(inputs)

np.clip(inp,0,1).max()
inp.numpy().transpose((1,2,0)).shape
plt.ion()

In [None]:
def img_show(inp,title=None):
    inp = inp.numpy().transpose((1,2,0))
    inp= std*inp+mean
    inp = np.clip(inp,0,1)
    
    plt.figure(figsize = (16,4))
    plt.axis('off')
    plt.imshow(inp)
    
    if title is not None:
        plt.title(title)
        
        img_show(inp, title = [class_names[x] for x in labels])

In [None]:
model = models.resnet18(pretrained=True)

In [None]:
# make our own linear layer -- get final number of input features that we need to pass into our last linear layer
num_ftrs = model.fc.in_features
num_ftrs

In [None]:
from torch.optim import lr_scheduler
# decays learning rate as we get close to convergence

In [None]:
model.fc = nn.Lineaer(num_ftrs,5)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(),
                     lr = 0.001,
                     momentum=0.9)

sched = lr_scheduler.StepLR(optimizer,
                           step_size = 7,
                           gamma = 0.1) # reduce learning rate by 0.1 every 7 epochs

In [None]:
def calculate_accuracy(phase, running_loss, running_corrects):
    epoch_loss = running_loss/len(image_datasets[phase])
    epoch_acc = running_corrects.double()/len(image-Datasets[phase])
    
    print('{} Loss: {:.4f} acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))
    return epoch_loss, epoch_acc

def phase_train(model, criterion, optimizer, scheduler):
    scheduler.step()
    model.train()
    running_loss = 0.0
    running_corrects = 0
    
    for inputs, labels in dataloaders['train']:
        optimizer.zero_grad()
        
        with torch.set_grad_enabled(True):
            outputs = model(inputs)
            _,preds = torch.max(outputs,1)
            loss = criterion(outputs, labels)
            
            loss.backward()
            optimizer.step()
            
        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)
        
    calculate_accuracy('train', running_loss, running_corrects)

In [None]:
import copy

best_acc = 0.0

In [None]:
def phase_test(model, criterion, optimizer):
    model.eval()
    running_loss = 0.0
    running_corrects = 0
    global best_acc
    
    for inputs, labels in dataloaders['test']:
        
        optimizer.zero_grad()
        
        with torch.no_grad():
            outputs = model(inputs)
            _, preds = torch.max(outputs,1)
            loss = criterion(outputs,labels)
            
        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)
        
    epochLoss, epoc_acc = cauclate_accuracy('test',running_loss, running_corrects)
    
    if epoch_acc > best_acc:
        best_acc = epoch_acc
        best_model_wts = copy.deepcopy(model.state_dict())
    
    return best_model_wts

In [None]:
# training the model

def build_model(model, criterion, optimizer, scheduler, num_epochs=10):
    
    best_model_wts = copy.deepcopy(model.state_dict())
    
    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-'*10)
        
        phase_train(model, criterion, optimizer, scheduler)
        best_model_wts = phase_test(model,criterion,optimizer)
        print()
    print('Best test Acc: {:4f}'.format(best_acc))
    
    model.load_state_dict(best_model_wts)
    return model

In [None]:
model = build_model(model,
                   criterion,
                   optimizer,
                   sched,
                   num_epochs = 1)

In [None]:
with torch.no_grad():
    inputs,labels = iter(dataloaders['test']).next()
    inp = torchvision.utils.make_grid(inputs)
    
    outputs = model(inputs)
    _,preds = torch.max(outputs, 1)
    
    for j in range(len(inputs)):
        inp = inputs.data[j]
        img_show(inp, 'predicted:' + class_names[preds[j]])

### transfer learning using frozen layers

In [None]:
frozen_model = models.resnet18(pretrained=True)

In [None]:
for param in frozen_model.parameters():
    param_requres_grad = False

In [None]:
frozen_model.fc = nn.Linear(num_ftrs, 5)

optimizer = optim.SGD(frozen_model.fc.parameters(),
                     lr = 0.001,
                     momentum=0.9)

sched = lr_scheduler.StepLR(optimizer,
                           step_size = 7
                           gamma = 0.1)

criterion = nn.CrossEntropyLoss()

best_acc = 0.0

In [None]:
frozen_model = build_model(frozen_model,
                          criterion,
                          optimizer
                          sched
                          num_epochs = 1)

In [None]:
with torch.no_grad():
    inputs,labels = iter(dataloaders['test']).next()
    inp = torchvision.utils.make_grid(inputs)
    
    outputs = frozen_model(inputs)
    _,preds = torch.max(outputs, 1)
    
    for j in range(len(inputs)):
        inp = inputs.data[j]
        img_show(inp, 'predicted:' + class_names[preds[j]])