In [None]:
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
from PIL import Image
from sklearn.model_selection import train_test_split
import random

# **1. Scenes Classication**

# *1. Data preprocessing*

In [None]:
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

seed = 59
set_seed(seed)

In [None]:
data_path = '/kaggle/input/aio-datasets/Data/img_cls_scenes_classification/scenes_classification/train'
_, folders,_ = next(os.walk(data_path))
folders

In [None]:
count_dict = {}
labels = []
data_paths = []
for (i, label) in enumerate(folders):
    file_path = os.path.join(data_path, label)
    _, _, files_list = next(os.walk(file_path))
    count_dict[label] = len(files_list)
    for file in files_list:
        path = os.path.join(file_path, file)
        data_paths.append(path)
        labels.append(i)

print('data_len: ', len(data_paths), len(labels))
print(count_dict)

In [None]:
test_data_path = '/kaggle/input/aio-datasets/Data/img_cls_scenes_classification/scenes_classification/val'
_, test_folders,_ = next(os.walk(test_data_path))
test_folders

In [None]:
count_dict = {}
test_label = []
test_data = []
for (i, label) in enumerate(test_folders):
    file_path = os.path.join(test_data_path, label)
    _, _, files_list = next(os.walk(file_path))
    count_dict[label] = len(files_list)
    for file in files_list:
        path = os.path.join(file_path, file)
        test_data.append(path)
        test_label.append(i)

print('data_len: ', len(test_data), len(test_label))
print(count_dict)

In [None]:
plt.figure(figsize=(12, 6))
plt.bar(count_dict.keys(), count_dict.values())

In [None]:
train_data, valid_data, train_label, valid_label = train_test_split(data_paths, labels, test_size=0.2, shuffle=True)
print('train, valid, test: ', len(train_data), len(valid_data), len(test_data))

In [None]:
class SceneDataset(Dataset):
    def __init__(self, img_paths, label_paths, transform=None):
        self.img_paths = img_paths
        self.label_paths = label_paths
        self.transform = transform

    def __len__(self):
        return len(self.img_paths)
        
    def __getitem__(self, id):
        img = Image.open(self.img_paths[id]).convert('RGB')
        if self.transform:
            img = self.transform(img)
        return img, self.label_paths[id]


In [None]:
def transform(img, size=(224, 224, 3)):
    img = np.array(img)
    img = np.resize(img, size)
    img = np.transpose(img, (2, 0, 1))
    img = torch.tensor(img).float()
    img = img / 255.0
    return img

In [None]:
train_dataset = SceneDataset(train_data, train_label, transform=transform)
valid_dataset = SceneDataset(valid_data, valid_label, transform=transform)
test_dataset = SceneDataset(test_data, test_label, transform=transform)

In [None]:
train_batch_size = 64
valid_batch_size = 16

train_loader = DataLoader(train_dataset, batch_size=train_batch_size, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=valid_batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=valid_batch_size, shuffle=False)


# *2. Define Model*

In [None]:
device = ('cuda' if torch.cuda.is_available() else 'cpu')

***ResNet***
* The number of channels in the ***first module*** is the same 
as the number of input channel
* In the ***first residual block*** for 
each of the subsequent modules, the number of channels is***doubled***d compared with that o 
the previous module, and the height and width are halved.
s

In [None]:
class ResidualBlock(nn.Module):
    def __init__(self, in_channel, out_channel, stride=1):
        super(ResidualBlock, self).__init__()
        
        self.conv2d_1 = nn.Conv2d(in_channel, out_channel, 
                                  kernel_size=3, padding=1, stride=stride)
        
        self.bn1 = nn.BatchNorm2d(num_features=out_channel)
        
        self.conv2d_2 = nn.Conv2d(out_channel, out_channel, 
                                      kernel_size=3, padding=1, stride=1)
        
        self.bn2 = nn.BatchNorm2d(num_features=out_channel)
        
        if (stride != 1) or (in_channel != out_channel):
            self.downsampling = nn.Sequential(
                nn.Conv2d(in_channel, out_channel, kernel_size=1, stride=2),
                nn.BatchNorm2d(out_channel)
            )
            
        else:
            self.downsampling = None
            
            
    def forward(self, x):
        res_net = x.clone()
        x = F.relu(self.bn1(self.conv2d_1(x)))
        x = self.bn2(self.conv2d_2(x))
        if self.downsampling:
            res_net = self.downsampling(res_net)
            x += res_net
        else:
            x += res_net
        return F.relu(x)

class FirstBlock(nn.Module):
    def __init__(self, in_channel, out_channel):
        super().__init__()
        self.conv = nn.Conv2d(in_channel, out_channel, kernel_size=7, stride=2, padding=3)
        self.bn = nn.BatchNorm2d(out_channel)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

    def forward(self, x):
        return self.maxpool(self.bn(F.relu(self.conv(x))))

