In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.models as models
import torchvision.datasets as datasets
import random
from torch.utils.data import DataLoader, random_split
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from torch.utils.tensorboard import SummaryWriter

def get_device():
    if torch.cuda.is_available():
        device = torch.device("cuda")
        torch.cuda.set_per_process_memory_fraction(0.8, device=device.index)
        print("Using GPU with 80% memory usage.")
    else:
        device = torch.device("cpu")
        print("Using CPU.")
    return device

def load_data(data_dir, batch_size, img_size):
    transform = transforms.Compose([
        transforms.Resize((img_size, img_size)),
        transforms.RandomRotation(20),
        transforms.RandomAffine(degrees=15, translate=(0.1, 0.1)),
        transforms.RandomHorizontalFlip(),
        transforms.ColorJitter(brightness=0.2, contrast=0.2),
        transforms.ToTensor(),
        transforms.Normalize([0.5], [0.5])
    ])

    dataset = datasets.ImageFolder(root=data_dir, transform=transform)
    num_classes = len(dataset.classes)

    train_size = int(0.7 * len(dataset))
    val_size = int(0.15 * len(dataset))
    test_size = len(dataset) - train_size - val_size

    indices = list(range(len(dataset)))
    random.shuffle(indices)

    train_indices = indices[:train_size]
    val_indices = indices[train_size:train_size + val_size]
    test_indices = indices[train_size + val_size:]

    train_data = torch.utils.data.Subset(dataset, train_indices)
    val_data = torch.utils.data.Subset(dataset, val_indices)
    test_data = torch.utils.data.Subset(dataset, test_indices)

    train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False)

    return train_loader, val_loader, test_loader, num_classes

def build_model(model_type, num_classes):
    if model_type == "resnet50":
        model = models.resnet50(pretrained=True)
        model.fc = nn.Sequential(
            nn.Linear(model.fc.in_features, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes)
        )
    elif model_type == "mobilenetv2":
        model = models.mobilenet_v2(pretrained=True)
        model.classifier = nn.Sequential(
            nn.Linear(model.last_channel, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes)
        )
    elif model_type == "custom_cnn":
        model = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Flatten(),
            nn.Linear(128 * 28 * 28, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes)
        )
    else:
        raise ValueError("Unsupported model type. Choose 'resnet50', 'mobilenetv2', or 'custom_cnn'.")
    return model

def train_and_evaluate(model, train_loader, val_loader, device, epochs, model_path, optimizer_type, learning_rate, use_tensorboard):
    model.to(device)
    criterion = nn.CrossEntropyLoss()
    
    if optimizer_type.lower() == "adam":
        optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    elif optimizer_type.lower() == "sgd":
        optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)
    else:
        raise ValueError("Unsupported optimizer type. Choose 'Adam' or 'SGD'.")

    best_val_acc = 0.0  # Để lưu lại mô hình tốt nhất
    writer = SummaryWriter() if use_tensorboard else None  # TensorBoard logger

    for epoch in range(epochs):
        model.train()
        running_loss, correct, total = 0.0, 0, 0

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

        train_loss = running_loss / len(train_loader)
        train_acc = correct / total

        # Đánh giá trên tập validation
        val_loss, val_acc = evaluate(model, val_loader, device, criterion)

        print(f"Epoch [{epoch+1}/{epochs}] - Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

        if writer:
            writer.add_scalar("Loss/Train", train_loss, epoch)
            writer.add_scalar("Loss/Validation", val_loss, epoch)
            writer.add_scalar("Accuracy/Train", train_acc, epoch)
            writer.add_scalar("Accuracy/Validation", val_acc, epoch)

        # Lưu mô hình nếu có độ chính xác cao nhất trên validation set
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), model_path)
            print(f"✅ Saved best model at epoch {epoch+1} with val_acc: {val_acc:.4f}")

    if writer:
        writer.close()

def evaluate_model(model, test_loader, device):
    model.to(device) 
    model.eval()  
    all_preds, all_labels = [], [] 

    with torch.no_grad():  
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)  
            _, predicted = torch.max(outputs, 1)  
            all_preds.extend(predicted.cpu().numpy())  
            all_labels.extend(labels.cpu().numpy()) 

    cm = confusion_matrix(all_labels, all_preds)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm)
    disp.plot(cmap='Blues', values_format='d') 
    plt.title("Confusion Matrix - Test Set")
    plt.show()


def main():
    data_dir = "../hand_gesture_recognition_project/leapGestRecog"
    batch_size = 60
    img_size = 224
    epochs = 50
    model_path = "best_model.pth"
    optimizer_type = 'Adam'
    learning_rate = 1e-3
    use_tensorboard = True
    model_type = "resnet50"  # Hoặc 'mobilenetv2', 'custom_cnn'

    device = get_device()
    train_loader, val_loader, test_loader, num_classes = load_data(data_dir, batch_size, img_size)
    model = build_model(model_type, num_classes)
    criterion = nn.CrossEntropyLoss()  

    train_and_evaluate(model, train_loader, val_loader, device, epochs, model_path, optimizer_type, learning_rate, use_tensorboard)

    model.load_state_dict(torch.load(model_path))
    model.to(device)

    evaluate_model(model, test_loader, device, criterion, show_confusion_matrix=True)

if __name__ == "__main__":
    main()

