<a href="https://colab.research.google.com/github/parthava-adabala/learning/blob/main/03_Pytorch_ComputerVision.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn

import torchvision
from torchvision import datasets, transforms
from torchvision.transforms import ToTensor

import matplotlib.pyplot as plt

torch.__version__, torchvision.__version__

# Get dataset

In [None]:
train_data = datasets.FashionMNIST(root='data', train=True, download=True, transform=ToTensor(), target_transform=None)
test_data = datasets.FashionMNIST(root='data', train=False, download=True, transform=ToTensor(), target_transform=None)

In [None]:
len(train_data), len(test_data)

In [None]:
image, label = train_data[0]
image, label

In [None]:
image.shape, label

In [None]:
classes = train_data.classes
classes

In [None]:
class_to_idx = train_data.class_to_idx
class_to_idx

In [None]:
targets = train_data.targets
targets

In [None]:
plt.imshow(image.squeeze())
plt.title(classes[label])
plt.show()

In [None]:
plt.imshow(image.squeeze(), cmap='gray')
plt.title(classes[label])
plt.axis('off')
plt.show()

In [None]:
# Let's plot more images

plt.figure(figsize=(10, 10))
for i in range(25):
    ax = plt.subplot(5, 5, i + 1)
    image, label = train_data[i]
    plt.imshow(image.squeeze(), cmap='gray', aspect='auto')
    plt.title(classes[label])
    plt.axis('off')
plt.show()

#prepare data loader

In [None]:
from torch.utils.data import DataLoader

BATCH_SIZE = 32

train_dataloader = DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=BATCH_SIZE, shuffle=False)
train_dataloader, test_dataloader

In [None]:
train_dataloader.batch_size, len(train_dataloader)

In [None]:
train_features_batch, train_labels_batch = next(iter(train_dataloader))
train_features_batch.shape, train_labels_batch.shape

In [None]:
#sample
torch.manual_seed(42)
random_idx = torch.randint(0, len(train_features_batch), size=[1]).item()
image, label = train_features_batch[random_idx], train_labels_batch[random_idx]
plt.imshow(image.squeeze(), cmap='gray', aspect='auto')
plt.title(classes[label])
plt.axis('off')
plt.show()

# Baseline model

In [None]:
# flatten layer
flatten = torch.nn.Flatten()
x = train_features_batch[0]
out = flatten(x)
x.shape, out.shape

In [None]:
out.squeeze()

In [None]:
class FashionMNISTV0(torch.nn.Module):
    def __init__(self, input_shape: int, hidden_units: int, output_shape: int):
        super().__init__()
        self.layer_stack = torch.nn.Sequential(
            torch.nn.Flatten(),
            torch.nn.Linear(in_features=input_shape, out_features=hidden_units),
            torch.nn.Linear(in_features=hidden_units, out_features=output_shape)
        )

    def forward(self, x: torch.Tensor):
        return self.layer_stack(x)

In [None]:
torch.manual_seed(42)
model_0 = FashionMNISTV0(input_shape=784, hidden_units=10, output_shape=len(classes))
model_0

In [None]:
dummy_x = torch.rand([1, 1, 28, 28])
model_0(dummy_x)

In [None]:
import requests
from pathlib import Path

if Path('helper_functions.py').is_file():
    print('helper_functions.py already exists')
else:
    print('Downloading helper_functions.py')
    request = requests.get('https://raw.githubusercontent.com/mrdbourke/pytorch-deep-learning/main/helper_functions.py')
    with open('helper_functions.py', 'wb') as f:
        f.write(request.content)

In [None]:
# loss function and optimizer and eval metric
from helper_functions import accuracy_fn

loss_fn = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(params=model_0.parameters(), lr=0.1)


In [None]:
from timeit import default_timer as timer
def print_train_time(start: float, end: float, device: torch.device = None):
    total_time = end - start
    print(f'Train time on {device}: {total_time:.3f} seconds')
    return total_time

In [None]:
start_time = timer()
#some code
end_time = timer()
print_train_time(start=start_time, end=end_time, device="cpu")

In [None]:
# training loop
from tqdm.auto import tqdm

torch.manual_seed(42)
train_time_start_on_cpu = timer()

