In [None]:
### Flower classification, Jan Gruszczynski, 11.05.2022
### CODE FOR DATASET WHICH IS ALREADY USING THE FOLDER STRUCTURE AS BELOW

#- prepared_dataset
#-- training
#--- flower_1
#--- flower_2 ...

# Download libraries

import os
import shutil
import tarfile
from six.moves import urllib
import pandas as pd
import numpy as np
import cv2
import time

import torch
import torch.nn as nn
import torch.nn.functional as F

from torch.utils.data import DataLoader, Dataset
from torch.utils.data import RandomSampler

import torchvision.transforms as T
import torchvision.models as models
from torchvision.utils import make_grid
from torchvision.datasets import ImageFolder

from matplotlib import pyplot as plt

from sklearn import model_selection
from sklearn.metrics import confusion_matrix, classification_report

from PIL import Image

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
os.listdir("/content/drive/MyDrive/Colab Notebooks/prepared_dataset_1.zip (Unzipped Files)/prepared_dataset_1")

In [None]:
# Script to download dataset

DOWNLOAD_ROOT = "https://www.robots.ox.ac.uk/~vgg/data/flowers/17/17flowers.tgz"
DATA_PATH = os.path.join("datasets", "flowers")
IMAGES_PATH = os.path.join("datasets", "flowers", "jpg")
def fetch_housing_data(data_url=DOWNLOAD_ROOT, data_path=DATA_PATH):
    if not os.path.isdir(data_path):
        os.makedirs(data_path)
    tgz_path = os.path.join(data_path, "17flowers.tgz")
    urllib.request.urlretrieve(data_url, tgz_path)
    data_tgz = tarfile.open(tgz_path)
    data_tgz.extractall(path=data_path)
    data_tgz.close()

In [None]:
### PREPARING TRAINING AND TEST DATA

# assign labels to subsequent classes

DATASET_PATH = "/content/drive/MyDrive/Colab Notebooks/prepared_dataset_1.zip (Unzipped Files)/prepared_dataset_1"

classes = os.listdir(DATASET_PATH)
#print(classes)

# prepare training, validation, test data PATHS
 
LIST_FILE_PATH = os.path.join(DATASET_PATH, "files.txt")
#DATASET_PATH = os.path.join(DATA_PATH, "prepared_dataset_1")


TRAINING_PATH = os.path.join(DATASET_PATH, "training")
VALIDATION_PATH = os.path.join(DATASET_PATH, "validation")
TEST_PATH = os.path.join(DATASET_PATH, "test")


#os.mkdir(TRAINING_PATH)
#os.mkdir(VALIDATION_PATH)
#os.mkdir(TEST_PATH)

In [None]:
# reading list of images

with open(LIST_FILE_PATH) as file:
    all_files_list = file.readlines()
    all_files_list = [line.rstrip() for line in all_files_list]

In [None]:
import re
import random

# create list of files

with open(LIST_FILE_PATH, "r") as file:
    file_list = file.readlines()
    file_list = [line.rstrip() for line in file_list]
    
# list of divisible by lenght of class

list_classes = [i for i in range(1, len(file_kist)) if i % 80 == 0]
list_classes

# create datasets

#- prepared_dataset
#-- training
#--- flower_1
#--- flower_2 ...

# declare location of the resized images

IMAGES_PATH = os.path.join("datasets", "flowers", "jpg1")


