In [1]:
import numpy as np
import pandas as pd
from sklearn import preprocessing
import os
import joblib
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
from torchvision import models, transforms
import time
import copy
import typing
import matplotlib.pyplot as plt
from torch.utils.tensorboard import SummaryWriter

In [2]:
class ProdDataset(torch.utils.data.Dataset):
    """ A dataset object for the image data.
    Assumes the data has been preprocessed with prepare_image_data.py.
    Expects image size (224, 224), zero padding, and RGB mode.
     """
    def __init__(self, input, label, transform=None):
        self.input = input
        self.label = label
        if transform:
            self.transform = transform

    def __getitem__(self, index):
        input = self.input[index]
        label = self.label[index]
        if self.transform:
            input = self.transform(input)
        return input, label

    def __len__(self):
        return len(self.label)


def imshow(inp, title=None):
    """Imshow for Tensor."""
    inp = inp.numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    plt.imshow(inp)
    if title is not None:
        plt.title(title)
    plt.pause(0.001)


In [6]:
# data_path = os.getcwd() + '/data/images/' + 'img_prepared'
data_path = os.path.join('/notebooks/data/', 'img_prepared')

# load the prepared data (prepare_image_data.py)
train_pklname = data_path + '_train.pkl'
val_pklname = data_path + '_val.pkl'
train_data = joblib.load(train_pklname)
val_data = joblib.load(val_pklname)

# get train input data
input_train = np.array(train_data['data'])
# gest train target data
label_train = np.array(train_data['label'])

# get val input data
input_val = np.array(val_data['data'])
# gest val target data
label_val = np.array(val_data['label'])

# create ordinal encoded train target
label_train = np.array(label_train).reshape(-1, 1)
target_enc = preprocessing.OrdinalEncoder()
target_enc.fit(label_train)
target_train = target_enc.transform(label_train)
target_train = target_train.reshape(-1)

# create ordinal encoded val target
label_val = np.array(label_val).reshape(-1, 1)
target_val = target_enc.transform(label_val)
target_val = target_val.reshape(-1)

# a transform composition for training
train_transform = transforms.Compose([
    transforms.RandomRotation((90,90)),
    transforms.RandomRotation((-90,-90)),
    #transforms.RandomAutocontrast(p=0.3),
    #transforms.RandomPerspective(p=0.3),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# a transform composition for validation
val_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# create dataset objects for the dataloader
dataset_train = ProdDataset(input_train, target_train, train_transform)
dataset_val = ProdDataset(input_val, target_val, val_transform)
image_datasets = {'train': dataset_train, 'val': dataset_val}

BATCH_SIZE = 8
# create a dict of train and val dataloaders
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=BATCH_SIZE,
                                              shuffle=True, num_workers=4)
               for x in ['train', 'val']}

dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

  input_train = np.array(train_data['data'])
  input_train = np.array(train_data['data'])
  input_val = np.array(val_data['data'])
  input_val = np.array(val_data['data'])


