<a href="https://colab.research.google.com/github/kelvin17/ml-project-notebook/blob/main/Yanqing_Project_1_hotdog.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## 0. Exercise 1.4 Hotdog -- no hotdog
This is the poster hand-in project for the course. Please see the associated PDF for instructions.

In [None]:
import os
import time
import numpy as np
import glob
import PIL.Image as Image
from tqdm.notebook import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import models
import torchvision.datasets as datasets
from torch.utils.data import DataLoader, random_split
import torchvision.transforms as transforms
import matplotlib.pyplot as plt

## todo check
from torchvision import transforms, datasets

We always check that we are running on a GPU

In [None]:
import torch
if torch.cuda.is_available():
    print("The code will run on GPU.")
else:
    print("The code will run on CPU. Go to Edit->Notebook Settings and choose GPU as the hardware accelerator")
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

We provide you with a class that can load the *hotdog/not hotdog* dataset you should use from /dtu/datasets1/02516/

In [None]:
class Hotdog_NotHotdog(torch.utils.data.Dataset):
    def __init__(self, train, transform, data_path='/dtu/datasets1/02516/hotdog_nothotdog'):
        'Initialization'
        self.transform = transform
        data_path = os.path.join(data_path, 'train' if train else 'test')
        image_classes = [os.path.split(d)[1] for d in glob.glob(data_path +'/*') if os.path.isdir(d)]
        image_classes.sort()
        self.name_to_label = {c: id for id, c in enumerate(image_classes)} # result: {"hotdog:0, nonhotdog:1"}; enumerate(image_classes) result is (0,'hotdog'), (1,'nothotdog')
        self.image_paths = glob.glob(data_path + '/*/*.jpg') # result is a list containing all the image-path ".../train/hotdog/img1.jpg"

    def __len__(self):
        'Returns the total number of samples'
        return len(self.image_paths)

    def __getitem__(self, idx):
        'Generates one sample of data'
        image_path = self.image_paths[idx]

        image = Image.open(image_path) # Image lib open the image
        c = os.path.split(os.path.split(image_path)[0])[1]
        # os.path.split 专门用来拆最后一个分隔符的。[0] 是最后一个分隔符前半段；[1]是后半段
        # image_path = ".../train/hotdog/img1.jpg"
        #     内部取前半段：                             os.path.split(image_path)[0] => .../train/hotdog
        #     外部取内部再次分隔的后半段：   os.path.split(os.path.split(image_path)[0])[1]  => hotdog

        y = self.name_to_label[c]
        X = self.transform(image)
        return X, y

Below is the simple way of converting the images to something that can be fed through a network.
Feel free to use something other than $128\times128$ images.

In [None]:
from google.colab import drive
drive.mount('/content/drive')
data_path = '/content/drive/MyDrive/hotdog_nothotdog/'
model_save_path = '/content/drive/MyDrive/model_output/hotdog_nothotdog/'


In [None]:
size = 128
train_transform = transforms.Compose([transforms.Resize((size, size)),
                                    transforms.ToTensor()])
test_transform = transforms.Compose([transforms.Resize((size, size)),
                                    transforms.ToTensor()])

batch_size = 64
trainset = Hotdog_NotHotdog(train=True, transform=train_transform, data_path=data_path)
train_loader = DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=3)
testset = Hotdog_NotHotdog(train=False, transform=test_transform, data_path=data_path)
test_loader = DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=3)

In [None]:
num_test_images = len(test_loader.dataset)
print(f'Test set has {num_test_images} images')

num_train_images = len(train_loader.dataset)
print(f'Train set has {num_train_images} images')

Let's look at some images from our data

In [None]:
# images, labels = next(iter(train_loader))
# print(images.shape)
# print(labels.shape)

for images, labels in train_loader:
  print(type(images))
  print(images.dtype)
  print(images.shape)
  print(images.size())
  # print(images)
  print(type(labels))
  print(labels.dtype)
  print(labels.shape)
  print(labels.size())
  break