for i in range(1,18):
    
    TRAIN_DIR = os.path.join(TRAINING_PATH, "flower_{}".format(i))
    os.mkdir(TRAIN_DIR)
    VAL_DIR = os.path.join(VALIDATION_PATH, "flower_{}".format(i))
    os.mkdir(VAL_DIR)
    TEST_DIR = os.path.join(TEST_PATH, "flower_{}".format(i))
    os.mkdir(TEST_DIR)
    #print(TRAIN_DIR, VAL_DIR, TEST_DIR)
    
    a = 80*i
    print(a)
    
   
    # list of all images in one class
    list_total = [j for j in file_list if a-80 < int(re.search(r'\d+', j).group()) <= a]
    random.shuffle(list_total)

    training_dataset, test_dataset = sklearn.model_selection.train_test_split(list_total, test_size=20)
    test_dataset, valid_dataset = sklearn.model_selection.train_test_split(test_dataset, test_size=10)
    #print("TRAINING", len(training_dataset), "VALID", len(valid_dataset), "TEST", len(test_dataset))
    #print("TRAINING", training_dataset, "VALID", valid_dataset, "TEST", test_dataset)
    
    #copying files into subsequent datasets
    
    # CAN BE REPLACED WITH FUNCTION
       
    #training dataset
    src_files = os.listdir(IMAGES_PATH)
    for file_name in training_dataset:
        full_file_name = os.path.join(IMAGES_PATH, file_name)
        if os.path.isfile(full_file_name):
            shutil.copy(full_file_name, TRAIN_DIR)
    
    #valid dataset
    for file_name in valid_dataset:
        full_file_name = os.path.join(IMAGES_PATH, file_name)
        if os.path.isfile(full_file_name):
            shutil.copy(full_file_name, VAL_DIR)
    
    #test dataset
    for file_name in test_dataset:
        full_file_name = os.path.join(IMAGES_PATH, file_name)
        if os.path.isfile(full_file_name):
            shutil.copy(full_file_name, TEST_DIR)

In [None]:
### Exploring Dataset

classes = os.listdir(TRAINING_PATH)
print("Total Classes: ",len(classes))

#Counting total train, valid & test images

train_count = 0
valid_count = 0
test_count = 0
for _class in classes:
    train_count += len(os.listdir(TRAINING_PATH + "/" + _class))
    valid_count += len(os.listdir(VALIDATION_PATH + "/" +_class))
    test_count += len(os.listdir(TEST_PATH + "/" +_class))

print("Total train images: ",train_count)
print("Total valid images: ",valid_count)
print("Total test images: ",test_count)

In [None]:
train_imgs = []
valid_imgs = []
test_imgs = []

for _class in classes:
    
    for img in os.listdir(TRAINING_PATH + "/" +_class):
        train_imgs.append(TRAINING_PATH + "/" + _class + "/" + img)
    
    for img in os.listdir(VALIDATION_PATH +"/" + _class):
        valid_imgs.append(VALIDATION_PATH +"/" + _class + "/" + img)
        
    for img in os.listdir(TEST_PATH + "/" + _class):
        test_imgs.append(TEST_PATH + "/" +_class + "/" + img)

class_to_int = {classes[i] : i for i in range(len(classes))}

In [None]:
### Loading Classification Dataset - FOR METHOD 2: For multi-class data, by inheriting Dataset class

def get_transform():
    return T.Compose([T.ToTensor()])

class FlowerDataset(Dataset):
    
    def __init__(self, imgs_list, class_to_int, transforms = None):
        
        super().__init__()
        self.imgs_list = imgs_list
        self.class_to_int = class_to_int
        self.transforms = transforms
        
        
    def __getitem__(self, index):
    
        image_path = self.imgs_list[index]
        
        #Reading image
        image = cv2.imread(image_path, cv2.IMREAD_COLOR)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB).astype(np.float32)
        image /= 255.0
        
        #Retrieving class label
        label = image_path.split()[-2]
        label = self.class_to_int[label]
        
        #Applying transforms on image
        if self.transforms:
            
            image = self.transforms(image)
        
        return image, label
        
        
        
    def __len__(self):
        return len(self.imgs_list)

In [None]:
### Loading Classification Dataset


# Method 1: For multi-class data directly from folders using ImageFolder
train_dataset = ImageFolder(root = TRAINING_PATH, transform = T.ToTensor())
valid_dataset = ImageFolder(root = VALIDATION_PATH, transform = T.ToTensor())
test_dataset = ImageFolder(root = TEST_PATH, transform = T.ToTensor())

