<a href="https://colab.research.google.com/github/parabola01/Data-Visualization/blob/main/27_07_Kopia_notatnika_car_recognition_(1).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
import torch.nn as nn
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader

In [2]:
from google.colab import drive
import os
drive.mount('/content/my_drive')

Mounted at /content/my_drive


In [3]:
# images_base_dir = 'cars_merged'
images_base_dir = '/content/my_drive/MyDrive/cars_merged'

In [4]:
# output_dir = ''
output_dir = '/content/my_drive/MyDrive/'
output_data_json_path = os.path.join(output_dir, 'car_dataset_items_with_ids.json')
output_mappings_json_path = os.path.join(output_dir, 'car_dataset_mappings.json')

In [5]:
from torch.utils.data import Dataset
from torchvision.datasets.folder import default_loader
import torch
import os

class StanfordCarsMultiHeadDataset(Dataset):
    def __init__(self, data_items, images_dir, transform=None):
        self.data_items = data_items
        self.images_dir = images_dir
        self.transform = transform

    def __len__(self):
        return len(self.data_items)

    def __getitem__(self, idx):
        item = self.data_items[idx]

        brand_id = item['brand_id']
        model_id = item['model_id']
        type_id = item['type_id']

        img_path = os.path.join(self.images_dir, item['image_path'])

        image = default_loader(img_path)
        if self.transform:
            image = self.transform(image)

        return image, {
            "brand": torch.tensor(brand_id),
            "model": torch.tensor(model_id),
            "type": torch.tensor(type_id)
        }

In [6]:
import json
print(f"Wczytywanie danych z {output_data_json_path}")
with open(output_data_json_path, 'r') as f:
    loaded_data_items_with_ids = json.load(f)

print(f"Wczytywanie mapowa≈Ñ z {output_mappings_json_path}")
with open(output_mappings_json_path, 'r') as f:
    loaded_mappings = json.load(f)

Wczytywanie danych z /content/my_drive/MyDrive/car_dataset_items_with_ids.json
Wczytywanie mapowa≈Ñ z /content/my_drive/MyDrive/car_dataset_mappings.json


In [7]:
num_brands = len(loaded_mappings['brand2idx'])
num_models = len(loaded_mappings['model2idx'])
num_types = len(loaded_mappings['type2idx'])

In [8]:
brand_to_model_mask = torch.full((num_brands, num_models), float('-inf'))

for item in loaded_data_items_with_ids:
    brand_id = item['brand_id']
    model_id = item['model_id']
    brand_to_model_mask[brand_id, model_id] = 0

In [9]:
model_to_type_mask = torch.full((num_models, num_types), float('-inf'))

for item in loaded_data_items_with_ids:
    model_id = item['model_id']
    type_id = item['type_id']
    model_to_type_mask[model_id, type_id] = 0

In [10]:
from torchvision import transforms
from torch.utils.data import DataLoader

transform = transforms.Compose([
    transforms.RandomAffine(degrees=10, translate=(0.1, 0.1), scale=(0.9, 1.1)),  # rotacja + przesuniƒôcie + skalowanie
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(p=0.5),  # lustrzane odbicie
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],  # warto≈õci z ImageNet
                         std=[0.229, 0.224, 0.225])
])

dataset = StanfordCarsMultiHeadDataset(loaded_data_items_with_ids, images_base_dir, transform=transform)

In [11]:
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, Subset
from tqdm import tqdm

# Parametry
fine_tune_epochs = 20
batch_size = 128
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Podzia≈Ç danych
indices = list(range(len(dataset)))
train_val_idx, test_idx = train_test_split(indices, test_size=0.2, random_state=42)
train_idx, val_idx = train_test_split(train_val_idx, test_size=0.125, random_state=42)

# Use Subset to select the indices for train and validation
train_subset = Subset(dataset, train_idx)
val_subset = Subset(dataset, val_idx)
test_subset = Subset(dataset, test_idx)

train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_subset, batch_size=batch_size)
test_loader = DataLoader(test_subset, batch_size=batch_size, shuffle=False)

In [12]:

from datetime import datetime
from torch.utils.tensorboard import SummaryWriter

timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

# log_dir = f"logs/experiment_{timestamp}"
log_dir = "/content/drive/MyDrive/tensorboard_logs/experiment_{timestamp}"
writer = SummaryWriter(log_dir=log_dir)

In [18]:
import torch
import torch.nn as nn
import torchvision.models as models