In [None]:
# images, labels = next(iter(train_loader))
plt.figure(figsize=(20,10)) # create a huge figure

for i in range(21):
    plt.subplot(5,7,i+1) # create 5row*7column subplot. the index of location is i+1
    plt.imshow(np.swapaxes(np.swapaxes(images[i].numpy(), 0, 2), 0, 1)) # swap axes : PyTorch.image [C,H,W] plt need [H,W,C]. so we need to do swap
    plt.title(['hotdog', 'not hotdog'][labels[i].item()])
    plt.axis('off')


Now create a model and train it!


# 1. function def

In [None]:
## 1. calculate accuracy and loss
def evaluate(model, loader, device, loss_func):
  model.eval()
  running_loss = 0.0
  correct, total = 0,0
  with torch.no_grad():
    for imgs, labels in loader:
      imgs, labels = imgs.to(device), labels.to(device)
      outputs = model(imgs)
      loss = loss_func(outputs, labels) # return 0-dim tensor; .item() will return a scale
      running_loss += loss.item() * imgs.size(0)
      pred = outputs.argmax(1)
      correct += (pred == labels).sum().item()
      total += labels.size(0)
  return 100 * correct / total, running_loss / total

## 3. show some figures of the result
def show_result(model, loader, classes, device):
    import matplotlib.pyplot as plt
    import numpy as np

    model.eval()

    data_iter = iter(loader)
    images, labels = next(data_iter)
    images, labels = images.to(device), labels.to(device)

    with torch.no_grad():
        outputs = model(images)
        probs = F.softmax(outputs, dim=1)
        _, pred = torch.max(outputs, 1)

    fig = plt.figure(figsize=(15, 5))

    for i in range(10):
      ax = fig.add_subplot(2, 5, i+1, xticks=[], yticks=[])
      # because the image may be on GPU
      ax.imshow(images[i].permute(1, 2, 0).cpu().numpy())
      pred_class = classes[pred[i]]
      true_class = classes[labels[i]]
      pred_prob = probs[i][pred[i]].item()*100

      color = 'green' if pred_class == true_class else 'red'

      ax.set_title(f"Predicted: {pred_class}\nActual: {true_class}\n prob: {pred_prob:.1f}%", fontsize=10, color=color)
      ax.axis('off')

    plt.show()

