In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
# for dirname, _, filenames in os.walk('/kaggle/input'):
#     for filename in filenames:
#         print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
# # Mount Google Drive
# from google.colab import drive
# drive.mount('/content/drive')

In [None]:
!pip install pillow_heif

In [None]:
import os
import glob
from PIL import Image
import pillow_heif
pillow_heif.register_heif_opener()

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import models
import torchvision.transforms.v2 as T
from torchvision.transforms.v2.functional import to_image
import pandas as pd
from tqdm import tqdm
import time
import copy
import matplotlib.pyplot as plt
import random


In [None]:
# Đường dẫn dữ liệu
source_path = r'/kaggle/input/do-an-ml-an/ML-Do-An'
image_paths = []

for category_dir in glob.glob(os.path.join(source_path, '*', 'hand_written_digit', '??52????')):
    for digit in range(10):
        search_pattern = os.path.join(category_dir, f'{digit}_*.*')
        found_images = glob.glob(search_pattern)
        image_paths.extend(found_images)

print(f"Total images collected: {len(image_paths)}")

In [None]:
# Dataset cho ảnh gán nhãn
class DigitDataset(Dataset):
    def __init__(self, paths, aug=None):
        self.paths = paths
        self.aug = aug

    def __len__(self):
        return len(self.paths)

    def __load_image(self, file_path):
        try:
            img = Image.open(file_path).convert("RGB")
            label = int(os.path.basename(file_path)[0])
            if self.aug:
                img = self.aug(img)
            return img, label
        except Exception as ex:
            print(f"Cannot read {file_path}: {ex}")
            return None # torch.zeros(3, 288, 288), 0

    def __getitem__(self, index):
        return self.__load_image(self.paths[index])

In [None]:
# Augmentation cho ảnh chữ số 0-9

#Chỉ resize ảnh 
train_no_aug = T.Compose([
    T.Resize((288, 288)),                            
    T.ToImage(),                                     
    T.ToDtype(torch.float32, scale=True),
    # T.Normalize([0.485, 0.456, 0.406],                
    #             [0.229, 0.224, 0.225])
])

#Tăng cường cơ bản
train_aug_basic = T.Compose([
    T.Resize((288, 288)),                            # Resize ảnh về kích thước cố định
    T.RandomRotation(degrees=15),                    # Xoay ảnh ngẫu nhiên ±15 độ
    T.ToImage(),                                     
    T.ToDtype(torch.float32, scale=True),            
    T.Normalize([0.485, 0.456, 0.406],                
                [0.229, 0.224, 0.225])
])

#Tăng cường nâng cao
train_aug_advanced = T.Compose([
    T.Resize((288, 288)),                            # Resize ảnh về kích thước cố định
    T.RandomRotation(degrees=15),                    # Xoay ảnh ngẫu nhiên ±15 độ
    T.RandomHorizontalFlip(p=0.3),                   # Lật ngang ảnh với xác suất 30%
    T.RandomAffine(degrees=0, translate=(0.1, 0.1)), # Dịch ảnh theo trục x, y (tối đa 10%)
    T.ColorJitter(brightness=0.2, contrast=0.2),     # Thay đổi độ sáng & tương phản
    T.RandomPerspective(distortion_scale=0.2, p=0.3),# Biến dạng phối cảnh nhẹ
    T.ToImage(),                                     
    T.ToDtype(torch.float32, scale=True),            
    T.Normalize([0.485, 0.456, 0.406],                
                [0.229, 0.224, 0.225])
])

test_aug = T.Compose([
    T.Resize((288, 288)),
    T.ToImage(),
    T.ToDtype(torch.float32, scale=True),
    T.Normalize([0.485, 0.456, 0.406],
                [0.229, 0.224, 0.225])
])

In [None]:
# # Split dữ liệu không augment
# train_size = int(0.9 * len(image_paths))
# train_set = DigitDataset(image_paths[:train_size], aug= train_no_aug)
# val_set = DigitDataset(image_paths[train_size:], aug= test_aug)

# Split dữ liệu có augment
train_size = int(0.9 * len(image_paths))
train_set = DigitDataset(image_paths[:train_size], aug= train_no_aug)
val_set = DigitDataset(image_paths[train_size:], aug= test_aug)

In [None]:
import matplotlib.pyplot as plt
import torch

# Đảo normalize nếu ảnh RGB
mean = torch.tensor([0.485, 0.456, 0.406]).view(3, 1, 1)
std = torch.tensor([0.229, 0.224, 0.225]).view(3, 1, 1)

plt.figure(figsize=(20, 4))

start_idx = 1000
end_idx = 1010

for i, idx in enumerate(range(start_idx, end_idx + 1)):
    img, label = train_set[idx]

    img_denorm = img * std + mean
    img_np = img_denorm.permute(1, 2, 0).numpy()

    plt.subplot(1, end_idx - start_idx + 1, i + 1)
    plt.imshow(img_np)
    plt.title(f"Label: {label}")
    plt.axis('off')

plt.tight_layout()
plt.show()


In [None]:
print(len(train_set))
print(len(val_set))

In [None]:
def remove_none_samples(batch):
    return torch.utils.data.dataloader.default_collate([item for item in batch if item is not None])