class MultiHeadResNet(nn.Module):
    def __init__(self, base_model, num_brands, num_models, num_types, brand_to_model_mask, model_to_type_mask,
                 brand_embedding_dim=64, model_embedding_dim=128):
        super().__init__()
        self.backbone = base_model
        in_features = base_model.fc.in_features
        self.backbone.fc = nn.Identity()

        self.brand_to_model_mask = brand_to_model_mask
        self.model_to_type_mask = model_to_type_mask

        self.brand_embedding = nn.Embedding(num_brands, brand_embedding_dim)
        self.model_embedding = nn.Embedding(num_models, model_embedding_dim)

        # G≈Çowa do klasyfikacji marki
        self.brand_head = nn.Sequential(
            nn.Linear(in_features, 1024),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(1024, num_brands)
        )

        # --- Model Head ---
        # Input: backbone features + brand embedding
        self.model_head = nn.Sequential(
            nn.Linear(in_features + brand_embedding_dim, 1024),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(1024, num_models)
        )

        # --- Type Head ---
        # Input: backbone features + brand embedding + model embedding
        self.type_head = nn.Sequential(
            nn.Linear(in_features + brand_embedding_dim + model_embedding_dim, 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, num_types)
        )

    def forward(self, x, targets=None):
        features = self.backbone(x)  # Ekstrakcja cech z obrazu

        brand_logits = self.brand_head(features)

        if self.training:
            # --- TRAINING PATH (Teacher Forcing) ---
            # Use ground-truth labels to get embeddings for stability
            if targets is None:
                raise ValueError("Targets must be provided during training for teacher forcing.")

            brand_labels = targets['brand']
            model_labels = targets['model']

            # 1. Look up brand embedding using ground-truth brand labels
            brand_emb = self.brand_embedding(brand_labels)

            # 2. Predict model using features + true brand embedding
            model_input = torch.cat([features, brand_emb], dim=1)
            model_logits = self.model_head(model_input)

            # 3. Look up model embedding using ground-truth model labels
            model_emb = self.model_embedding(model_labels)

            # 4. Predict type using features + true brand & model embeddings
            type_input = torch.cat([features, brand_emb, model_emb], dim=1)
            type_logits = self.type_head(type_input)

            return {
                "brand": brand_logits,
                "model": model_logits,
                "type": type_logits
            }

        else:
            # --- INFERENCE PATH ---
            # Use the model's own predictions to get embeddings

            # 1. Get brand prediction and its embedding
            brand_preds = torch.argmax(brand_logits, dim=1)
            brand_emb = self.brand_embedding(brand_preds)

            # 2. Predict model using features + predicted brand embedding
            model_input = torch.cat([features, brand_emb], dim=1)
            model_logits = self.model_head(model_input)

            # Apply mask to model logits based on brand prediction
            model_mask = self.brand_to_model_mask[brand_preds]
            masked_model_logits = model_logits + model_mask

            # 3. Get model prediction and its embedding
            model_preds = torch.argmax(masked_model_logits, dim=1)
            model_emb = self.model_embedding(model_preds)

            # 4. Predict type using features + predicted brand & model embeddings
            type_input = torch.cat([features, brand_emb, model_emb], dim=1)
            type_logits = self.type_head(type_input)

            # Apply mask to type logits based on model prediction
            type_mask = self.model_to_type_mask[model_preds]
            masked_type_logits = type_logits + type_mask

            return {
                "brand": brand_logits, # Return original logits for brand
                "model": masked_model_logits,
                "type": masked_type_logits
            }


In [19]:
def evaluate(model, loader, epoch, criterion, prefix="val"):
    model.eval()
    total_loss = 0
    correct = {"brand": 0, "model": 0, "type": 0}
    total = 0

    with torch.no_grad():
        for images, targets in loader:
            images = images.to(device)
            targets = {k: v.to(device) for k, v in targets.items()}

            outputs = model(images)
            loss = sum(criterion(outputs[k], targets[k]) for k in outputs)
            total_loss += loss.item()

            for key in outputs:
                preds = outputs[key].argmax(dim=1)
                correct[key] += (preds == targets[key]).sum().item()
            total += images.size(0)

    avg_loss = total_loss / len(loader)
    acc = {k: correct[k] / total for k in correct}

    # TensorBoard log
    writer.add_scalar(f"{prefix}/loss", avg_loss, epoch)
    for k in acc:
        writer.add_scalar(f"{prefix}/acc_{k}", acc[k], epoch)

    return avg_loss, acc


In [20]:
import torch.nn.utils as torch_utils