def show_result_random(model, loader, classes, device, num_images=10):
    import matplotlib.pyplot as plt
    import numpy as np
    import torch.nn.functional as F
    import torch

    model.eval()

    # 随机选择一个 batch
    data_iter = iter(loader)
    images, labels = next(data_iter)
    images, labels = images.to(device), labels.to(device)

    batch_size = images.size(0)
    num_images = min(num_images, batch_size)  # 防止 batch 小于 10

    # 从 batch 中随机选择 num_images 张
    random_indices = np.random.choice(batch_size, num_images, replace=False)
    images = images[random_indices]
    labels = labels[random_indices]

    with torch.no_grad():
        outputs = model(images)
        probs = F.softmax(outputs, dim=1)
        _, pred = torch.max(outputs, 1)

    fig = plt.figure(figsize=(15, 5))

    for i in range(num_images):
        ax = fig.add_subplot(2, (num_images + 1) // 2, i+1, xticks=[], yticks=[])
        ax.imshow(images[i].permute(1, 2, 0).cpu().numpy())
        pred_class = classes[pred[i]]
        true_class = classes[labels[i]]
        pred_prob = probs[i][pred[i]].item()*100

        color = 'green' if pred_class == true_class else 'red'
        ax.set_title(f"Predicted: {pred_class}\nActual: {true_class}\nprob: {pred_prob:.1f}%", fontsize=10, color=color)
        ax.axis('off')

    plt.show()


In [None]:
class EarlyStopping:
    def __init__(self, patience=10, delta=0.0, mode="min", path="checkpoint.pt"):
        """
        patience: how many epoch the metric no improved will stop
        delta: the mini improved threadhold
        mode: min or max - min: the lower the better, e.g the loss; max: the higher the better, e.g the accuracy or the AUC
        path: where to save the model
        """
        self.patience = patience
        self.delta = delta
        self.mode = mode
        self.path = path
        self.best_score = None
        self.counter = 0
        self.early_stop = False

    def __call__(self, metric, model):
        score = -metric if self.mode == "min" else metric
        if self.best_score is None:
          self.best_score = score
          self.save_checkpoint(model)
        elif score < self.best_score + self.delta:
          self.counter += 1
          if self.counter >= self.patience:
            self.early_stop = True
            print(f"Early stopping, The best score is {self.best_score}")
        else:
          self.best_score = score
          self.save_checkpoint(model)
          self.counter = 0

    def save_checkpoint(self, model):
        torch.save(model.state_dict(), self.path)

In [None]:
def plot_curve(train_losses, train_accs, test_losses, test_accs, total_epochs, plt_path = None):
    plt.figure(figsize=(12,5))
    plt.subplot(1,2,1)
    plt.plot(range(1,total_epochs+1), train_losses, label='Train Loss')
    plt.plot(range(1,total_epochs+1), test_losses, label='Val Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.title('Loss Curve')

    plt.subplot(1,2,2)
    plt.plot(range(1,total_epochs+1), train_accs, label='Train Acc')
    plt.plot(range(1,total_epochs+1), test_accs, label='Val Acc')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.title('Accuracy Curve')

    if plt_path is not None:
        plt.savefig(plt_path, dpi=300, bbox_inches='tight')

    plt.show()

def train(model, epochs_num, train_loader, val_loader, device, optimizer, loss_fc, scheduler = None, early_stopping = None, is_plot_curve = False, model_path = None, plt_path=None):
    train_losses, train_accs = [], []
    val_losses, val_accs = [], []
    for epoch in range(epochs_num):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        start = time.time()

        for imgs, labels in train_loader:
            imgs, labels = imgs.to(device), labels.to(device)
            optimizer.zero_grad() # location: must be by followed by backward. recommand to the begining of every loop
            outputs = model(imgs)

            loss = loss_fc(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * imgs.size(0)
            _, pred = outputs.max(1)
            correct += (pred == labels).sum().item()
            total += labels.size(0)

        train_loss = running_loss / len(trainset)
        train_acc = correct / total * 100
        train_losses.append(train_loss)
        train_accs.append(train_acc)

        val_acc, val_loss = evaluate(model, val_loader, device, loss_fc)
        val_losses.append(val_loss)
        val_accs.append(val_acc)

        if scheduler is not None:
           scheduler.step()

        elapsed = time.time() - start
        print(f"Epoch {epoch}/{epochs_num}, Train Loss: {train_loss:.4f},Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%, train_acc: {train_acc:.2f}%, Time: {elapsed:.2f}s")

        total_runned_epochs = epoch+1

        if early_stopping is not None:
           early_stopping(val_acc, model)
           if early_stopping.early_stop:
              break
        else:
           if (total_runned_epochs == epochs_num) and (model_path is not None):
              torch.save(model.state_dict(), model_path)

    if is_plot_curve:
       plot_curve(train_losses, train_accs, val_losses, val_accs, total_runned_epochs, plt_path)

# 2. Training


## 2.1 Small CNN
Performance - 80.5%

In [None]:
## small CNN
class SmallCNN(nn.Module):
  def __init__(self):
    super(SmallCNN, self).__init__()
    self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1) # keep the size
    self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1) # keep the size
    self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1) # keep the size
    self.conv4 = nn.Conv2d(128, 256, kernel_size=3, padding=1) # keep the size
    self.pool = nn.MaxPool2d(2, 2)
    self.fc1 = nn.Linear(256 * 8 * 8, 128)
    self.fc2 = nn.Linear(128, 2)

  def forward(self, x):
    x = self.pool(F.relu(self.conv1(x)))
    x = self.pool(F.relu(self.conv2(x)))
    x = self.pool(F.relu(self.conv3(x)))
    x = self.pool(F.relu(self.conv4(x)))
    x = x.view(-1, 256 * 8 * 8)
    x = F.relu(self.fc1(x))
    x = self.fc2(x)
    return x

