In [1]:
import torch
from torch import nn
import cv2

import torchvision 
from torchvision import datasets
from torchvision import transforms
from torchvision.transforms import ToTensor

import matplotlib.pyplot as plt

In [2]:
from pathlib import Path
device = "cuda" if torch.cuda.is_available() else "cpu"
device

'cpu'

In [3]:
try:
    from going_modular.going_modular import data_setup, engine
except:
    # Get the going_modular scripts
    print("[INFO] Couldn't find going_modular scripts... downloading them from GitHub.")
    !git clone https://github.com/mrdbourke/pytorch-deep-learning
    !mv pytorch-deep-learning/going_modular .
    !rm -rf pytorch-deep-learning
    from going_modular.going_modular import data_setup, engine

[INFO] Couldn't find going_modular scripts... downloading them from GitHub.
Cloning into 'pytorch-deep-learning'...
remote: Enumerating objects: 3710, done.[K
remote: Counting objects: 100% (332/332), done.[K
remote: Compressing objects: 100% (172/172), done.[K
remote: Total 3710 (delta 176), reused 301 (delta 155), pack-reused 3378[K
Receiving objects: 100% (3710/3710), 648.79 MiB | 24.04 MiB/s, done.
Resolving deltas: 100% (2132/2132), done.
Updating files: 100% (248/248), done.


In [4]:
data_path = Path("data/")
image_path = data_path / "CAR"

In [5]:
if image_path.is_dir():
    print(f"{image_path} directory exists.")
else:
    print(f"Did not find {image_path} directory, creating one...")
    image_path.mkdir(parents=True, exist_ok=True)

Did not find data/CAR directory, creating one...


In [6]:
train_dir = image_path/"train"
test_dir = image_path/"test"

if train_dir.is_dir():
    print(f"{image_path} directory exists.")
else:
    print(f"Did not find {train_dir,test_dir} directory, creating one...")
    train_dir.mkdir(parents=True, exist_ok=True)
    test_dir.mkdir(parents=True, exist_ok=True)


Did not find (PosixPath('data/CAR/train'), PosixPath('data/CAR/test')) directory, creating one...


In [7]:
good_train_dir=train_dir/"good"
dent_train_dir=train_dir/"dent"


good_test_dir=test_dir/"good"
dent_test_dir=test_dir/"dent"



In [8]:
if good_train_dir.is_dir():
    print(f"{image_path} directory exists.")
else:
    print(f"Did not find {train_dir,test_dir} directory, creating one...")
    good_train_dir.mkdir(parents=True, exist_ok=True)
    dent_train_dir.mkdir(parents=True, exist_ok=True)

if good_test_dir.is_dir():
    print(f"{image_path} directory exists.")
else:
    print(f"Did not find {train_dir,test_dir} directory, creating one...")
    good_test_dir.mkdir(parents=True, exist_ok=True)
    dent_test_dir.mkdir(parents=True, exist_ok=True)

Did not find (PosixPath('data/CAR/train'), PosixPath('data/CAR/test')) directory, creating one...
Did not find (PosixPath('data/CAR/train'), PosixPath('data/CAR/test')) directory, creating one...


In [9]:
import random
from PIL import Image

#random.seed(42)

image_path_list=list(image_path.glob("*/*/*.jpg"))
image_path_list

random_image_path=random.choice(image_path_list)
print(random_image_path)

image_class=random_image_path.parent.stem
print(image_class)

img=Image.open(random_image_path)

print(f'Random image path: {random_image_path}')
print(f'iamge class: {image_class}')
print(f'Image height: {img.height}')
print(f'Image width: {img.width}')
img

IndexError: ignored

In [None]:
#writing a transform for our image
data_transform=transforms.Compose([
    
    #resize the image
    transforms.Resize(size=(224,224)),

    #Turn the image into a torch tensor
    transforms.ToTensor()
])

In [None]:
from torchvision import datasets
train_data=datasets.ImageFolder(root=train_dir,
                                transform=data_transform,
                                target_transform=None)

test_data=datasets.ImageFolder(root=test_dir,
                               transform=data_transform)

train_data,test_data

In [None]:
classes=train_data.classes
classes

In [None]:
class_to_dict=train_data.class_to_idx
class_to_dict

In [None]:
import os
from torch.utils.data import DataLoader
BATCH_SIZE=1
train_dataloader=DataLoader(dataset=train_data,
                            batch_size=BATCH_SIZE,
                            num_workers=os.cpu_count(),# number of cpu calls to load the data
                            shuffle=True) 

test_dataloader=DataLoader(dataset=test_data,
                           batch_size=BATCH_SIZE,
                           num_workers=os.cpu_count(),
                           shuffle=False)

len(train_dataloader),len(test_dataloader)



In [None]:
img, label = next(iter(train_dataloader))

In [None]:

import os
import pathlib
import torch
from typing import List,Tuple,Dict
from PIL import Image
from torch.utils.data import Dataset
from torchvision import transforms


In [None]:
weights = torchvision.models.EfficientNet_B0_Weights.DEFAULT
auto_transforms = weights.transforms()
auto_transforms

In [None]:
train_dataloader, test_dataloader, class_names = data_setup.create_dataloaders(train_dir=train_dir,
                                                                               test_dir=test_dir,
                                                                               transform=auto_transforms, # perform same data transforms on our own data as the pretrained model
                                                                               batch_size=32) # set mini-batch size to 32

train_dataloader, test_dataloader, class_names

In [None]:
weights = torchvision.models.EfficientNet_B0_Weights.DEFAULT # .DEFAULT = best available weights 
model = torchvision.models.efficientnet_b0(weights=weights).to(device)

In [None]:
for param in model.features.parameters():
    param.requires_grad = False

In [None]:
torch.manual_seed(42)
torch.cuda.manual_seed(42)

# Get the length of class_names (one output unit for each class)
output_shape = len(class_names)

# Recreate the classifier layer and seed it to the target device
model.classifier = torch.nn.Sequential(
    torch.nn.Dropout(p=0.2, inplace=True), 
    torch.nn.Linear(in_features=1280, 
                    out_features=output_shape, # same number of output units as our number of classes
                    bias=True)).to(device)

In [None]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
torch.manual_seed(42)
torch.cuda.manual_seed(42)

# Start the timer
from timeit import default_timer as timer 
start_time = timer()

# Setup training and save the results
results = engine.train(model=model,
                       train_dataloader=train_dataloader,
                       test_dataloader=test_dataloader,
                       optimizer=optimizer,
                       loss_fn=loss_fn,
                       epochs=5,
                       device=device)

# End the timer and print out how long it took
end_time = timer()
print(f"[INFO] Total training time: {end_time-start_time:.3f} seconds")

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
class TinyVGG(nn.Module):
    """
    Model architecture copying TinyVGG from: 
    https://poloclub.github.io/cnn-explainer/
    """
    def __init__(self, input_shape: int, hidden_units: int, output_shape: int) -> None:
        super().__init__()
        self.conv_block_1 = nn.Sequential(
            nn.Conv2d(in_channels=input_shape, 
                      out_channels=hidden_units, 
                      kernel_size=3, # how big is the square that's going over the image?
                      stride=1, # default
                      padding=1), # options = "valid" (no padding) or "same" (output has same shape as input) or int for specific number 
            nn.ReLU(),
            nn.Conv2d(in_channels=hidden_units, 
                      out_channels=hidden_units,
                      kernel_size=3,
                      stride=1,
                      padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2,
                         stride=2) # default stride value is same as kernel_size
        )
        self.conv_block_2 = nn.Sequential(
            nn.Conv2d(hidden_units, hidden_units, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(hidden_units, hidden_units, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            # Where did this in_features shape come from? 
            # It's because each layer of our network compresses and changes the shape of our inputs data.
            nn.Linear(in_features=hidden_units*16*16,
                      out_features=output_shape)
        )
    
    def forward(self, x: torch.Tensor):
        #x = self.conv_block_1(x)
        # print(x.shape)
        #x = self.conv_block_2(x)
        # print(x.shape)
        #x = self.classifier(x)
        # print(x.shape)
        #return x
        return self.classifier(self.conv_block_2(self.conv_block_1(x))) # <- leverage the benefits of operator fusion

torch.manual_seed(42)
model_0 = TinyVGG(input_shape=3, # number of color channels (3 for RGB) 
                  hidden_units=10, 
                  output_shape=len(train_data.classes)).to(device)
model_0

In [None]:
#TRAINING AND TESTING

# train_step - takes a model and a dataloader as input and trains the model on the dataloader
# test_step - takes a model and a dataloader as input and evaluates the model on the dataloader

def train_step(model: torch.nn.Module,
               dataloader: torch.utils.data.DataLoader,
               loss_fn: torch.nn.Module,
               optimizer: torch.optim.Optimizer):
  
  model.train()

  train_loss,train_acc=0,0

  for batch,(X,y) in enumerate(dataloader):
    X,y=X.to(device),y.to(device)


    y_pred=model(X) # outputs logits

    loss=loss_fn(y_pred,y)
    train_loss+=loss.item()

    optimizer.zero_grad()

    loss.backward()

    optimizer.step()


    y_pred_class=torch.argmax(torch.softmax(y_pred,dim=1),dim=1)
    train_acc+=(y_pred_class==y).sum().item()/len(y_pred)

  train_loss= train_loss/len(dataloader)
  train_acc= train_acc/len(dataloader)

  return train_loss,train_acc


def test_step(model : torch.nn.Module,
              dataloader : torch.utils.data.DataLoader,
              loss_fn : torch.nn.Module):
  

  model.eval()

  test_loss,test_acc=0,0

  with torch.inference_mode():

    for batch,(X,y) in enumerate(dataloader):
      X,y=X.to(device),y.to(device)


      test_pred_logits=model(X)

      loss=loss_fn(test_pred_logits,y)
      test_loss+=loss.item()


      test_pred_labels=test_pred_logits.argmax(dim=1)
      test_acc+=(test_pred_labels==y).sum().item()/len(test_pred_labels)

    test_loss= test_loss/len(dataloader)
    test_acc= test_acc/len(dataloader)

    return test_loss,test_acc

        


In [None]:
# write a train function to combine the train_step and test_step
from tqdm import tqdm

def train(model : torch.nn.Module,
          train_dataloader : torch.utils.data.DataLoader,
          test_dataloader : torch.utils.data.DataLoader,
          optimizer : torch.optim.Optimizer,
          loss_fn : torch.nn.Module = nn.CrossEntropyLoss(),
          epochs : int = 5):
  
  results={"train_loss" : [],
           "train_acc" : [],
           "test_loss" : [],
           "test_acc" : []
  }

  for epoch in tqdm(range(epochs)):

    train_loss,train_acc = train_step(model=model,dataloader=train_dataloader,loss_fn=loss_fn,optimizer=optimizer)

    test_loss,test_acc = test_step(model=model,dataloader=test_dataloader,loss_fn=loss_fn)


    print(
              f"Epoch: {epoch+1} | "
              f"train_loss: {train_loss:.4f} | "
              f"train_acc: {train_acc:.4f} | "
              f"test_loss: {test_loss:.4f} | "
              f"test_acc: {test_acc:.4f}"
          )
    
    results["train_loss"].append(train_loss)
    results["train_acc"].append(train_acc)
    results["test_loss"].append(test_loss)
    results["test_acc"].append(test_acc)

  return results

  


In [None]:
'''torch.manual_seed(42)
torch.cuda.manual_seed(42)

NUM_EPOCHS=5

model_0=TinyVGG(input_shape=3, #3 color channels
                hidden_units=10,
                output_shape=len(train_data.classes)).to(device)

loss_fn=nn.CrossEntropyLoss()
optimizer=torch.optim.Adam(params=model_0.parameters(),lr=0.001)


from timeit import default_timer as timer
start_time=timer()



model_0_results=train(model=model_0,
                      train_dataloader=train_dataloader,
                      test_dataloader=test_dataloader,
                      optimizer=optimizer,
                      loss_fn=loss_fn,
                      epochs=NUM_EPOCHS)


end_time = timer()
print(f"Total training time: {end_time-start_time:.3f} seconds")'''



In [None]:
def make_predictions(model: torch.nn.Module,
                      data: list,
                      device: torch.device='cpu'):
   
   
   pred_probs=[]
   model.eval()
   with torch.inference_mode():
     for sample in data:
       # Here we unsqueeze and a single image in order to add a batch dimension
       sample=torch.unsqueeze(sample,dim=0)

       # Forward pass(model outputs raw logits)
       pred_logit=model(sample)

       # Converting these logits to prediction probabilities
       pred_prob=torch.softmax(pred_logit.squeeze(),dim=0)

       # Append to the list
       pred_probs.append(pred_prob)
   return torch.stack(pred_probs)


In [None]:
import random 
#random.seed(42)
test_samples=[]
test_labels=[]

for sample,label in random.sample(list(test_data),k=9):
  test_samples.append(sample)
  test_labels.append(label)

test_samples[0].shape

In [None]:
pred_probs=make_predictions(model=model,data=test_samples)

pred_probs[:2]
pred_classes=pred_probs.argmax(dim=1)

In [None]:
plt.figure(figsize=(9,9))
nrows=3
ncols=3
for i,sample in enumerate(test_samples):
   
   #create subplots
   plt.subplot(nrows,ncols,i+1)

   #plot target image
   plt.imshow(sample.permute(1,2,0))

   #prediction in text format
   pred_label=classes[pred_classes[i]]

   #get the truth label
   truth_label=classes[test_labels[i]]

   # Create the title text of the plot
   title_text = f"Pred: {pred_label} | Truth: {truth_label}"
  
   # Check for equality and change title colour accordingly
   if pred_label == truth_label:
      plt.title(title_text, fontsize=10, c="g") # green text if correct
   else:
      plt.title(title_text, fontsize=10, c="r") # red text if wrong
   plt.axis(False);