In [None]:
# Dataloader
train_loader = DataLoader(train_set, batch_size=64, shuffle=True, num_workers=2, collate_fn=remove_none_samples)
val_loader = DataLoader(val_set, batch_size=64, shuffle=True, num_workers=2, collate_fn=remove_none_samples)

In [None]:
# # Model setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Computation will run on: {device}")

# #model = models.efficientnet_b1(weights=models.EfficientNet_B1_Weights.IMAGENET1K_V2)
# model = models.efficientnet_b2(weights=models.EfficientNet_B2_Weights.IMAGENET1K_V1)
# in_features = model.classifier[1].in_features
# model.classifier = nn.Sequential(
#     nn.Dropout(p=0.3, inplace=True),
#     nn.Linear(in_features, 10)
# )
# model = model.to(device)

model = models.regnet_y_1_6gf(weights=models.RegNet_Y_1_6GF_Weights.IMAGENET1K_V1)
in_features = model.fc.in_features
model.fc = nn.Sequential(
    nn.Dropout(p=0.3, inplace=True),
    nn.Linear(in_features, 10)
)

model = model.to(device)

In [None]:
# Optimizer, Loss
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-4)

In [None]:
def training(model, loss_fn, optimizer,
             train_loader, val_loader,
             num_epochs=15, patience=2, model_path='best_model.pth'):
    
    best_score = 0.0
    best_weights = copy.deepcopy(model.state_dict())
    no_gain = 0
    start_time = time.time()

    for ep in range(num_epochs):
        print(f"\nEpoch {ep}/{num_epochs-1}")
        print('-'*30)

        for phase in ['train', 'val']:
            model.train() if phase == 'train' else model.eval()
            loader = train_loader if phase == 'train' else val_loader

            epoch_loss = 0.0
            correct = 0

            for imgs, lbls in tqdm(loader):
                imgs, lbls = imgs.to(device), lbls.to(device)
                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    preds = model(imgs)
                    loss = loss_fn(preds, lbls)
                    _, predicted = torch.max(preds, 1)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                epoch_loss += loss.item() * imgs.size(0)
                correct += torch.sum(predicted == lbls.data)

            acc = correct.double() / len(loader.dataset)
            avg_loss = epoch_loss / len(loader.dataset)
            print(f"{phase.capitalize()} Loss: {avg_loss:.4f}, Acc: {acc:.4f}")

            if phase == 'val':
                if acc > best_score:
                    best_score = acc
                    best_weights = copy.deepcopy(model.state_dict())
                    no_gain = 0
                    torch.save(model.state_dict(), model_path)
                    print(f"Saved best model at acc: {best_score:.4f}")
                else:
                    no_gain += 1
                    print(f"No improvement after {no_gain} epoch(s)")

        if no_gain >= patience:
            print("Early stopping.")
            break

    duration = time.time() - start_time
    print(f"\nTraining completed in {duration//60:.0f}m {duration%60:.0f}s")
    print(f"Best validation accuracy: {best_score:.4f}")

    model.load_state_dict(best_weights)
    return model


In [None]:
# Huấn luyện
model_save_path = '/kaggle/working/best_model.pth'
model = training(model, loss_fn, optimizer,train_loader, val_loader,
                 num_epochs=15, patience=2,model_path=model_save_path)

In [None]:
# Dataset ảnh không nhãn
eval_dir = r'/kaggle/input/data-2025'
eval_paths = []

for dirpath, _, files in os.walk(eval_dir):
    for fname in files:
        if fname.lower().endswith(('.jpg', '.jpeg', '.png', '.heic', '.jfif')):
            eval_paths.append(os.path.join(dirpath, fname))
        else:
            print(f"Skipping: {fname}")

print(f"Total validate images: {len(eval_paths)}")

In [None]:
class EvalDataset(Dataset):
    def __init__(self, img_list, transform=None):
        self.img_list = img_list
        self.transform = transform

    def __getitem__(self, idx):
        path = self.img_list[idx]
        try: 
            # Bỏ qua những file quá bé
            if os.path.getsize(path) < 10:
                return torch.zeros((3, 288, 288), dtype=torch.float32), os.path.basename(path)

            img = Image.open(path).convert("RGB")
            img = to_image(img)
            if self.transform:
                img = self.transform(img)
            return img, os.path.basename(path)
        except Exception as err:
            print(f"Failed: {path} - {err}")
            return torch.zeros((3, 288, 288), dtype=torch.float32), os.path.basename(path)

    def __len__(self):
        return len(self.img_list)

In [None]:
# DataLoader
eval_dataset = EvalDataset(eval_paths, transform=test_aug)
eval_loader = DataLoader(eval_dataset, batch_size=64, shuffle=True, num_workers=2)

In [None]:
# Load model để dự đoán
model.load_state_dict(torch.load(model_save_path, map_location=device))
model = model.to(device)
model.eval()

In [None]:
# Dự đoán
all_preds, all_names = [], []
with torch.no_grad():
    for imgs, names in tqdm(eval_loader):
        imgs = imgs.to(device)
        outputs = model(imgs)
        _, preds = torch.max(outputs, 1)

        all_preds.extend(preds.cpu().numpy())
        all_names.extend(names)

In [None]:
# Xuất kết quả
results_df = pd.DataFrame({'Filename': all_names, 'Prediction': all_preds})
results_df.to_csv('/kaggle/working/predictions.csv', header=False, index=False)
print("Done writing predictions.")