In [None]:
# size = 128
batch_size = 64
epochs_num = 50

transform_train = transforms.Compose([
    transforms.RandomResizedCrop(128, scale=(0.6, 1.0)),
    transforms.RandomAffine(degrees=10, translate=(0.1, 0.1), scale=(0.9, 1.1)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225]),
])

transform_test = transforms.Compose([
    transforms.Resize((128,128)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225]),
])

trainset = Hotdog_NotHotdog(train=True, transform=transform_train, data_path=data_path)

val_ratio = 0.2
total_size = len(trainset)
val_size = int(total_size * val_ratio)
train_size = total_size - val_size

trainset, valset = random_split(trainset, [train_size, val_size], generator=torch.Generator().manual_seed(42))

train_loader = DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=3)
val_loader = DataLoader(valset, batch_size=batch_size, shuffle=True, num_workers=3)

testset = Hotdog_NotHotdog(train=False, transform=transform_test, data_path=data_path)
test_loader = DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=3)

In [None]:
# training processing
model = SmallCNN().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=3e-4)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.5)

model_path = model_save_path+'/best_small_model.pth'
early_stopping = EarlyStopping(patience=10, mode="max", path=model_path)
plt_path = model_save_path+'/small_model.png'

train(model, epochs_num, train_loader, val_loader, device, optimizer, criterion, scheduler, early_stopping, is_plot_curve = True, plt_path=plt_path)

# Evaluation on Test
best_model = SmallCNN()
best_model.load_state_dict(torch.load(model_path))
best_model.to(device)

test_acc, _ = evaluate(best_model, test_loader, device, criterion)
print(f"Test Accuracy: {test_acc:.2f}%, according model saved to {model_path}")

In [None]:
classes = list(testset.name_to_label.keys())
show_result(model, test_loader, classes, device)

## 2.2 Optimized Small CNN
Performance - 81.63%
1. Optimized Model layer
  - using batch normalization

In [None]:
## 2. Improved CNN model
batch_size = 64
epochs_num = 50

# -------------------------------
# imporved small CNN Definition
# -------------------------------
class OptimizedSmallCNN(nn.Module):
    def __init__(self):
        super(OptimizedSmallCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, 3, padding=1)
        self.bn1 = nn.BatchNorm2d(32, momentum=0.01)
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.bn2 = nn.BatchNorm2d(64, momentum=0.01)
        self.conv3 = nn.Conv2d(64, 128, 3, padding=1)
        self.bn3 = nn.BatchNorm2d(128, momentum=0.01)
        self.conv4 = nn.Conv2d(128, 256, 3, padding=1)
        self.bn4 = nn.BatchNorm2d(256, momentum=0.01)
        self.pool = nn.MaxPool2d(2,2)
        self.fc1 = nn.Linear(256*8*8, 128)
        self.fc2 = nn.Linear(128, 2)

    def forward(self, x):
        x = self.pool(F.relu(self.bn1(self.conv1(x))))
        x = self.pool(F.relu(self.bn2(self.conv2(x))))
        x = self.pool(F.relu(self.bn3(self.conv3(x))))
        x = self.pool(F.relu(self.bn4(self.conv4(x))))
        x = x.view(-1, 256*8*8)
        x = F.relu(self.fc1(x))
        # x = self.dropout(x)
        x = self.fc2(x)
        return x

# -------------------------------
# Augmentation and Normalize
# -------------------------------
transform_train = transforms.Compose([
    transforms.RandomResizedCrop(128, scale=(0.6, 1.0)),
    transforms.RandomAffine(degrees=10, translate=(0.1, 0.1), scale=(0.9, 1.1)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225]),
])

transform_test = transforms.Compose([
    transforms.Resize((128,128)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225]),
])

transform = transforms.Compose([
    transforms.Resize((128,128)),
    transforms.ToTensor(),
])

trainset = Hotdog_NotHotdog(train=True, transform=transform_train, data_path=data_path)
# trainset = Hotdog_NotHotdog(train=True, transform=transform, data_path=data_path)

