### Datasets and Dataloaders

In [8]:
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import Dataset, Subset, DataLoader
import os
from PIL import Image
import os
import random

class CustomImageDataset(Dataset):
    def __init__(self, image_dir, transform=None, num_images_per_country=200):
        self.image_dir = image_dir
        self.transform = transform
        self.num_images_per_country = num_images_per_country
        
        # Liste des sous-dossiers (chaque sous-dossier correspond à un pays)
        self.country_dirs = [d for d in os.listdir(image_dir) if os.path.isdir(os.path.join(image_dir, d))]

        self.image_files = []
        self.labels = []
        
        # Récupérer les images et labels
        for idx, country in enumerate(self.country_dirs):
            country_path = os.path.join(image_dir, country)
            
            # Liste des fichiers d'images pour ce pays
            country_images = [f for f in os.listdir(country_path) if f.endswith('.png')]

            # Limiter le nombre d'images récupérées par pays, si spécifié
            if self.num_images_per_country:
                country_images = random.sample(country_images, min(self.num_images_per_country, len(country_images)))
            
            for img_name in country_images:
                self.image_files.append(os.path.join(country_path, img_name))
                self.labels.append(country)  # Le label est le nom du pays (le sous-dossier)

        # Encodage des labels (pays -> indices)
        self.label_encoder = LabelEncoder()
        self.labels = self.label_encoder.fit_transform(self.labels)
        
        # Stocker les noms de classes pour référence future
        self.class_names = self.label_encoder.classes_
        
    def __len__(self):
        return len(self.image_files)
    
    def __getitem__(self, idx):
        # Charger l'image
        img_name = self.image_files[idx]
        image = Image.open(img_name).convert('RGB')
        
        # Appliquer les transformations (si fournies)
        if self.transform:
            image = self.transform(image)
        
        # Récupérer le label associé
        label = self.labels[idx]
        
        return image, label

In [9]:
from torchvision import transforms
from torch.utils.data import random_split

# Transformations pour les images (redimensionnement et normalisation)
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # ResNet attend des images 224x224
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalisation standard
])

# Instantiate the dataset
image_dir = '../script/streetview_scrapping/images'  # Dossier contenant les sous-dossiers des pays
dataset = CustomImageDataset(image_dir=image_dir, transform=transform)

# Split dataset into training, validation and test sets
train_size = int(0.8 * len(dataset))
validation_size = (len(dataset) - train_size) // 2
test_size = len(dataset) - train_size - validation_size

trainset, valset, testset = random_split(dataset, [train_size, validation_size, test_size])

# Create DataLoaders
trainloader = DataLoader(trainset, batch_size=4, shuffle=True)
valloader = DataLoader(valset, batch_size=4, shuffle=True)
testloader = DataLoader(testset, batch_size=4, shuffle=True)

# Check class distribution
print(f"Train set size: {len(trainset)}")
print(f"Validation set size: {len(valset)}")
print(f"Test set size: {len(testset)}")

Train set size: 320
Validation set size: 40
Test set size: 40


### Transfer Learning (Feature Extraction)

In [5]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision.models import resnet18, ResNet18_Weights
from torchvision import models

# Load a pre-trained ResNet-18 model
model = models.resnet18(weights=ResNet18_Weights.IMAGENET1K_V1)

# Modify the final fully connected layer to match the number of classes
num_classes = len(dataset.class_names)
model.fc = nn.Linear(model.fc.in_features, num_classes)

# Move the model to the GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

def train_model(model, trainloader, valloader, criterion, optimizer, num_epochs=25):
    best_model_wts = model.state_dict()
    best_acc = 0.0

    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        # Chaque époque a une phase d'entraînement et de validation
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Met le modèle en mode entraînement
                dataloader = trainloader
            else:
                model.eval()   # Met le modèle en mode évaluation
                dataloader = valloader

            running_loss = 0.0
            running_corrects = 0

            # Boucle sur les données
            for inputs, labels in dataloader:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # Remise à zéro des gradients
                optimizer.zero_grad()

                # Passage avant et arrière
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # Propagation arrière + optimisation si on est en phase d'entraînement
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / len(dataloader.dataset)
            epoch_acc = running_corrects.double() / len(dataloader.dataset)

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = model.state_dict()

        print()

    print('Best val Acc: {:.4f}'.format(best_acc))

    # Charger les poids du meilleur modèle
    model.load_state_dict(best_model_wts)
    return model

In [6]:
model = train_model(model, trainloader, valloader, criterion, optimizer, num_epochs=1)

Epoch 0/0
----------
train Loss: 0.5897 Acc: 0.7643
val Loss: 0.2939 Acc: 0.8611

Best val Acc: 0.8611


In [7]:
def evaluate_model(model, testloader):
    model.eval()  # Mise en mode évaluation
    running_corrects = 0

    with torch.no_grad():
        for inputs, labels in testloader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            running_corrects += torch.sum(preds == labels.data)

    accuracy = running_corrects.double() / len(testloader.dataset)
    print(f'Test Accuracy: {accuracy:.4f}')

# Évaluer le modèle sur le test set
evaluate_model(model, testloader)

Test Accuracy: 0.8630


### Transfer Learning (Fine Tuning)