In [None]:
#cell 1
import os
import shutil
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torchvision.models as models
from torch.optim.lr_scheduler import ReduceLROnPlateau
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

In [None]:
#cell 2
# ƒê∆∞·ªùng d·∫´n d·ªØ li·ªáu
archive_dir = "/kaggle/input/skin-cancer-mnist-ham10000"
part1_dir = os.path.join(archive_dir, "HAM10000_images_part_1")
part2_dir = os.path.join(archive_dir, "HAM10000_images_part_2")
metadata_file = os.path.join(archive_dir, "HAM10000_metadata.csv")
merged_dir = "/kaggle/working/HAM10000_sorted"
os.makedirs(merged_dir, exist_ok=True)

# ƒê·ªçc metadata v√† s·∫Øp x·∫øp ·∫£nh
df = pd.read_csv(metadata_file)
for _, row in df.iterrows():
    image_id, disease_class = row["image_id"], row["dx"]
    class_dir = os.path.join(merged_dir, disease_class)
    os.makedirs(class_dir, exist_ok=True)
    filename = image_id + ".jpg"
    src_path = os.path.join(part1_dir, filename) if os.path.exists(os.path.join(part1_dir, filename)) else os.path.join(part2_dir, filename)
    if os.path.exists(src_path):
        shutil.copy(src_path, os.path.join(class_dir, filename))

print(f"‚úÖ ƒê√£ g·ªôp v√† s·∫Øp x·∫øp ·∫£nh v√†o th∆∞ m·ª•c: {merged_dir}")


In [None]:
#cell 3
from torch.quantization import QuantStub, DeQuantStub, fuse_modules

data_dir = merged_dir
output_dir = "/kaggle/working/HAM10000_split"
for split in ["train", "valid", "test"]:
    os.makedirs(os.path.join(output_dir, split), exist_ok=True)

train_ratio, valid_ratio, test_ratio = 0.8, 0.1, 0.1
assert train_ratio + valid_ratio + test_ratio == 1.0, "T·ªïng t·ª∑ l·ªá ph·∫£i b·∫±ng 1"

for class_name in os.listdir(data_dir):
    class_path = os.path.join(data_dir, class_name)
    if not os.path.isdir(class_path): continue

    for split in ["train", "valid", "test"]:
        os.makedirs(os.path.join(output_dir, split, class_name), exist_ok=True)

    images = [f for f in os.listdir(class_path) if f.endswith(('.jpg', '.png'))]
    train_images, temp_images = train_test_split(images, train_size=train_ratio, random_state=42)
    valid_images, test_images = train_test_split(temp_images, test_size=0.5, random_state=42)

    def copy_images(img_list, src_dir, dest_dir):
        for img in img_list:
            shutil.copy2(os.path.join(src_dir, img), os.path.join(dest_dir, img))

    copy_images(train_images, class_path, os.path.join(output_dir, "train", class_name))
    copy_images(valid_images, class_path, os.path.join(output_dir, "valid", class_name))
    copy_images(test_images, class_path, os.path.join(output_dir, "test", class_name))

    print(f"‚úÖ {class_name}: {len(train_images)} train, {len(valid_images)} valid, {len(test_images)} test")

print("\nüéØ Ho√†n th√†nh! D·ªØ li·ªáu ƒë√£ ƒë∆∞·ª£c chia v√†o '/kaggle/working/HAM10000_split'.")


In [None]:
#cell 4
import torch.ao.quantization as tq

