<a href="https://colab.research.google.com/github/mathun3003/sightseeing_muenster/blob/main/notebooks/model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import shutil
import random
from tqdm.auto import tqdm
from pathlib import Path

import torch
import torch.nn as nn
from torch.utils.data import DataLoader

import torchvision
from torchvision import transforms, datasets

from google.colab import drive

In [None]:
# setup pytorch specs
NUM_WORKERS = os.cpu_count()
device = 'cuda' if torch.cuda.is_available() else 'cpu'
# Set the device globally
# torch.set_default_device(device)
print(device)

cuda


In [None]:
# mount images from gdrive
drive.mount('/content/drive/', force_remount=True)
# set data path
data_dir = Path("/content/drive/My Drive/sight_seeing_ms/data/")
all_dir = data_dir.joinpath('all/')
train_dir = data_dir.joinpath('train/')
test_dir = data_dir.joinpath('test/')

Mounted at /content/drive/


In [None]:
# only run if train and test dirs are empty
if (not os.listdir(train_dir)) and (not os.listdir(test_dir)):

  # Create the 'train' and 'test' folders if they don't exist
  os.makedirs(train_dir, exist_ok=True)
  os.makedirs(test_dir, exist_ok=True)

  # Iterate through each folder
  for folder_name in tqdm(os.listdir(all_dir), f'Creating train and test dirs from {all_dir}'):
      folder_path = os.path.join(all_dir, folder_name)

      # Create 'train' and 'test' subfolders within each source folder
      train_subfolder = os.path.join(train_dir, folder_name)
      test_subfolder = os.path.join(test_dir, folder_name)
      os.makedirs(train_subfolder, exist_ok=True)
      os.makedirs(test_subfolder, exist_ok=True)

      # Get a list of image files in the current folder
      image_files = [f for f in os.listdir(folder_path)]

      # Shuffle the image files randomly
      random.shuffle(image_files)

      # Calculate the split point based on the 80/20 ratio
      split_point = int(len(image_files) * 0.8)

      # Copy 80% of the images to the 'train' folder
      for file_name in tqdm(image_files[:split_point], f"Copying files for train set from {folder_name}"):
          src_path = os.path.join(folder_path, file_name)
          dst_path = os.path.join(train_subfolder, file_name)
          shutil.copy(src_path, dst_path)

      # Copy 20% of the images to the 'test' folder
      for file_name in tqdm(image_files[split_point:], f"Copying files for test set from {folder_name}"):
          src_path = os.path.join(folder_path, file_name)
          dst_path = os.path.join(test_subfolder, file_name)
          shutil.copy(src_path, dst_path)
else:
  print('Train and Test directories already exist.')

Train and Test directories already exist.


In [None]:
with torch.device('cpu'):

  # get mean and std for image normalization

  norm_params = {train_dir: {'mean': 0, 'std': 0}, test_dir: {'mean': 0, 'std': 0}}

  for dir in tqdm([train_dir, test_dir], "Progress"):
    dataset = datasets.ImageFolder(dir,
                                  transform=transforms.Compose([
                                      transforms.Resize(size=(224, 224)),
                                      transforms.ToTensor()
                                      ]))

    loader = DataLoader(dataset,
                        batch_size=10,
                        num_workers=NUM_WORKERS,
                        shuffle=False,
                        drop_last=False)

    mean = 0.0
    for images, _ in tqdm(loader, f"Calculating mean for {dir}"):
        images = images
        batch_samples = images.size(0)
        images = images.view(batch_samples, images.size(1), -1)
        mean += images.mean(2).sum(0)
    mean = mean / len(loader.dataset)

    var = 0.0
    pixel_count = 0
    for images, _ in tqdm(loader, f"Calculating std for {dir}"):
        images = images
        batch_samples = images.size(0)
        images = images.view(batch_samples, images.size(1), -1)
        var += ((images - mean.unsqueeze(1))**2).sum([0,2])
        pixel_count += images.nelement()
    std = torch.sqrt(var / pixel_count)

    norm_params[dir]['mean'] = mean
    norm_params[dir]['std'] = std

Progress:   0%|          | 0/2 [00:00<?, ?it/s]

Calculating mean for /content/drive/My Drive/sight_seeing_ms/data/train:   0%|          | 0/57 [00:00<?, ?it/s…

Calculating std for /content/drive/My Drive/sight_seeing_ms/data/train:   0%|          | 0/57 [00:00<?, ?it/s]

Calculating mean for /content/drive/My Drive/sight_seeing_ms/data/test:   0%|          | 0/15 [00:00<?, ?it/s]

Calculating std for /content/drive/My Drive/sight_seeing_ms/data/test:   0%|          | 0/15 [00:00<?, ?it/s]

In [None]:
# transform data
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(**norm_params[train_dir])
])

test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(**norm_params[test_dir])
])

# create data sets
train_data = datasets.ImageFolder(train_dir, transform=train_transform)
test_data = datasets.ImageFolder(test_dir, transform=test_transform)

class_names = train_data.classes
class_dict = train_data.class_to_idx