val_ratio = 0.2
total_size = len(trainset)
val_size = int(total_size * val_ratio)
train_size = total_size - val_size


trainset, valset = random_split(trainset, [train_size, val_size], generator=torch.Generator().manual_seed(42))

train_loader = DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=3)
val_loader = DataLoader(valset, batch_size=batch_size, shuffle=True, num_workers=3)

testset = Hotdog_NotHotdog(train=False, transform=transform_test, data_path=data_path)
# testset = Hotdog_NotHotdog(train=False, transform=transform, data_path=data_path)
test_loader = DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=3)


# -------------------------------
# instantiate loss function, optimizer and learn rate scheduler
# -------------------------------
model = OptimizedSmallCNN().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=3e-4)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.5)

# -------------------------------
# training loop
# -------------------------------
model_path = model_save_path+'/best_optimized_model.pth'
early_stopping = EarlyStopping(patience=10, mode="max", path=model_path)
plt_path = model_save_path+'/optimized_model.png'

train(model, epochs_num, train_loader, val_loader, device, optimizer, criterion, scheduler, early_stopping, is_plot_curve = True, plt_path=plt_path)

# Evaluation on Test
best_model = OptimizedSmallCNN()
best_model.load_state_dict(torch.load(model_path))
best_model.to(device)

test_acc, _ = evaluate(best_model, test_loader, device, criterion)
print(f"Test Accuracy: {test_acc:.2f}%, according model saved to {model_path}")

In [None]:
classes = list(testset.name_to_label.keys())
show_result(model, test_loader, classes, device)

## 2.3 Transfer learning on resNet18
Performence - 92.64% on test
1. Fine tune resNet18(layer4+new fc for 2-classes)
2. Data Augmentation
3. scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)
4. save the best test-accuracry parameters
5. loss and accuracy curve of training and testing

In [None]:
epochs_num = 50
batch_size = 64
tune_block4 = True
# -------------------------------
# 1. Load pre-trained model and setting fine tune
# -------------------------------
# 1.1 load pre-trained model
model = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)

# 1.2 keep all the parameters except the last layer's
for param in model.parameters():
  param.requires_grad = False

# 1.3 replace the last fc layer for 2-classifier
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 2) # new 2-class fc ; default model.fc.requires_grad = True

# 1.4 set fine tune - layer4
if tune_block4:
  for name, param in model.named_parameters():
      if name.startswith("layer4"):
          param.requires_grad = True

model = model.to(device)

# -------------------------------
# 2. Augmentation and Normalize
# -------------------------------
transform_train = transform_train = transforms.Compose([
    transforms.RandomResizedCrop(128, scale=(0.6, 1.0)),
    transforms.RandomAffine(degrees=10, translate=(0.1, 0.1), scale=(0.9, 1.1)),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    transforms.RandomErasing(p=0.5)
])

transform_test = transforms.Compose([
    transforms.Resize((128,128)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225]),
])


trainset = Hotdog_NotHotdog(train=True, transform=transform_train, data_path=data_path)

val_ratio = 0.2
total_size = len(trainset)
val_size = int(total_size * val_ratio)
train_size = total_size - val_size

trainset, valset = random_split(trainset, [train_size, val_size], generator=torch.Generator().manual_seed(42))

train_loader = DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=3)
val_loader = DataLoader(valset, batch_size=batch_size, shuffle=True, num_workers=3)

testset = Hotdog_NotHotdog(train=False, transform=transform_test, data_path=data_path)
test_loader = DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=3)

In [None]:
# -------------------------------
# 3. Loss function & Optimizer for fine tune & learning scheduler
# -------------------------------
criterion = nn.CrossEntropyLoss()
if tune_block4:
  optimizer = optim.AdamW([
      {'params': model.layer4.parameters(), 'lr': 1e-4},
      {'params': model.fc.parameters(), 'lr': 3e-4}
  ], weight_decay=1e-4)
else:
  optimizer = optim.AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=3e-4, weight_decay=1e-4)


scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.5)

# -------------------------------
# 4. Train loop
# -------------------------------
if tune_block4:
  model_path = model_save_path+'/best_resnet18_model-b4-fc.pth'
  plt_path = model_save_path+'/resnet18_model-b4-fc.png'
else:
  model_path = model_save_path+'/best_resnet18_model-fc.pth'
  plt_path = model_save_path+'/resnet18_model-fc.png'

early_stopping = EarlyStopping(patience=10, mode="max", path=model_path)

train(model, epochs_num, train_loader, val_loader, device, optimizer, criterion, scheduler, early_stopping, is_plot_curve = True, plt_path=plt_path)

# 5. test_acc - reload the best model
best_model = models.resnet18()
num_ftrs = best_model.fc.in_features
best_model.fc = torch.nn.Linear(num_ftrs, 2)  # Assuming binary classification: Hot Dog or Not
best_model.load_state_dict(torch.load(model_path))
best_model.to(device)

test_acc, _ = evaluate(best_model, test_loader, device, criterion)
print(f"Test Accuracy: {test_acc:.2f}%, according model saved to {model_path}")

In [None]:
def show_wrong_predictions_limited(model, loader, classes, device, num_wrong=10):
    import matplotlib.pyplot as plt
    import torch
    import torch.nn.functional as F
    import math

    model.eval()

    wrong_images = []
    wrong_labels = []
    wrong_preds = []
    wrong_probs = []

    with torch.no_grad():
        for images, labels in loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            probs = F.softmax(outputs, dim=1)
            _, preds = torch.max(outputs, 1)

            # 找到错误分类
            mask = preds != labels
            if mask.any():
                wrong_images.append(images[mask])
                wrong_labels.append(labels[mask])
                wrong_preds.append(preds[mask])
                wrong_probs.append(probs[mask])

            # 检查是否已经收集够 num_wrong 个
            total_wrong = sum([x.size(0) for x in wrong_images])
            if total_wrong >= num_wrong:
                break

    if len(wrong_images) == 0:
        print("没有错误预测的样本！")
        return

    # 合并 batch 并只保留前 num_wrong 个
    wrong_images = torch.cat(wrong_images)[:num_wrong]
    wrong_labels = torch.cat(wrong_labels)[:num_wrong]
    wrong_preds = torch.cat(wrong_preds)[:num_wrong]
    wrong_probs = torch.cat(wrong_probs)[:num_wrong]

    # 可视化：多行，每行最多5张
    num_cols = 5
    num_rows = math.ceil(len(wrong_images) / num_cols)
    fig = plt.figure(figsize=(num_cols * 4, num_rows * 4))  # 调整 figsize 适应行列

    for i in range(len(wrong_images)):
        row = i // num_cols
        col = i % num_cols
        ax = fig.add_subplot(num_rows, num_cols, i+1, xticks=[], yticks=[])
        ax.imshow(wrong_images[i].permute(1, 2, 0).cpu().numpy())
        pred_class = classes[wrong_preds[i]]
        true_class = classes[wrong_labels[i]]
        pred_prob = wrong_probs[i][wrong_preds[i]].item()*100
        ax.set_title(f"Predicted: {pred_class}\nActual: {true_class}\nprob: {pred_prob:.1f}%", fontsize=10, color='red')
        ax.axis('off')

    plt.tight_layout()
    plt.show()


In [None]:
model_path = model_save_path+'/best_resnet18_model-b4-fc.pth' # This is the best one
model = models.resnet18()
num_ftrs = model.fc.in_features
model.fc = torch.nn.Linear(num_ftrs, 2)  # Assuming binary classification: Hot Dog or Not
model.load_state_dict(torch.load(model_path))
model.to(device)
model.eval()

classes = list(testset.name_to_label.keys())
# print(classes)
show_wrong_predictions_limited(model, test_loader, classes, device, num_wrong=10)
# show_result_random(model, test_loader, classes, device, num_images=15)

# saliency map analysis

In [None]:
!pip install opencv-python

