# Pytorch Grapevine Leaf Classifier

Taken with very little modification (just path and use pyplot) from Maxim Vlah

Dataset and code (c) 2021 Maxim Vlah, Licensed CC-BY-SA-NC 4.0 and used with permission.

Dataset: https://www.kaggle.com/maximvlah/grapevine-leaves

Original notebook: https://www.kaggle.com/maximvlah/pytorch-resnet18-94-f1

**Important notice from Thomas**: I picked this notebook because I liked the dataset and the code was reasonable and protypical of a lot of code I see in my work. Creating, curating and releasing a useful dataset (from your own images, too) is a great accomplishment, and a demonstration of the dataset with simple demo code is cool, too. I do not think of this code as primarily _unoptimized_ but rather as primarily _worth optimizing_. Thank you Maxim!


In [19]:
'''Load librarires'''
import pickle
import time
import random
import ntpath
import glob
import os
from copy import deepcopy
import numpy as np
import pandas as pd
from PIL import Image
from tqdm import tqdm
from pathlib import Path
from collections import defaultdict

from matplotlib import pyplot

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset,DataLoader
import torchvision
from torchvision import models, transforms, utils

from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, precision_score, recall_score

In [20]:
class CFG:

    '''Store all hyperparameters here.''' 

    SEED = 420
    TEST_SIZE = 0.2
    VAL_SIZE = 0.25
    CLASSES = None #Need to update manually
    OUTPUT_FEATURES = None #Need to update manually
    
    #transforms
    TRAIN_TRANSFORMS = transforms.Compose([
        #Rotate the image by given angle.
        transforms.RandomRotation(5),
        #Crop the given PIL Image to random size and aspect ratio.
        transforms.Resize((224,224)),
        #Horizontally flip the given PIL Image randomly with a given probability.
        transforms.RandomHorizontalFlip(p = 0.2),
        #Convert a PIL Image or numpy.ndarray to tensor.
        transforms.ToTensor(),
        #Normalize a tensor image with mean and standard deviation
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                             std=[0.229, 0.224, 0.225])
    ])

    VAL_TRANSFORMS = transforms.Compose([
        #Rotate the image by given angle.
        transforms.RandomRotation(5),
        #Crop the given PIL Image to random size and aspect ratio.
        transforms.Resize((224,224)),
        #Horizontally flip the given PIL Image randomly with a given probability.
        transforms.RandomHorizontalFlip(p = 0.2),
        #Convert a PIL Image or numpy.ndarray to tensor.
        transforms.ToTensor(),
        #Normalize a tensor image with mean and standard deviation
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                             std=[0.229, 0.224, 0.225])
    ])
    
    TEST_TRANSFORMS = transforms.Compose([
        #Crop the given PIL Image to random size and aspect ratio.
        transforms.Resize((224,224)),
        #Convert a PIL Image or numpy.ndarray to tensor.
        transforms.ToTensor(),
        #Normalize a tensor image with mean and standard deviation
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                             std=[0.229, 0.224, 0.225])
    ])
    
    #models
    
    # MODEL1 = {
    #     'name': 'custom_cnn',
    #     'transfer': False,
    #     'architecture': nn.Sequential(
    #       nn.Conv2d(1,20,5),
    #       nn.ReLU(),
    #       nn.Conv2d(20,64,5),
    #       nn.ReLU()
    #     ),
    #     'criterion': nn.CrossEntropyLoss(),
    #     'optimizer': optim.SGD,
    #     'momentum': 0.9,
    #     'lr': 0.003,
    #     'history': None
    # }

    MODEL2 = {
        'name': 'resnet18',
        'transfer': True,
        'architecture': models.resnet18(pretrained=True), # ResNet18
        'criterion': nn.CrossEntropyLoss(),
        'optimizer': optim.SGD,
        'momentum': 0.9,
        'lr': 0.002,
        'history': None
    }

    # MODELS =[MODEL1,MODEL2]

    BATCH_SIZE = 64
    EPOCHS = 120
    
    DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print('You are using ->', DEVICE)    

You are using -> cuda:0


In [21]:
def seed_everything(seed):
    '''Make the results reproducible'''
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True 

seed_everything(CFG.SEED)

In [22]:
'''Store image paths and their labels in pandas dataframe. Will be used to create pytorch datasets. '''

paths = glob.glob('/mnt/data/vision/plants-wine/data/*/*' )
meta = pd.DataFrame([(path, ntpath.basename(ntpath.dirname(path))) for path in paths], columns = ['path','label'])
#get class mappings
classes = dict(enumerate(meta.label.astype('category').cat.categories))
CFG.CLASSES = classes
CFG.OUTPUT_FEATURES = len(CFG.CLASSES)