def train_one_epoch(model, loader, optimizer, scheduler, criterion, device, brand_to_model_mask, model_to_type_mask, loss_weights={'brand': 0.6, 'model': 1.0, 'type': 1.0}, grad_clip_value=1.0):
    """
    Funkcja do przeprowadzenia jednej epoki treningowej.
    """
    model.train()
    running_loss = 0
    total_grad_norm = 0
    correct = {"brand": 0, "model": 0, "type": 0}
    total = 0

    for images, targets in tqdm(loader, desc="Training"):
        images = images.to(device)
        targets = {k: v.to(device) for k, v in targets.items()}
        brand_labels = targets['brand']
        model_labels = targets['model']
        type_labels = targets['type']

        optimizer.zero_grad()

        outputs = model(images, targets)

        brand_logits = outputs['brand']
        model_logits = outputs['model']
        type_logits = outputs['type']

        # --- Masked Loss Calculation ---
        # 1. Brand loss (calculated as usual)
        loss_brand = criterion(brand_logits, brand_labels)

        # 2. Model loss (with ground truth masking)
        model_loss_mask = brand_to_model_mask[brand_labels]
        masked_model_logits = model_logits + model_loss_mask
        loss_model = criterion(masked_model_logits, model_labels)

        # 3. Type loss (with ground truth masking)
        type_loss_mask = model_to_type_mask[model_labels]
        masked_type_logits = type_logits + type_loss_mask
        loss_type = criterion(masked_type_logits, type_labels)

        # 4. Combine the losses
        loss = (loss_weights['brand'] * loss_brand +
                loss_weights['model'] * loss_model +
                loss_weights['type'] * loss_type)

        loss.backward()

        grad_norm = torch_utils.clip_grad_norm_(model.parameters(), grad_clip_value)
        total_grad_norm += grad_norm.item()

        optimizer.step()
        scheduler.step()
        running_loss += loss.item()
        total += images.size(0)
        correct['brand'] += (brand_logits.argmax(dim=1) == brand_labels).sum().item()
        correct['model'] += (masked_model_logits.argmax(dim=1) == model_labels).sum().item()
        correct['type'] += (masked_type_logits.argmax(dim=1) == type_labels).sum().item()

    avg_loss = running_loss / len(loader)
    avg_grad_norm = total_grad_norm / len(loader)
    accuracy = {k: 100 * correct[k] / total for k in correct} # Corrected accuracy percentage
    print(f"Accuracy -> Brand: {accuracy['brand']:.2f}% | Model: {accuracy['model']:.2f}% | Type: {accuracy['type']:.2f}%")

    return avg_loss, accuracy, avg_grad_norm

In [None]:
from torch.optim.lr_scheduler import OneCycleLR

base_model = models.resnet50(pretrained=True)
for param in base_model.parameters():
    param.requires_grad = False

model = MultiHeadResNet(base_model=base_model,
    num_brands=num_brands,
    num_models=num_models,
    num_types=num_types,
    brand_to_model_mask=brand_to_model_mask,
    model_to_type_mask=model_to_type_mask).to(device)
criterion = nn.CrossEntropyLoss()

# ===================================================================
# === ETAP 1: TRENING G≈ÅOWIC (ZAMRO≈ªONY BACKBONE) ===
# ===================================================================
print("üöÄ ETAP 1: Rozpoczynam trening g≈Çowic...")

# Konfiguracja tylko dla zamro≈ºonego treningu
frozen_epochs = 70
# Upewnij siƒô, ≈ºe tylko parametry g≈Çowic sƒÖ przekazywane do optymalizatora
optimizer_frozen = torch.optim.AdamW(
    filter(lambda p: p.requires_grad, model.parameters()),
    lr=0.0001
)
scheduler_frozen = OneCycleLR(
    optimizer_frozen,
    max_lr=1e-2, # Maksymalny LR do osiƒÖgniƒôcia
    total_steps=frozen_epochs * len(train_loader) # OneCycleLR dzia≈Ça per batch!
)

for epoch in range(frozen_epochs):
    # Wywo≈Çanie zunifikowanej funkcji treningowej
    train_loss, train_acc, train_grad_norm = train_one_epoch(
        model, train_loader, optimizer_frozen, scheduler_frozen, criterion, device, brand_to_model_mask, model_to_type_mask
    )

    # Ewaluacja
    val_loss, val_acc = evaluate(model, val_loader, epoch, criterion, prefix="val")

    # Logowanie
    current_lr = optimizer_frozen.param_groups[0]['lr']
    writer.add_scalar("train/loss", train_loss, epoch)
    writer.add_scalar("train/grad_norm", train_grad_norm, epoch)
    writer.add_scalar("train/learning_rate", current_lr, epoch)
    for k in train_acc:
        writer.add_scalar(f"train/acc_{k}", train_acc[k], epoch)

    print(f"ETAP 1 - Epoka {epoch+1}/{frozen_epochs} | Val Loss: {val_loss:.4f} | LR: {current_lr:.6f}")
    print(f"Val Acc: {val_acc}")


