In [None]:
import torch
import torch.nn as nn
import os
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image
from torch.utils.data import Dataset , DataLoader
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import joblib

In [None]:
def setSeed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
seed = 59
setSeed(seed)

In [None]:
root_dir = '/kaggle/input/weather-dataset/dataset'

classes = {
    label_idx : class_name \
    for label_idx, class_name in enumerate(sorted(os.listdir(root_dir)))
}
classes

In [None]:
img_paths = []
labels = []
for label_idx, class_name in classes.items():
    class_dir = os.path.join(root_dir, class_name)
    for img_filename in os.listdir(class_dir):
        img_path = os.path.join(class_dir, img_filename)
        img_paths.append(img_path)
        labels.append(label_idx)

In [None]:
val_size = 0.2
test_size = 0.125
is_shuffle = True
X_train, X_val, y_train, y_val = train_test_split(img_paths, labels, test_size = val_size, random_state = seed, shuffle = is_shuffle)

X_train, X_test, y_train, y_test = train_test_split(X_train, y_train, test_size = test_size, random_state = seed, shuffle = is_shuffle)

In [None]:
class WeatherDataset(Dataset):
    def __init__(self, X, y, transform = None):
        self.transform = transform
        self.img_paths = X
        self.labels = y
    def __len__(self):
        return len(self.img_paths)
    def __getitem__(self, idx):
        img_path = self.img_paths[idx]
        img = Image.open(img_path).convert("RGB")
        if self.transform:
            img = self.transform(img)
        return img, self.labels[idx]

In [None]:
def transform(img, img_size = (224, 224)):
    img = img.resize(img_size)
    img = np.array(img)[..., :3]
    img = torch.tensor(img).permute(2,0,1).float()
    normalized_img = img / 255.0
    
    return normalized_img

In [None]:
train_dataset = WeatherDataset(X_train, y_train, transform = transform)
val_dataset = WeatherDataset(X_val, y_val, transform = transform)
test_dataset = WeatherDataset(X_test, y_test, transform = transform)

In [None]:
train_batch_size = 512
test_batch_size = 8
train_loader = DataLoader(train_dataset, batch_size = train_batch_size, shuffle = True)
val_loader = DataLoader(val_dataset, batch_size = test_batch_size, shuffle = False)
test_loader = DataLoader(test_dataset, batch_size = test_batch_size, shuffle = False)

In [None]:
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride = 1):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size = 3, stride = stride, padding = 1)
        self.batch_norm1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size = 3, stride = 1, padding = 1)
        self.batch_norm2 = nn.BatchNorm2d(out_channels)
        self.downsample = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.downsample = nn.Sequential(nn.Conv2d(in_channels, out_channels, kernel_size = 1, stride = stride), nn.BatchNorm2d(out_channels))
        self.relu = nn.ReLU()
    def forward(self, x):
        shortcut = x.clone()
        x = self.conv1(x)
        x = self.batch_norm1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.batch_norm2(x)
        x += self.downsample(shortcut)
        x = self.relu(x)
        return x
            

