In [None]:
pip install torch torchvision tqdm


In [None]:
import timm
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from torchvision import datasets, transforms, models 
from torchvision.utils import make_grid
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from PIL import Image
import os
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
import warnings
warnings.filterwarnings("ignore")

In [None]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [None]:
import json
with open('/kaggle/input/pytorch-challange-flower-dataset/cat_to_name.json', 'r') as f:
    cat2name_mapping = json.load(f)
#print(cat2name_mapping)
class_names = [cat2name_mapping[cat] for cat in cat2name_mapping]
cat_list = [cat for cat in cat2name_mapping]
#print(cat_list)
print(class_names)
N=list(range(len(class_names)))
cat2N_mapping=dict(zip(cat_list,N)) 
name2N_mapping=dict(zip(class_names,N)) 
N2name_mapping=dict(zip(N,class_names))

In [None]:
path_label=[]
for dirname, _, filenames in os.walk('/kaggle/input/pytorch-challange-flower-dataset/dataset/train'):
    for filename in filenames:
        if filename[-4:]=='.jpg' and dirname.split('/')[-1] in cat_list:
            path=os.path.join(dirname, filename)
            label=dirname.split('/')[-1]
            path_label+=[(path,cat2N_mapping[label])]
            
tpath_label=[]
for dirname, _, filenames in os.walk('/kaggle/input/pytorch-challange-flower-dataset/dataset/valid'):
    for filename in filenames:
        if filename[-4:]=='.jpg' and dirname.split('/')[-1] in cat_list:
            path=os.path.join(dirname, filename)
            label=dirname.split('/')[-1]
            tpath_label+=[(path,cat2N_mapping[label])]  

In [None]:
class ImageDataset(Dataset):
    def __init__(self, path_label, transform=None):
        self.path_label = path_label
        self.transform = transform

    def __len__(self):
        return len(self.path_label)

    def __getitem__(self, idx):
        path, label = self.path_label[idx]
        img = Image.open(path).convert('RGB')

        if self.transform is not None:
            img = self.transform(img)

        return img, label

In [None]:
train_data = ImageDataset(path_label, transform)
test_data = ImageDataset(tpath_label, transform)

In [None]:
labels = [label for _, label in train_data.path_label]

In [None]:
train_loader=DataLoader(train_data,batch_size=32,shuffle=True)
test_loader=DataLoader(test_data,batch_size=32)

In [None]:
for images, labels in train_loader:
    break
im=make_grid(images,nrow=16)

In [None]:
plt.figure(figsize=(12,12))
plt.imshow(np.transpose(im.numpy(),(1,2,0)))

In [None]:
inv_normalize=transforms.Normalize(mean=[-0.485/0.229,-0.456/0.224,-0.406/0.225],
                                    std=[1/0.229,1/0.224,1/0.225])
im=inv_normalize(im)

In [None]:
plt.figure(figsize=(12,12))
plt.imshow(np.transpose(im.numpy(),(1,2,0)))

In [None]:
models_sota = {
    "ResNet50": models.resnet50(pretrained=True),
    "EfficientNet_B0": models.efficientnet_b0(pretrained=True),
    "DenseNet121": models.densenet121(pretrained=True),
    "ViT_B_16": models.vit_b_16(pretrained=True),
    "Swin_T": models.swin_t(pretrained=True),
    "ConvNeXt_T": models.convnext_tiny(pretrained=True),
}

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
criterion = nn.CrossEntropyLoss()

In [None]:
import torch
import torch.nn as nn
from tqdm import tqdm

def train_and_evaluate(model_name, model, train_loader, test_loader, num_classes=102, epochs=5, lr=1e-3):
    print(f"\nTraining {model_name}...")

    # Đưa mô hình về thiết bị (GPU nếu có)
    model = model.to(device)
    
    # Sử dụng DataParallel nếu có nhiều GPU
    if torch.cuda.device_count() > 1:
        model = nn.DataParallel(model)

    # Thay đổi output layer để phù hợp với số lớp của bạn
    if hasattr(model, 'fc'):  # Ví dụ với ResNet
        model.fc = nn.Linear(model.fc.in_features, num_classes).to(device)
    elif hasattr(model, 'classifier'):  # Ví dụ với DenseNet, EfficientNet
        model.classifier = nn.Linear(model.classifier.in_features, num_classes).to(device)
    elif hasattr(model, 'heads'):  # Ví dụ với ViT
        model.heads.head = nn.Linear(model.heads.head.in_features, num_classes).to(device)
    
    # Đảm bảo criterion sử dụng device
    criterion = nn.CrossEntropyLoss().to(device)
    
    # Tạo optimizer
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    
    for epoch in range(epochs):
        model.train()
        train_loss, correct = 0, 0
        # Sử dụng tqdm để tạo thanh tiến độ
        with tqdm(total=len(train_loader), desc=f'Epoch {epoch+1}/{epochs}', unit='batch') as pbar:
            for images, labels in train_loader:
                # Di chuyển dữ liệu vào device
                images, labels = images.to(device), labels.to(device)
                
                # Zero gradients, backward pass, optimizer step
                optimizer.zero_grad()
                outputs = model(images)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()

                # Cập nhật thống kê
                train_loss += loss.item()
                correct += (outputs.argmax(1) == labels).sum().item()

                # Cập nhật thanh tiến độ
                pbar.set_postfix(loss=train_loss / (pbar.n + 1), accuracy=correct / len(train_loader.dataset))
                pbar.update(1)  # Cập nhật thanh tiến độ

        # Tính toán độ chính xác và in ra
        accuracy = correct / len(train_loader.dataset)
        print(f"Epoch {epoch+1}/{epochs} - Loss: {train_loss:.4f}, Accuracy: {accuracy:.4f}")

    # Đánh giá trên tập test
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for images, labels in test_loader:
            # Di chuyển dữ liệu vào device
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            test_loss += loss.item()
            correct += (outputs.argmax(1) == labels).sum().item()

    # Tính độ chính xác trên test set
    test_accuracy = correct / len(test_loader.dataset)
    print(f"Test Accuracy for {model_name}: {test_accuracy:.4f}")

    # Lưu checkpoint
    checkpoint_path = f"{model_name}_checkpoint.pth"  # Tên checkpoint theo model
    checkpoint = {
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'epoch': epochs,
        'loss': train_loss / len(train_loader),
        'accuracy': accuracy,
    }
    torch.save(checkpoint, checkpoint_path)
    print(f"Checkpoint saved to {checkpoint_path}")


In [None]:
for model_name, model in models_sota.items():
    train_and_evaluate(model_name, model, train_loader, test_loader)