# Machine Learning - Image Classification

In [1]:
#importing
import os

import cv2
import torch
import torch.nn as nn
import torch.nn.functional as fun


from dotenv import load_dotenv
from sklearn.model_selection import train_test_split
from torch import optim
from tqdm import tqdm

from utils import *

## Database importing

In [2]:
load_dotenv()
DAT_PATH = os.getenv("TRAIN_DATASET_PATH")

ANIMALS_DATAFRAME = load_dataset_info("../data/archive/raw-img")

In [3]:
train_df, test_df = train_test_split(ANIMALS_DATAFRAME, test_size=0.01, random_state=37)
train_df, val_df = train_test_split(train_df, test_size=0.0526, random_state=37)

In [4]:
TARGET_SIZE = (256, 256)
MAX_SIZE = 500
BATCH_SIZE = 64
NUM_CLASSES = ANIMALS_DATAFRAME.label.nunique()
CLASS_LABELS = {name: idx for idx, name in enumerate(np.sort(ANIMALS_DATAFRAME.label.unique()))}

# vypocet frekvencie augmentacie -> aby nebol model biasnuty iba na majoritne categorie obrazkov
counts = train_df.label.value_counts()
max_count = counts.max()
aug_strength = (max_count / counts).to_dict()

print(aug_strength)


{'cane': 1.0, 'ragno': 1.0813374436803416, 'gallina': 1.5670103092783505, 'cavallo': 1.8431689571544059, 'mucca': 2.608695652173913, 'scoiattolo': 2.632794457274827, 'farfalla': 2.9552819183408943, 'pecora': 3.375277572168764, 'gatto': 3.965217391304348, 'elefante': 4.470588235294118}


In [5]:
#setting up dataloaders#
#   POMALE
#
# train_gen = AnimalImageGenerator(
#     df=train_df,
#     batch_size=BATCH_SIZE,
#     target_size=TARGET_SIZE,
#     num_classes=NUM_CLASSES,
#     augment=True,
#     shuffle=True,
#     aug_strength=aug_strength,
#     max_size=MAX_SIZE,
#     class_mapping=CLASS_LABELS,
# )
#
# test_gen = AnimalImageGenerator(
#     df=test_df,
#     batch_size=BATCH_SIZE,
#     target_size=TARGET_SIZE,
#     num_classes=NUM_CLASSES,
#     augment=False,
#     shuffle=False,
#     max_size=MAX_SIZE,
#     class_mapping=CLASS_LABELS,
# )
#
# val_gen = AnimalImageGenerator(
#     df=val_df,
#     batch_size=BATCH_SIZE,
#     target_size=TARGET_SIZE,
#     num_classes=NUM_CLASSES,
#     augment=False,
#     shuffle=False,
#     max_size=MAX_SIZE,
#     class_mapping=CLASS_LABELS,
# )


In [7]:
from torch.utils.data import WeightedRandomSampler


#CHATOVINA ale ze vraj rychla

def create_sampler(df, class_mapping):
    labels = df["label"].map(class_mapping).values
    class_counts = np.bincount(labels)
    class_weights = 1.0 / class_counts

    sample_weights = class_weights[labels]
    sampler = WeightedRandomSampler(
        torch.from_numpy(sample_weights).float(),
        num_samples=len(sample_weights),
        replacement=True
    )
    return sampler

train_dataset = AnimalDataset(
    df=train_df,
    class_mapping=CLASS_LABELS,
    augment=True,
    target_size=TARGET_SIZE,
)

val_dataset = AnimalDataset(
    df=val_df,
    class_mapping=CLASS_LABELS,
    augment=False,
    target_size=TARGET_SIZE,
)

sampler = create_sampler(train_df, CLASS_LABELS)

train_loader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    sampler=sampler,
    num_workers=1,
    pin_memory=True,
    persistent_workers=True
)

val_loader = torch.utils.data.DataLoader(
    val_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=1,
    pin_memory=True,
    persistent_workers=True
)


## Model implemetation

In [8]:
#PRE TUTO FUNKCIU NEGENERUJ ZIADNE KOMENTARE
class ImageClassifier(nn.Module):
    def __init__(self, classes: int):
        super(ImageClassifier, self).__init__()
        self.numberOfClasses = classes

        # Convolution layres ONLY WORKS with RGB because of in_channels, kernel_size for filtering is 3 stride 1 padding 1 for size preservation
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=16, kernel_size=3, stride=1,padding=1)
        self.bn1 = nn.BatchNorm2d(16)

        # Significant for grad-CAM
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1,padding=1)
        self.bn2 = nn.BatchNorm2d(32)

        self.conv3 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1,padding=1)
        self.bn3 = nn.BatchNorm2d(64)

        # Adaptive pooling to make model input-size agnostic / dont want to use it for now
        # self.adaptive_pool = nn.AdaptiveAvgPool2d((4, 4))

        #size is determined by conv channels and the reduction in size by conv channels
        #channels * width * height because 256 /2 /2 /2 is 8
        self.fc1 = nn.Linear(in_features=64*32*32, out_features=128)
        self.fc2 = nn.Linear(in_features=128, out_features=self.numberOfClasses)

    def forward(self, x):
        #Block 1
        x = fun.relu(self.bn1(self.conv1(x)))
        x = fun.max_pool2d(x, kernel_size=2) # zmensovanie velkosti

        x = fun.relu(self.bn2(self.conv2(x)))
        x = fun.max_pool2d(x, kernel_size=2)

        x = fun.relu(self.bn3(self.conv3(x)))
        x = fun.max_pool2d(x, kernel_size=2)

        x = x.reshape(x.size(0), -1)

        x = fun.relu(self.fc1(x))
        x = self.fc2(x)
        return x


## Model Training

### Training function

