In [1]:
# import libraries 
import os
import numpy as np
import matplotlib.pyplot as plt

import torch
from torch.utils.data import DataLoader, random_split
from torchvision import datasets, transforms
import torch.nn as nn
import torchvision.models as models

from timeit import default_timer as timer 
from tqdm.auto import tqdm
import matplotlib.pyplot as plt

import PIL

In [2]:
data_dir = 'G:/My Drive/ICS504/github/ICS-504/download_images'

# Write transform for image
data_transform = transforms.Compose([
    transforms.Resize(size=(64, 64)),# Resize the images to 64x64
    transforms.ToTensor() # Turn the image into a torch.Tensor
])

# loading training data
train_data = datasets.ImageFolder(root=data_dir, # target folder of images
                                  transform=data_transform, # transforms to perform on data (images)
                                  target_transform=None) # transforms to perform on labels (if necessary)

# loading test data
# test_data = datasets.ImageFolder(root=data_dir, 
#                                  transform=data_transform)

print(f"Train data:\n{train_data}")

# Get class names as a list
class_dict = train_data.class_to_idx
class_dict

Train data:
Dataset ImageFolder
    Number of datapoints: 6885
    Root location: G:/My Drive/ICS504/github/ICS-504/download_images
    StandardTransform
Transform: Compose(
               Resize(size=(64, 64), interpolation=bilinear, max_size=None, antialias=None)
               ToTensor()
           )


{'0000000000000000': 0,
 '0000000000000010': 1,
 '0000000000000100': 2,
 '0000000000000110': 3,
 '0000000000100000': 4,
 '0000000000100010': 5,
 '0000000000100100': 6,
 '0000000000100110': 7,
 '0000000001000000': 8,
 '0000000001000010': 9,
 '0000000001000100': 10,
 '0000000001100000': 11,
 '0000000001100010': 12,
 '0000001000000000': 13,
 '0000001000000010': 14,
 '0000001000000100': 15,
 '0000001000100000': 16,
 '0000001000100010': 17,
 '0010000000000000': 18,
 '0010000000000010': 19,
 '0010000000000100': 20,
 '0010000000000110': 21,
 '0010000000100000': 22,
 '0010000000100010': 23,
 '0010000000100110': 24,
 '0010000001000000': 25,
 '0010000001000100': 26,
 '0010000001100000': 27,
 '0010001000000000': 28,
 '0010001000000010': 29,
 '0010001000000100': 30,
 '0010001000000110': 31,
 '0010001000100000': 32,
 '0010001000100010': 33,
 '0010001001000000': 34,
 '0010001001000100': 35,
 '0010001001100000': 36,
 '0100000000000000': 37,
 '0100000000000010': 38,
 '0100000000000100': 39,
 '01000000

In [3]:
# ## zero centering
mean = 0.
std = 0.
for images, _ in train_data:
    mean += images.mean(dim=(1,2))
    std += images.std(dim=(1,2))

# Compute the mean and standard deviation of the pixel values for each channel
mean /= len(train_data)
std /= len(train_data)


# mean = [0.5644, 0.5995, 0.6298]
# std  = [0.1767, 0.1662, 0.2231]

print("train mean", mean)
print('train std', std)



KeyboardInterrupt



In [None]:
data_transform = transforms.Compose([
    transforms.Resize(size=(255, 255)),# Resize the images to 64x64
    transforms.ToTensor() # Turn the image into a torch.Tensor
    transforms.Normalize(
        mean=mean,
        std=std
    )
])

# loading training data
train_data = datasets.ImageFolder(root=data_dir, # target folder of images
                                  transform=data_transform, # transforms to perform on data (images)
                                  target_transform=None) # transforms to perform on labels (if necessary)


# Split the dataset into training and validation sets
train_subdata, val_subdata = random_split(train_data, [int(len(train_data)*0.8), len(train_data)-int(len(train_data)*0.8)])

# Setup batch size and number of workers 
BATCH_SIZE = 32
NUM_WORKERS = os.cpu_count()
print(f"Creating DataLoader's with batch size {BATCH_SIZE} and {NUM_WORKERS} workers.")

# Create DataLoader's
train_dataloader = DataLoader(train_subdata, 
                                     batch_size=BATCH_SIZE, 
                                     shuffle=True, 
                                     num_workers=NUM_WORKERS)

val_dataloader = DataLoader(val_subdata, 
                                     batch_size=BATCH_SIZE, 
                                     shuffle=True, 
                                     num_workers=NUM_WORKERS)

# test_dataloader = DataLoader(test_data, 
#                                     batch_size=BATCH_SIZE, 
#                                     shuffle=False, 
#                                     num_workers=NUM_WORKERS)

train_dataloader, val_dataloader#test_dataloader, 

In [None]:
# Get class names as a list
class_dict = train_data.class_to_idx
class_dict

In [None]:
class CNNmodel(nn.Module):
    
    def __init__(self, input_shape: int, output_shape: int) -> None:
        super().__init__()
        #1st block CNN with 32 filters size 7x7 strid 1 no padding with relu activation function
        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False),
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False),
        )
        
        
        #7th block fully connected NN 
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features=25088, out_features=496, bias=True),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5, inplace=False),
            nn.Linear(in_features=4096, out_features=4096, bias=True),
            nn.Linear(in_features=4096, out_features=1024, bias=True),
            nn.ReLU(),
            nn.Linear(in_features=1024,out_features=output_shape)
        )
        
        #8th block is included in the cross entropy 
    
    def forward(self, x: torch.Tensor):
        x = self.features(x)
        # print(x.shape)
        x = self.classifier(x)
        # print(x.shape)
        return x
        