# ===================================================================
# === ETAP 2: FINE-TUNING (ODMRO≈ªONY BACKBONE) ===
# ===================================================================
print("\nüöÄ ETAP 2: Rozpoczynam fine-tuning...")

# KROK 1: Odmra≈ºamy ostatnie warstwy backbone'u
for param in model.backbone.layer4.parameters():
    param.requires_grad = True

# KROK 2: Tworzymy NOWY optymalizator z r√≥≈ºnymi learning rates (bardzo wa≈ºne!)
fine_tune_epochs = 35
optimizer_finetune = torch.optim.AdamW([
    # Grupa parametr√≥w dla g≈Çowic (wy≈ºszy learning rate)
    {'params': (p for n, p in model.named_parameters() if 'backbone' not in n and p.requires_grad), 'lr': 1e-5},
    # Grupa parametr√≥w dla odmro≈ºonego backbone'u (BARDZO niski learning rate)
    {'params': model.backbone.layer4.parameters(), 'lr': 1e-6}
])
# Tworzymy NOWY scheduler dla nowego optymalizatora
scheduler_finetune = OneCycleLR(
    optimizer_finetune,
    max_lr=[1e-2, 1e-3], # max_lr dla g≈Çowic i dla backbone
    total_steps=fine_tune_epochs * len(train_loader)
)

for epoch in range(fine_tune_epochs):
    # Wa≈ºne: indeks epoki do logowania musi byƒá kontynuacjƒÖ poprzedniego etapu
    epoch_idx = frozen_epochs + epoch

    train_loss, train_acc, train_grad_norm = train_one_epoch(
        model, train_loader, optimizer_finetune, scheduler_finetune, criterion, device, brand_to_model_mask, model_to_type_mask
    )

    val_loss, val_acc = evaluate(model, val_loader, epoch_idx, criterion, prefix="val")

    # Logowanie - u≈ºywamy osobnych grup LR
    lr_heads = optimizer_finetune.param_groups[0]['lr']
    lr_backbone = optimizer_finetune.param_groups[1]['lr']
    writer.add_scalar("train/loss", train_loss, epoch_idx)
    writer.add_scalar("train/grad_norm", train_grad_norm, epoch_idx)
    writer.add_scalar("train/learning_rate_heads", lr_heads, epoch_idx)
    writer.add_scalar("train/learning_rate_backbone", lr_backbone, epoch_idx)
    for k in train_acc:
        writer.add_scalar(f"train/acc_{k}", train_acc[k], epoch_idx)

    print(f"ETAP 2 - Epoka {epoch+1}/{fine_tune_epochs} | Val Loss: {val_loss:.4f} | LR G≈Çowic: {lr_heads:.6f} | LR Backbone: {lr_backbone:.7f}")
    print(f"Val Acc: {val_acc}")

In [None]:
test_loss, test_acc = evaluate(model, test_loader, fine_tune_epochs, criterion, prefix="test")
print(f"Test loss: {test_loss:.4f} | Test acc: {test_acc}")

In [None]:
torch.save(model.state_dict(), f"car_model_{timestamp}.pth")

In [None]:
from tensorboard.backend.event_processing.event_accumulator import EventAccumulator
import matplotlib.pyplot as plt
import os

# chart_dir = f"logs/tensorboard_charts/experiment_{timestamp}"
chart_dir = "/content/drive/MyDrive/tensorboard_charts/experiment_{timestamp}"
os.makedirs(chart_dir, exist_ok=True)

ea = EventAccumulator(log_dir)
ea.Reload()

for tag in ea.Tags()['scalars']:
    events = ea.Scalars(tag)
    steps = [e.step for e in events]
    values = [e.value for e in events]

    plt.figure()
    plt.plot(steps, values)
    plt.title(tag)
    plt.xlabel("Epoch")
    plt.ylabel(tag.split('/')[-1])
    plt.grid(True)

    fname_base = tag.replace("/", "_")
    plt.savefig(os.path.join(chart_dir, f"{fname_base}.png"))
    plt.savefig(os.path.join(chart_dir, f"{fname_base}.pdf"))
    plt.close()