In [None]:
from google.colab import drive

drive.mount('/content/gdrive')
path = '/content/gdrive/My Drive/MyCode/animal-code'
%cd {path}

Mounted at /content/gdrive
/content/gdrive/.shortcut-targets-by-id/1PX4wlgoBojg5BTTQ3yOEDoz8vk2IF1ju/animal-code


In [None]:
import os
import shutil
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from torch.optim.lr_scheduler import MultiStepLR
from torchvision.transforms import Compose, ToTensor, Resize
from torch.optim import SGD, Adagrad, Adam
from torch.utils.tensorboard import SummaryWriter
import pickle
import cv2
import numpy as np
from google.colab.patches import cv2_imshow
from torchsummary import summary
import numpy as np
from sklearn.metrics import accuracy_score, confusion_matrix
import matplotlib.pyplot as plt
import argparse
from tqdm.autonotebook import tqdm

import warnings
warnings.filterwarnings("ignore")

In [None]:
class AnimalDataset(Dataset):
    def __init__(self, root = "animals", train=True, transform = None):
        self.categories = ["butterfly", "cat", "chicken", "cow", "dog", "elephant", "horse", "sheep", "spider", "squirrel"]

        if train:
            data_path = os.path.join(root, "train")
        else:
            data_path = os.path.join(root, "test")

        self.image_paths = []
        self.labels = []

        for category in self.categories:
            category_path = os.path.join(data_path, category)
            for image_name in os.listdir(category_path):
                image_path = os.path.join(category_path, image_name)
                self.image_paths.append(image_path)
                self.labels.append(self.categories.index(category))
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, item):
        image = cv2.imread(self.image_paths[item])
        if self.transform:
            image = self.transform(image)
        label = self.labels[item]

        return image, label

In [None]:
class CNN(nn.Module):
    def __init__(self, num_classes = 10):
        super().__init__()

        self.conv1 = self.make_block(in_channels = 3, out_channels = 16)
        self.conv2 = self.make_block(in_channels = 16, out_channels = 32)
        self.conv3 = self.make_block(in_channels = 32, out_channels = 64)
        self.conv4 = self.make_block(in_channels = 64, out_channels = 64)
        self.conv5 = self.make_block(in_channels = 64, out_channels = 64)

        self.fc1 = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(in_features=3136, out_features=1024),
            nn.LeakyReLU()
        )

        self.fc2 = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(in_features=1024, out_features=512),
            nn.LeakyReLU()
        )

        self.fc3 = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(in_features=512, out_features=num_classes),
            nn.LeakyReLU()
        )

    def make_block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.Conv2d(in_channels = in_channels, out_channels = out_channels, kernel_size = 3, padding = 1),
            nn.BatchNorm2d(num_features=out_channels),
            nn.LeakyReLU(),
            nn.Conv2d(in_channels = out_channels, out_channels = out_channels, kernel_size = 3, padding = 1),
            nn.BatchNorm2d(num_features=out_channels),
            nn.LeakyReLU(),
            nn.MaxPool2d(kernel_size=2) # Mặc định strike = kernel_size
          )

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.conv5(x)
        x = x.view(x.shape[0], -1) #giữ lại chiều đầu tiên, gộp các chiều còn lại
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.fc3(x)
        return x

In [None]:
def plot_confusion_matrix(writer, cm, class_names, epoch):
    """
    Returns a matplotlib figure containing the plotted confusion matrix.

    Args:
       cm (array, shape = [n, n]): a confusion matrix of integer classes
       class_names (array, shape = [n]): String names of the integer classes
    """

    figure = plt.figure(figsize=(20, 20))
    # color map: https://matplotlib.org/stable/gallery/color/colormap_reference.html
    plt.imshow(cm, interpolation='nearest', cmap="cool")
    plt.title("Confusion matrix")
    plt.colorbar()
    tick_marks = np.arange(len(class_names))
    plt.xticks(tick_marks, class_names, rotation=45)
    plt.yticks(tick_marks, class_names)

    # Normalize the confusion matrix.
    cm = np.around(cm.astype('float') / cm.sum(axis=1)[:, np.newaxis], decimals=2)

    # Use white text if squares are dark; otherwise black.
    threshold = cm.max() / 2.

    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            color = "white" if cm[i, j] > threshold else "black"
            plt.text(j, i, cm[i, j], horizontalalignment="center", color=color)

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    writer.add_figure('confusion_matrix', figure, epoch)