class CustomEfficientNet(nn.Module):
    def __init__(self, num_classes):
        super(CustomEfficientNet, self).__init__()
        self.model = models.efficientnet_b0(weights=models.EfficientNet_B0_Weights.DEFAULT)

        # Th√™m l∆∞·ª£ng t·ª≠ h√≥a
        self.quant = tq.QuantStub()
        self.dequant = tq.DeQuantStub()

        self.model.classifier = nn.Sequential(
            nn.Dropout(0.6),
            nn.Linear(1280, 512),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        x = self.quant(x)  # L∆∞·ª£ng t·ª≠ h√≥a ƒë·∫ßu v√†o
        x = self.model(x)
        x = self.dequant(x)  # Gi·∫£i l∆∞·ª£ng t·ª≠ h√≥a ƒë·∫ßu ra
        return x

    def fuse_model(self):
        for module_name, module in self.model.named_children():
            if isinstance(module, nn.Sequential):
                for block_name, block in module.named_children():
                    if isinstance(block, nn.Sequential):
                        submodules = list(block.children())

                        # Ki·ªÉm tra v√† fuse Conv2d + BatchNorm2d + ReLU n·∫øu c√≥
                        for i in range(len(submodules) - 2):
                            if (
                                isinstance(submodules[i], nn.Conv2d)
                                and isinstance(submodules[i + 1], nn.BatchNorm2d)
                                and isinstance(submodules[i + 2], nn.ReLU)
                            ):
                                tq.fuse_modules(block, [str(i), str(i + 1), str(i + 2)], inplace=True)

In [None]:
#cell 5
import torch.ao.quantization as tq

def fuse_model(model):
    """ H·ª£p nh·∫•t (fuse) c√°c l·ªõp Conv + BatchNorm + ReLU tr∆∞·ªõc khi QAT """
    for module_name, module in model.named_children():
        if isinstance(module, torch.nn.Sequential):
            submodules = list(module.children())
            if (
                len(submodules) >= 3
                and isinstance(submodules[0], torch.nn.Conv2d)
                and isinstance(submodules[1], torch.nn.BatchNorm2d)
                and isinstance(submodules[2], torch.nn.ReLU)
            ):
                tq.fuse_modules(module, ['0', '1', '2'], inplace=True)
        fuse_model(module)

def evaluate(model, device, loader, loss_fn, mode="Validation"):
    model.eval()
    total_loss, correct, total = 0, 0, 0
    with torch.no_grad():
        for data, target in loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            total_loss += loss_fn(output, target).item()
            correct += output.argmax(dim=1).eq(target).sum().item()
            total += target.size(0)

    avg_loss = total_loss / len(loader)
    accuracy = 100. * correct / total
    print(f"{mode}: Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%")
    return avg_loss, accuracy

def train(args, model, device, train_loader, valid_loader, optimizer, scheduler, early_stop):
    loss_fn = nn.CrossEntropyLoss(label_smoothing=0.1)
    train_losses, valid_losses = [], []
    best_valid_loss = float("inf")

    # üîπ B·∫≠t ch·∫ø ƒë·ªô QAT tr∆∞·ªõc khi train (c·∫ßn ki·ªÉm tra n·∫øu ch∆∞a chu·∫©n b·ªã)
    if not hasattr(model, 'qconfig') or model.qconfig is None:
        model.qconfig = tq.get_default_qat_qconfig('qnnpack')  # Ho·∫∑c 'fbgemm' n·∫øu b·∫°n s·ª≠ d·ª•ng GPU
        fuse_model(model)  # H·ª£p nh·∫•t c√°c l·ªõp tr∆∞·ªõc khi QAT
        tq.prepare_qat(model, inplace=True)
        print("‚úÖ QAT Enabled: Model prepared for Quantization Aware Training")

    for epoch in range(1, args.epochs + 1):
        print(f"\nEpoch {epoch}/{args.epochs}")
        model.train()
        total_loss, correct, total = 0, 0, 0

        for data, target in train_loader:
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = loss_fn(output, target)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            correct += output.argmax(dim=1).eq(target).sum().item()
            total += target.size(0)

        avg_train_loss = total_loss / len(train_loader)
        train_accuracy = 100. * correct / total  # T√≠nh accuracy tr√™n t·∫≠p train
        train_losses.append(avg_train_loss)

        valid_loss, valid_accuracy = evaluate(model, device, valid_loader, loss_fn)
        valid_losses.append(valid_loss)

        # ‚úÖ In ƒë·∫ßy ƒë·ªß th√¥ng tin
        print(f"Train: Loss: {avg_train_loss:.4f}, Accuracy: {train_accuracy:.2f}%")

        scheduler.step(valid_loss)

        if valid_loss < best_valid_loss:
            best_valid_loss = valid_loss
            torch.save(model.state_dict(), "best_model.pt")
            print("Model improved and saved!")
            early_stop.counter = 0
        else:
            early_stop.counter += 1
            if early_stop.counter >= early_stop.patience:
                print("Early stopping triggered!")
                break

    return train_losses, valid_losses


def test(model, device, test_loader):
    loss_fn = nn.CrossEntropyLoss(label_smoothing=0.1)
    print("\nFinal Evaluation on Test Set:")
    evaluate(model, device, test_loader, loss_fn, mode="Test")

class EarlyStopping:
    def __init__(self, patience=5):
        self.patience = patience
        self.counter = 0


In [None]:
class Args:
    batch_size = 32
    test_batch_size = 32
    epochs = 40
    lr = 0.0001
    patience = 7

args = Args()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

train_dir, valid_dir, test_dir = [os.path.join(output_dir, d) for d in ["train", "valid", "test"]]

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

train_dataset = datasets.ImageFolder(root=train_dir, transform=transform)
valid_dataset = datasets.ImageFolder(root=valid_dir, transform=transform)
test_dataset = datasets.ImageFolder(root=test_dir, transform=transform)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=args.batch_size, shuffle=True)
valid_loader = torch.utils.data.DataLoader(valid_dataset, batch_size=args.test_batch_size, shuffle=False)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=args.test_batch_size, shuffle=False)

num_classes = len(train_dataset.classes)
model = CustomEfficientNet(num_classes).to(device)
optimizer = optim.Adam(model.parameters(), lr=args.lr, weight_decay=5e-4)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3, min_lr=1e-6)
early_stop = EarlyStopping(patience=args.patience)

# üî• Train m√¥ h√¨nh
train(args, model, device, train_loader, valid_loader, optimizer, scheduler, early_stop)

# üîÑ Load l·∫°i m√¥ h√¨nh t·ªët nh·∫•t
model.load_state_dict(torch.load("best_model.pt", weights_only=True))
model.to("cpu")  # Chuy·ªÉn v·ªÅ CPU tr∆∞·ªõc khi l∆∞·ª£ng t·ª≠ h√≥a
model.eval()

# üõ†Ô∏è Fuse model tr∆∞·ªõc khi l∆∞·ª£ng t·ª≠ h√≥a ƒë·ªÉ tr√°nh l·ªói fuser method
model.fuse_model()  # ‚úÖ G·ªçi ph∆∞∆°ng th·ª©c fuse_model() ƒë√∫ng c√°ch

# üî• Ch·ªâ l∆∞·ª£ng t·ª≠ h√≥a n·∫øu m√¥ h√¨nh ƒë√£ ƒë∆∞·ª£c chu·∫©n b·ªã cho QAT tr∆∞·ªõc ƒë√≥
if hasattr(model, 'qconfig') and model.qconfig is not None:
    torch.ao.quantization.convert(model, inplace=True)
    print("‚úÖ Model has been quantized successfully!")
    torch.save(model.state_dict(), "best_model_quantized.pt")  # L∆∞u m√¥ h√¨nh l∆∞·ª£ng t·ª≠ h√≥a
else:
    print("‚ö†Ô∏è Warning: Model was not prepared for QAT, skipping quantization!")