# Check the lengths
print(len(train_data), len(test_data))

570 146


In [None]:
# define dataloaders
train_dataloader = DataLoader(dataset=train_data,
                              batch_size=10,
                              num_workers=NUM_WORKERS,
                              generator=torch.Generator(device='cpu'),
                              shuffle=True)

test_dataloader = DataLoader(dataset=test_data,
                            batch_size=10,
                            num_workers=NUM_WORKERS,
                            generator=torch.Generator(device='cpu'),
                            shuffle=False)

In [None]:
# define model
class EfficientNetV2S(nn.Module):
    def __init__(self, fan_out: int, class_to_idx: dict[str, int]):
        """
        Creates a pretrained EfficientNetV2S model with a custom classifier.
        :param fan_out: Number of classes.
        :param: class_to_idx: Class to index mapping dictionary.
        """
        super().__init__()
        self.class_to_idx = class_to_idx
        self.weights = torchvision.models.EfficientNet_V2_S_Weights.DEFAULT
        self.model = torchvision.models.efficientnet_v2_s(weights=self.weights)
        self.model.classifier = torch.nn.Sequential(
            nn.Dropout(p=0.2, inplace=True),
            nn.Linear(in_features=1280,
                      out_features=fan_out,
                      bias=True))
        # freeze base layers
        for param in self.model.features.parameters():
            param.requires_grad = False

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.model(x)

    def save(self, model_path: str | Path):
        torch.save({
            'model_state_dict': self.model.state_dict(),
            'class_to_idx': self.class_to_idx
        }, model_path)

In [None]:
def train_step(model: torch.nn.Module | OptimizedModule,
               dataloader: torch.utils.data.DataLoader,
               loss_fn: torch.nn.Module,
               optimizer: torch.optim.Optimizer,
               device: torch.device) -> tuple[float, float]:
  """Trains a PyTorch model for a single epoch.

  Turns a target PyTorch model to training mode and then
  runs through all of the required training steps (forward
  pass, loss calculation, optimizer step).

  Args:
    model: A PyTorch model to be trained.
    dataloader: A DataLoader instance for the model to be trained on.
    loss_fn: A PyTorch loss function to minimize.
    optimizer: A PyTorch optimizer to help minimize the loss function.
    device: A target device to compute on (e.g. "cuda" or "cpu").

  Returns:
    A tuple of training loss and training accuracy metrics.
    In the form (train_loss, train_accuracy). For example:

    (0.1112, 0.8743)
  """
  # Put model in train mode
  model.train()

  # Setup train loss and train accuracy values
  train_loss, train_acc = 0, 0

  # Loop through data loader data batches
  for batch, (X, y) in enumerate(dataloader):
      # Send data to target device
      X, y = X.to(device), y.to(device)

      # 1. Forward pass
      y_pred = model(X)

      # 2. Calculate  and accumulate loss
      loss = loss_fn(y_pred, y)
      train_loss += loss.item()

      # 3. Optimizer zero grad
      optimizer.zero_grad()

      # 4. Loss backward
      loss.backward()

      # 5. Optimizer step
      optimizer.step()

      # Calculate and accumulate accuracy metric across all batches
      y_pred_class = torch.argmax(torch.softmax(y_pred, dim=1), dim=1)
      train_acc += (y_pred_class == y).sum().item()/len(y_pred)

  # Adjust metrics to get average loss and accuracy per batch
  train_loss = train_loss / len(dataloader)
  train_acc = train_acc / len(dataloader)
  return train_loss, train_acc

def test_step(model: torch.nn.Module | OptimizedModule,
              dataloader: torch.utils.data.DataLoader,
              loss_fn: torch.nn.Module,
              device: torch.device) -> tuple[float, float]:
  """Tests a PyTorch model for a single epoch.

  Turns a target PyTorch model to "eval" mode and then performs
  a forward pass on a testing dataset.

  Args:
    model: A PyTorch model to be tested.
    dataloader: A DataLoader instance for the model to be tested on.
    loss_fn: A PyTorch loss function to calculate loss on the test data.
    device: A target device to compute on (e.g. "cuda" or "cpu").

  Returns:
    A tuple of testing loss and testing accuracy metrics.
    In the form (test_loss, test_accuracy). For example:

    (0.0223, 0.8985)
  """
  # Put model in eval mode
  model.eval()

  # Setup test loss and test accuracy values
  test_loss, test_acc = 0, 0

  # Turn on inference context manager
  with torch.inference_mode():
      # Loop through DataLoader batches
      for batch, (X, y) in enumerate(dataloader):
          # Send data to target device
          X, y = X.to(device), y.to(device)

          # 1. Forward pass
          test_pred_logits = model(X)

          # 2. Calculate and accumulate loss
          loss = loss_fn(test_pred_logits, y)
          test_loss += loss.item()

          # Calculate and accumulate accuracy
          test_pred_labels = test_pred_logits.argmax(dim=1)
          test_acc += ((test_pred_labels == y).sum().item()/len(test_pred_labels))

  # Adjust metrics to get average loss and accuracy per batch
  test_loss = test_loss / len(dataloader)
  test_acc = test_acc / len(dataloader)
  return test_loss, test_acc

