In [1]:
#import packages
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
from pytorch_lightning import seed_everything, LightningModule, Trainer
from sklearn.utils import class_weight
import torch.nn as nn
import torch
from pytorch_lightning.callbacks import EarlyStopping,ModelCheckpoint,LearningRateMonitor
from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts
import torchvision
from sklearn.metrics import classification_report
from PIL import Image
from torch.utils.data import DataLoader, Dataset,random_split
import timm
import torchmetrics
import torchvision.models as models
import albumentations as A
from albumentations.pytorch import ToTensorV2
import pytorch_lightning as pl
import sklearn

In [2]:
print('torch version',torch.__version__)
print('pytorch lightnging version',pl.__version__)
print('sklearn version',sklearn.__version__)
print('torchvision version',torchvision.__version__)
print('albumentations version',A.__version__)
print('torchmetrics version',torchmetrics.__version__)


torch version 1.7.1+cu110
pytorch lightnging version 1.5.4
sklearn version 1.0.1
torchvision version 0.8.2+cu110
albumentations version 1.1.0
torchmetrics version 0.6.1


In [3]:
#create data augmentation
img_size=224
aug= A.Compose([
            A.Resize(img_size,img_size),
            A.HorizontalFlip(0.5),
            A.VerticalFlip(),
            A.RandomRotate90(),
            A.Normalize(),
            ToTensorV2(p=1.0),
        ], p=1.0)

In [4]:
#create a class to read data from folders and apply augmentation from albumentation
class DataReader(Dataset):
    def __init__(self, dataset, transform=None):
        self.dataset = dataset
        self.transform = transform

    def __getitem__(self, index):
        x=self.dataset[index][0]#read image
        y=self.dataset[index][1] #read label
        if self.transform:#apply augmentations
            x=np.array(x)
            x=self.transform(image=x)['image']
        return x, y
    
    def __len__(self):
        return len(self.dataset)

In [5]:
class OurModel(LightningModule):
    def __init__(self):
        super(OurModel,self).__init__()
        # model architecute
        '''refernce:https://github.com/rwightman/pytorch-image-models/blob/master/timm/models/efficientnet.py'''
        self.model =  timm.create_model(model_name,pretrained=True)
        self.fc1=nn.Linear(1000,500)
        self.relu=nn.ReLU()
        self.fc2= nn.Linear(500,5)
 
        #parameters
        self.lr=1e-3
        self.batch_size=64
        self.numworker=12
        self.acc = torchmetrics.Accuracy() #metric
        self.criterion=nn.CrossEntropyLoss() #loss function
        #list to store loss and accuracy
        self.trainacc,self.valacc=[],[]
        self.trainloss,self.valloss=[],[]
        #load data        
        self.dataset=torchvision.datasets.ImageFolder('dataset')
        #split data
        self.train_set, self.val_set =random_split(self.dataset,
                            [int(len(self.dataset)*0.7), int(len(self.dataset)*0.3)],
                                                  generator=torch.Generator().manual_seed(42))
    def forward(self,x):
        x= self.model(x)
        x=self.fc1(x)
        x=self.relu(x)
        x=self.fc2(x)
        return x

    def mixup_data(self,x, y, alpha=1.0):
        '''
        Returns mixed inputs, pairs of targets, and lambda
        reference: mixup: Beyond Empirical Risk Minimization
        '''
        if alpha > 0:
            lam = np.random.beta(alpha, alpha)
        else:
            lam = 1

        batch_size = x.size()[0]
        index = torch.randperm(batch_size)
        mixed_x = lam * x + (1 - lam) * x[index, :]
        y_a, y_b = y, y[index]
        return mixed_x, y_a, y_b, lam


    def mixup_criterion(self, pred, y_a, y_b, lam):
        return lam * self.criterion(pred, y_a) + (1 - lam) * self.criterion(pred, y_b)
    
    
    def configure_optimizers(self):
        #optimizer and scheduler
        opt=torch.optim.AdamW(params=self.parameters(),lr=self.lr )
        scheduler=CosineAnnealingWarmRestarts(opt,T_0=10, T_mult=1, eta_min=1e-6, last_epoch=-1)
        return {'optimizer': opt,'lr_scheduler':scheduler}

    def train_dataloader(self):#load train 
        return DataLoader(DataReader(self.train_set,aug), batch_size = self.batch_size, 
                          num_workers=self.numworker,
                          pin_memory=True,shuffle=True)

    def training_step(self,batch,batch_idx):
        image,label=batch
        mixed_x, y_a, y_b, lam=self.mixup_data(image,label)#apply mixup
        out = self(mixed_x)#pass images to model
        loss=self.mixup_criterion(out,y_a, y_b, lam) #calculate loss
        acc=self.acc(out,label)#calculate accuracy
        return {'loss':loss,'acc':acc}

    def training_epoch_end(self, outputs):
        #average loss and accuracy in all batches of train data
        loss=torch.stack([x["loss"] for x in outputs]).mean().detach().cpu().numpy().round(2)
        acc=torch.stack([x["acc"] for x in outputs]).mean().detach().cpu().numpy().round(2)
        self.trainacc.append(acc)
        self.trainloss.append(loss)
        self.log('train_loss', loss)
        self.log('train_acc', acc)
        
    def val_dataloader(self):
        ds=DataLoader(DataReader(self.val_set,aug), batch_size = self.batch_size,
                      num_workers=self.numworker,pin_memory=True, shuffle=False)
        return ds

    def validation_step(self,batch,batch_idx):
        image,label=batch
        out=self(image)
        loss=self.criterion(out,label)
        acc=self.acc(out,label)
        return {'loss':loss,'acc':acc}

    def validation_epoch_end(self, outputs):
        loss=torch.stack([x["loss"] for x in outputs]).mean().detach().cpu().numpy().round(2)
        acc=torch.stack([x["acc"] for x in outputs]).mean().detach().cpu().numpy().round(2)
        self.valacc.append(acc)
        self.valloss.append(loss)
        print('validation loss accuracy ',self.current_epoch,loss, acc)
        self.log('val_loss', loss)
        self.log('val_acc', acc)