class ResNet(nn.Module):
    
    def __init__(self, first_block, residual_block, in_channel ,out_classes):
        super().__init__()
        self.conv1 = first_block(in_channel=3, out_channel=64)
        self.conv2 = self.create_module(residual_block=ResidualBlock, in_channel=64, out_channel=64, 
                                          num_residual_block=2, stride=1)
        self.conv3 = self.create_module(residual_block=ResidualBlock, in_channel=64, out_channel=128, 
                                          num_residual_block=2, stride=2)
        self.conv4 = self.create_module(residual_block=ResidualBlock, in_channel=128, out_channel=256, 
                                          num_residual_block=2, stride=2)
        self.conv5 = self.create_module(residual_block=ResidualBlock, in_channel=256, out_channel=512, 
                                          num_residual_block=2, stride=2)
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.flatten = nn.Flatten()
        self.fc = nn.Linear(512, out_classes)
                                
    def create_module(self, residual_block, 
                      in_channel, out_channel, 
                      num_residual_block, stride=1):
        module = []
        for i in range(num_residual_block):
            if i == 0:
                module.append(residual_block(in_channel, out_channel, stride=stride))
            else:
                module.append(residual_block(out_channel, out_channel, stride=1))

        return nn.Sequential(*module)

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.conv5(x)
        x = self.avg_pool(x)
        x = self.flatten(x)
        x = self.fc(x)
        return x

***DenseNet***

In [None]:
class DenseBlock(nn.Module):
    def __init__(self, num_conv, in_channel, growth_rate):
        super().__init__()
        conv_blk = []
        for i in range(num_conv):
            conv_blk.append(self.conv_block(in_channel + i*growth_rate, growth_rate, growth_rate))   

        self.conv_module = nn.Sequential(*conv_blk)

    def conv_block(self, in_channel, out_channel, growth_rate):
        return nn.Sequential(
            # botle neck with 4*growth_rate channel before each 3×3 convolution
            nn.BatchNorm2d(in_channel),
            nn.ReLU(),
            nn.Conv2d(in_channel, 4*growth_rate, kernel_size=1, stride=1),
            # conv 3x3
            nn.BatchNorm2d(4*growth_rate),
            nn.ReLU(),
            nn.Conv2d(4*growth_rate, growth_rate, kernel_size=3, stride=1, padding=1)
        )                            
                
    def forward(self, x):
        for conv_blk in self.conv_module:
            y = conv_blk(x)
            x = torch.cat((x, y), dim=1)
        return x 

class DenseNet(nn.Module):
    def __init__(self, dense_block,
                 in_channel, growth_rate,
                 transition_channel=10, 
                 num_convs=[6, 12, 24, 16],
                 n_classes = 1000):
        super().__init__()
        # first layer:
        dense_net = []
        self.conv1 = self.first_layer(in_channel, growth_rate)
        dense_module = []
        
        in_channel = 2*growth_rate
        for i, num_conv in enumerate(num_convs):
            dense_module.append(dense_block(num_conv, in_channel, growth_rate))
            in_channel = in_channel + num_conv*growth_rate
            
            if i != (len(num_convs) - 1):
                dense_module.append(self.transition_layer(in_channel, transition_channel))
            in_channel = transition_channel
        self.dense_net = nn.Sequential(*dense_module)   
        self.avgpool = nn.AvgPool2d(kernel_size=7)
        self.flatten = nn.Flatten()
        self.fc = nn.Linear(522, n_classes)                                    
    
    def first_layer(self, in_channel, growth_rate):
        return nn.Sequential(
                   nn.Conv2d(in_channel, 2*growth_rate, kernel_size=7, stride=2, padding=3),
                   nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
               )
    def transition_layer(self, in_channel, out_channel):
        return nn.Sequential(
            # nn.BatchNorm2d(in_channel),
            nn.ReLU(),
            nn.Conv2d(in_channel, out_channel, kernel_size=1),
            nn.AvgPool2d(kernel_size=2, stride=2)
        )

  
    def forward(self, x):
        x = self.conv1(x)
        x = self.dense_net(x)
        x = self.avgpool(x)
        x = self.flatten(x)
        x = self.fc(x)
        return x

In [None]:
!pip install torchsummary
from torchsummary import summary

In [None]:
# denseblock = DenseBlock(num_conv=64, in_channel=64, growth_rate=32).to(device)
# summary(denseblock, (64, 56, 56))

In [None]:
densenet121 = DenseNet(dense_block=DenseBlock,
                      transition_channel=10,
                      in_channel=3, growth_rate=32, 
                      num_convs=[6, 12, 24, 16],
                      n_classes=6).to(device)
densenet121= nn.DataParallel(densenet121)
summary(densenet121, (3, 224, 224))

# ***3. Training***

In [None]:
def compute_accucary(predicted, labels):
    predicted = np.array(predicted)
    labels = np.array(labels)
    return np.sum(predicted == labels) * 100 / predicted.shape[0]

