<a href="https://colab.research.google.com/github/nobiquad/ESL/blob/main/Traffic_Sign_Recognition.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:

!pip install torch torchvision matplotlib

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchvision.models import mobilenet_v2
import matplotlib.pyplot as plt
import numpy as np
import time

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f" Urządzenie: {device}")

GTSRB_CLASSES = [
    "Prędkość 20", "Prędkość 30", "Prędkość 50", "Prędkość 60", "Prędkość 70",
    "Prędkość 80", "Koniec 80", "Prędkość 100", "Prędkość 120", "Zakaz wyprzedzania",
    "Zakaz wyprzedzania dla ciężarówek", "Pierwszeństwo na skrzyżowaniu", "Droga z pierwszeństwem",
    "Ustąp pierwszeństwa", "Stop", "Zakaz wjazdu", "Zakaz wjazdu dla ciężarówek",
    "Inny zakaz", "Uwaga (ogólne)", "Niebezpieczny zakręt w lewo", "Niebezpieczny zakręt w prawo",
    "Podwójny zakręt", "Wyboista droga", "Śliska droga", "Droga zwęża się z prawej",
    "Roboty drogowe", "Sygnalizacja świetlna", "Piesi", "Dzieci", "Rowerzyści",
    "Uwaga, lód/śnieg", "Dzikie zwierzęta", "Koniec ograniczeń", "Jedź w prawo",
    "Jedź w lewo", "Tylko prosto", "Prosto lub w prawo", "Prosto lub w lewo",
    "Trzymaj się prawej", "Trzymaj się lewej", "Rondo", "Koniec zakazu wyprzedzania",
    "Koniec zakazu wyprzedzania dla ciężarówek"
]

IMAGENET_MEAN = [0.485, 0.456, 0.406]
IMAGENET_STD = [0.229, 0.224, 0.225]

train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomRotation(15),
    transforms.RandomAffine(degrees=0, translate=(0.1, 0.1), scale=(0.9, 1.1)),
    transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4),
    transforms.ToTensor(),
    transforms.Normalize(IMAGENET_MEAN, IMAGENET_STD),
])

test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(IMAGENET_MEAN, IMAGENET_STD),
])

print(" Pobieranie danych...")
trainset_full = datasets.GTSRB(root='./data', split='train', download=True, transform=train_transform)

train_size = int(0.9 * len(trainset_full))
val_size = len(trainset_full) - train_size
train_ds, val_ds = torch.utils.data.random_split(trainset_full, [train_size, val_size])

testset = datasets.GTSRB(root='./data', split='test', download=True, transform=test_transform)

BATCH_SIZE = 64
trainloader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
valloader = DataLoader(val_ds, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)
testloader = DataLoader(testset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

model = mobilenet_v2(weights="IMAGENET1K_V1")
model.classifier[1] = nn.Linear(model.classifier[1].in_features, 43)
model = model.to(device)

criterion = nn.CrossEntropyLoss()

def run_training(model, optimizer, epochs, phase_name):
    print(f"\n {phase_name} ({epochs} epok)...")
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for images, labels in trainloader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        model.eval()
        val_correct = 0
        val_total = 0
        with torch.no_grad():
            for images, labels in valloader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                _, predicted = torch.max(outputs, 1)
                val_total += labels.size(0)
                val_correct += (predicted == labels).sum().item()

        print(f"Epoch [{epoch+1}/{epochs}] Loss: {running_loss/len(trainloader):.4f} | Train Acc: {100*correct/total:.2f}% | Val Acc: {100*val_correct/val_total:.2f}%")

for param in model.features.parameters():
    param.requires_grad = False

optimizer = optim.Adam(model.classifier[1].parameters(), lr=0.001)
run_training(model, optimizer, epochs=3, phase_name="FAZA 1: Frozen Backbone")

print("\n Odmrażanie wag (Fine-tuning)...")
for param in model.parameters():
    param.requires_grad = True

optimizer = optim.Adam(model.parameters(), lr=0.0001)

run_training(model, optimizer, epochs=2, phase_name="FAZA 2: Unfrozen Fine-tuning")

print("\n Ostateczny Test na zbiorze testowym...")
model.eval()
test_correct = 0
test_total = 0
with torch.no_grad():
    for images, labels in testloader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs, 1)
        test_total += labels.size(0)
        test_correct += (predicted == labels).sum().item()

print(f" Ostateczna dokładność (Test Set): {100 * test_correct / test_total:.2f}%")