torch.manual_seed(42)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CNNmodel(input_shape=3, # number of color channels (3 for RGB) 
                  output_shape=len(train_data.classes)).to(device)
model

In [None]:
# 1. Get a batch of images and labels from the DataLoader
img_batch, label_batch = next(iter(train_dataloader))

# 2. Get a single image from the batch and unsqueeze the image so its shape fits the model
img_single, label_single = img_batch[0].unsqueeze(dim=0), label_batch[0]
print(f"Single image shape: {img_single.shape}\n")

# 3. Perform a forward pass on a single image
model.eval()
with torch.inference_mode():
    pred = model(img_single.to(device))
    
# 4. Print out what's happening and convert model logits -> pred probs -> pred label
print(f"Output logits:\n{pred}\n")
print(f"Output prediction probabilities:\n{torch.softmax(pred, dim=1)}\n")
print(f"Output prediction label:\n{torch.argmax(torch.softmax(pred, dim=1), dim=1)}\n")
print(f"Actual label:\n{label_single}")

In [None]:
def train_step(model: torch.nn.Module, 
               dataloader: torch.utils.data.DataLoader, 
               loss_fn: torch.nn.Module, 
               optimizer: torch.optim.Optimizer):
    # Put model in train mode
    model.train()
    
    # Setup train loss and train accuracy values
    train_loss, train_acc = 0, 0
    
    # Loop through data loader data batches
    i = 1
    for batch, (X, y) in enumerate(dataloader):
        i = i+1
        # Send data to target device
        X, y = X.to(device), y.to(device)

        # 1. Forward pass
        y_pred = model(X)

        # 2. Calculate  and accumulate loss
        loss = loss_fn(y_pred, y)
        train_loss += loss.item() 

        # 3. Optimizer zero grad
        optimizer.zero_grad()

        # 4. Loss backward
        loss.backward()

        # 5. Optimizer step
        optimizer.step()

        # Calculate and accumulate accuracy metric across all batches
        y_pred_class = torch.argmax(torch.softmax(y_pred, dim=1), dim=1)
        train_acc += (y_pred_class == y).sum().item()/len(y_pred)

      
    # Adjust metrics to get average loss and accuracy per batch 
    train_loss = train_loss / len(dataloader)
    train_acc = train_acc / len(dataloader)
    return train_loss, train_acc

