# Chihuahua or Muffin
This notebook demonstrates how we can use Pytorch to work on an image classification problem. It's designed to be run in colab

note: this notebook uses mixed precision to faster the model training. It's modified from chihuahua_muffin_resnet01_simple.
 Changes include
- mixed precision related code
- time function

In [None]:
import torch
from torch import nn
from torch.utils.data import DataLoader, Subset
from torchvision import datasets, transforms, models
from torchvision.transforms import ToTensor

import matplotlib.pyplot as plt
import numpy
import time
# Mixed precision imports (only if CUDA is available)
if torch.cuda.is_available():
    from torch.cuda.amp import autocast, GradScaler
    
    

In [None]:
!python --version

In [None]:
#from google.colab import drive
#drive.mount('/content/drive')



# prepare directory and files in colab

In [None]:

%cd /content
!git clone https://github.com/porrameth/teach_ai.git
!ls
%cd /content/teach_ai
!ls

In [None]:
%pwd
#%cd CodeProj2/02_Learn/pytorch/
#!ls chihuahua_muffin/
!ls chihuahua_muffin_small2/


# dataset and loader

In [None]:

# Define transformations for image preprocessing
transform = transforms.Compose([
    transforms.Resize(255),  # Adjust size as needed  255
    transforms.CenterCrop(224),  # Adjust size as needed 224
    transforms.RandomHorizontalFlip(), # just try
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
TRAIN ='chihuahua_muffin_small2/train'
VAL = 'chihuahua_muffin_small2/val'
TEST = 'chihuahua_muffin_small2/mytest'
# Create dataset
train_data = datasets.ImageFolder(root= TRAIN, transform=transform)
val_data = datasets.ImageFolder(root= VAL ,transform=transform)
test_data = datasets.ImageFolder(root= TEST  ,transform=transform)



In [None]:
#classes = train_data.classes
#classes
val_data.classes
val_data.class_to_idx

In [None]:
val_data.samples[-10:]

In [None]:
batch_size = 128
train_dataloader = DataLoader(train_data, batch_size=batch_size,num_workers=2)
val_dataloader = DataLoader(val_data, batch_size=batch_size,num_workers=2)
test_dataloader = DataLoader(test_data, batch_size=batch_size,num_workers=2)


In [None]:
len(train_dataloader)
val_dataloader.dataset

In [None]:
for X, y in val_dataloader:
    print(f' {X.shape}')
    print(f'shape of y {y.shape} {y.dtype} ')
    break

In [None]:
img, lab = train_data[1]
print(img.shape)
lab
## plot 3 channels image- need to permute channel to the last dimension
img = img/2 +.5
plt.imshow((img.permute(1,2,0) * 255).numpy().astype('uint16'))


In [None]:
# print sample of data
#import warnings
#warnings.filterwarnings('ignore', message='not allowed')
import random
#img , label = train_data[0]
#print(img.shape, label)


labels = [
   "chihuahua","muffin"
]

row,col=3,10
fig, ax = plt.subplots(row,col,figsize=(12,6))
counter=0
size = len(train_data)
print(f'size {size}')
nums = random.sample(range(size),row*col)
#print(f'nums {nums}')
for r in range(row):
    for c in range(col):

        num  = nums[counter]
        img , lab = train_data[num]
        img= img/2+.5
        img_permute = img.permute(1,2,0)
        img_permute = (img_permute*255).numpy().astype('uint16')
        ax[r,c].imshow(img_permute)
        ax[r,c].set_axis_off()

        #title = "("+str(lab)+")"
        ax[r,c].set_title(labels[lab])
        counter+=1


# Model Architecture

we are using a technique called transfer-learning in order to increase accuracy and speed up the training time

In [None]:
# Get cpu, gpu or mps device for training.
device = (
    "cuda"
    if torch.cuda.is_available()
    #else "mps"
    #if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

#Pre-trained ResNet Model
model = models.resnet50(weights=True)
model = model.to(device)

# Freeze Base Model Parameters
for param in model.parameters():
    param.requires_grad = False

# Replace Final Classifier Layer
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 2)  # Binary classification
#print(model)

In [None]:
i = 0
for batch, (xx,yy) in enumerate(val_dataloader):
    print(batch, xx.shape,yy.shape)
    i+=1
    if i == 3:
        break

In [None]:
## Method 2
#loss_fn = nn.BCEWithLogitsLoss()

learning_rate = 1e-3
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)


def train_one_epoch(epoch_index, tb_writer,training_loader,scaler=None):
    running_loss = 0.
    last_loss = 0.
    #print(f'device = {device}')
    # we use enumerate(training_loader) instead of iter(training_loader) so that we can track the batch index
    for i, data in enumerate(training_loader):
        # Every data instance is an input + label pair
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)
        #labels= labels.unsqueeze(1).float() ## FOR BCELoss
        # Zero your gradients for every batch!
        optimizer.zero_grad()
        if scaler:
            with autocast():
                outputs = model(inputs)
                loss = loss_fn(outputs,labels)
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
        else:
            # Make predictions for this batch
            #model = model.to(device)
            outputs = model(inputs)

            # Compute the loss and its gradients
            loss = loss_fn(outputs, labels)
            loss.backward()

            # Adjust weights
            optimizer.step()

        # Gather data and report
        running_loss += loss.item()
        if i % 5 == 0:
            last_loss = running_loss / 5 # loss per batch
            print('  batch {} loss: {}'.format(i + 1, last_loss))
            tb_x = epoch_index * len(training_loader) + i + 1
            tb_writer.add_scalar('Loss/train', last_loss, tb_x)
            running_loss = 0.

    return last_loss



