In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!unzip 'drive/MyDrive/food11'

In [None]:
import imageio   
import os       
import glob       
from collections import Counter  
import random 
myseed = 12345 

from google.colab.patches import cv2_imshow  #display an image


# let's take a look at one random image 
random_pic_file = random.choice(os.listdir('./food11/validation/'))
pic = imageio.imread('./food11/training/'+ random_pic_file)   #eads the image data
cv2_imshow(pic)   #shows the image in the output.
height, width, channels = pic.shape  #xtracts the height, width, and number of channels (color components) of the image
print(f'original height, width, and channels of each image: {height} {width} {channels}')

# let's take a look at label distirbution 
folder_path_options = ["./food11/training/", "./food11/test/", "./food11/validation/"]  #the paths of three directories: training, test, and validation.
for path in folder_path_options:
  labels = []
 
  files = glob.glob(path+"*")   

  
  if "test" in path:
    continue
  
  labels = [int(filename[len(path):].split("_")[0]) for filename in files]
 
  counts = Counter(labels)
  total_count = len(labels)
  for value, count in sorted(counts.items(), key=lambda x: x[0]):
    distribution = count / total_count
    print(f'{value}: {count} ({distribution:.2%})')                     
   

In [None]:

import numpy as np
import pandas as pd
import torch   
import os
import torch.nn as nn   #resizing, cropping, and normalization.
import torchvision.transforms as transforms
from PIL import Image   

from torch.utils.data import ConcatDataset, DataLoader, Subset, Dataset  
#These classes and functions are used for handling datasets and creating data loaders in PyTorch.
from torchvision.datasets import DatasetFolder, VisionDataset

from tqdm.auto import tqdm
import random

In [None]:
# basic setup for PyTorch
torch.backends.cudnn.deterministic = True    #flag of the cuDNN backend in PyTorch
torch.backends.cudnn.benchmark = False  

np.random.seed(myseed)
torch.manual_seed(myseed)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(myseed)

In [None]:
## All we need here is to resize the PIL image and transform it into Tensor.
train_tfm = transforms.Compose([
    # Resize the image into a fixed shape (height = width = 128)
    transforms.Resize((128, 128)),
    
    transforms.ToTensor(),
])

test_tfm = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
])

In [None]:
class FoodDataset(Dataset):  #extends the functionality of the base Dataset class in PyTorch.
    def __init__(self,path,tfm=test_tfm,files = None):
      
        super(FoodDataset).__init__()
        self.path = path
        self.files = sorted([os.path.join(path,x) for x in os.listdir(path) if x.endswith(".jpg")])
        
        if files != None:
            self.files = files
        print(f"One {path} sample",self.files[0]) 
        self.transform = tfm
  
    def __len__(self):
        return len(self.files)
  
    def __getitem__(self,idx):
        fname = self.files[idx]  #This line retrieves the file path at the given index idx 
        im = Image.open(fname) 
        im = self.transform(im)
  
        try:
            label = int(fname.split("/")[-1].split("_")[0])
            
        except:
            label = -1 # test has no label
        return im,label

In [None]:
class FirstCNN(nn.Module):  
    def __init__(self):  
        super(FirstCNN, self).__init__() #calls parent class (nn.Module) to ensure that the necessary initialization steps are performed.
       
        
        self.cnn = nn.Sequential(  
            nn.Conv2d(3, 64, 3, 1, 1),  # [64, 128, 128]
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),      # [64, 64, 64]

            nn.Conv2d(64, 128, 3, 1, 1), # [128, 64, 64]
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),      # [128, 32, 32]

            nn.Conv2d(128, 256, 3, 1, 1), # [256, 32, 32]
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),      # [256, 16, 16]

            nn.Conv2d(256, 512, 3, 1, 1), # [512, 16, 16]
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),       # [512, 8, 8]
            
            nn.Conv2d(512, 512, 3, 1, 1), # [512, 8, 8]
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),       # [512, 4, 4]
        )

      
        self.fc = nn.Sequential(  
            nn.Linear(512*4*4, 1024),
            nn.ReLU(),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Linear(512, 11)
        )

        
    def forward(self, x):
      
        out = self.cnn(x)
        out = out.view(out.size()[0], -1) 
        
        return self.fc(out)



# Load training and validation dataset

In [None]:
_exp_name = "sample"
batch_size = 64
_dataset_dir = "./food11"
# Construct datasets.
# The argument "loader" tells how torchvision reads the data.
train_set = FoodDataset(os.path.join(_dataset_dir,"training"), tfm=train_tfm)

train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True)
valid_set = FoodDataset(os.path.join(_dataset_dir,"validation"), tfm=test_tfm)
valid_loader = DataLoader(valid_set, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True)

One ./food11/training sample ./food11/training/0_0.jpg
One ./food11/validation sample ./food11/validation/0_0.jpg


In [None]:

device = "cuda" if torch.cuda.is_available() else "cpu"

