In [None]:
import torch
import torchvision

device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

GTSRB = torchvision.datasets.GTSRB("", download=True)

In [None]:
import os

all_files = []
for parent, dirs, files in os.walk("gtsrb"):
    all_files = all_files + [os.path.join(parent, i) for i in files if i.endswith(".ppm")]

all_files[:10]

In [None]:
from PIL import Image
import random

img = Image.open(all_files[0])


def image_grid(paths, rows, cols):
    imgs = [Image.open(i) for i in random.sample(paths, rows * cols)]
    
    w, h = imgs[0].size
    grid = Image.new('RGB', size=(cols * w, rows * h))
    for i, img in enumerate(imgs):
        grid.paste(img, box=(i % cols * w, i // cols * h))
    return grid


image_grid(all_files, rows=3, cols=5)

In [None]:
sizes = [Image.open(i).size for i in random.sample(all_files, max(500, len(all_files) // 100))]
print(max(sizes))
print(min(sizes))


In [None]:
import polars as pl

df = pl.DataFrame({"path": all_files})
df.head()

In [None]:
def get_label_from_path(the_path: str):
    return int(the_path.split(os.sep)[-2][-2:])


df = df.with_columns(label=pl.col('path').map_elements(get_label_from_path))
df.head()

In [None]:
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as T
from sklearn.model_selection import train_test_split

BATCH_SIZE = 1024
EPOCHS = 15
IMAGE_DIM = 100

transforms_train = T.Compose([
    T.ToTensor(),
    T.Resize((IMAGE_DIM, IMAGE_DIM), antialias=True),
    T.RandomRotation(degrees=15),
    T.RandomHorizontalFlip(),
    T.ColorJitter(),
    T.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    ),
])

transforms_test = T.Compose([
    T.ToTensor(),
    T.Resize((IMAGE_DIM, IMAGE_DIM), antialias=True),
    T.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    ),
])


class GtsrbDataset(Dataset):
    def __init__(self, df: pl.DataFrame, transform=None):
        self.df = df
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        idx = idx.tolist() if torch.is_tensor(idx) else idx

        image, label = self.df[idx, "path"], self.df[idx, "label"]
        image = Image.open(image)

        if self.transform:
            image = self.transform(image)
        return image, label


df_train, df_test = train_test_split(df)

dataset_train = GtsrbDataset(df_train, transforms_train)
dataset_test = GtsrbDataset(df_test, transforms_test)

dataloader_train = DataLoader(dataset_train, batch_size=BATCH_SIZE, shuffle=True)
dataloader_test = DataLoader(dataset_test, batch_size=BATCH_SIZE)

In [None]:
model = torchvision.models.resnet18(weights=torchvision.models.ResNet18_Weights.DEFAULT)
model = model.to(device)
optimizer = torch.optim.Adam(model.parameters())
loss_fn = torch.nn.CrossEntropyLoss()

loss_test_history = []
for epoch in range(EPOCHS):
    print(f'Epoch {epoch + 1}')

    model.train()
    total_loss = 0.0
    for i, data in enumerate(dataloader_train):
        images, labels = data
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = loss_fn(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss
    total_loss = total_loss / len(dataloader_train.dataset)
    print(f'Train Loss: {total_loss}')

    model.eval()
    total_loss = 0.0
    with torch.no_grad():
        for images, labels in dataloader_test:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = loss_fn(outputs, labels)
            total_loss += loss
    total_loss = total_loss / len(dataloader_test.dataset)
    print(f'Test Loss: {total_loss}')

    if epoch == 0 or (total_loss < min(loss_test_history)):
        torch.save(model.state_dict(), 'best_model_state.bin')
        # if epoch != 0 and (round(float(min(loss_test_history)) * 1000) == round(float(total_loss) * 1000)):
        #     print("Model is no longer improving, early stopping")
        #     break
    else:
        print("Model performed worse, not saving this model")

    loss_test_history.append(total_loss)

model.load_state_dict(torch.load('best_model_state.bin'))


In [None]:
import polars as pl

df = pl.DataFrame(schema={"predicted_label":int, "true_label": int})

model.eval()
with torch.no_grad():
    for images, labels in dataloader_test:
        images = images.to(device)
        outputs = model(images)
        _, preds = torch.max(outputs, dim=1)
        
        df = pl.concat([df, pl.DataFrame({
            "predicted_label": [int(i.cpu()) for i in preds], 
            "true_label": labels.tolist(),
        })])

accuracy = len(df.filter((pl.col("predicted_label") == pl.col("true_label")))) / len(df)
print("Accuracy:", 100 * accuracy, "%")