In [None]:
import os
import json
import pandas as pd
from pathlib import Path
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
import torch.nn as nn
import torch.optim as optim

# Parametry
JSON_PATH = "../data/train_data.json"
IMAGE_DIR = "../data/images/"
MODEL_SAVE_DIR = "../models/"
IMG_HEIGHT, IMG_WIDTH = 140, 100
NUM_IMAGES = 12
BATCH_SIZE = 16
NUM_EPOCHS = 10
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Dataset
class CarDataset(Dataset):
    def __init__(self, json_path, image_dir, transform=None, num_images=NUM_IMAGES, debug=False):
        self.image_dir = image_dir
        self.transform = transform
        self.num_images = num_images
        self.debug = debug

        # Wczytaj dane JSON
        self.data = pd.read_json(json_path, lines=True)

        # Unikalne modele samochodów (mapowanie na klasy)
        self.models = {model: idx for idx, model in enumerate(self.data["model"].unique())}

        # Filtruj dane, aby uwzględnić tylko oferty z przynajmniej jednym zdjęciem
        self.samples = []
        for _, row in self.data.iterrows():
            car_id = str(row["id"])
            has_images = any(
                Path(self.image_dir, f"{car_id}_{i}.jpg").exists() for i in range(1, self.num_images + 1)
            )
            if has_images:
                self.samples.append({
                    "id": car_id,
                    "model": row["model"],  # Klasa modelu samochodu
                })

        if self.debug:
            print(f"Debug: Dataset contains {len(self.samples)} samples.")

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        sample = self.samples[idx]
        car_id = sample["id"]
        label = torch.tensor(self.models[sample["model"]], dtype=torch.long)

        # Załaduj zdjęcia
        images = []
        for i in range(1, self.num_images + 1):
            image_path = Path(self.image_dir, f"{car_id}_{i}.jpg")
            if image_path.exists():
                img = Image.open(image_path).convert("RGB")
                if self.transform:
                    img = self.transform(img)
                images.append(img)

        # Jeśli brakuje zdjęć, powielaj istniejące
        while len(images) < self.num_images:
            images.append(images[-1])

        # Przytnij, jeśli jest za dużo zdjęć
        images = images[:self.num_images]
        images = torch.stack(images)  # [NUM_IMAGES, C, H, W]

        return images, label

# Model
class CarModelClassifier(nn.Module):
    def __init__(self, num_images=NUM_IMAGES, num_classes=10):
        super(CarModelClassifier, self).__init__()
        # Model bazowy (ResNet18)
        self.base_model = models.resnet18(pretrained=True)
        self.base_model.fc = nn.Identity()  # Usuń klasyfikator na końcu

        # Przetwarzanie obrazów
        self.embedding_size = 512  # Rozmiar embeddingu z ResNet18
        self.fc = nn.Linear(self.embedding_size, num_classes)

def forward(self, x):
    # x: [BATCH_SIZE, NUM_IMAGES, C, H, W]
    batch_size, num_images, C, H, W = x.size()
    
    # Przetwarzanie każdego obrazu w ramach jednej oferty
    embeddings = []
    for i in range(num_images):
        img_features = self.base_model(x[:, i, :, :, :])  # Przetwarzanie pojedynczego obrazu
        embeddings.append(img_features)
    
    # Łączenie cech obrazów przy użyciu max pooling
    embeddings = torch.stack(embeddings, dim=1)  # [BATCH_SIZE, NUM_IMAGES, EMBEDDING_SIZE]
    combined_embedding, _ = embeddings.max(dim=1)  # [BATCH_SIZE, EMBEDDING_SIZE]
    
    # Klasyfikacja na podstawie połączonych cech
    output = self.fc(combined_embedding)  # [BATCH_SIZE, NUM_CLASSES]
    return output

# Transformacje dla obrazów
transform = transforms.Compose([
    transforms.Resize((IMG_HEIGHT, IMG_WIDTH)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Dataset i DataLoader
dataset = CarDataset(JSON_PATH, IMAGE_DIR, transform=transform)
train_loader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)

# Model, funkcja kosztu, optymalizator
num_classes = len(dataset.models)  # Liczba klas (unikalnych modeli samochodów)
model = CarModelClassifier(num_images=NUM_IMAGES, num_classes=num_classes).to(DEVICE)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Trenowanie
for epoch in range(NUM_EPOCHS):
    model.train()
    epoch_loss = 0
    correct = 0
    total = 0
    for images, labels in train_loader:
        images, labels = images.to(DEVICE), labels.to(DEVICE)

        # Przewidywania i strata
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # Oblicz metryki
        epoch_loss += loss.item()
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()

    accuracy = 100.0 * correct / total
    print(f"Epoch [{epoch+1}/{NUM_EPOCHS}], Loss: {epoch_loss/len(train_loader):.4f}, Accuracy: {accuracy:.2f}%")

# Tworzenie katalogu na zapis modelu
os.makedirs(MODEL_SAVE_DIR, exist_ok=True)




In [None]:
# Zapis modelu
model_path = os.path.join(MODEL_SAVE_DIR, "car_model_classifier.pth")
torch.save(model.state_dict(), model_path)
print(f"Model zapisany w: {model_path}")