In [17]:
## check cuda
torch.cuda.is_available()

False

In [None]:
from datetime import datetime
from torch.utils.tensorboard import SummaryWriter
# Initializing in a separate cell so we can easily add more epochs to the same run
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
writer = SummaryWriter('runs/fashion_trainer_{}'.format(timestamp))
epoch_number = 0

EPOCHS = 9

best_vloss = 1_000_000.
time_total  = 0
# Mixed precision scaler (only if CUDA is available)
scaler = GradScaler() if torch.cuda.is_available() else None
for epoch in range(EPOCHS):
    print('EPOCH {}:'.format(epoch_number + 1))

    # gradient is on
    model =model.to(device)
    model.train(True)
    ## add timer
    start_time = time.time()
    avg_loss = train_one_epoch(epoch_number, writer,train_dataloader,scaler)


    running_vloss = 0.0
    # Set the model to evaluation mode
    model.eval()

    # Disable gradient computation
    correct=0
    with torch.no_grad():
        for i, vdata in enumerate(val_dataloader):

            vinputs, vlabels = vdata
            vinputs, vlabels = vinputs.to(device), vlabels.to(device)
            #vlabels = vlabels.unsqueeze(1).float() ## For BCELoss
            
            # if scaler:  # If scaler is provided, use mixed precision
            #     with autocast():
            #         outputs = model(inputs)
            #         loss = criterion(outputs, labels)
            # else:  # Fallback to full precision
            #     outputs = model(inputs)
            #     loss = criterion(outputs, labels)
            if scaler:
                with autocast():
                    voutputs = model(vinputs)
                    vloss = loss_fn(voutputs, vlabels)
            else:
                voutputs = model(vinputs)
                vloss = loss_fn(voutputs, vlabels)
            running_vloss += vloss
            #correct += (pred.argmax(1) == y).type(torch.float).sum().item()
            correct += (voutputs.argmax(1) == vlabels).type(torch.float).sum().item()

    ## calculate time for each epoch
    epoch_time = time.time() - start_time
    time_total+= epoch_time
    avg_vloss = running_vloss / (i + 1)
    print(f'--correct = {correct}, validation size ={len(val_dataloader.dataset)}')
    correct /= len(val_dataloader.dataset)
    val_accuracy = 100*correct
    print(f'LOSS train {avg_loss} valid {avg_vloss:>8f}  Val Accuracy {(val_accuracy):>0.4f}% Time: {epoch_time:.2f}')
    #print(f"Val Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {avg_vloss:>8f} \n")


    #test(test_dataloader,model, loss_fn)
    # Log the running loss averaged per batch
    # for both training and validation
    writer.add_scalars('Training vs. Validation Loss',
                    { 'Training' : avg_loss, 'Validation' : avg_vloss },
                    epoch_number + 1)
    writer.flush()

    # Track best loss, and save the model's state
    if avg_vloss < best_vloss:
        best_vloss = avg_vloss
        model_path = 'model_{}_{}_{:.0f}'.format(timestamp, epoch_number,round(val_accuracy,2)*100)
        torch.save(model.state_dict(), model_path)

    epoch_number += 1
print(f'Total training time = {time_total}')

In [None]:
#     time_total+= epoch_time

#     avg_vloss = running_vloss / (i + 1)
#     print(f'--correct = {correct}, validation size ={len(val_dataloader.dataset)}')
#     correct /= len(val_dataloader.dataset)
#     val_accuracy = 100*correct
#     print(f'LOSS train {avg_loss} valid {avg_vloss:>8f}  Val Accuracy {(val_accuracy):>0.4f}% Time: {epoch_time:.2f}')
#     #print(f"Val Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {avg_vloss:>8f} \n")