In [23]:
def train_model(model, criterion, optimizer, scheduler, writer, num_epochs=50, save_path=None):
    """_summary_

    Args:
        model: the image classification model
        criterion: loss function
        optimizer:
        scheduler:
        writer: for tensorboard
        num_epochs (int, optional): number of training epochs. Defaults to 50.
        save_path (path, optional): path for saving the model.

    Returns:
        the best model (lowest val score)
    """
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.type(torch.LongTensor)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            writer.add_scalar(f'Loss/{phase}', epoch_loss, epoch)
            writer.add_scalar(f'Accuracy/{phase}', epoch_acc, epoch)

            # deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
                print(save_path)
                torch.save(best_model_wts, os.path.join(save_path, 'best_image_cnn_model.pt'))

        print()
        writer.flush()

    time_elapsed = time.time() - since
    print(
        f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best val Acc: {best_acc:4f}')

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model

def visualize_model(model, num_images=6):
    """A function to visualize some random images classification result.
    Uses validation data.

    Args:
        model: the learned model
        num_images (int, optional): number of random images to visualize.
                                    Defaults to 6.
    """
    was_training = model.training
    model.eval()
    images_so_far = 0
    fig = plt.figure()

    with torch.no_grad():
        for i, (inputs, labels) in enumerate(dataloaders['val']):
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)

            for j in range(inputs.size()[0]):
                images_so_far += 1
                ax = plt.subplot(num_images//2, 2, images_so_far)
                ax.axis('off')
                ax.set_title(
                    f'predicted: {target_enc.categories_[0][preds[j]]}')
                imshow(inputs.cpu().data[j])

                if images_so_far == num_images:
                    model.train(mode=was_training)
                    return
        model.train(mode=was_training)

In [24]:
import os, datetime
log_dir_path = os.path.join('/notebooks', 'runs')
save_dir_path = os.path.join('/notebooks', 'models')
try:
    os.makedirs(log_dir_path)
except:
    pass
try:
    os.makedirs(save_dir_path)
except:
    pass
# %load_ext tensorboard
# %tensorboard --logdir {log_dir_path}


In [None]:
# view the model structure (RESNET-50)
model_ft = models.resnet50(pretrained=True)
for child in model_ft.named_children():
    print(child)

In [14]:
# visialize the children of later 4 of RESNET-50
children = list(model_ft.layer4.children())
len(list(children))
list(children.children())[-3:]


3

In [25]:
import datetime
# log dir for storing tensorboard files
log_dir = os.path.join(log_dir_path, datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S'))
writer = SummaryWriter(log_dir=log_dir)

# load the RESNET-50 model
model_ft = models.resnet50(pretrained=True)

# freeze all params
for param in model_ft.parameters():
    param.requires_grad = False

# unfreeze layer 4 params
#for param in model_ft.layer4.parameters():
#    param.requires_grad = True

for param in model_ft.avgpool.parameters():
    param.requires_grad = True

# get the last bottleneck of l4 (RESNET-50)     
l4_last_bnk = list(model_ft.layer4.children())[-1:]
for child in l4_last_bnk:
    # unfreeze the last 3 layer blocks
    for c in list(child.children())[-3:]:
        for param in c.parameters():
            param.requires_grad = True

# some experiments
# 2022-06-14_09-32-58 only one FC layer output, no relu, dropout
# 2022-06-14_10-12-36 256, relu, dropout 0.3, output layers
# 2022-06-14_10-58-38 256, relu, output layers
# 2022-06-14_20-59-14 l4 last conv block, 256, relu, output layers, step_size=3

# build a FC layer for classification. The CNN is for feature extraction
num_ftrs = model_ft.fc.in_features
model_ft.fc = nn.Sequential(
              nn.Linear(num_ftrs, 256),
              nn.ReLU(),
              nn.Linear(256, len(target_enc.categories_[0])),
              )
# make sure the FC layer is learnable              
for param in model_ft.fc.parameters():
    param.requires_grad = True

model_ft = model_ft.to(device)

criterion = nn.CrossEntropyLoss()

# Observe that all parameters are being optimized
optimizer_ft = optim.SGD(model_ft.parameters(), lr=0.001, momentum=0.9)

# Decay LR by a factor of 0.1 every 3 epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=3, gamma=0.1)

# train the model
model_ft = train_model(model_ft, criterion, optimizer_ft, exp_lr_scheduler, writer,
                       num_epochs=50, save_path=save_dir_path)

# visualize the model
visualize_model(model_ft)

Epoch 0/49
----------
train Loss: 2.1521 Acc: 0.2857
val Loss: 1.7738 Acc: 0.4236
/notebooks/models

Epoch 1/49
----------


KeyboardInterrupt: 

In [None]:
# visualize the trained model using val dataset
model_ft = models.resnet50(pretrained=True)
num_ftrs = model_ft.fc.in_features
model_ft.fc = nn.Linear(num_ftrs, len(target_enc.categories_[0]))
model_ft.load_state_dict(torch.load(save_dir_path + 'best_image_cnn_model.pt'))
model_ft = model_ft.to(device)
model_ft.eval()

num_batches = 1
i = 0
for inputs, labels in dataloaders['val']:
    inputs = inputs.to(device)
    labels = labels.type(torch.LongTensor)
    labels = labels.to(device)
    outputs = model_ft(inputs)
    _, preds = torch.max(outputs, 1)

    fig = plt.figure()
    for j in range(inputs.size()[0]):
        ax = plt.subplot(2, 2, j)
        ax.axis('off')
        ax.set_title(
            f'predicted: {target_enc.categories_[0][preds[j]]}\n label: {target_enc.categories_[0][labels[j]]}')
        imshow(inputs.cpu().data[j])
    i += 1
    if num_batches == i:
        break