In [None]:
class ResNet(nn.Module):
    def __init__(self, residual_block, n_blocks_lst, n_classes):
        super(ResNet, self).__init__()
        self.conv1 = nn.Conv2d(3, 64 , kernel_size = 7, stride = 2, padding=3)
        self.batch_norm1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.conv2 = self.create_layer(residual_block, 64, 64, n_blocks_lst[0], 1)
        self.conv3 = self.create_layer(residual_block, 64, 128, n_blocks_lst[1], 2)
        self.conv4 = self.create_layer(residual_block, 128, 256, n_blocks_lst[2], 2)
        self.conv5 = self.create_layer(residual_block, 256, 512, n_blocks_lst[3], 2)
        self.avgpool = nn.AdaptiveAvgPool2d(1)
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(512, n_classes)
    def create_layer(self, residual_block, in_channels, out_channels, n_blocks, stride):
        blocks = []
        first_block = residual_block(in_channels, out_channels, stride)
        blocks.append(first_block)
        for idx in range(1, n_blocks):
            block = residual_block(out_channels, out_channels, stride = 1)
            blocks.append(block)
        block_sequential = nn.Sequential(*blocks)

        return block_sequential

    def forward(self, x):
        x = self.conv1(x)
        x = self.batch_norm1(x)
        x = self.maxpool(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.conv5(x)
        x = self.avgpool(x)
        x = self.flatten(x)
        x = self.fc1(x)
        return x

In [None]:
n_classes = len(list(classes.keys()))
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = ResNet(residual_block = ResidualBlock, n_blocks_lst = [2,2,2,2], n_classes = n_classes).to(device)

In [None]:
def evaluate(model, dataloader, criterion, device):
    model.eval()
    correct = 0
    total = 0
    losses = []
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            losses.append(loss.item())
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    loss = sum(losses)/len(losses)
    acc = correct/total
    return loss, acc

In [None]:
def save_model(model, save_path, file_name):
    if not os.path.exists(save_path):
        os.makedirs(save_path)
    
    full_path = os.path.join(save_path, file_name)
    
    joblib.dump(model, full_path)
    
    print(f"Đã lưu model bằng Joblib tại: {full_path}")

In [None]:
def fit(model, train_loader, val_loader, criterion, 
        optimizer, device, epochs, model_name):
    train_losses = []
    val_losses = []
    for epoch in range(epochs):
        batch_train_losses = []
        model.train()
        progress_bar = tqdm(train_loader)
        for idx, (inputs, labels) in enumerate(progress_bar):
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            progress_bar.set_description(f'Epoch {epoch + 1}:\t loss: {loss:.4f}')
            loss.backward()
            optimizer.step()
            batch_train_losses.append(loss.item())

        train_loss = sum(batch_train_losses)/len(batch_train_losses)
        train_losses.append(train_loss)
        val_loss, val_acc = evaluate(model, val_loader, criterion, device)
        val_losses.append(val_loss)
    print(f'Train loss: {train_loss:.4f}\tVal loss: {val_loss:.4f}')
    save_model(model, "/kaggle/working", f"model_{model_name}.pth")
    return train_losses, val_losses

In [None]:
lr = 1e-2
epochs = 50
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr = lr)


In [None]:
train_losses, val_losses = fit(
    model, train_loader, val_loader, criterion, optimizer,
    device, epochs, 'weather'
)

In [None]:
val_loss, val_acc = evaluate(model, val_loader, criterion, device)
test_loss, test_acc = evaluate(model, test_loader, criterion, device)
print('Val accuracy: ', val_acc)
print('Test accuracy: ', test_acc)

In [None]:
root_dir = '/kaggle/input/intel-image-classification'
train_dir = os.path.join(root_dir, 'seg_train/seg_train')
test_dir = os.path.join(root_dir, 'seg_test/seg_test')

classes = {
    label_idx : class_name \
    for label_idx, class_name in enumerate(sorted(os.listdir(train_dir)))
}
classes

In [None]:
X_train = []
y_train = []
X_test = []
y_test = []

In [None]:
for dataset_path in [train_dir, test_dir]:
    for label_idx, class_name in classes.items():
        class_dir = os.path.join(dataset_path, class_name)
        for img_filename in os.listdir(class_dir):
            img_path = os.path.join(class_dir, img_filename)
            if 'seg_train' in dataset_path:
                X_train.append(img_path)
                y_train.append(label_idx)
            else:
                X_test.append(img_path)
                y_test.append(label_idx)

In [None]:
seed = 0
val_size = 0.2
is_shuffle = True

X_train, X_val, y_train, y_val = train_test_split(
    X_train, y_train, test_size = val_size, random_state=seed,
    shuffle = is_shuffle
)
    

