# TẢI BỘ DỮ LIỆU

In [34]:
import os
import random
import time

import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.utils.data as data

import torchvision.transforms as transforms
import torchvision.datasets as datasets

from torchsummary import summary

import matplotlib.pyplot as plt
from PIL import Image

In [35]:
ROOT = './data'
train_data = datasets.MNIST(
    root=ROOT,
    train=True,
    download=True
)

test_data = datasets.MNIST(
    root=ROOT,
    train=False,
    download=True,
    transform=test_transforms
)

# TIỀN XỬ LÝ DỮ LIỆU

In [36]:
#   Split training: Validation = 0.9 : 0.1
VALID_RAIO = 0.9

n_train_examples = int(len(train_data) * VALID_RAIO)
n_valid_examples = len(train_data) - n_train_examples

train_data, valid_data = data.random_split(train_data, [n_train_examples, n_valid_examples])

# Compute mean and std for normalization
mean = train_data.dataset.data.float().mean() / 255
std = train_data.dataset.data.float().std() / 255

train_transforms = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[mean], std=[std])
])
test_transforms = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[mean], std=[std])
])

train_data.dataset.transform = train_transforms
valid_data.dataset.transform = test_transforms

#   Create dataloader
BATCH_SIZE = 256

train_dataloader = data.DataLoader(
    train_data,
    batch_size=BATCH_SIZE,
    shuffle=True
)

valid_dataloader = data.DataLoader(
    valid_data,
    batch_size=BATCH_SIZE,
)

# Mô hình LeNet

In [37]:
class LeNetClassifier(nn.Module):
    def __init__(self, num_classes=10):
        super(LeNetClassifier, self).__init__()
        # Convolutional layers
        self.conv1 = nn.Conv2d(1, 6, kernel_size=5, stride=1, padding=2)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(6, 16, kernel_size=5)

        # Fully connected layers
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(16 * 5 * 5, 120)  # Adjust input size
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, num_classes)

    def forward(self, x):
        # Convolutional layers
        x = self.pool(F.relu(self.conv1(x)))  # Output: [batch_size, 6, 14, 14]
        x = self.pool(F.relu(self.conv2(x)))  # Output: [batch_size, 16, 5, 5]

        # Fully connected layers
        x = self.flatten(x)                  # Output: [batch_size, 16*5*5]
        x = F.relu(self.fc1(x))              # Output: [batch_size, 120]
        x = F.relu(self.fc2(x))              # Output: [batch_size, 84]
        x = self.fc3(x)                      # Output: [batch_size, num_classes]
        return x

# HUẤN LUYỆN MÔ HÌNH

In [38]:
#   Training function
def train(model, optimizer, criterion, train_dataloader, device, epoch=0, log_interval=50):
    model.train()
    total_acc, total_count = 0, 0
    losses = []
    start_time = time.time()

    for idx, (inputs, labels) in enumerate(train_dataloader):
        inputs = inputs.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()

        predictions = model(inputs)

        # Compute loss
        loss = criterion(predictions, labels)
        losses.append(loss.item())

        # Backward
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.1)
        optimizer.step()
        total_acc += (predictions.argmax(1) == labels).sum().item()
        total_count += labels.size(0)
        if idx % log_interval == 0 and idx > 0:
            elapsed_time = time.time() - start_time
            print(
                "| Epoch {:3d} | {:5d}/{:5d} batches | "
                "| Accuracy {:8.3f}".format(
                    epoch, idx, len(train_dataloader), total_acc / total_count
                )
            )
            total_acc, total_count = 0, 0
            start_time = time.time()

    epoch_acc = total_acc / total_count
    epoch_loss = sum(losses) / len(losses)
    return epoch_acc, epoch_loss

# Evaluation function
def evaluate(model, criterion, device, valid_dataloader):
    model.eval()
    total_acc, total_count = 0, 0
    losses = []

    with torch.no_grad():
        for idx, (inputs, labels) in enumerate(valid_dataloader):
            inputs = inputs.to(device)
            labels = labels.to(device)

            predictions = model(inputs)

            loss = criterion(predictions, labels)
            losses.append(loss.item())

            total_acc += (predictions.argmax(1) == labels).sum().item()
            total_count += labels.size(0)

    epoch_acc = total_acc / total_count
    epoch_loss = sum(losses) / len(losses)
    return epoch_acc, epoch_loss

## Training

In [39]:
num_classes = len(train_data.dataset.classes)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

lenet = LeNetClassifier(num_classes=num_classes)
lenet.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(lenet.parameters())

num_epochs = 10
save_model = './model'

train_accs, train_losses = [], []
eval_accs, eval_losses = [], []
best_loss_eval = 100

for epoch in range(1, num_epochs + 1):
    epoch_start_time = time.time()
    # Training
    train_acc, train_loss = train(
        lenet, optimizer, criterion, train_dataloader, device, epoch
    )
    train_accs.append(train_acc)
    train_losses.append(train_loss)

    # Evaluation
    eval_acc, eval_loss = evaluate(lenet, criterion, device, valid_dataloader)
    eval_accs.append(eval_acc)
    eval_losses.append(eval_loss)

    # Save best model
    if eval_loss < best_loss_eval:
        best_loss_eval = eval_loss
        torch.save(lenet.state_dict(), save_model + '/lenet_model.pt')

    # Print loss, acc and epoch
    print("-" * 59)
    print(
        "| End of epoch {:3d} | Time: {:5.2f}s | Train Accuracy {:8.3f} | Train Loss {:8.3f} | "
        "| Valid Accuracy {:8.3f} | Valid Loss {:8.3f} |".format(
            epoch, time.time() - epoch_start_time, train_acc, train_loss, eval_acc, eval_loss
        )
    )
    print("-" * 59)

    # Load best model
    lenet.load_state_dict(torch.load(save_model + '/lenet_model.pt'))
    lenet.eval()

| Epoch   1 |    50/  211 batches | | Accuracy    0.653
| Epoch   1 |   100/  211 batches | | Accuracy    0.902
| Epoch   1 |   150/  211 batches | | Accuracy    0.929
| Epoch   1 |   200/  211 batches | | Accuracy    0.943
-----------------------------------------------------------
| End of epoch   1 | Time: 13.98s | Train Accuracy    0.952 | Train Loss    0.479 | | Valid Accuracy    0.960 | Valid Loss    0.136 |
-----------------------------------------------------------
| Epoch   2 |    50/  211 batches | | Accuracy    0.961
| Epoch   2 |   100/  211 batches | | Accuracy    0.966
| Epoch   2 |   150/  211 batches | | Accuracy    0.969
| Epoch   2 |   200/  211 batches | | Accuracy    0.973
-----------------------------------------------------------
| End of epoch   2 | Time: 14.37s | Train Accuracy    0.970 | Train Loss    0.105 | | Valid Accuracy    0.970 | Valid Loss    0.091 |
-----------------------------------------------------------
| Epoch   3 |    50/  211 batches | | Accura

# ĐÁNH GIÁ TRÊN TẬP TEST

In [41]:

test_dataloader = data.DataLoader(
    test_data,
    batch_size=BATCH_SIZE
)
test_acc, test_loss = evaluate(lenet, criterion, device, test_dataloader)
test_acc, test_loss

TypeError: default_collate: batch must contain tensors, numpy arrays, numbers, dicts or lists; found <class 'PIL.Image.Image'>