meta.label = meta.label.astype('category').cat.codes
meta.head()

Unnamed: 0,path,label
0,/mnt/data/vision/plants-wine/data/Merlot/IMG_1...,4
1,/mnt/data/vision/plants-wine/data/Merlot/IMG_1...,4
2,/mnt/data/vision/plants-wine/data/Merlot/IMG_0...,4
3,/mnt/data/vision/plants-wine/data/Merlot/IMG_1...,4
4,/mnt/data/vision/plants-wine/data/Merlot/IMG_1...,4


In [23]:
classes

{0: 'Auxerrois',
 1: 'Cabernet Franc',
 2: 'Cabernet Sauvignon',
 3: 'Chardonnay',
 4: 'Merlot',
 5: 'Muller Thurgau',
 6: 'Pinot Noir',
 7: 'Riesling',
 8: 'Sauvignon Blanc',
 9: 'Syrah',
 10: 'Tempranillo'}

In [24]:
'''Split data into train, validation and test sets'''

X = list(meta.path)
y = list(meta.label)

X_train, X_test, y_train, y_test = train_test_split(X, 
                                                    y, 
                                                    test_size=CFG.TEST_SIZE, 
                                                    random_state=CFG.SEED, 
                                                    stratify=y) #stratified split

X_train, X_val, y_train, y_val = train_test_split(X_train,
                                                  y_train,
                                                  test_size=CFG.VAL_SIZE,
                                                  random_state=CFG.SEED,
                                                  stratify=y_train) #stratified split

print(f'Train length -> {len(X_train)}')
print(f'Val length -> {len(X_val)}')
print(f'Test length -> {len(X_test)}')

Train length -> 605
Val length -> 202
Test length -> 202


In [25]:
'''Custom pytorch dataset implementation.'''
class LeafDataset(Dataset):
    def __init__(self,X,y, transform=None):
        self.X = X
        self.y = torch.tensor(y, dtype=torch.long)
        self.transform = transform

        assert len(self.X) == len(self.y), f'X and y have different lengths -> {len(self.X)} != {len(self.y)} '

    def __len__(self):
        return len(self.X)

    def __getitem__(self,idx):
        img_path = self.X[idx]
        img = Image.open(img_path)
        if self.transform is not None:
            img = self.transform(img)
        label = self.y[idx]
        return (img,label) 

    def show_img(self,idx):
        '''Plot image'''
        img,label = self.__getitem__(idx)
        img = img.numpy().transpose((1, 2, 0))
        pyplot.figure(figsize=(16, 8))
        pyplot.axis('off')
        pyplot.imshow(img)
        pyplot.title(CFG.CLASSES[int(label)]) #using CFG.CLASSES dict
        pyplot.pause(0.001)

In [26]:
'''Instantiate pytorch train, validation and test sets'''
TRAIN = LeafDataset(X_train,y_train, CFG.TRAIN_TRANSFORMS)
VAL = LeafDataset(X_val,y_val, CFG.VAL_TRANSFORMS)
TEST = LeafDataset(X_test,y_test, CFG.TEST_TRANSFORMS)

'''Instantiate Dataloaders'''
TRAIN_LOADER = DataLoader(TRAIN,CFG.BATCH_SIZE)
VAL_LOADER = DataLoader(VAL,CFG.BATCH_SIZE)
TEST_LOADER = DataLoader(TEST,CFG.BATCH_SIZE)