"""
# Method 2: Using Dataset Class
train_dataset = FlowerDataset(train_imgs, class_to_int, get_transform())
valid_dataset = FlowerDataset(valid_imgs, class_to_int, get_transform())
test_dataset = FlowerDataset(test_imgs, class_to_int, get_transform())
"""
#Data Loader  -  using Sampler (YT Video)
train_random_sampler = RandomSampler(train_dataset)
valid_random_sampler = RandomSampler(valid_dataset)
test_random_sampler = RandomSampler(test_dataset)

#Shuffle Argument is mutually exclusive with Sampler!
train_data_loader = DataLoader(
    dataset = train_dataset,
    batch_size = 16,
    sampler = train_random_sampler,
    num_workers = 0,
)

valid_data_loader = DataLoader(
    dataset = valid_dataset,
    batch_size = 16,
    sampler = valid_random_sampler,
    num_workers = 0
)

test_data_loader = DataLoader(
    dataset = test_dataset,
    batch_size = 16,
    sampler = test_random_sampler,
    num_workers = 0,
)

In [None]:
train_dataset

In [None]:
# Visualize one training batch
for images, labels in train_data_loader:
    fig, ax = plt.subplots(figsize = (10, 10))
    ax.set_xticks([])
    ax.set_yticks([])
    ax.imshow(make_grid(images, 4).permute(1,2,0))
    break

In [None]:
### Define model
model = models.vgg16(pretrained = True)

### Modifying last few layers and no of classes
# NOTE: cross_entropy loss takes unnormalized op (logits), then function itself applies softmax and calculates loss, so no need to include softmax here
model.classifier = nn.Sequential(
    nn.Linear(25088, 4096, bias = True),
    nn.ReLU(inplace = True),
    nn.Dropout(0.4),
    nn.Linear(4096, 2048, bias = True),
    nn.ReLU(inplace = True),
    nn.Dropout(0.4),
    nn.Linear(2048, 200)
)

In [None]:
### Get device

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
torch.cuda.empty_cache()

model.to(device)

### Training Details

optimizer = torch.optim.Adam(model.parameters(), lr = 0.0001)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size = 5, gamma = 0.75)
criterion = nn.CrossEntropyLoss()

train_loss = []
train_accuracy = []

val_loss = []
val_accuracy = []

epochs = 20

In [None]:
def calc_accuracy(true,pred):
    pred = F.softmax(pred, dim = 1)
    true = torch.zeros(pred.shape[0], pred.shape[1]).scatter_(1, true.unsqueeze(1), 1.)
    acc = (true.argmax(-1) == pred.argmax(-1)).float().detach().numpy()
    acc = float((100 * acc.sum()) / len(acc))
    return round(acc, 4)

In [None]:
### Training Code

for epoch in range(epochs):
    
    start = time.time()
    
    #Epoch Loss & Accuracy
    train_epoch_loss = []
    train_epoch_accuracy = []
    _iter = 1
    
    #Val Loss & Accuracy
    val_epoch_loss = []
    val_epoch_accuracy = []
    
    # Training
    for images, labels in train_data_loader:
        
        images = images.to(device)
        labels = labels.to(device)
        
        #Reset Grads
        optimizer.zero_grad()
        
        #Forward ->
        preds = model(images)
        
        #Calculate Accuracy
        acc = calc_accuracy(labels.cpu(), preds.cpu())
        
        #Calculate Loss & Backward, Update Weights (Step)
        loss = criterion(preds, labels)
        loss.backward()
        optimizer.step()
        
        #Append loss & acc
        loss_value = loss.item()
        train_epoch_loss.append(loss_value)
        train_epoch_accuracy.append(acc)
        
        if _iter % 500 == 0:
            print("> Iteration {} < ".format(_iter))
            print("Iter Loss = {}".format(round(loss_value, 4)))
            print("Iter Accuracy = {} % \n".format(acc))
        
        _iter += 1
    
    #Validation
    for images, labels in valid_data_loader:
        
        images = images.to(device)
        labels = labels.to(device)
        
        #Forward ->
        preds = model(images)
        
        #Calculate Accuracy
        acc = calc_accuracy(labels.cpu(), preds.cpu())
        
        #Calculate Loss
        loss = criterion(preds, labels)
        
        #Append loss & acc
        loss_value = loss.item()
        val_epoch_loss.append(loss_value)
        val_epoch_accuracy.append(acc)
    
    
    train_epoch_loss = np.mean(train_epoch_loss)
    train_epoch_accuracy = np.mean(train_epoch_accuracy)
    
    val_epoch_loss = np.mean(val_epoch_loss)
    val_epoch_accuracy = np.mean(val_epoch_accuracy)
    
    end = time.time()
    
    train_loss.append(train_epoch_loss)
    train_accuracy.append(train_epoch_accuracy)
    
    val_loss.append(val_epoch_loss)
    val_accuracy.append(val_epoch_accuracy)
    
    #Print Epoch Statistics
    print("** Epoch {} ** - Epoch Time {}".format(epoch, int(end-start)))
    print("Train Loss = {}".format(round(train_epoch_loss, 4)))
    print("Train Accuracy = {} % \n".format(train_epoch_accuracy))
    print("Val Loss = {}".format(round(val_epoch_loss, 4)))
    print("Val Accuracy = {} % \n".format(val_epoch_accuracy))