epochs = 3
for epoch in tqdm(range(epochs)):
  train_loss = 0
  for batch, (X, y) in enumerate(train_dataloader):
    model_0.train()
    y_pred = model_0(X)
    loss = loss_fn(y_pred, y)
    train_loss += loss.item()
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    if batch % 400 == 0:
      print(f"Looked at {batch * len(X)}/{len(train_dataloader.dataset)} samples")
  train_loss /= len(train_dataloader)
  test_loss, test_acc = 0, 0
  model_0.eval()
  with torch.inference_mode():
    for X_test, y_test in test_dataloader:
      test_pred = model_0(X_test)
      test_loss += loss_fn(test_pred, y_test)
      test_acc += accuracy_fn(y_true=y_test, y_pred=test_pred.argmax(dim=1))
    test_loss /= len(test_dataloader)
    test_acc /= len(test_dataloader)
  print(f"Train loss: {train_loss:.5f} | Test loss: {test_loss:.5f} | Test acc: {test_acc:.2f}%")
train_time_end_on_cpu = timer()
total_train_time_model_0 = print_train_time(start=train_time_start_on_cpu, end=train_time_end_on_cpu, device=str(next(model_0.parameters()).device))
total_train_time_model_0

In [None]:
# Make preds
import torch
device = "cuda" if torch.cuda.is_available() else "cpu" # Define device here
def eval_model(model: torch.nn.Module, data_loader: torch.utils.data.DataLoader, loss_fn: torch.nn.Module, accuracy_fn, device=device):
  loss, acc = 0, 0
  model.eval()
  model.to(device) # Move the model to the target device
  with torch.inference_mode():
    for X, y in tqdm(data_loader):
      X, y = X.to(device), y.to(device)
      y_pred = model(X).to(device) # Move prediction to the device
      loss += loss_fn(y_pred, y)
      acc += accuracy_fn(y_true=y, y_pred=y_pred.argmax(dim=1))
    loss /= len(data_loader)
    acc /= len(data_loader)
  return {"model_name": model.__class__.__name__, "model_loss": loss.item(), "model_acc": acc}

model_0_results = eval_model(model=model_0, data_loader=test_dataloader, loss_fn=loss_fn, accuracy_fn=accuracy_fn)
model_0_results

#device agnostic code

In [None]:
!nvidia-smi

In [None]:
torch.cuda.is_available()

In [None]:
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"
device

In [None]:
# Model 1 with non linearity
class FashionMNISTV1(torch.nn.Module):
    def __init__(self, input_shape: int, hidden_units: int, output_shape: int):
        super().__init__()
        self.layer_stack = torch.nn.Sequential(
            torch.nn.Flatten(),
            torch.nn.Linear(in_features=input_shape, out_features=hidden_units),
            torch.nn.ReLU(),
            torch.nn.Linear(in_features=hidden_units, out_features=output_shape),
            torch.nn.ReLU()
            )
    def forward(self, x: torch.Tensor):
        return self.layer_stack(x)

In [None]:
# create model instance
torch.manual_seed(42)
model_1 = FashionMNISTV1(input_shape=784, hidden_units=10, output_shape=len(classes)).to(device)

In [None]:
from helper_functions import accuracy_fn
loss_fn = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(params=model_1.parameters(), lr=0.1)

# Functionizing train test loops

In [None]:
def train_step(model: torch.nn.Module, data_loader: torch.utils.data.DataLoader, loss_fn: torch.nn.Module, optimizer: torch.optim.Optimizer, accuracy_fn, device: torch.device = device):
  train_loss, train_acc = 0, 0
  model.train()
  for batch, (X, y) in enumerate(data_loader):
    X, y = X.to(device), y.to(device)
    y_pred = model(X)
    loss = loss_fn(y_pred, y)
    train_loss += loss
    train_acc += accuracy_fn(y_true=y, y_pred=y_pred.argmax(dim=1))
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
  train_loss /= len(data_loader)
  train_acc /= len(data_loader)
  print(f"train_loss:{train_loss:.5f}, train_acc:{train_acc:.2f}%")

In [None]:
def test_step(model: torch.nn.Module, data_loader: torch.utils.data.DataLoader, loss_fn: torch.nn.Module, accuracy_fn, device: torch.device = device):
  test_loss, test_acc = 0, 0
  model.eval()
  with torch.inference_mode():
    for X, y in data_loader:
      X, y = X.to(device), y.to(device)
      test_pred = model(X)
      test_loss += loss_fn(test_pred, y)
      test_acc += accuracy_fn(y_true=y, y_pred=test_pred.argmax(dim=1))
    test_loss /= len(data_loader)
    test_acc /= len(data_loader)
    print(f"test_loss:{test_loss:.5f}, test_acc:{test_acc:.2f}%")

In [None]:
torch.manual_seed(42)

# measure time
from timeit import default_timer as timer
train_time_start_on_gpu = timer()