In [27]:
class Net(nn.Module):
    '''
    ========================
          NEURAL NET
    ========================
    
    Args:
        model_dict(dict): configuration dict containing the model architecture
        output_features(int): length of output tensor; for classification equals to number of classes
    '''
    def __init__(self, model_dict, output_features):
        super().__init__()
        self.__dict__.update(model_dict) #unpack model dict from CFG into this class
        
        if self.transfer:
            model = self.architecture
            num_ftrs = model.fc.in_features
            model.fc = nn.Linear(num_ftrs, output_features)
            self.model = model
        else:
            self.model = self.architecture
        
        self.output_features = output_features
        #optimizer
        self.optimizer = self.optimizer(self.model.parameters(),self.lr)
        #path where to save model
        self.save_path = 'models'
        
    def forward(self, x):
        return self.model(x)

    def fit(self,
            train_loader,
            val_loader,
            epochs = 5,
            batch_size = 32,
            device = 'cpu'):
        '''
        =============================
            OPTIMIZATION LOOP
        =============================

        Args:
            train_loader(torch dataloader)
            val_loader(torch dataloader)
            epochs(int)
            batch_size(int)
            device(str)

            
        Output style inspired by the skorch fit() method

        '''
        #may be changed if lrscheduler is used???
        lr = deepcopy(self.lr)

        #get model training history
        history = self.history
        if history == None:
            history = defaultdict(list)
        else:
            pass
        #get train and val sizes
        train_size = len(train_loader.dataset)
        val_size = len(val_loader.dataset)
        #stuff for printing epoch metrics as a beautiful table
        headers = ['epoch','train_loss','val_loss','val_acc','cp','lr','dur']
        template = '{:<10} {:<10} {:<10} {:<10} {:<10} {:<10} {:<10}'
        print(template.format(*headers))
        print(template.replace(':', ':-').format('','','','','','',''))
        cyan = "\033[96m{:<10}\033[00m" #cyan
        purple = "\033[95m{:<10}\033[00m" #purple
        green = "\033[92m{:<10}\033[00m" #green
        white = "\033[0m{:<10}\033[0m" #white
        #set model into train mode
        self.model.train()
        #send model to device
        self.model.to(device)
        #training loop
        for epoch in range(epochs):
            start_time = time.time()
            train_loss = 0
            val_loss, val_acc = 0, 0
            #optimization  loop
            for (X,y) in train_loader:
                #Send training data to device
                X,y = X.to(device), y.to(device)
                #Forward propagation
                pred = self.model(X)
                loss = self.criterion(pred,y)
                #Backpropagation
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()
                #update loss
                train_loss = loss.item()
            #validation loop
            with torch.no_grad():
                for X, y in val_loader:
                    X,y = X.to(device),y.to(device)
                    pred = self.model(X)
                    val_loss = self.criterion(pred,y).item()
                    val_acc += (pred.argmax(1) == y).type(torch.float).sum().item()
            #calculate validation accuracy after the epoch
            val_acc /= val_size
            #append epoch results
            history['epoch'].append(epoch+1)
            history['train_loss'].append(train_loss)
            history['val_loss'].append(val_loss)
            history['val_acc'].append(val_acc)
            
            #colorize epoch's output if it improves
            colortemp = template.split(' ')
            # colorize train loss if it decreases
            if history['train_loss'][-1] == min(history['train_loss']):
                colortemp[1] = cyan
            else:
                colortemp[1] = white
            #colorize validation loss if it decreases
            if history['val_loss'][-1] == min(history['val_loss']):
                colortemp[2] = purple
            else:
                colortemp[2] = white
            # colorize validation accuracy & save best weights if it increases
            if history['val_acc'][-1] == max(history['val_acc']):
                #colorize       
                colortemp[3] = green
                #checkpoint
                cp = '+'
                if not os.path.exists(self.save_path):
                    os.mkdir(self.save_path)
                torch.save(self.model.state_dict(), Path(self.save_path,f'best_{self.name}.pth'))
            else:
                colortemp[3] = white
                cp = '-'
            colortemp = ' '.join(colortemp)

            #calculate epoch duration (in seconds)
            end_time = time.time()
            dur = end_time - start_time
            #append the rest of epoch results
            history['cp'].append(cp)
            history['lr'].append(lr)
            history['dur'].append(dur)
            #display the epoch results
            print(colortemp.format(*f'{epoch+1}/{epochs} {train_loss:.4f} {val_loss:.4f} {val_acc:.2f} {cp} {lr} {dur:.2f}'.split(' ')))
        #update epoch number of the entire training history
        history['epoch'] = [e+1 for e in range(len(history['epoch']))]
        #update model's training history
        self.history = history
        #save training history as csv
        self.save_history()

    def eval_model(self,dataloader,avg=None,device ='cpu'):
        '''
        ==================================
           ACCURACY PRECISION RECALL F1
        ==================================
        '''
        labels = [l for l in range(self.output_features)]
        loader_size = len(dataloader)
        dataset_size = len(dataloader.dataset)

        acc =0
        precision = 0
        recall = 0
        f1 = 0

        #set model to evaluation mode
        self.model.eval()
        #model to device, default cpu
        self.model.to(device)

        with torch.no_grad():
            for X, y in dataloader:
                pred = self.model(X)
                #accuracy
                acc += (pred.argmax(1) == y).type(torch.float).sum().item()

                pred = pred.argmax(1)
                pred,y = list(pred), list(y)
                #precision
                p = precision_score(y, pred, labels = labels, zero_division = 1, average = avg)
                precision+=p
                #recall
                r = recall_score(y, pred, labels = labels, zero_division = 1,  average = avg)
                recall+=r
                #f1 score
                f = f1_score(y, pred, labels = labels, zero_division = 1,  average = avg)
                f1 += f

        acc /= dataset_size
        precision /= loader_size
        recall /= loader_size
        f1 /= loader_size

        print(f" Accuracy: {(100*acc):>0.1f}%")
        print(f"Precision: {(100*np.mean(precision)):>0.1f}%")
        print(f"   Recall: {(100*np.mean(recall)):>0.1f}%")
        print(f" F1 Score: {(100*np.mean(f1)):>0.1f}%")

        
    def plot_loss_history(self):
        '''
        Plot loss history
        '''
        assert self.history != None, 'No history to plot -> the model has not been trained yet!'
        
        df = pd.DataFrame(self.history)
        fig = pyplot.figure()
        pyplot.plot(df.epoch, df.train_loss, label='train loss')
        pyplot.plot(df.epoch, df.val_loss, label='val loss')
        pyplot.title('Loss History')
        pyplot.legend()
        pyplot.show()

    def save_history(self):
        '''Save model's training history'''
        assert self.history != None, 'No history to save -> the model has not been trained yet!'
        #save as csv
        pd.DataFrame(self.history).to_csv(Path(f'models/{self.name}_history.csv')) 
        #save as pickle file
        with open(Path(f'models/{self.name}_history.pkl'), 'wb') as f:
            pickle.dump(self.history, f, protocol=pickle.HIGHEST_PROTOCOL)

    def save_model(self):
        torch.save(self.model.state_dict(), Path(self.save_path,f'latest_{self.name}.pth'))

    def load_model(self,path = 'models'):
        '''Load model'''
        try:
            #load model weights
            p = Path(path,f'best_{self.name}.pth')
            self.model.load_state_dict(torch.load(p))
            #load model training history
            with open(Path(path,f'{self.name}_history.pkl'), 'rb') as h: 
                self.history = pickle.load(h)
        except:
            print('No model to load!')