In [None]:
# Function to save the model (not working on my drive)
def saveModel(): 
    path = "/content/drive/MyDrive/Colab Notebooks//prepared_dataset_1.zip (Unzipped Files)/prepared_dataset_1/NetModel_appsilon_flowers_vgg16.pth"
    torch.save(model.state_dict(), path) 

saveModel()

In [None]:
# define test function 

def test_funct(test_data_loader):
    #Test
    for images, labels in test_data_loader:
        
        images = images.to(device)
        labels = labels.to(device)
        
        val_epoch_loss = []
        val_epoch_accuracy = []

        #Forward ->
        preds = model(images)
        
        #Calculate Accuracy
        acc = calc_accuracy(labels.cpu(), preds.cpu())
        
        #Calculate Loss
        loss = criterion(preds, labels)
        
        #Append loss & acc
        loss_value = loss.item()
        val_epoch_loss.append(loss_value)
        val_epoch_accuracy.append(acc)

    val_epoch_loss = np.mean(val_epoch_loss)
    val_epoch_accuracy = np.mean(val_epoch_accuracy)

    return(val_epoch_loss, val_epoch_accuracy)


test_funct(test_data_loader)
print("avg_loss", val_epoch_loss, "avg_accuracy", val_epoch_accuracy)

In [None]:
# one way to generate predictions list

y_pred_list = []
with torch.no_grad():
    model.eval()
    for X_batch, _ in test_data_loader:
        X_batch = X_batch.to(device)
        y_test_pred = model(X_batch)
        _, y_pred_tags = torch.max(y_test_pred, dim = 1)
        y_pred_list.append(y_pred_tags.cpu().numpy())
y_pred_list = [a.squeeze().tolist() for a in y_pred_list]

In [None]:
# Display image and label
test_features, test_labels = next(iter(test_data_loader))
print(f"Feature batch shape: {test_features.size()}")
print(f"Labels batch shape: {test_labels.size()}")
img = test_features[0].squeeze()
label = test_labels[0]
plt.imshow(img)
plt.show()
print(f"Label: {label}")

In [None]:
# printing original labels

original_test = []
for images, labels in test_data_loader:
    fig, ax = plt.subplots(figsize = (10, 10))
    ax.set_xticks([])
    ax.set_yticks([])
    ax.imshow(make_grid(images, 4).permute(1,2,0))
    #print(labels)
    list_temp = labels.tolist()
    original_test.append(list_temp)
print(original_test)

In [None]:
# printing test labels

for batch in test_data_loader:
    inputs, targets = batch
    for img in inputs:
        image  = img.cpu().numpy()
        # transpose image to fit plt input
        image = image.T
        # normalise image
        data_min = np.min(image, axis=(1,2), keepdims=True)
        data_max = np.max(image, axis=(1,2), keepdims=True)
        scaled_data = (image - data_min) / (data_max - data_min)
        # show image
        plt.imshow(scaled_data)
        plt.show()
    break