<a href="https://colab.research.google.com/github/mralamdari/CV-Object-Detection-Projects/blob/main/Flower_Recognition_Challenges_part4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os
import copy
import time
import tqdm
import torch
import mlxtend
import warnings
import operator
import matplotlib
import torchvision
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import tensorflow as tf
from sklearn import model_selection, metrics, preprocessing

In [2]:
warnings.filterwarnings('ignore')
warnings.filterwarnings('always')
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

##1.Get Data

In [None]:
os.environ['KAGGLE_CONFIG_DIR'] = '/content/drive/MyDrive'
!kaggle datasets download -d alxmamaev/flowers-recognition
!unzip \*.zip && rm *.zip

In [None]:
!pip install -q torch
!pip install -q albumentations
!pip install -q seaborn
!pip install -q tqdm
!pip install -q numpy
!pip install -q addict

In [None]:
from torch.utils.data import Dataset, DataLoader, random_split
import albumentations as A
from albumentations.pytorch import ToTensor
from PIL import Image
import numpy as np
import logging
from addict import Dict
from datetime import datetime, date

logging.basicConfig(format="[%(levelname)s] [%(asctime)s] - %(message)s")

## Config


In [None]:
config = Dict({
    "path": "../input/flowers-recognition/flowers/",
    "device": torch.device("cuda" if torch.cuda.is_available() else "cpu"),
    "batch_size": 16,
    "augmentations": A.Compose([
        A.Downscale(scale_min=0.6, scale_max=0.99, p=0.2),  
        A.HorizontalFlip(p=0.5),
        A.VerticalFlip(p=0.1), 
        A.RandomFog(fog_coef_lower=0.1, fog_coef_upper=0.5, alpha_coef=0.05, p=0.5), 
        A.RandomContrast(limit=0.1, p=0.4),
        A.RandomGamma(gamma_limit=(50, 150), p=0.4),
        A.RandomBrightness(p=0.4),
        A.OpticalDistortion(p=0.2),
        A.Blur(blur_limit=2, p=0.2),
        ToTensor(),
    ]),
    "num_workers": 0,
})

In [None]:
class FlowersDataset(Dataset):
    def __init__(self, path:str, shape=(256, 256), augmentations=None):
        self.__images_classes_pathes = []
        self.__shape = shape
        self.__augmentations = augmentations
        self.labels = []
            
        if os.path.exists(path):
            self.__path = path
            folders = [_ for _ in os.listdir(self.__path) if _ != "flowers"]

            for folder in folders:
                folder_path = os.path.join(self.__path, folder)
                if os.path.isdir(folder_path) and os.path.exists(folder_path):
                    images = os.listdir(folder_path)
                    self.labels.append(folder)
                    
                    
                    if len(images):
                        for image in images:
                            if image.endswith("jpeg") or image.endswith("png") or image.endswith("jpg"):
                                image_path = os.path.join(folder_path, image)
                                self.__images_classes_pathes.append((folder, image_path))
                    else:
                        message = f"Images for folder '{folder}' weren't found!"
                        print(message)
                else:
                    message = f"'{folder}' is not folder and it will be skipped!"
                    print(message)
                
        else:
            message = f"Path '{path}' does not exists!"
            raise Exception(message)
    
        self.__images_classes_pathes = np.array(self.__images_classes_pathes)
        
    
    
    
    def __load_image(self, path, channels="RGB"):
        width, height = self.__shape
        loader = A.Compose([
            A.Resize(width, height),
            ToTensor(),
        ])
        
        image = np.array(Image.open(path).convert(channels))
        return loader(image=image)["image"]
    
    def __len__(self):
        return len(self.__images_classes_pathes)
    
    
    def __getitem__(self, index):
        item = self.__images_classes_pathes[index]
        label, image_path = item
        
        image = self.__load_image(image_path, channels="RGB")
        
        if self.__augmentations is not None:
            image = self.__augmentations(image=image.permute(1, 2, 0).numpy())["image"]
        
        label = self.labels.index(label)
        
        return Dict({
            "label": label,
            "image": image
        })
    
    
