In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
%pip install pillow
%pip install pillow_heif

In [None]:
import glob
from torchvision.transforms.v2 import ToTensor
import torchvision.transforms.v2 as transforms
from torchvision.io import decode_image
from torchvision.transforms.v2.functional import to_image
import torch
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
import os
from torchvision import models
import torch.nn as nn
from PIL import Image
import pillow_heif
pillow_heif.register_heif_opener()

In [None]:
image_lists = []
data_dir = '/kaggle/input/do-an-ml-an/ML-Do-An'

for subfolder in glob.glob(os.path.join(data_dir, '*', 'hand_written_digit', '??52????')):
    for num in range(10):
        pattern = os.path.join(subfolder, f'{num}_*.*')
        matched = glob.glob(pattern)
        image_lists.extend(matched)

print(f'FOUND {len(image_lists)} IMAGES.')

In [None]:
class custom_image_dataset(Dataset):
    def __init__(self, image_lists, transform=None):
        self.image_lists = image_lists
        self.transform = transform

    def __read_image(self, img_path):
        try:
            image = decode_image(img_path, mode="RGB")
            label = int(os.path.basename(img_path)[0])
            if self.transform:
                image = self.transform(image)
            return image, label
        except:
            return torch.zeros(3, 224, 224), 0

    def __getitem__(self, idx):
        return self.__read_image(self.image_lists[idx])

    def __len__(self):
        return len(self.image_lists)

In [None]:
aug_train_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomResizedCrop((224, 224), scale=(0.8, 1.0), ratio=(0.9, 1.1)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=15),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.1, hue=0.05),
    transforms.RandomAffine(degrees=15),
    transforms.RandomVerticalFlip(),
    transforms.ToImage(),
    transforms.ToDtype(torch.float32, scale=True), # ToTensor
    transforms.Normalize(   [0.485, 0.456, 0.406],
                            [0.229, 0.224, 0.225])
])

non_aug_train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToImage(),
    transforms.ToDtype(torch.float32, scale=True),
    transforms.Normalize(   [0.485, 0.456, 0.406],
                            [0.229, 0.224, 0.225])
])

test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToImage(),
    transforms.ToDtype(torch.float32, scale=True),
    transforms.Normalize(   [0.485, 0.456, 0.406],
                            [0.229, 0.224, 0.225])
])


In [None]:
split = int(0.9 * len(image_lists))

aug_train_dataset = custom_image_dataset( image_lists[:split],
                                    transform=aug_train_transform )

non_aug_train_dataset = custom_image_dataset( image_lists[:split],
                                    transform=non_aug_train_transform )

test_dataset = custom_image_dataset( image_lists[split:],
                                    transform=test_transform )

aug_train_loader = DataLoader(aug_train_dataset, batch_size=64, shuffle=True, num_workers=0)

non_aug_train_loader = DataLoader(non_aug_train_dataset, batch_size=64, shuffle=True, num_workers=0)

test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=0)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using {device} device")

In [None]:
torch.backends.cudnn.benchmark = True

In [None]:
model_effb1 = models.efficientnet_b1(weights='IMAGENET1K_V1')
num_ftrs_eff = model_effb1.classifier[1].in_features
model_effb1.classifier = nn.Sequential(
    nn.Dropout(p=0.3, inplace=True),
    nn.Linear(in_features=num_ftrs_eff, out_features=10)  # 10 digits
)

model_effb1 = model_effb1.to(device)
optimizer_b1 = torch.optim.SGD(model_effb1.parameters(), lr=0.001, momentum=0.9)
scheduler_b1 = torch.optim.lr_scheduler.StepLR(optimizer_b1, step_size=7, gamma=0.1)

In [None]:
criterion = nn.CrossEntropyLoss()

In [None]:
import time
import copy
import csv
from tqdm import tqdm
import torch

def train_model(model, criterion, optimizer, scheduler,
                train_loader, test_loader, instance = 'model', num_epochs=25):
    since = time.time()
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    log_rows = []

    log_path = f'/kaggle/working/{instance}_training_log.csv'

    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
                dataloader = train_loader
            else:
                model.eval()
                dataloader = test_loader

            running_loss = 0.0
            running_corrects = 0

            for inputs, labels in tqdm(dataloader, desc=f'{phase} epoch {epoch}'):
                inputs = inputs.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / len(dataloader.dataset)
            epoch_acc = running_corrects.double() / len(dataloader.dataset)

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            log_rows.append({
                'epoch': epoch,
                'phase': phase,
                'loss': epoch_loss,
                'accuracy': epoch_acc.item()
            })

            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

    time_elapsed = time.time() - since
    print(f'\nTraining complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best val Acc: {best_acc:.4f}')

    with open(log_path, 'w', newline='') as csvfile:
        fieldnames = ['epoch', 'phase', 'loss', 'accuracy']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        for row in log_rows:
            writer.writerow(row)

    print(f"Logs saved to {log_path}")

    model.load_state_dict(best_model_wts)
    return model

In [None]:
model_b1_aug = train_model(model_effb1,  
                           criterion, optimizer_b1, 
                           scheduler_b1, 
                           aug_train_loader, 
                           test_loader, instance='effb1_aug', num_epochs=25)

In [None]:
modelb1_save_path = '/kaggle/working/effb1_digits_weights.pth'
torch.save(model_b1_aug.state_dict(), modelb1_save_path)
print(f"Model weights saved to {modelb1_save_path}")

In [None]:
model_b1_non_aug = train_model(model_effb1, 
                               criterion, optimizer_b1, 
                               scheduler_b1, non_aug_train_loader, 
                               test_loader, instance='effb1_non_aug', 
                               num_epochs=25)

In [None]:
non_aug_modelb1_save_path = '/kaggle/working/non_aug_effb1_digits_weights.pth'
torch.save(model_b1_non_aug.state_dict(), non_aug_modelb1_save_path)
print(f"Model weights saved to {non_aug_modelb1_save_path}")