epochs = 3
for epoch in tqdm(range(epochs)):
  print(f"Epoch: {epoch}\n-------")
  train_step(model=model_1, data_loader=train_dataloader, loss_fn=loss_fn, optimizer=optimizer, accuracy_fn=accuracy_fn, device = device)
  test_step(model=model_1, data_loader=test_dataloader, loss_fn=loss_fn, accuracy_fn=accuracy_fn, device = device)
train_time_end_on_gpu = timer()
total_train_time_model_1 = print_train_time(start=train_time_start_on_gpu, end=train_time_end_on_gpu, device=str(next(model_1.parameters()).device))

In [None]:
model_1_results = eval_model(model=model_1, data_loader=test_dataloader, loss_fn=loss_fn, accuracy_fn=accuracy_fn, device=device)
model_1_results

#Model 2 with CNN

In [None]:
class FashionMNISTV2(torch.nn.Module):
  def __init__(self, input_shape: int, hidden_units: int, output_shape: int):
    super().__init__()
    self.conv_block_1 = torch.nn.Sequential(
        torch.nn.Conv2d(in_channels=input_shape, out_channels=hidden_units, kernel_size=3, stride=1, padding=1),
        torch.nn.ReLU(),
        torch.nn.Conv2d(in_channels=hidden_units, out_channels=hidden_units, kernel_size=3, stride=1, padding=1),
        torch.nn.ReLU(),
        torch.nn.MaxPool2d(kernel_size=2)
    )
    self.conv_block_2 = torch.nn.Sequential(
        torch.nn.Conv2d(in_channels=hidden_units, out_channels=hidden_units, kernel_size=3, stride=1, padding=1),
        torch.nn.ReLU(),
        torch.nn.Conv2d(in_channels=hidden_units, out_channels=hidden_units, kernel_size=3, stride=1, padding=1),
        torch.nn.ReLU(),
        torch.nn.MaxPool2d(kernel_size=2)
    )
    self.classifier = torch.nn.Sequential(
        torch.nn.Flatten(),
        torch.nn.Linear(in_features=hidden_units*7*7, out_features=output_shape))

  def forward(self, x: torch.Tensor):
    x = self.conv_block_1(x)
    #print(x.shape)
    x = self.conv_block_2(x)
    #print(x.shape)
    x = self.classifier(x)
    return x

In [None]:
torch.manual_seed(42)
model_2 = FashionMNISTV2(input_shape=1, hidden_units=10, output_shape=len(classes)).to(device)
model_2

# Stepping through nn.Conv2d

In [None]:
torch.manual_seed(42)

# create a batch of images
images = torch.randn(size=(32, 3, 64, 64))
test_image = images[0]
images.shape, test_image.shape, images

In [None]:
# create a single conv2d layer
torch.manual_seed(42)
conv_layer = torch.nn.Conv2d(in_channels=3, out_channels=10, kernel_size=(3,3), stride=1, padding=1)
conv_out = conv_layer(test_image.unsqueeze(dim=0))
conv_out.shape, conv_out

# Stepping through Maxpool2d

In [None]:
test_image.shape

In [None]:
max_pool_layer = torch.nn.MaxPool2d(kernel_size=2)
max_pool_out = max_pool_layer(test_image.unsqueeze(dim=0))
max_pool_out.shape, max_pool_out

# set up loss function and optimizer

In [None]:
from helper_functions import accuracy_fn
loss_fn = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(params=model_2.parameters(), lr=0.1)

In [None]:
model_2.state_dict()

In [None]:
torch.manual_seed(42)
torch.cuda.manual_seed(42)

#measure time
from timeit import default_timer as timer
train_time_start_model_2 = timer()

epochs = 3
for epoch in tqdm(range(epochs)):
  print(f"Epoch: {epoch}\n-------")
  train_step(model=model_2, data_loader=train_dataloader, loss_fn=loss_fn, optimizer=optimizer, accuracy_fn=accuracy_fn, device = device)
  test_step(model=model_2, data_loader=test_dataloader, loss_fn=loss_fn, accuracy_fn=accuracy_fn, device = device)
train_time_end_model_2 = timer()
total_train_time_model_2 = print_train_time(start=train_time_start_model_2, end=train_time_end_model_2, device=str(next(model_2.parameters()).device))

In [None]:
model_2_results = eval_model(model=model_2, data_loader=test_dataloader, loss_fn=loss_fn, accuracy_fn=accuracy_fn, device=device)
model_2_results

# Compare model results and training time