In [None]:
class ScenesDataset(Dataset):
    def __init__(self, X, y, transform = None):
        self.transform = transform
        self.img_paths = X
        self.labels = y
    def __len__(self):
        return len(self.img_paths)
    def __getitem__(self, idx):
        img_path = self.img_paths[idx]
        img = Image.open(img_path).convert('RGB')
        if self.transform:
            img = self.transform(img)
        return img, self.labels[idx]


In [None]:
def transform(img, img_size=(224,224)):
    img = img.resize(img_size)
    img = np.array(img)[..., :3]
    img = torch.tensor(img).permute(2,0,1).float()
    normalized_img = img/255.0
    return normalized_img

In [None]:
train_dataset = ScenesDataset(X_train, y_train, transform=transform)
val_dataset = ScenesDataset(X_val, y_val, transform=transform)
test_dataset = ScenesDataset(X_test, y_test, transform=transform)


In [None]:
train_batch_size = 64
test_batch_size = 8
train_loader = DataLoader(train_dataset, batch_size = train_batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size = test_batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size = test_batch_size, shuffle=False)

In [None]:
class BottleneckBlock(nn.Module):
    def __init__(self, in_channels, growth_rate):
        super(BottleneckBlock,self).__init__()
        self.bn1 = nn.BatchNorm2d(in_channels)
        self.conv1 = nn.Conv2d(in_channels, 4*growth_rate,
                               kernel_size =1, bias = False)
        self.bn2 = nn.BatchNorm2d(4*growth_rate)
        self.conv2 = nn.Conv2d(4*growth_rate, growth_rate,
                              kernel_size=3, padding=1, bias=False)
        self.relu = nn.ReLU()

    def forward(self, x):
        res = x.clone().detach()
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv1(x)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = torch.cat([res,x], 1)
        return x

class DenseBlock(nn.Module):
    def __init__(self, num_layers, in_channels, growth_rate):
        super(DenseBlock,self).__init__()
        layers = []
        for i in range(num_layers):
            layers.append(BottleneckBlock(in_channels + i*growth_rate
                         , growth_rate))
        self.block = nn.Sequential(*layers)

    def forward(self, x):
        return self.block(x)

In [None]:
class DenseNet(nn.Module):
    def __init__(self, num_blocks, growth_rate, num_classes):
        super(DenseNet, self).__init__()
        self.conv1 = nn.Conv2d(3, 2*growth_rate, kernel_size=7,
                              padding=3, stride=2, bias=False)
        self.bn1 = nn.BatchNorm2d(2*growth_rate)
        self.pool1 = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.dense_blocks = nn.ModuleList()
        in_channels = 2*growth_rate
        for i, num_layers in enumerate(num_blocks):
            self.dense_blocks.append(DenseBlock(
                num_layers, in_channels, growth_rate
            ))
            in_channels += num_layers * growth_rate
            if i != len(num_blocks) -1:
                out_channels = in_channels // 2
                self.dense_blocks.append(nn.Sequential(
                    nn.BatchNorm2d(in_channels),
                    nn.Conv2d(in_channels, out_channels,
                    kernel_size=1, bias=False),
                ))
                in_channels = out_channels

        self.bn2 = nn.BatchNorm2d(in_channels)
        self.pool2 = nn.AvgPool2d(kernel_size=7)
        self.relu = nn.ReLU()
        self.fc = nn.Linear(in_channels, num_classes)

    def forward(self,x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.pool1(x)
        for block in self.dense_blocks:
            x = block(x)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.pool2(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

In [None]:
n_classes = len(list(classes.keys()))
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = DenseNet([6,12,24,16], growth_rate=32, num_classes=n_classes).to(device)

In [None]:
lr = 1e-2
epochs = 50
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr = lr)

In [None]:
train_losses, val_losses = fit(model, train_loader, val_loader,
                              criterion, optimizer, device, epochs, 'scenes')

In [None]:
val_loss, val_acc = evaluate(model, val_loader, criterion, device)
test_loss, test_acc = evaluate(model,test_loader, criterion, device)
print('Evaluation on val/test dataset')
print('Val accuracy: ', val_acc)
print('Test accuracy: ', test_acc)