n_epochs = 4
patience = 300 
model = FirstCNN().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0003, weight_decay=1e-5) 
stale = 0
best_acc = 0

for epoch in range(n_epochs):

    model.train()

   
    train_loss = []
    train_accs = []

    for batch in tqdm(train_loader):

        # A batch consists of image data and corresponding labels.
        imgs, labels = batch
        logits = model(imgs.to(device))
        loss = criterion(logits, labels.to(device))
        optimizer.zero_grad()
        loss.backward()
        grad_norm = nn.utils.clip_grad_norm_(model.parameters(), max_norm=10)

        
        optimizer.step()

        
        acc = (logits.argmax(dim=-1) == labels.to(device)).float().mean()

        # Record the loss and accuracy.
        train_loss.append(loss.item())
        train_accs.append(acc)
        
    train_loss = sum(train_loss) / len(train_loss)
    train_acc = sum(train_accs) / len(train_accs)

    # Print the information.
    print(f"[ Train | {epoch + 1:03d}/{n_epochs:03d} ] loss = {train_loss:.5f}, acc = {train_acc:.5f}")

    
    model.eval()

    # These are used to record information in validation.
    valid_loss = []
    valid_accs = []

    # Iterate the validation set by batches.
    for batch in tqdm(valid_loader):

        # A batch consists of image data and corresponding labels.
        imgs, labels = batch
        #imgs = imgs.half()

        
        with torch.no_grad():
            logits = model(imgs.to(device))

        # We can still compute the loss (but not the gradient).
        loss = criterion(logits, labels.to(device))

        # Compute the accuracy for current batch.
        acc = (logits.argmax(dim=-1) == labels.to(device)).float().mean()

        # Record the loss and accuracy.
        valid_loss.append(loss.item())
        valid_accs.append(acc)
        #break

    # The average loss and accuracy for entire validation set is the average of the recorded values.
    valid_loss = sum(valid_loss) / len(valid_loss)
    valid_acc = sum(valid_accs) / len(valid_accs)

    # Print the information.
    print(f"[ Valid | {epoch + 1:03d}/{n_epochs:03d} ] loss = {valid_loss:.5f}, acc = {valid_acc:.5f}")


    # update logs
    if valid_acc > best_acc:
        with open(f"./{_exp_name}_log.txt","a"):
            print(f"[ Valid | {epoch + 1:03d}/{n_epochs:03d} ] loss = {valid_loss:.5f}, acc = {valid_acc:.5f} -> best")
    else:
        with open(f"./{_exp_name}_log.txt","a"):
            print(f"[ Valid | {epoch + 1:03d}/{n_epochs:03d} ] loss = {valid_loss:.5f}, acc = {valid_acc:.5f}")


    # save models
    if valid_acc > best_acc:
        print(f"Best model found at epoch {epoch}, saving model")
        torch.save(model.state_dict(), f"{_exp_name}_best.ckpt") # only save best to prevent output memory exceed error
        best_acc = valid_acc
        stale = 0
    else:
        stale += 1
        if stale > patience:
            print(f"No improvment {patience} consecutive epochs, early stopping")
            break

# Apply the best model on test dataset

In [None]:
# # set up test data loader
# test_set = FoodDataset(os.path.join(_dataset_dir,"test"), tfm=test_tfm)
# test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=0, pin_memory=True)

# model_best = FirstCNN().to(device)
# model_best.load_state_dict(torch.load(f"{_exp_name}_best.ckpt"))
# model_best.eval()
# prediction = []
# with torch.no_grad():
#     for data,_ in test_loader:
#         test_pred = model_best(data.to(device))
#         test_label = np.argmax(test_pred.cpu().data.numpy(), axis=1)
#         prediction += test_label.squeeze().tolist()
  
# #create test csv
# def pad4(i):
#     return "0"*(4-len(str(i)))+str(i)
# df = pd.DataFrame()
# df["Id"] = [pad4(i) for i in range(1,len(test_set)+1)]
# df["Category"] = prediction
# df.to_csv("test_prediction.csv",index = False)

In [None]:
import matplotlib.pyplot as plt

# set up test data loader
test_set = FoodDataset(os.path.join(_dataset_dir, "test"), tfm=test_tfm)
test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=0, pin_memory=True)

model_best = FirstCNN().to(device)
model_best.load_state_dict(torch.load(f"{_exp_name}_best.ckpt"))
model_best.eval()

predictions = []
with torch.no_grad():
    for data, _ in test_loader:
        test_pred = model_best(data.to(device))
        test_label = np.argmax(test_pred.cpu().data.numpy(), axis=1)
        predictions.extend(test_label.squeeze().tolist())

# Print predicted labels and display corresponding images
for i, prediction in enumerate(predictions):
    image, _ = test_set[i]
    plt.imshow(image.permute(1, 2, 0))
    plt.title(f"Predicted Label: {prediction}")
    plt.axis('off')
    plt.show()
