In [None]:
from os import listdir
import cv2
from typing import List
import random
import numpy as np

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt

In [None]:
import torch
from torch import optim, nn
from torch.utils.data import DataLoader, random_split
from torchvision.utils import make_grid
from torchvision import models, transforms, datasets

In [None]:
# Set device.
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

### CONFIG

In [None]:
# Hyperparameters.
learning_rate = 1e-3
batch_size = 128
num_epochs = 50
train_split = 0.7
image_size = (224, 224)
num_workers = 2


In [None]:
# Load data.
datasets_info = {
    "WLASL": {"name": "WLASL_frames_100", "path": "WLASL/frames_100"},
    "animals": {"name": "animals_simple", "path": "animals/simple"},
}


dataset_name = "WLASL"
data_dir = f"../../data/{datasets_info[dataset_name]['path']}"


### Input labels with images

In [None]:
input_labels = listdir(data_dir)
input_labels[:5]

In [None]:
def show_image(img):
  img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
  plt.imshow(img)
  plt.show()
  
def imshow(img):
  img = img / 2 + 0.5 # unnormalize
  npimg = img.numpy() # convert to numpy objects
  plt.imshow(np.transpose(npimg, (1, 2, 0)))
  plt.show()

In [None]:
label = input_labels[random.randint(0, len(input_labels) - 1)]
label_images = listdir(data_dir + "/" + label)
label_images

print(label)

image = cv2.imread(data_dir + "/" + label + "/" + label_images[0])
show_image(image)


### Create dataloader for pytorch

In [None]:
from torch.utils.data.sampler import SubsetRandomSampler


### Using random_split

In [None]:
def load_split_dataset(data_dir, train_split=0.7, batch_size=32, image_size=224):
    transform = transforms.Compose(
        [
            transforms.Resize(image_size),
            transforms.CenterCrop(image_size-10),
            transforms.ToTensor(),
        ]
    )
    dataset = datasets.ImageFolder(data_dir, transform=transform)

    classes = dataset.classes
    dataset_len = len(dataset)

    train_split = int(train_split * dataset_len)
    tets_split = dataset_len - train_split
    train_set, test_set = random_split(dataset, [train_split, tets_split])

    train_loader = DataLoader(
        dataset=train_set, batch_size=batch_size, shuffle=True, num_workers=num_workers
    )
    test_loader = DataLoader(
        dataset=test_set, batch_size=batch_size, shuffle=True, num_workers=num_workers
    )

    return train_loader, test_loader, classes


### Solution to unbalanced labels 1: Random dataset split to solve unbalanced labels

In [None]:
def load_split_dataset(data_dir, train_split=0.7, batch_size=32, image_size=224):
    transform = transforms.Compose(
        [
            transforms.Resize(image_size),
            transforms.ToTensor(),
        ]
    )
    dataset = datasets.ImageFolder(data_dir, transform=transform)

    classes = dataset.classes
    dataset_len = len(dataset)

    ran_ind = torch.randperm(dataset_len)  # 1. random
    # seq_ind = list(range(dataset_len))  # 2. sequential

    indices = ran_ind
    split = int(np.floor(train_split * dataset_len))
    train_indices, test_indices = indices[:split], indices[split:]

    train_sampler = SubsetRandomSampler(train_indices)
    test_sampler = SubsetRandomSampler(test_indices)

    train_loader = DataLoader(
        dataset,
        sampler=train_sampler,
        batch_size=batch_size,
        num_workers=num_workers,
    )
    test_loader = DataLoader(
        dataset,
        sampler=test_sampler,
        batch_size=batch_size,
        num_workers=num_workers,
    )

    return train_loader, test_loader, classes


### Uso del dataset

In [None]:
train_loader, test_loader, classes = load_split_dataset(
    data_dir,
    train_split=train_split,
    batch_size=batch_size,
    image_size=image_size,
)

num_classes = len(classes)

print(len(train_loader))
print(len(test_loader))
print(train_loader.batch_size)

### Check how many images are in each loader for every label

In [None]:
def check_balance_status(loader):
    class_count = {}

    for _, targets in loader:
        for target in targets:
            label = classes[target]
            if label not in class_count:
                class_count[label] = 0
            class_count[label] += 1

    return sorted(class_count.items())