In [None]:
def train_one_epoch(train_loader, valid_loader, 
                    model, device, 
                    loss_func, optimizer):
    model.train()
    train_batch_loss = 0.0
    train_labels = []
    train_preds = []
    
    for i, (train_imgs, batch_train_labels) in enumerate(train_loader):
        train_imgs = train_imgs.to(device)
        batch_train_labels = batch_train_labels.to(device)
        optimizer.zero_grad()
        outputs = model(train_imgs)
        train_loss = loss_func(outputs, batch_train_labels)
        train_batch_loss += train_loss.item()
        train_loss.backward()
        optimizer.step()
        train_preds += (outputs.argmax(dim=1).cpu().tolist())
        train_labels += (batch_train_labels.cpu().tolist())
        
    
    avg_batch_loss = train_batch_loss / len(train_loader)
    train_accuracy_score = compute_accucary(train_preds, train_labels)
    
    return avg_batch_loss, train_accuracy_score

In [None]:
# training
import time

def train(train_loader, valid_loader, model, device, num_epochs, loss_func, optimizer):
    max_valid_acc = 0.0
    
    train_losses = []
    valid_losses = []
    train_accuracies = []
    valid_accuracies = []
    
    for epoch in range(num_epochs):
        start_time = time.time()
        avg_train_loss,  train_accuracy_score = train_one_epoch(train_loader, valid_loader, 
                                                                model, device, 
                                                                loss_func, optimizer)
    
        # evaluate
        model.eval()
        valid_batch_loss = 0.0
        valid_labels = []
        valid_preds = []
        with torch.no_grad():
            for valid_imgs, batch_valid_labels in valid_loader:
                valid_imgs = valid_imgs.to(device)
                batch_valid_labels = batch_valid_labels.to(device)
                predicted = model(valid_imgs)
                val_loss = loss_func(predicted, batch_valid_labels)
                valid_batch_loss += val_loss.item()
                valid_preds += (predicted.argmax(dim=1).cpu().tolist())
                valid_labels += (batch_valid_labels.cpu().tolist())
    
        avg_valid_loss = valid_batch_loss / len(valid_loader)
        valid_accuracy_score = compute_accucary(valid_preds, valid_labels)
    
        # save log
        train_losses.append(avg_train_loss)
        valid_losses.append(avg_valid_loss)
        train_accuracies.append(train_accuracy_score)
        valid_accuracies.append(valid_accuracy_score)
        
        #save best model
        if max_valid_acc < valid_accuracy_score:
            max_valid_acc = valid_accuracy_score
            print(f'save best model at epoch {epoch+1}')
            torch.save(model.state_dict(), './best_model.pt')
        # print log
        print(f"Epoch {epoch+1}/{num_epochs} | {time.time() - start_time}s:")
        print(f"Train Loss: {avg_train_loss:.4f} | Train Accuracy: {train_accuracy_score:.2f}% | Valid Loss: {avg_valid_loss:.4f} | Valid Accuracy: {valid_accuracy_score:.2f}%")
        print('-------------------------------------------------------------------------------------------')
    return train_losses, valid_losses, train_accuracies, valid_accuracies

In [None]:
# define hyperparameters

num_epochs = 20
lr = 0.001
loss_func = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(densenet121.parameters(), lr=lr)

train_losses, valid_losses, train_accuracies, valid_accuracies = train(train_loader, valid_loader, 
                                                                       densenet121, device, num_epochs, 
                                                                       loss_func, optimizer)

In [None]:
# Plot log
fig, ax = plt.subplots(1, 2, figsize=(20, 10))
ax[0].plot(range(1, len(train_losses) + 1), train_losses, label='train_loss', color='blue')
ax[0].plot(range(1, len(valid_losses) + 1), valid_losses, label='valid_loss', color='orange')
ax[0].set_xlabel('epoch')
ax[0].set_ylabel('loss')
ax[0].legend()

ax[1].plot(range(1, len(train_accuracies) + 1), train_accuracies, label='train_accuracy', color='blue')
ax[1].plot(range(1, len(valid_accuracies) + 1), valid_accuracies, label='valid_accuracy', color='orange')
ax[1].set_xlabel('epoch')
ax[1].set_ylabel('accuracy')
ax[1].legend()

In [None]:
model = DenseNet(dense_block=DenseBlock,
                      transition_channel=10,
                      in_channel=3, growth_rate=32, 
                      num_convs=[6, 12, 24, 16],
                      n_classes=6).to(device).to(device)
model.load_state_dict(torch.load('./best_model.pt', weights_only=True))

In [None]:
model.eval()
test_batch_loss = 0.0
test_labels = []
test_preds = []
with torch.no_grad():
    for test_imgs, batch_test_labels in test_loader:
        test_imgs = test_imgs.to(device)
        batch_test_labels = batch_test_labels.to(device)
        predicted = model(test_imgs)
        test_loss = loss_func(predicted, batch_test_labels)
        test_batch_loss += test_loss.item()
        test_preds += (predicted.argmax(dim=1).cpu().tolist())
        test_labels += (batch_test_labels.cpu().tolist())

avg_test_loss = test_batch_loss / len(test_loader)
test_accuracy_score = compute_accucary(test_preds, test_labels)

print(f"Test_loss: {avg_test_loss}")
print(f"Test_acc: {test_accuracy_score}")