In [None]:
# Load the fine-tuned ResNet-18 model
model_path = model_save_path+'/best_resnet18_model-b4-fc.pth' # This is the best one
model = models.resnet18()
num_ftrs = model.fc.in_features
model.fc = torch.nn.Linear(num_ftrs, 2)  # Assuming binary classification: Hot Dog or Not
model.load_state_dict(torch.load(model_path))
model.to(device)
model.eval()


#  preprocess = test_transform
preprocess = transforms.Compose([
    transforms.Resize((128,128)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225]),
])

# 'hot dog' class is index 0, non-hot-dog is index of 1

import matplotlib.pyplot as plt
import numpy as np
from PIL import Image
import torch

def get_saliency_map(img_path):
    img_pil = Image.open(img_path).convert("RGB")
    input_tensor = preprocess(img_pil).unsqueeze(0).to(device)
    input_tensor.requires_grad_()

    # Forward pass
    output = model(input_tensor)
    score = output[0, 0] #the second index of 0 represents hotdog

    # Backward pass
    model.zero_grad()
    score.backward()

    # Get the saliency map
    saliency, _ = torch.max(input_tensor.grad.data.abs(), dim=1)
    saliency = saliency.squeeze().cpu().numpy()

    # Normalize saliency
    saliency = (saliency - saliency.min()) / (saliency.max() - saliency.min())

    # Convert original image to numpy
    img = np.array(img_pil)

    # Resize saliency to match image size
    saliency_resized = np.array(Image.fromarray(saliency).resize((img.shape[1], img.shape[0])))

    # Create overlay
    overlay = np.uint8(plt.cm.jet(saliency_resized) * 255)
    overlay = overlay[..., :3]
    overlay = np.float32(overlay) / 255

    # Blend original and saliency map
    blended = np.clip(overlay * 0.5 + np.float32(img) / 255 * 0.5, 0, 1)

    # Display in a row
    plt.figure(figsize=(7,2))

    plt.subplot(1,3,1)
    plt.title("Original Image")
    plt.imshow(img)
    plt.axis('off')

    plt.subplot(1,3,2)
    plt.title("Saliency Map")
    plt.imshow(saliency_resized, cmap='hot')
    plt.axis('off')

    plt.subplot(1,3,3)
    plt.title("Overlay")
    plt.imshow(blended)
    plt.axis('off')

    plt.show()
    plt.close()
    torch.cuda.empty_cache()


img_paths = [
    # data_path+"/test/nothotdog/"+"pets (683).jpg",
    # data_path+"/test/nothotdog/"+"pets (682).jpg",
    # data_path+"/test/nothotdog/"+"pets (679).jpg",
    # data_path+"/test/nothotdog/"+"pets (677).jpg",
    data_path+"/test/hotdog/"+"hotdog (344).jpg",
    data_path+"/test/hotdog/"+"hotdog (343).jpg",
    data_path+"/test/hotdog/"+"hotdog (342).jpg",
    data_path+"/test/hotdog/"+"hotdog (341).jpg",
]

for img_path in img_paths:
  get_saliency_map(img_path)

In [None]:
model_path = model_save_path+'/best_resnet18_model-b4-fc.pth' # This is the best one
model = models.resnet18()
num_ftrs = model.fc.in_features
model.fc = torch.nn.Linear(num_ftrs, 2)  # Assuming binary classification: Hot Dog or Not
model.load_state_dict(torch.load(model_path))
model.to(device)
model.eval()

preprocess = test_transform
preprocess = transforms.Compose([
    transforms.Resize((128,128)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225]),
])

img_path = data_path+"/test/hotdog/"+"hotdog (344).jpg" # Removed the trailing comma
img_pil = Image.open(img_path).convert("RGB")
input_tensor = preprocess(img_pil).unsqueeze(0).to(device)
input_tensor.requires_grad_()

output = model(input_tensor)
print('***'*20 + 'output' + "***"*20)
print(output)

score = output[0, 0]
print('***'*20 + 'score' + "***"*20)
print(score)