<a href="https://colab.research.google.com/github/thaianh1210/DeepLearning-exercise/blob/main/Untitled49.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch.nn as nn
import torch
import os
import numpy as np
import matplotlib.pyplot as plt

from PIL import Image
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
root_dir = '/content/drive/MyDrive/img_cls_weather_dataset/weather-dataset/dataset'
img_paths = []
labels = []
classes = {
    label_idx: class_name \
    for label_idx, class_name in enumerate(sorted(os.listdir(root_dir)))
}

print(classes)

{0: 'dew', 1: 'fogsmog', 2: 'frost', 3: 'glaze', 4: 'hail', 5: 'lightning', 6: 'rain', 7: 'rainbow', 8: 'rime', 9: 'sandstorm', 10: 'snow'}


In [4]:
for label_idx, class_name in classes.items():
  class_dir = os.path.join(root_dir, class_name)
  for img_filename in os.listdir(class_dir):
    img_path = os.path.join(class_dir, img_filename)
    img_paths.append(img_path)
    labels.append(label_idx)
print(img_paths[:10])

['/content/drive/MyDrive/img_cls_weather_dataset/weather-dataset/dataset/snow/0830.jpg', '/content/drive/MyDrive/img_cls_weather_dataset/weather-dataset/dataset/snow/0834.jpg', '/content/drive/MyDrive/img_cls_weather_dataset/weather-dataset/dataset/snow/0835.jpg', '/content/drive/MyDrive/img_cls_weather_dataset/weather-dataset/dataset/snow/0846.jpg', '/content/drive/MyDrive/img_cls_weather_dataset/weather-dataset/dataset/snow/0838.jpg', '/content/drive/MyDrive/img_cls_weather_dataset/weather-dataset/dataset/snow/0844.jpg', '/content/drive/MyDrive/img_cls_weather_dataset/weather-dataset/dataset/snow/0845.jpg', '/content/drive/MyDrive/img_cls_weather_dataset/weather-dataset/dataset/snow/0840.jpg', '/content/drive/MyDrive/img_cls_weather_dataset/weather-dataset/dataset/snow/0843.jpg', '/content/drive/MyDrive/img_cls_weather_dataset/weather-dataset/dataset/snow/0831.jpg']


In [5]:
seed = 0
val_size = 0.2
test_size = 0.1
is_shuffle = True

X_train, X_test, y_train, y_test = train_test_split(
    img_paths, labels, test_size = test_size, random_state = seed, shuffle = is_shuffle
)

X_train, X_val, y_train, y_val = train_test_split(
    X_train, y_train, test_size = val_size, random_state = seed, shuffle = is_shuffle
)


In [6]:
class WeatherDataset(Dataset):
  def __init__(self, X, y, transform = None):
    self.transform = transform
    self.img_paths = X
    self.labels = y

  def __len__(self):
    return len(self.img_paths)

  def __getitem__(self, idx):
    img_path = self.img_paths[idx]
    img = Image.open(img_path).convert('RGB')

    if self.transform:
      img = self.transform(img)

    return img, self.labels[idx]

def transform(img, img_size = (224, 224)):
    img = img.resize(img_size)
    img =  np.array(img)[..., :3]
    img = torch.tensor(img).permute(2, 0, 1).float()
    normalize_img = img/ 255.0
    return normalize_img

In [7]:
train_dataset = WeatherDataset(X_train, y_train, transform = transform)
val_dataset = WeatherDataset(X_val, y_val, transform = transform)
test_dataset = WeatherDataset(X_test, y_test, transform = transform)

In [8]:
train_batch_size = 512
test_batch_size = 8

train_loader = DataLoader(train_dataset, batch_size = train_batch_size, shuffle = True)
val_loader = DataLoader(val_dataset, batch_size = test_batch_size, shuffle = False)
test_loaderr = DataLoader(test_dataset, batch_size = test_batch_size, shuffle = False)

In [9]:
#Create ResNet for training
#First, create a Class of Residual Block

class ResidualBlock(nn.Module):
  def __init__(self, in_channels, out_channels, stride = 1):
    super(ResidualBlock, self).__init__()
    self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size = 3, stride = stride, padding = 1)
    self.batch_norm1 = nn.BatchNorm2d(out_channels)
    self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size = 3, stride = 1, padding = 1)
    self.batch_norm2 = nn.BatchNorm2d(out_channels)
    self.downsample = nn.Sequential()

    if stride != 1 or in_channels != out_channels:
      self.downsample = nn.Sequential(
          nn.Conv2d(in_channels, out_channels, kernel_size = 1, stride = stride),
          nn.BatchNorm2d(out_channels)
      )
    self.relu = nn.ReLU()

  def forward(self, x):
    shortcut = x.clone()
    x = self.conv1(x)
    x = self.batch_norm1(x)
    x = self.relu(x)
    x = self.conv2(x)
    x = self.batch_norm2(x)
    x += self.downsample(shortcut)
    x = self.relu(x)
    return x