In [None]:
import pandas as pd
compare_results = pd.DataFrame([model_0_results, model_1_results, model_2_results])
compare_results

In [None]:
compare_results["training time (s)"] = [total_train_time_model_0, total_train_time_model_1, total_train_time_model_2]
compare_results

In [None]:
#visualize
compare_results.set_index('model_name')['model_acc'].plot(kind='barh')
plt.xlabel('Accuracy (%)')
plt.ylabel('Model')
plt.show()

In [None]:
# make and evaluate randpm predictions with our best model
def make_predictions(model: torch.nn.Module, data: list, device: torch.device = device):
  pred_probs = []
  model.eval()
  with torch.inference_mode():
    for sample in data:
      sample = torch.unsqueeze(sample, dim=0).to(device)
      pred_logit = model(sample.to(device))
      pred_prob = torch.softmax(pred_logit.squeeze(), dim=0)
      pred_probs.append(pred_prob.cpu())
  return torch.stack(pred_probs)

In [None]:
import random
random.seed(42)
test_samples = []
test_labels = []
for sample, label in random.sample(list(test_data), k=9):
  test_samples.append(sample)
  test_labels.append(label)
test_samples[0].shape

In [None]:
plt.imshow(test_samples[0].squeeze(), cmap='gray')
plt.title(classes[test_labels[0]])

In [None]:
# Make predictions
pred_probs = make_predictions(model=model_2, data=test_samples)
pred_probs[:2]

In [None]:
pred_classes = torch.argmax(pred_probs, dim=1)
pred_classes

In [None]:
# plot predictions
plt.figure(figsize=(9, 9))
nrows, ncols = 3, 3
for i, image in enumerate(test_samples):
  plt.subplot(nrows, ncols, i+1)
  plt.imshow(image.squeeze(), cmap='gray')
  pred_label = classes[pred_classes[i]]
  true_label = classes[test_labels[i]]
  title_text = f"Pred: {pred_label} | True: {true_label}"
  if pred_label == true_label:
    plt.title(title_text, fontsize=10, c='g')
  else:
    plt.title(title_text, fontsize=10, c='r')
  plt.axis('off')
plt.show()

In [None]:
y_preds = []
model_2.eval()
with torch.inference_mode():
  for X, y in tqdm(test_dataloader, desc="Making predictions"):
    X, y = X.to(device), y.to(device)
    y_logit = model_2(X)
    y_pred = torch.softmax(y_logit.squeeze(), dim=0).argmax(dim=1)
    y_preds.append(y_pred.cpu())
# print(y_preds)
y_pred_tensor = torch.cat(y_preds)
y_pred_tensor[:10]


In [None]:
len(y_pred_tensor)

# make a confusion matrix

In [None]:
try:
  import torchmetrics, mlxtend
  assert int(mlxtend.__version__.split(".")[1]>=19)
except:
  !pip install -q torchmetrics mlxtend
  import torchmetrics, mlxtend

In [None]:
from torchmetrics import ConfusionMatrix
from mlxtend.plotting import plot_confusion_matrix

confmat = ConfusionMatrix(num_classes=len(classes), task="multiclass")
confmat_tensor = confmat(y_pred_tensor, test_data.targets)
confmat_tensor

In [None]:
fig, ax = plot_confusion_matrix(conf_mat=confmat_tensor.numpy(), figsize=(8, 8), class_names=classes)
plt.title("Confusion Matrix")

# Saving model

In [None]:
from pathlib import Path

MODEL_PATH = Path("models")
MODEL_PATH.mkdir(parents=True, exist_ok=True)
MODEL_NAME = "fashion_mnist_cnn_model_v2.pth"
MODEL_SAVE_PATH = MODEL_PATH / MODEL_NAME
MODEL_SAVE_PATH

In [None]:
torch.save(obj=model_2.state_dict(), f=MODEL_SAVE_PATH)

In [None]:
loaded_model_2 = FashionMNISTV2(input_shape=1, hidden_units=10, output_shape=len(classes))
loaded_model_2.load_state_dict(torch.load(f=MODEL_SAVE_PATH))
loaded_model_2.to(device)

In [None]:
model_2_results

In [None]:
loaded_model_2_results = eval_model(model=loaded_model_2, data_loader=test_dataloader, loss_fn=loss_fn, accuracy_fn=accuracy_fn)
loaded_model_2_results

In [None]:
# Check if the model results are close
torch.isclose(torch.tensor(model_2_results['model_loss']), torch.tensor(loaded_model_2_results['model_loss']), atol=1e-02)