In [None]:
def test_step(model: torch.nn.Module, 
              dataloader: torch.utils.data.DataLoader, 
              loss_fn: torch.nn.Module):
    # Put model in eval mode
    model.eval() 
    
    # Setup test loss and test accuracy values
    test_loss, test_acc = 0, 0
    
    # Turn on inference context manager
    with torch.inference_mode():
        # Loop through DataLoader batches
        for batch, (X, y) in enumerate(dataloader):
            # Send data to target device
            X, y = X.to(device), y.to(device)
    
            # 1. Forward pass
            test_pred_logits = model(X)

            # 2. Calculate and accumulate loss
            loss = loss_fn(test_pred_logits, y)
            test_loss += loss.item()
            
            # Calculate and accumulate accuracy
            test_pred_labels = test_pred_logits.argmax(dim=1)
            test_acc += ((test_pred_labels == y).sum().item()/len(test_pred_labels))
            
    # Adjust metrics to get average loss and accuracy per batch 
    test_loss = test_loss / len(dataloader)
    test_acc = test_acc / len(dataloader)
    return test_loss, test_acc

In [None]:
# 1. Take in various parameters required for training and test steps
def train(model: torch.nn.Module, 
          train_dataloader: torch.utils.data.DataLoader, 
          test_dataloader: torch.utils.data.DataLoader, 
          optimizer: torch.optim.Optimizer,
          loss_fn: torch.nn.Module = nn.CrossEntropyLoss(),
          epochs: int = 5,
          early_stop=0):
    
    # 2. Create empty results dictionary
    results = {"train_loss": [],
        "train_acc": [],
        "test_loss": [],
        "test_acc": []
    }
    
    # 3. Loop through training and testing steps for a number of epochs
    past_val_loss=999999999
    loss_count = 1
    for epoch in tqdm(range(epochs)):
        train_loss, train_acc = train_step(model=model,
                                           dataloader=train_dataloader,
                                           loss_fn=loss_fn,
                                           optimizer=optimizer)
        test_loss, test_acc = test_step(model=model,
            dataloader=test_dataloader,
            loss_fn=loss_fn)
        
        # 4. Print out what's happening
        lr = 0.
        weight_decay = 0.
        for param_group in optimizer.param_groups:
            lr = param_group['lr']
            weight_decay = param_group['weight_decay']
        print(
            f"Epoch: {epoch+1} | "
            f"lr: {lr:.4f} | "
            f"weight_decay: {weight_decay:.4f} | "
            f"train_loss: {train_loss:.4f} | "
            f"train_acc: {train_acc:.4f} | "
            f"test_loss: {test_loss:.4f} | "
            f"test_acc: {test_acc:.4f}"
        )

        # 5. Update results dictionary
        results["train_loss"].append(train_loss)
        results["train_acc"].append(train_acc)
        results["test_loss"].append(test_loss)
        results["test_acc"].append(test_acc)

        if(past_val_loss < test_loss):
          loss_count = loss_count + 1

        if(loss_count == early_stop):
          print('early stop')
          break #early stop
        else:
          loss_count = 1
        past_val_loss = test_loss

    # 6. Return the filled results at the end of the epochs
    return results

In [None]:
# Set random seeds
torch.manual_seed(42) 
torch.cuda.manual_seed(42)

# Set number of epochs
NUM_EPOCHS = 61

# Recreate an instance of the model
model = CNNmodel(input_shape=3, # number of color channels (3 for RGB) 
                  output_shape=len(train_data.classes)).to(device)

# Setup loss function and optimizer
loss_fn = nn.CrossEntropyLoss() #include softmax
adam_optimizer = torch.optim.Adam(params=model.parameters(), lr=0.001,weight_decay=0)

# Start the timer
start_time = timer()

# Train model_0 
model_results = train(model=model, 
                        train_dataloader=train_dataloader,
                        test_dataloader=val_dataloader,
                        optimizer=adam_optimizer,
                        loss_fn=loss_fn, 
                        epochs=NUM_EPOCHS)

# End the timer and print out how long it took
end_time = timer()
print(f"Total training time: {end_time-start_time:.3f} seconds")

model_results

In [None]:
from __future__ import print_function, division

import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import matplotlib.pyplot as plt
import time
import os
import copy
from collections import defaultdict


model_ft = models.resnet18(pretrained=True)

model_ft
# num_ftrs = model_ft.fc.in_features
# model_ft.fc = nn.Linear(num_ftrs, 5) # last arg here, # classes? -gw

# model_ft = model_ft.to(device)

# criterion = nn.CrossEntropyLoss()

# # Observe that all parameters are being optimized
# optimizer_ft = optim.SGD(model_ft.parameters(), lr=0.001, momentum=0.9)

# # Decay LR by a factor of 0.1 every 7 epochs
# exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)