In [28]:
#instantiate model and send to device
Resnet18 = Net(CFG.MODEL2,CFG.OUTPUT_FEATURES)


In [32]:
t0 = time.perf_counter()
with torch.profiler.profile() as prof:
    Resnet18.fit(TRAIN_LOADER,
             VAL_LOADER,
             3, # CFG.EPOCHS,
             CFG.BATCH_SIZE,
             CFG.DEVICE)
t1 = time.perf_counter()
print(t1 - t0)

epoch      train_loss val_loss   val_acc    cp         lr         dur       
---------- ---------- ---------- ---------- ---------- ---------- ----------
1/3        [96m1.8333    [00m [95m2.0274    [00m [92m0.47      [00m +          0.002      20.56     
2/3        [96m1.7484    [00m [95m1.9565    [00m [92m0.51      [00m +          0.002      20.47     
3/3        [96m1.6762    [00m [95m1.9262    [00m [92m0.54      [00m +          0.002      20.69     
67.30887206504121


In [33]:
ka = prof.key_averages()

In [40]:
print(ka.table(sort_by="self_cpu_time_total", row_limit=5))

-------------------------------------------------------  ------------  ------------  ------------  ------------  ------------  ------------  ------------  ------------  ------------  ------------  
                                                   Name    Self CPU %      Self CPU   CPU total %     CPU total  CPU time avg     Self CUDA   Self CUDA %    CUDA total  CUDA time avg    # of Calls  
-------------------------------------------------------  ------------  ------------  ------------  ------------  ------------  ------------  ------------  ------------  ------------  ------------  
enumerate(DataLoader)#_SingleProcessDataLoaderIter._...        96.27%       59.201s        97.59%       60.014s        1.250s       0.000us         0.00%       0.000us       0.000us            48  
                                        cudaMemcpyAsync         1.78%        1.092s         1.78%        1.092s       2.167ms       0.000us         0.00%       0.000us       0.000us           504  
         

In [13]:
#evaluate best model's performance on the test set
print('Best Model:')
print('-'*20)
best_model = deepcopy(Resnet18)
best_model.load_model()
best_model.eval_model(TEST_LOADER)
#evaluate current model's performance on the test set
print('Current Model:')
print('-'*20)
Resnet18.eval_model(TEST_LOADER)

Best Model:
--------------------
 Accuracy: 92.6%
Precision: 94.3%
   Recall: 94.5%
 F1 Score: 93.8%
Current Model:
--------------------
 Accuracy: 92.6%
Precision: 94.3%
   Recall: 94.5%
 F1 Score: 93.8%


# What's next?

- Train function as model class method
- LR Scheduler
- Train Custom Model
- Confusion Matrix
- Random & Grid Search
- Cross-Validation
- Ensemble Learning

In [14]:
def cross_val():
    '''
    ======================
       CROSS VALIDATION
    ======================
    '''
    pass