def train(model: torch.nn.Module | OptimizedModule,
          train_dataloader: torch.utils.data.DataLoader,
          test_dataloader: torch.utils.data.DataLoader,
          optimizer: torch.optim.Optimizer,
          loss_fn: torch.nn.Module,
          epochs: int,
          device: torch.device) -> dict[str, list]:
  """Trains and tests a PyTorch model.

  Passes a target PyTorch models through train_step() and test_step()
  functions for a number of epochs, training and testing the model
  in the same epoch loop.

  Calculates, prints and stores evaluation metrics throughout.

  Args:
    model: A PyTorch model to be trained and tested.
    train_dataloader: A DataLoader instance for the model to be trained on.
    test_dataloader: A DataLoader instance for the model to be tested on.
    optimizer: A PyTorch optimizer to help minimize the loss function.
    loss_fn: A PyTorch loss function to calculate loss on both datasets.
    epochs: An integer indicating how many epochs to train for.
    device: A target device to compute on (e.g. "cuda" or "cpu").

  Returns:
    A dictionary of training and testing loss as well as training and
    testing accuracy metrics. Each metric has a value in a list for
    each epoch.
    In the form: {train_loss: [...],
                  train_acc: [...],
                  test_loss: [...],
                  test_acc: [...]}
    For example if training for epochs=2:
                 {train_loss: [2.0616, 1.0537],
                  train_acc: [0.3945, 0.3945],
                  test_loss: [1.2641, 1.5706],
                  test_acc: [0.3400, 0.2973]}
  """
  # Create empty results dictionary
  results = {"train_loss": [],
      "train_acc": [],
      "test_loss": [],
      "test_acc": []
  }

  # Loop through training and testing steps for a number of epochs
  for epoch in tqdm(range(epochs)):
      train_loss, train_acc = train_step(model=model,
                                          dataloader=train_dataloader,
                                          loss_fn=loss_fn,
                                          optimizer=optimizer,
                                          device=device)
      test_loss, test_acc = test_step(model=model,
          dataloader=test_dataloader,
          loss_fn=loss_fn,
          device=device)

      # Print out what's happening
      print(
          f"Epoch: {epoch+1} | "
          f"train_loss: {train_loss:.4f} | "
          f"train_acc: {train_acc:.4f} | "
          f"test_loss: {test_loss:.4f} | "
          f"test_acc: {test_acc:.4f}"
      )

      # Update results dictionary
      results["train_loss"].append(train_loss)
      results["train_acc"].append(train_acc)
      results["test_loss"].append(test_loss)
      results["test_acc"].append(test_acc)

  # Return the filled results at the end of the epochs
  return results

In [None]:
model = EfficientNetV2S(fan_out=len(class_names), class_to_idx=class_dict)

# train model
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

results = train(model=model.to(device),
                train_dataloader=train_dataloader,
                test_dataloader=test_dataloader,
                optimizer=optimizer,
                loss_fn=loss_fn,
                epochs=20,
                device=device)

  0%|          | 0/20 [00:00<?, ?it/s]

Epoch: 1 | train_loss: 1.4831 | train_acc: 0.5596 | test_loss: 1.0303 | test_acc: 0.8244
Epoch: 2 | train_loss: 0.8811 | train_acc: 0.7965 | test_loss: 0.6876 | test_acc: 0.9200
Epoch: 3 | train_loss: 0.6636 | train_acc: 0.8667 | test_loss: 1.4547 | test_acc: 0.9400
Epoch: 4 | train_loss: 0.4949 | train_acc: 0.8965 | test_loss: 0.4447 | test_acc: 0.9467
Epoch: 5 | train_loss: 0.4602 | train_acc: 0.8860 | test_loss: 0.3706 | test_acc: 0.9667
Epoch: 6 | train_loss: 0.4262 | train_acc: 0.9070 | test_loss: 0.3088 | test_acc: 0.9733
Epoch: 7 | train_loss: 0.3733 | train_acc: 0.9105 | test_loss: 0.2800 | test_acc: 0.9489
Epoch: 8 | train_loss: 0.3744 | train_acc: 0.9193 | test_loss: 0.2896 | test_acc: 0.9533
Epoch: 9 | train_loss: 0.3279 | train_acc: 0.9158 | test_loss: 0.3111 | test_acc: 0.9489
Epoch: 10 | train_loss: 0.2858 | train_acc: 0.9246 | test_loss: 0.2422 | test_acc: 0.9600
Epoch: 11 | train_loss: 0.2699 | train_acc: 0.9246 | test_loss: 0.2218 | test_acc: 0.9489
Epoch: 12 | train_l

In [None]:
# save model parameters
model_path = Path('/content/drive/My Drive/sight_seeing_ms/models/')

model_name = "01_pytorch_efficientnetv2s.pth"
model_save_path = model_path / model_name

model.save(model_save_path)