def visualize_grid(loader, model, num_images=12):
    model.eval()
    dataiter = iter(loader)
    images, labels = next(dataiter)
    num_images = min(num_images, len(images))

    cols = 4
    rows = (num_images + cols - 1) // cols
    fig, axes = plt.subplots(rows, cols, figsize=(16, 4 * rows))
    axes = axes.flatten()

    mean = np.array(IMAGENET_MEAN)
    std = np.array(IMAGENET_STD)

    with torch.no_grad():
        for i in range(num_images):
            ax = axes[i]
            img = images[i]
            true_label = labels[i].item()

            input_img = img.unsqueeze(0).to(device)
            output = model(input_img)
            probs = torch.softmax(output, 1)
            conf, pred = torch.max(probs, 1)

            pred_idx = pred.item()
            confidence = conf.item()

            img_display = img.cpu().numpy().transpose((1, 2, 0))
            img_display = std * img_display + mean
            img_display = np.clip(img_display, 0, 1)

            ax.imshow(img_display)
            ax.axis('off')

            is_correct = (pred_idx == true_label)
            color = 'green' if is_correct else 'red'
            title = f"{'OK' if is_correct else 'BŁĄD'}: {GTSRB_CLASSES[pred_idx]}\n({confidence*100:.1f}%)"
            if not is_correct:
                title += f"\n(Prawda: {GTSRB_CLASSES[true_label]})"

            ax.set_title(title, color=color, fontsize=9, fontweight='bold')

    for i in range(num_images, len(axes)): axes[i].axis('off')
    plt.tight_layout()
    plt.show()

visualize_grid(testloader, model)

### Załadowanie modelu z pliku `mobilenetv2_gtsrb_multithreaded.pth`

In [None]:

import torch
import torchao
from torchao.quantization import quantize_, Int8WeightOnlyConfig
import copy
import os
import time
import warnings

warnings.filterwarnings("ignore")

print("\n--- KWANTYZACJA ---")

device_cpu = torch.device("cpu")
model_fp32 = copy.deepcopy(model).to(device_cpu)
model_fp32.eval()

model_int8 = copy.deepcopy(model_fp32)

quantize_(model_int8,Int8WeightOnlyConfig())

print(" Model został skwantyzowany.")

def get_size_mb(model):
    torch.save(model.state_dict(), "temp_model.pth")
    size = os.path.getsize("temp_model.pth") / 1e6
    os.remove("temp_model.pth")
    return size

def measure_time(model, loader, device, runs=50):
    model.eval()
    model.to(device)

    try:
        input_data, _ = next(iter(loader))
    except StopIteration:
        return 0.0
    input_data = input_data.to(device)

    for _ in range(5):
        with torch.no_grad():
            _ = model(input_data)

    start = time.time()
    with torch.no_grad():
        for _ in range(runs):
            _ = model(input_data)
    end = time.time()

    return (end - start) / runs * 1000

def eval_acc(model, loader, device):
    model.eval()
    model.to(device)
    correct = 0
    total = 0
    limit = 50

    with torch.no_grad():
        for i, (images, labels) in enumerate(loader):
            if i >= limit: break
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return 100 * correct / total

print("\n --- WYNIKI PORÓWNANIA (CPU) ---")

size_fp32 = get_size_mb(model_fp32)
size_int8 = get_size_mb(model_int8)

time_fp32 = measure_time(model_fp32, testloader, device_cpu)
time_int8 = measure_time(model_int8, testloader, device_cpu)

acc_fp32 = eval_acc(model_fp32, testloader, device_cpu)
acc_int8 = eval_acc(model_int8, testloader, device_cpu)

print(f"\n{'METRYKA':<20} | {'FP32 (Oryginał)':<18} | {'INT8 (Kwantyzacja)':<18} | {'ZYSK/STRATA'}")
print("-" * 85)
print(f"{'Rozmiar (MB)':<20} | {size_fp32:.2f} MB           | {size_int8:.2f} MB           | {size_fp32/size_int8:.1f}x mniejszy")
print(f"{'Czas (ms/batch)':<20} | {time_fp32:.1f} ms           | {time_int8:.1f} ms           | {time_fp32/time_int8:.1f}x szybszy")
print(f"{'Dokładność (%)':<20} | {acc_fp32:.2f}%            | {acc_int8:.2f}%            | {acc_fp32-acc_int8:.2f} p.p. różnicy")

torch.save(model_int8.state_dict(), "mobilenet_v2_quantized.pth")