class Trainer:
    def __init__(self, model, criterion, optimizer, scheduler=None, metric=None, device="cpu"):
        self.__model = model
        self.__criterion = criterion
        self.__optimizer = optimizer
        self.__scheduler = scheduler
        self.__metric = metric
        self.__device = device
        self.logs = Dict({})
        
        
    def __log(self, logs):
        for k,v in logs.items():
            if k not in self.logs.keys():
                self.logs[k] = []
                
            self.logs[k].append(v)
        
        
    def __make_checkpoint(self, info, path=f"checkpoints/checkpoint.pt"):
        checkpoint_info = {**info,
            "optimizer_state": self.__optimizer.state_dict(),
            "model_state": self.__model.state_dict()}
        
        torch.save(checkpoint_info, path)
    
    
    def evaluate(self, loader):
        loss = 0
        length = len(loader)
        with torch.no_grad():
            loop = tqdm(loader)
            loop.set_description("Evaluating")
            for batch in loop:
                torch.cuda.empty_cache()
                images = batch["images"].to(self.__device)
                labels = batch["labels"].to("cpu")
                        
                output = self.__model(images).to("cpu")
                        
                batch_loss = criterion(output, labels)
                loss += batch_loss.item()
            
        loss /= length
        
        return loss
    
    def save(self, path="model.pt"):
        torch.save(self.__model.state_dict(), path)
        
        
    def fit(self, loader, epochs=10, validation_loader=None):
        model.to(self.__device)
        train_length = len(loader)
        
        best_validation_loss = 0
        for epoch in range(epochs):
            epoch_loss = 0
            
            loop = tqdm(loader, position=0, leave=True)
            loop.set_description(f"Epoch [{epoch+1}/{epochs}]")
            for batch in loop:
                torch.cuda.empty_cache()
                optimizer.zero_grad()
                
                images = batch["images"].to(self.__device)
                labels = batch["labels"].to("cpu")
                
                output = self.__model(images).to("cpu")
                predicted_class = torch.argmax(output, dim=1)
        
                loss = self.__criterion(output, labels)
                
                epoch_loss += loss.item()
                
                loop.set_postfix(loss=loss.item())
                
                loss.backward()
                optimizer.step()
            
            epoch_loss /= train_length
            
            self.__log({"epochs": epoch+1, "train_loss": epoch_loss})
            loop.set_postfix(loss=epoch_loss)
            
            if validation_loader is not None:
                validation_loss = self.evaluate(validation_loader)
                self.__log({"validation_loss": validation_loss})
            
                rounded_loss = np.round(validation_loss, 3)
                if rounded_loss > best_validation_loss:
                    now = datetime.now().strftime("%H:%M:%S %d.%m.%Y")
                    checkpoint_path = f"{rounded_loss}_{now}.pt"

                    checkpoint_info =  {
                        "epoch": epoch+1,
                        "loss": validation_loss
                    }

                    self.__make_checkpoint(info=checkpoint_info, path=checkpoint_path)

                    best_validation_loss = rounded_loss
                
                if self.__scheduler is not None:
                    self.__scheduler.step(validation_loss)
                    
            else:
                if self.__scheduler is not None:
                    self.__scheduler.step()
            
            lr = self.__optimizer.defaults["lr"]
            self.__log({"lr": lr})
            
    
def collate_fn(batch):
    images, labels = [], []
    
    for item in batch:
        label, image = item.label, item.image.tolist()
        
        images.append(image)
        labels.append(label)
        
    return {
        "images": torch.tensor(images),
        "labels": torch.tensor(labels)
    }


def train_test_split(dataset, test_size=0.2):
    length = len(dataset)
    train_length = round(length * (1 - test_size))
    test_length = length - train_length
    
    train_dataset, test_dataset = random_split(dataset, [train_length, test_length])
    return train_dataset, test_dataset

In [None]:
dataset = FlowersDataset(path=config.path, augmentations=config.augmentations)
rows, cols = 10, 10
fig = plt.figure(figsize=(cols*3, rows*3))
for _ in range(rows * cols):
    item = dataset[_*40]
    label = item.label
    class_ = dataset.labels[label]
    image = item.image.permute(1, 2, 0).numpy()
    ax = fig.add_subplot(rows, cols, _+1)
    ax.imshow(image)
    ax.set_title(class_, fontsize=15, fontfamily="serif", y=1.02)
    ax.xaxis.set_visible(False)
    ax.yaxis.set_visible(False)

fig.tight_layout()
fig.show()

#1 1

In [None]:
datagen = tf.keras.preprocessing.image.ImageDataGenerator(featurewise_center= False,
                              samplewise_center= False,
                              featurewise_std_normalization= False,
                              samplewise_std_normalization=False,
                              rotation_range= 10,        # 0- 180
                              zca_whitening=False,
                              zoom_range=0.1,            # Randomly zoom image
                              width_shift_range=0.2,     # randomly shift images horizontally (fraction of total width)
                              height_shift_range=0.2,    # randomly shift images vertically (fraction of total height)
                              horizontal_flip=True,      # randomly flip images
                              vertical_flip=False)       # randomly flip images
                             
datagen.fit(X_train)