In [10]:
def train_model(model: nn.Module,
                train_loader: AnimalImageGenerator,
                val_loader: AnimalImageGenerator,
                criterion: nn.Module,
                optimizer: torch.optim.Optimizer,
                device: torch.device,
                epochs: int = 10,
                scheduler=None):

    model.to(device)

    # for epoch in tqdm(range(1, epochs+1), desc="Training model"):
    for epoch in range(1, epochs+1):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        # for images, labels in train_loader:
        for images, labels in tqdm(train_loader):
            images = images.float().to(device)
            labels = labels.long().to(device)

            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * images.size(0)
            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)

        train_loss = running_loss / total
        train_acc = correct / total

        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0

        with torch.no_grad():
            for images, labels in val_loader:
                images = images.float().to(device)
                labels = labels.long().to(device)

                outputs = model(images)
                loss = criterion(outputs, labels)

                val_loss += loss.item() * images.size(0)
                _, preds = torch.max(outputs, 1)
                val_correct += (preds == labels).sum().item()
                val_total += labels.size(0)

        val_loss /= val_total
        val_acc = val_correct / val_total

        if scheduler:
            scheduler.step(val_loss)

        print(f"Epoch {epoch}/{epochs} | "
              f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f} | "
              f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
        ## TODO: add f1 score

## Training

### Set-up

In [11]:
img_class_model = ImageClassifier(NUM_CLASSES)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(img_class_model.parameters(), lr=0.001, momentum=0.9)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"device: {device}")


device: cuda


In [12]:
# TRAINING
train_model(model=img_class_model,
            train_loader=train_loader,
            val_loader=val_loader,
            criterion=criterion,
            optimizer=optimizer,
            device=device,
            epochs=5,
            scheduler=None)

100%|██████████| 355/355 [00:47<00:00,  7.43it/s]


Epoch 1/5 | Train Loss: 1.9299, Train Acc: 0.3129 | Val Loss: 1.8250, Val Acc: 0.3751


100%|██████████| 355/355 [00:42<00:00,  8.44it/s]


Epoch 2/5 | Train Loss: 1.7721, Train Acc: 0.3726 | Val Loss: 1.6132, Val Acc: 0.4330


100%|██████████| 355/355 [00:41<00:00,  8.49it/s]


Epoch 3/5 | Train Loss: 1.7043, Train Acc: 0.3996 | Val Loss: 1.5206, Val Acc: 0.4655


100%|██████████| 355/355 [00:42<00:00,  8.43it/s]


Epoch 4/5 | Train Loss: 1.6668, Train Acc: 0.4100 | Val Loss: 1.4757, Val Acc: 0.4877


100%|██████████| 355/355 [00:42<00:00,  8.43it/s]


Epoch 5/5 | Train Loss: 1.6232, Train Acc: 0.4290 | Val Loss: 1.4039, Val Acc: 0.5083


### Grad CAM Implementation
 -- On hold
 https://medium.com/@codetrade/grad-cam-in-pytorch-a-powerful-tool-for-visualize-explanations-from-deep-networks-bdc7caf0b282

## Single image predictions

In [32]:
class SingleImageInput():
    def __init__(self, target_size=(256, 256)):
        self.target_size = target_size
        self.transform = transforms.Compose([
            transforms.ToPILImage(),
            transforms.Resize(target_size),
            transforms.ToTensor(),
            transforms.Normalize(
                mean=[0.485, 0.456, 0.406],
                std=[0.229, 0.224, 0.225]
            ),
        ])
    def read_image(self, path) -> torch.Tensor:
        print(path)
        img = cv2.imread(path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = self.transform(img)
        return img

In [42]:
ran_img_sample = pick_random(ANIMALS_DATAFRAME, ANIMALS_DATAFRAME.label == "cane")
print(ran_img_sample)
ImageReader = SingleImageInput(target_size=(256, 256))
img = ImageReader.read_image(ran_img_sample).to(device)
img.unsqueeze_(0)

img_class_model.eval()
out = img_class_model(img)
predicted_l = out.argmax(dim=1).item()

..\data\archive\raw-img\cane\OIP--MffROkglTy0W_sVV_zv3AHaFY.jpeg
..\data\archive\raw-img\cane\OIP--MffROkglTy0W_sVV_zv3AHaFY.jpeg


In [43]:
target_layer = img_class_model.conv2 ## last layer
activations = []
gradients = []

def forward_hook(module, input, output):
    activations.append(output)
def backward_hook(module, grad_input, grad_output):
    gradients.append(grad_output[0])

f_handle = target_layer.register_forward_hook(forward_hook)
b_handle = target_layer.register_backward_hook(backward_hook)

try:
    img_class_model.zero_grad()
    score = out[0, predicted_l]
    score.backward()
finally:
    f_handle.remove()
    b_handle.remove()

print(len(gradients))

weights = torch.mean(gradients[0], dim=[2, 3])

heatmap = torch.sum(weights * activations[0], dim=1).squeeze()
heatmap = np.maximum(heatmap.cpu().detach().numpy(), 0)
heatmap /= np.max(heatmap)  # normalization

weights = torch.mean(gradients[0], dim=[2, 3])

heatmap = torch.sum(weights * activations[0], dim=1).squeeze()
heatmap = np.maximum(heatmap.cpu().detach().numpy(), 0)
heatmap /= np.max(heatmap)  # normalization

heatmap = cv2.resize(heatmap, (ran_img_sample.size[1], ran_img_sample.size[0]))
heatmap = cv2.applyColorMap(np.uint8(255 * heatmap), cv2.COLORMAP_JET)
superimposed_img = cv2.addWeighted(ran_img_sample, 0.6, heatmap, 0.4, 0)

cv2.imshow('Grad-CAM', superimposed_img)

0


IndexError: list index out of range