#     #test(test_dataloader,model, loss_fn)
#     # Log the running loss averaged per batch
#     # for both training and validation
#     writer.add_scalars('Training vs. Validation Loss',
#                     { 'Training' : avg_loss, 'Validation' : avg_vloss },
#                     epoch_number + 1)
#     writer.flush()

#     # Track best loss, and save the model's state
#     if avg_vloss < best_vloss:
#         best_vloss = avg_vloss
#         model_path = 'model_{}_{}_{:.0f}'.format(timestamp, epoch_number,round(val_accuracy,2)*100)
#         torch.save(model.state_dict(), model_path)

#     epoch_number += 1
# print(f'Total training time = {time_total}')

In [None]:
!ls -l

In [None]:
#torch.save(model.state_dict(), "model_muffin_weights-resnet-medium.pth")
#torch.save(model.state_dict(), "model_muffin_weights-resnet-big.pth")
torch.save(model.state_dict(), "model_muffin_weights-resnet-small2.pth")


#torch.save(model.state_dict(), "model_muffin-s.pth")
print("Saved PyTorch Model State to model_muffin_weights-resnet-xxxx.pth")

#Load trained model and use it on test set

In [None]:
!ls -l

In [None]:
#model = Convo().to(device)
model1 = models.resnet50(weights =False)
num_ftrs = model1.fc.in_features
model1.fc = nn.Linear(num_ftrs, 2)
#model1.load_state_dict(torch.load("model_muffin_weights-resnet.pth"))
#model1.load_state_dict(torch.load("model_muffin_weights-resnet-big.pth"))
#model1.load_state_dict(torch.load("model_muffin_weights-resnet-medium.pth"))
model1.load_state_dict(torch.load("model_muffin_weights-resnet-small2.pth"))
#model1.load_state_dict(torch.load("model_20240816_091207_8_8735"))
model1.to(device)
model1.eval()
print("Model loaded")

In [None]:
def sample_incorrect_images(dataset):
    row,col=2,5
    counter=0

    if len(dataset) >= row*col:
        nums = random.sample(range(len(dataset)),row*col)
        fig, ax = plt.subplots(row,col,figsize=(12,6))
        for r in range(row):
            for c in range(col):
                #print(f'counter = {counter}')
                num = nums[counter]
                img , lab, pred = dataset[num]
                img = img.to('cpu')
                img = img/2 +.5
                img_permute = img.permute(1,2,0)
                img_permute = (img_permute*255).numpy().astype('uint16')
                ax[r,c].imshow(img_permute)
                ax[r,c].set_axis_off()

                #title = "("+str(lab)+")"
                ax[r,c].set_title(f'Pred:{labels[pred]}\nActual:{labels[lab]}')
                counter+=1
    elif len(dataset) == 1:
        img , lab, pred = dataset[0]
        img = img.to('cpu')
        img = img/2 +.5
        img_permute = img.permute(1,2,0)
        img_permute = (img_permute*255).numpy().astype('uint16')
        plt.imshow(img_permute)
        plt.suptitle(f'Pred:{labels[pred]}\nActual:{labels[lab]}')
        plt.axis('off')
    else:
        print(f'Else: length of incorrect dataset = {len(dataset)}')
        fig, ax = plt.subplots(len(dataset),figsize=(15,8))
        for idx, data in enumerate(dataset):
            #print(f'idx={idx}')
            img, lab, pred = data
            img = img.to('cpu')
            img = img/2 +.5
            img_permute = img.permute(1,2,0)
            img_permute = (img_permute*255).numpy().astype('uint16')
            ax[idx].imshow(img_permute)
            ax[idx].set_axis_off()
                #title = "("+str(lab)+")"
            ax[idx].set_title(f'Pred:{labels[pred]}\nActual:{labels[lab]}')




def test_testset(model=model1, dataloader=test_dataloader):
    model.eval()
    test_loss = 0
    correct = 0
    incorrect_preds = []
    with torch.no_grad():
        for data, target in dataloader:
            data, target = data.to(device), target.to(device)
            #target = target.unsqueeze(1).float() ## for BCELoss
            output = model(data)
            #output = output.unsqueeze(1).float()
            test_loss += loss_fn(output, target).item()
            pred = output.argmax(dim=1, keepdim=True)
            # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()
            for idx in range(len(data)):
                if pred[idx] != target[idx]:
                    incorrect_preds.append((data[idx], target[idx], pred[idx]))



    test_loss /= len(dataloader.dataset)

    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(dataloader.dataset),
        100. * correct / len(dataloader.dataset)))
    if len(incorrect_preds) > 0:
      sample_incorrect_images(incorrect_preds)

In [None]:
#test_testset(model=model)

In [None]:
test_testset()

In [None]:
test_data[0][0].shape

In [None]:
test_data[0][1]