In [6]:
model_name='efficientnetv2_rw_s'
model=OurModel()

In [7]:
lr_monitor = LearningRateMonitor(logging_interval='epoch')
checkpoint=ModelCheckpoint(dirpath='checkpoints',filename='file', monitor='val_acc', verbose=False, save_last=True, mode='max')
trainer = Trainer(max_epochs=50, auto_lr_find=False, auto_scale_batch_size=False,
                #deterministic=True,
                gpus=-1,precision=16,
                accumulate_grad_batches=1,
                stochastic_weight_avg=False,
                enable_progress_bar = False,
                num_sanity_val_steps=0,
                callbacks=[lr_monitor,checkpoint]
                 )

Using 16bit native Automatic Mixed Precision (AMP)
GPU available: True, used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs


In [8]:
model.load_state_dict(torch.load('eff_model.pt'))

<All keys matched successfully>

In [9]:
# trainer.fit(model)

In [10]:
# torch.save(model.state_dict(), 'model.pt')

In [11]:
loader=model.val_dataloader()
model.cuda().eval()
labels,preds=[],[]
with torch.no_grad():
    for batch in loader:
        image,label=batch
        pred=model(image.cuda())
        pred=torch.argmax(pred,dim=1).detach().cpu().numpy()
        labels.append(label.cpu().numpy())
        preds.append(pred)

In [12]:
from sklearn.metrics import classification_report
print(classification_report(np.hstack(labels),np.hstack(preds)))

              precision    recall  f1-score   support

           0       0.72      0.91      0.80      1011
           1       0.72      0.75      0.73       885
           2       0.71      0.68      0.70       894
           3       0.75      0.63      0.68       951
           4       0.78      0.70      0.74       909

    accuracy                           0.74      4650
   macro avg       0.74      0.73      0.73      4650
weighted avg       0.74      0.74      0.73      4650



In [13]:
#test time augmentation
import ttach as tta
transforms = tta.Compose(
    [
        tta.HorizontalFlip(),
        tta.VerticalFlip(),
        tta.Rotate90(angles=[0, 90,180,270]),
    ]
)

tta_model = tta.ClassificationTTAWrapper(model, transforms)

In [14]:
loader=model.val_dataloader()
model.cuda().eval()
labels,preds=[],[]
with torch.no_grad():
    for batch in loader:
        image,label=batch
        pred=tta_model(image.cuda())
        pred=torch.argmax(pred,dim=1).detach().cpu().numpy()
        labels.append(label.cpu().numpy())
        preds.append(pred)

In [15]:
from sklearn.metrics import classification_report
print(classification_report(np.hstack(labels),np.hstack(preds)))

              precision    recall  f1-score   support

           0       0.72      0.94      0.82      1011
           1       0.74      0.76      0.75       885
           2       0.75      0.69      0.72       894
           3       0.77      0.65      0.70       951
           4       0.81      0.71      0.76       909

    accuracy                           0.75      4650
   macro avg       0.76      0.75      0.75      4650
weighted avg       0.76      0.75      0.75      4650



# test 

In [26]:
testset=torchvision.datasets.ImageFolder('test')
testloader=DataLoader(DataReader(testset,aug), batch_size = model.batch_size,
                      num_workers=model.numworker,pin_memory=True, shuffle=False)
classes=model.dataset.class_to_idx
classes={v:k for k,v in classes.items()}
print(classes)

{0: '1', 1: '2', 2: '3', 3: '4', 4: '5'}


In [27]:
model.cuda().eval()
labels,preds=[],[]#ignore labels
with torch.no_grad():
    for batch in testloader:
        image,label=batch#ignore labels
        pred=tta_model(image.cuda())
        pred=torch.argmax(pred,dim=1).detach().cpu().numpy()
        labels.append(label.cpu().numpy())#ignore labels
        preds.append(pred)

In [28]:
image_path=[i[0] for i in testset.imgs]
predicted_label=[classes[i] for i in np.hstack(preds)]

In [29]:
dict(zip(image_path,predicted_label))

{'test/0/0a0ce6bd-8748-49ee-8129-30dae17e11e5.png': '2',
 'test/0/0a6db652-8807-40c9-b074-22d184377800.png': '1'}