In [None]:
import torch
from torch import nn
from torchvision import transforms, datasets, models

device = "cuda" if torch.cuda.is_available() else "cpu"

### Import dataset

In [None]:
# path to dataset: /scratch/g/plaviole/srubenstein/DeepLearning/bigger_dataset
import random
from PIL import Image
# import matplotlib.pyplot as plt
from pathlib import Path

data_path = Path("/scratch/g/plaviole/srubenstein/DeepLearning/bigger_dataset")

# 1. Get all image paths
image_path_list = list(data_path.glob("*/*/*.tiff"))

# 2. Pick a random image path
random_image_path = random.choice(image_path_list)

# 3. Get image class from path name
image_class = random_image_path.parent.stem

### Transform data

In [None]:
import torch
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

# write a transform for image
data_transform = transforms.Compose([
    # resize to be half as large
    transforms.Resize(250),
    transforms.RandomCrop(240),
    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3),
    transforms.RandomRotation(30),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    # crop to resnet input size
    transforms.CenterCrop(224),
    # Flip images randomly on the horizontal
    transforms.RandomHorizontalFlip(p=0.5),
    # Turn the image into a torch.Tensor
    transforms.ToTensor()
])

train_dir = data_path / "train"
test_dir = data_path / "test"

train_data = datasets.ImageFolder(root=train_dir, transform=data_transform, target_transform=None)
test_data = datasets.ImageFolder(root=test_dir, transform=data_transform)

# get class names as a list
class_names = train_data.classes

### Put data in DataLoaders and get model

In [None]:
import os
from torch.utils.data import DataLoader
from torchvision.models import resnet101
import torch
model = resnet101(weights='DEFAULT')
model.to(device)
train_dataloader = DataLoader(dataset=train_data, batch_size=1, num_workers=0, shuffle=True)
test_dataloader = DataLoader(dataset=test_data, batch_size=1, num_workers=0, shuffle=True)
if(next(model.parameters()).is_cuda):   
    print("Model on GPU")
else:
    print("Model on CPU")

### Define loss function, optimizer, and scheduler

In [None]:
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch import nn

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(params=model.parameters(), lr=0.001)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.2, patience=6, min_lr=0.001)

### Function to calculate accuraacy

In [1]:
# Calculate accuracy (a classification metric)
def accuracy_fn(y_true, y_pred):
    """Calculates accuracy between truth labels and predictions.

    Args:
        y_true (torch.Tensor): Truth labels for predictions.
        y_pred (torch.Tensor): Predictions to be compared to predictions.

    Returns:
        [torch.float]: Accuracy value between y_true and y_pred, e.g. 78.45
    """

    # for testing:
    correct = torch.eq(y_true, y_pred).sum().item()
    acc = (correct / len(y_pred)) * 100
    return acc

### Function for printing train time

In [None]:
from timeit import default_timer as timer

def print_train_time(start: float,
                     end: float,
                     device: torch.device = None):
  """Prints difference between start and end time."""
  total_time = end-start
  print(f"Train time on {device}: {total_time:.3f} seconds")
  return total_time

### Train and test model

In [None]:
train_time_start_on_cpu = timer()
epochs = 20

# 2. Create empty results dictionary
results = {"train_loss": [],
            "train_acc": [],
            "test_loss": [],
            "test_acc": []}

for epoch in range(epochs):
    print(f"Epoch: {epoch}\n----")

    # train 
    train_loss, train_acc = 0, 0

    # loop through dataloader
    for batch, (X, y) in enumerate(train_dataloader):
        model.train()
        X, y = X.to(device), y.to(device)
        # forward pass
        y_pred = model(X)
        # calculate loss
        loss = loss_fn(y_pred, y)
        train_loss += loss
        train_acc += accuracy_fn(y_true=y, y_pred=y_pred.argmax(dim=1))
        # optimizer zero grad
        optimizer.zero_grad()
        # loss backward
        loss.backward()
        # optimizer step
        optimizer.step()

        # print what's happening
        if(batch % 100 == 0):
            print(f"Looked at {batch * len(X)}/{len(train_dataloader.dataset)}")

    # Divide total train loss by length of train dataloader
    train_loss /= len(train_dataloader)
    train_acc /= len(train_dataloader)

    # testing 
    test_loss, test_acc = 0, 0
    model.eval()
    
    with torch.inference_mode():
        for X_test, y_test in test_dataloader:
            X_test, y_test = X_test.to(device), y_test.to(device)
            # forward pass
            test_pred = model(X_test)
            # calculate the loss
            test_loss += loss_fn(test_pred, y_test)
            # calculate the accuracy
            test_acc += accuracy_fn(y_true=y_test, y_pred=test_pred.argmax(dim=1))
            
        # calculate the test loss average per batch
        test_loss /= len(test_dataloader)
        test_acc /= len(test_dataloader)

    scheduler.step(test_loss)

    print(f"\nTrain loss: {train_loss:.4f} | Train acc: {train_acc:.4f} | Test loss: {test_loss:.4f} | Test acc: {test_acc:.4f}")

    # update results dictionary
    results["train_loss"].append(train_loss)
    results["train_acc"].append(train_acc)
    results["test_loss"].append(test_loss)
    results["test_acc"].append(test_acc)

    # Calculate training time
    train_time_end_on_cpu = timer()
    total_trian_time_model_0 = print_train_time(start=train_time_start_on_cpu,
                                            end=train_time_end_on_cpu,
                                            device=str(next(model.parameters()).device))

In [None]:
# install matplotlib 
import sys
print(sys.executable)
# !{sys.executable} -m pip install matplotlib

### Function to calculate loss

In [None]:
import matplotlib.pyplot as plt
def plot_loss_curves(results: dict[str, list[float]]):
  """Plots training curves of a results dictionary."""
  # Get the loss values of the results dictionary(training and test)
  loss = torch.tensor(results["train_loss"]).cpu()
  loss = [torch.detach(tensor) for tensor in loss]
  test_loss = torch.tensor(results["test_loss"]).cpu()

  # Get the accuracy values of the results dictionary (training and test)
  accuracy = results["train_acc"]
  test_accuracy = results["test_acc"]

  # Figure out how mnay epochs there were
  epochs = range(len(results["train_loss"]))

  # Setup a plot
  plt.figure(figsize=(15, 7))

  # Plot the loss
  plt.subplot(1, 2, 1)
  print(f"type of epochs: {type(epochs)} | type of loss: {type(loss)}")
  plt.plot(epochs, loss, label="train_loss")
  plt.plot(epochs, test_loss, label="test_loss")
  plt.title("Loss")
  plt.xlabel("Epochs")
  plt.legend() 

  # Plot the accuracy
  plt.subplot(1, 2, 2)
  plt.plot(epochs, accuracy, label="train_accuracy")
  plt.plot(epochs, test_accuracy, label="test_accuracy")
  plt.title("Accuracy")
  plt.xlabel("Epochs")
  plt.legend()

In [None]:
plot_loss_curves(results)