In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

In [None]:
import time
import matplotlib.pyplot as plt
import torch
from torch import nn
import torchvision
from torchvision import transforms, models
from sklearn.model_selection import train_test_split

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
# Dataset and dataloader

def get_dataset(batch_size):
    train_transforms = transforms.Compose([transforms.RandomRotation(30),
                                          transforms.RandomResizedCrop(224),
                                          transforms.RandomHorizontalFlip(),
                                          transforms.ToTensor(),
                                          transforms.Normalize([0.485, 0.456, 0.406],
                                                                [0.229, 0.224, 0.225])])

    # Normalize the test set same as training set without augmentation
    eval_transforms = transforms.Compose([transforms.Resize(255),
                                          transforms.CenterCrop(224),
                                          transforms.ToTensor(),
                                          transforms.Normalize([0.485, 0.456, 0.406],
                                                              [0.229, 0.224, 0.225])])

    validation_size = 0.2

    # Split to train, test, validation
    trainval = torchvision.datasets.Food101(root='./datasets', split='train', download=True, transform=train_transforms)
    trainset, valset = train_test_split(trainval, test_size=validation_size)
    testset = torchvision.datasets.Food101(root='./datasets', split='test', download=True, transform=eval_transforms)

    train_loader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True)
    test_loader = torch.utils.data.DataLoader(testset, batch_size=batch_size, shuffle=True)
    val_loader = torch.utils.data.DataLoader(valset, batch_size=batch_size, shuffle=True)
    
    return train_loader, test_loader, val_loader

In [None]:
# Transfer learning - change DenseNet's final layer

def define_model():

  model = models.densenet121(weights='IMAGENET1K_V1')

  for param in model.parameters():
      param.requires_grad = False

  model.classifier = nn.Sequential(nn.Linear(1024,101)
                                  # nn.LeakyReLU(),
                                  # nn.Linear(512,101)
                              )
  return model

In [None]:
# Hyperparameters

num_epochs = 30
learning_rate = 0.001
batch_size = 64

In [None]:
# Train

def train(num_epochs, train_loader, val_loader, model, optimizer, criterion):
    model = model.to(device)

    start = time.time()

    val_acc_history = []  # List to store validation accuracy history
    train_acc_history = []  # List to store training accuracy history

    num_of_batches = len(train_loader) 

    best_acc = 0.0

    for epoch in range(num_epochs):

        model.train()
        running_loss = 0.0
        running_corrects = 0 

        for i, data in enumerate(train_loader): 
            inputs, labels = data
            inputs = inputs.to(device) 
            labels = labels.to(device)

            # Forward
            outputs = model(inputs) 
            loss = criterion(outputs, labels)

            # Backward
            optimizer.zero_grad() 
            loss.backward() 
            optimizer.step()

            # Find loss and accuracy
            _, train_preds = torch.max(outputs, 1)  # Get the predicted labels
            running_loss += loss.item() * inputs.size(0)  # Update the running loss
            running_corrects += torch.sum(train_preds == labels)  # Update the number of correct predictions

            # Print status every 100 steps
            if (i + 1) % 100 == 0:
                time_cost = time.time() - start
                print('Epoch [{}/{}], Step [{}/{}], Time:{}'.format(epoch + 1, num_epochs, i + 1, num_of_batches,
                                                                     time_cost))

        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_acc = running_corrects.double() / len(train_loader.dataset) * 100

        train_acc_history.append(epoch_acc)

        model.eval()

        running_val_loss = 0.0 
        running_val_corrects = 0 

        # Find score on validation
        for i, data in enumerate(val_loader):
            inputs, labels = data
            inputs = inputs.to(device)
            labels = labels.to(device)

            with torch.no_grad():

                # Forward
                val_outputs = model(inputs) 
                val_loss = criterion(val_outputs, labels)

                _, val_preds = torch.max(val_outputs, 1)  # Get the predicted labels
                running_val_loss += val_loss.item() * inputs.size(0)  # Update the running loss
                running_val_corrects += torch.sum(val_preds == labels)  # Update the number of correct predictions

        epoch_val_loss = running_val_loss / len(val_loader.dataset)
        epoch_val_acc = running_val_corrects.double() / len(val_loader.dataset) * 100

        # Avg accuracy and loss for each epoch
        time_cost = time.time() - start
        print('Done Epoch [{}/{}], Time:{}'.format(epoch + 1, num_epochs, time_cost))
        print('-' * 10)
        print('Train Loss: {:.4f} Train Acc: {:.4f}'.format(epoch_loss, epoch_acc))
        print('Test Loss: {:.4f} Test Acc: {:.4f}'.format(epoch_val_loss, epoch_val_acc))

        # Save the model if the validation accuracy improves
        if epoch_val_acc > best_acc:
            print("saved model with higher accuracy")
            best_acc = epoch_val_acc
            torch.save(model.state_dict(), '/content/drive/MyDrive/dl_project/models/food101_less_loss.pth')

        val_acc_history.append(epoch_val_acc)

        model.train()  # Set the model back to training mode

    return train_acc_history, val_acc_history


In [None]:
model = define_model()

optimizer = torch.optim.Adam(model.classifier.parameters(), lr=learning_rate, betas=[0.9, 0.999])

criterion = nn.CrossEntropyLoss()

train_loader, test_loader, val_loader = get_dataset(batch_size)

In [None]:
# Display graph of train and validation accuracy

train_acc_history, val_acc_history = train(num_epochs, train_loader, val_loader, model, optimizer, criterion)

fig = plt.figure()
ax = fig.add_subplot(111)
ax.plot(range(num_epochs), train_acc_history, label='train accuracy')
ax.plot(range(num_epochs), val_acc_history, label='validation accuracy')
ax.set_xlabel("Epochs")
ax.set_ylabel("Accuracy")
ax.grid()
ax.legend()
plt.show()

In [None]:

# Save the final model's weights

torch.save(model.state_dict(), '/content/drive/MyDrive/dl_project/models/final_model.pth')

In [None]:
# Test on testset

model.eval()

test_loss = 0.0
test_accuracy = 0.0

with torch.no_grad():
    for i, data in enumerate(test_loader):
        images, labels = data
        images = images.to(device) 
        labels = labels.to(device)

        # Forward
        outputs = model(images)
        batch_loss = criterion(outputs, labels)
        test_loss += batch_loss.item()

        # Find top5 accuracy score
        _, top_labels = outputs.topk(5, dim=1)
        labels = labels.view(-1, 1)

        results = top_labels == labels
        results = results.sum(1)

        # Avg accuracy and loss for each epoch
        test_accuracy += torch.mean(results.float()).item()
        avg_test_accuracy = test_accuracy / (i + 1)
        avg_test_loss = test_loss / (i + 1)

        time_cost = time.time() - start
        print('Step [{}/{}], Loss: {:.4f}, Accuracy Top5: {:.3%}, Time:{}'.format(i + 1, len(test_loader),
                                                                                   avg_test_loss, avg_test_accuracy,
                                                                                   time_cost))