train_class_count = check_balance_status(train_loader)
test_class_count = check_balance_status(test_loader)

pc_mean = 0

for tr, te in zip(train_class_count, test_class_count):
    tr_pc = tr[1] / (tr[1] + te[1]) * 100
    pc_mean += tr_pc
    print(f"{tr[0]} - TR: {tr[1]} TS: {te[1]} - PC-TR: {tr_pc:.2f}%")
    
pc_mean /= len(train_class_count)
print(f"Mean PC-TR: {pc_mean:.2f}%")


### Load pretrained resnet model

In [None]:
# Load pretrain model & modify it.
model = models.resnet50(pretrained=True)

In [None]:
for param in model.parameters():
    param.requires_grad = False

model.fc = nn.Sequential(
    nn.Linear(2048, 512),
    nn.ReLU(),
    nn.Dropout(0.2),
    nn.Linear(512, num_classes),
    nn.LogSoftmax(dim=1),
)


In [None]:
# Loss and optimizer.
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.fc.parameters(), lr=learning_rate)

model.to(device)


### Freeze model and train only last layer

In [None]:
def train():
    # Train network.
    costs = []

    for epoch in range(num_epochs):
        losses = []

        for batch_idx, (data, targets) in enumerate(train_loader):
            # Get data to cuda if possible.
            data = data.to(device)
            targets = targets.to(device)

            # forward.
            scores = model(data)
            loss = criterion(scores, targets)
            losses.append(loss.item())

            # backward.
            optimizer.zero_grad()
            loss.backward()

            # gradient descent or adam step.
            optimizer.step()

        cost = sum(losses) / len(losses)
        costs.append(cost)
        print(f"Cost at epoch {epoch + 1} is {cost:.5f}")

    return costs


costs = train()


In [None]:
def plot_costs(costs):
    # Plot cost.
    plt.plot(costs)
    plt.ylabel("Cost")
    plt.xlabel("Epoch")
    plt.show()


plot_costs(costs)


### Save model

In [None]:
model_name = datasets_info[dataset_name]["name"]
model_path = f"../../models/resnet_{model_name}.pth"

In [None]:
# Export model.
torch.save(model, model_path)


### Load saved model

In [None]:
# Import model for acc test.
model = torch.load(model_path)
model.to(device)

### Test net

In [None]:
# Check accuracy on training & test to see how good our model predicts.
def check_accuracy(loader, model):
    num_correct = 0
    num_samples = 0
    model.eval()

    with torch.no_grad():
        for i, (x, y) in enumerate(loader):
            x, y = x.to(device), y.to(device)

            scores = model(x)
            _, predictions = scores.max(1)
            num_correct += (predictions == y).sum()
            num_samples += predictions.size(0)

            # Output both images to compare.
            print(f"Images for {i+1}")
            imshow(make_grid(x.cpu()))

            print(f"Predictions for batch {i+1} ")
            print([classes[int(i)] for i in predictions])

            print(f"Ground truth for batch {i+1}")
            print([classes[int(i)] for i in y])
            
            print("---------------------------------\n\n")
            break

        print(
            f"Got {num_correct} / {num_samples} with accuracy {float(num_correct)/float(num_samples)*100:.2f}"
        )


In [None]:
print("Checking accuracy on Training Set")
check_accuracy(train_loader, model)


In [None]:
print("Checking accuracy on Test Set")
check_accuracy(test_loader, model)

### Check train and test on same class.

In [None]:
def predict_class(loader, model, debug_label):

    for images, targets in loader:
        images, targets = images.to(device), targets.to(device)

        scores = model(images)
        _, predictions = scores.max(1)

        for i, (image, target) in enumerate(zip(images, targets)):
            label = classes[target]
            if label != debug_label:
                continue

            # Predict label for image.
            prediction_id = predictions[i]
            prediction = classes[prediction_id]

            # Show image.
            imshow(image.cpu())

            print(f"Prediction: {prediction}. Ground truth: {label}")

            return

    return sorted(class_count.items())


In [None]:
debug_label = "apple"


In [None]:
print(f"Prediction for {debug_label} and train set")
predict_class(train_loader, model, debug_label)


In [None]:
print(f"Prediction for {debug_label} and test set")
predict_class(test_loader, model, debug_label)