In [None]:
def get_args():
    parser = argparse.ArgumentParser(description='Animal classifier')
    parser.add_argument('-p', '--data_path', type=str, default="./animals")
    parser.add_argument('-b', '--batch_size', type=int, default=16)
    parser.add_argument('-e', '--epochs', type=int, default=100)
    parser.add_argument('-l', '--lr', type=float, default=1e-3)  # SGD: lr = 1e-2. Adam: lr = 1e-3
    parser.add_argument('-s', '--image_size', type=int, default=224)
    parser.add_argument('-c', '--checkpoint_path', type=str, default=None)
    parser.add_argument('-t', '--tensorboard_path', type=str, default="tensorboard")
    parser.add_argument('-r', '--trained_path', type=str, default="trained_models")
    args, unknown = parser.parse_known_args()
    return args

In [None]:
def train(args):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    transform = Compose([
        ToTensor(),
        Resize((args.image_size, args.image_size))
    ])
    train_set = AnimalDataset(root=args.data_path, train=True, transform=transform)
    valid_set = AnimalDataset(root=args.data_path, train=False, transform=transform)

    training_params = {
        "batch_size": args.batch_size,
        "shuffle": True,
        "drop_last": True,
        "num_workers": 6
    }

    valid_params = {
        "batch_size": args.batch_size,
        "shuffle": False,
        "drop_last": False,
        "num_workers": 6
    }
    train_dataloader = DataLoader(train_set, **training_params)
    valid_dataloader = DataLoader(valid_set, **valid_params)

    model = CNN(num_classes=len(train_set.categories)).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr=args.lr)
    scheduler = MultiStepLR(optimizer, milestones=[30, 60, 90], gamma=0.1)

    if args.checkpoint_path and os.path.isfile(args.checkpoint_path):
        checkpoint = torch.load(args.checkpoint_path)
        model.load_state_dict(checkpoint["model"])
        optimizer.load_state_dict(checkpoint["optimizer"])
        start_epoch = checkpoint["epoch"] + 1
        best_acc = checkpoint["best_acc"]
    else:
        start_epoch = 0
        best_acc = 0

    if os.path.isdir(args.tensorboard_path):
        shutil.rmtree(args.tensorboard_path)
    os.mkdir(args.tensorboard_path)

    if not os.path.isdir(args.trained_path):
        os.mkdir(args.trained_path)
    writer = SummaryWriter(args.tensorboard_path)
    num_iters = len(train_dataloader)

    for epoch in range(start_epoch, args.epochs):
        # TRAIN
        model.train()
        losses = []
        progress_bar = tqdm(train_dataloader, colour="yellow")
        for iter, (images, labels) in enumerate(progress_bar):
            # Move tensor to configured device:
            images = images.to(device)
            labels = labels.to(device)

            # Forward pass
            predictions = model(images)
            loss = criterion(predictions, labels)

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            loss_value = loss.item()
            progress_bar.set_description("Epoch {}/{}. Loss value: {:.4f}".format(epoch + 1, args.epochs, loss_value))
            losses.append(loss_value)
            writer.add_scalar("Train/Loss", np.mean(losses), epoch*num_iters+iter)

        # VALIDATE
        model.eval()
        losses = []
        all_predictions = []
        all_gts = []
        with torch.no_grad():  # with torch.inference_mode():  # pytorch 1.9
            for iter, (images, labels) in enumerate(valid_dataloader):
                # Move tensor to configured device:
                images = images.to(device)
                labels = labels.to(device)

                # Forward pass
                predictions = model(images)
                max_idx = torch.argmax(predictions, 1)

                # _, max_idx = torch.max(predictions, 1)
                loss = criterion(predictions, labels)
                losses.append(loss.item())
                all_gts.extend(labels.tolist())
                all_predictions.extend(max_idx.tolist())

        writer.add_scalar("Val/Loss", np.mean(losses), epoch)
        acc = accuracy_score(all_gts, all_predictions)
        writer.add_scalar("Val/Accuracy", acc, epoch)
        conf_matrix = confusion_matrix(all_gts, all_predictions)
        plot_confusion_matrix(writer, conf_matrix, [i for i in range(len(train_set.categories))], epoch)

        checkpoint = {
            "model": model.state_dict(),
            "optimizer": optimizer.state_dict(),
            "epoch": epoch,
            "best_acc": best_acc,
            "batch_size": args.batch_size
        }

        torch.save(checkpoint, os.path.join(args.trained_path, "last.pt"))
        if acc > best_acc:
            torch.save(checkpoint, os.path.join(args.trained_path, "best.pt"))
            best_acc = acc
        scheduler.step()


In [1]:
if __name__ == '__main__':
    args = get_args()
    train(args)