In [10]:
class ResNet(nn.Module):
  def __init__(self, residual_block, n_block_lst, n_classes):
    super(ResNet, self).__init__()
    self.conv1 = nn.Conv2d(3, 64, kernel_size = 7, stride = 2, padding = 3)
    self.batch_norm1 = nn.BatchNorm2d(64)
    self.relu = nn.ReLU()
    self.maxpool = nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)
    self.conv2 = self.create_layer(residual_block, 64, 64, n_block_lst[0], 1)
    self.conv3 = self.create_layer(residual_block, 64, 128, n_block_lst[1], 2)
    self.conv4 = self.create_layer(residual_block, 128, 256, n_block_lst[2], 2)
    self.conv5 = self.create_layer(residual_block, 256, 512, n_block_lst[3], 2)
    self.avgpool = nn.AdaptiveAvgPool2d(1)
    self.fc = nn.Linear(512, n_classes)
    self. flatten = nn.Flatten()

    # Correct the indentation of create_layer function
  def create_layer(self, residual_block, in_channels, out_channels, n_blocks, stride):
      blocks = []
      first_block = residual_block(in_channels, out_channels, stride)
      blocks.append(first_block)

      for idx in range(1, n_blocks):
        block = residual_block(out_channels, out_channels, stride)
        blocks.append(block)
      return nn.Sequential(*blocks)

    # Correct the indentation of forward function
  def forward(self, x):
      x = self.conv1(x)
      x = self.batch_norm1(x) # Fix typo: batchnorm1 -> batch_norm1
      x = self.maxpool(x) # Fix typo: maxpool1 -> maxpool
      x = self.relu(x)
      x = self.conv2(x)
      x = self.conv3(x)
      x = self.conv4(x)
      x = self.conv5(x)
      x = self.avgpool(x)
      x = self.flatten(x)
      x = self.fc(x)
      return x

In [11]:
n_classes = len(list(classes.keys()))
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = ResNet(residual_block = ResidualBlock, n_block_lst = [2, 2, 2, 2], n_classes = n_classes).to(device)

In [12]:
def evaluate(model, val_loader, criterion, device):
  model.eval()
  correct = 0
  total = 0
  losses = []
  with torch.no_grad():
    for images, labels, in val_loader:
      images = images.to(device)
      labels = labels.to(device)
      y_hat = model(images)
      loss = criterion(y_hat, labels)
      losses.append(loss.item())
      _, predicted = torch.max(y_hat.data, 1)
      total += labels.size(0)
      correct += (predicted == labels).sum().item()
  loss = sum(losses) / len(losses)
  acc = correct / total
  return loss, acc

In [13]:
def fit(model, train_loader, val_loader, criterion, optimizer, device, epochs):
  train_losses = []
  val_losses = []
  for epoch in range(epochs):
    batch_train_losses = []
    model.train()
    for idx, (inputs, labels) in enumerate(train_loader):
      inputs, labels = inputs.to(device), labels.to(device)
      outputs = model(inputs)
      loss = criterion(outputs, labels)
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()

      batch_train_losses.append(loss)
    train_loss = sum(batch_train_losses) / len(batch_train_losses)
    train_losses.append(train_loss)
    val_loss, val_acc = evaluate(model, val_loader, criterion, device)
    val_losses.append(val_loss)
    print(f'Epoch {epoch + 1}/{epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}')
  return train_losses, val_losses


In [14]:
lr = 1e-3
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr = lr)

In [15]:
train_losses, val_losses = fit(model, train_loader, val_loader, criterion, optimizer, device, epochs = 200)

Epoch 1/200, Train Loss: 2.6542, Val Loss: 2.4223, Val Acc: 0.0000
Epoch 2/200, Train Loss: 1.7605, Val Loss: 2.4218, Val Acc: 0.0000
Epoch 3/200, Train Loss: 1.4780, Val Loss: 2.4143, Val Acc: 0.0000
Epoch 4/200, Train Loss: 1.0839, Val Loss: 2.4062, Val Acc: 0.0000
Epoch 5/200, Train Loss: 0.8230, Val Loss: 2.3908, Val Acc: 0.0000
Epoch 6/200, Train Loss: 0.6968, Val Loss: 2.3732, Val Acc: 0.0000
Epoch 7/200, Train Loss: 0.6225, Val Loss: 2.3542, Val Acc: 0.0000
Epoch 8/200, Train Loss: 0.4749, Val Loss: 2.3336, Val Acc: 0.0000
Epoch 9/200, Train Loss: 0.4163, Val Loss: 2.3098, Val Acc: 0.0000
Epoch 10/200, Train Loss: 0.2957, Val Loss: 2.2859, Val Acc: 1.0000
Epoch 11/200, Train Loss: 0.2441, Val Loss: 2.2613, Val Acc: 1.0000
Epoch 12/200, Train Loss: 0.2010, Val Loss: 2.2378, Val Acc: 1.0000
Epoch 13/200, Train Loss: 0.1806, Val Loss: 2.2136, Val Acc: 1.0000
Epoch 14/200, Train Loss: 0.1640, Val Loss: 2.1945, Val Acc: 1.0000
Epoch 15/200, Train Loss: 0.1448, Val Loss: 2.1730, Val A