# 521153S Deep Learning Final Project

In [None]:
# import necessary packages
import os
import requests
import zipfile
import sys
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
import torchvision
from PIL import Image
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import transforms, datasets
from torchvision.io import read_image
from torchvision.models import resnet18, resnet34, resnet50, resnet101, resnet152, ResNet18_Weights, ResNet34_Weights, ResNet50_Weights, ResNet101_Weights, ResNet152_Weights
import gdown
import urllib
import os
import random
import tarfile
import time
import shutil

In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

# If you encounter some issues regarding cuda device, e.g., "RuntimeError: CUDA Out of memory error",
# try to switch the device to cpu by using the following code

# device = torch.device('cpu')
print('Device:', device)

In [None]:
# download the dataset
val_url = 'https://drive.google.com/u/0/uc?id=1hSMUMj5IRpf-nQs1OwgiQLmGZCN0KDWl'
train_url = 'https://drive.google.com/u/0/uc?id=107FTosYIeBn5QbynR46YG91nHcJ70whs'
test_url = 'https://drive.google.com/u/0/uc?id=1yKyKgxcnGMIAnA_6Vr2ilbpHMc9COg-v'
eurosat_url = 'https://zenodo.org/records/7711810/files/EuroSAT_RGB.zip?download=1'

val_file = './data/val.tar'
train_file = './data/train.tar'
test_file = './data/test.tar'
eurosat_file = './data/eurosatrgb.zip'


if not os.path.exists('./data'):
    print('Creating data directory')
    os.mkdir('./data')

if not os.path.exists('./data/val.tar'):
    print('Downloading val.tar')
    gdown.download(val_url, val_file)
    val_tar = tarfile.open(val_file)
    val_tar.extractall('./data/')
    val_tar.close()

if not os.path.exists(train_file):
    print('Downloading train.tar')
    gdown.download(train_url, train_file)
    train_tar = tarfile.open(train_file)
    train_tar.extractall('./data/')
    train_tar.close()

if not os.path.exists(test_file):
    print('Downloading test.tar')
    gdown.download(test_url, test_file)
    test_tar = tarfile.open(test_file)
    test_tar.extractall('./data/')
    test_tar.close()

if not os.path.exists(eurosat_file):
    print('Downloading EuroSAT_RGB.zip')
    response = urllib.request.urlretrieve(eurosat_url, eurosat_file)
    eurosat_zip = zipfile.ZipFile(eurosat_file)
    eurosat_zip.extractall('./data/eurosat')
    eurosat_zip.close()

In [None]:
# Image Size
image_size = 32

# Batch size during training
batch_size = 100
num_workers = 1

# Learning rate for optimizers
lr = 0.0002


In [None]:
# Number of training epochs
num_epochs = 10
model_resnet18 = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', pretrained=True)
model_resnet34 = torch.hub.load('pytorch/vision:v0.10.0', 'resnet34', pretrained=True)
model_resnet50 = torch.hub.load('pytorch/vision:v0.10.0', 'resnet50', pretrained=True)
model_resnet101 = torch.hub.load('pytorch/vision:v0.10.0', 'resnet101', pretrained=True)
model_resnet152 = torch.hub.load('pytorch/vision:v0.10.0', 'resnet152', pretrained=True)

train_dataset = torchvision.datasets.ImageFolder(
    root='./data/train',
    transform=transforms.Compose([
        transforms.Resize(image_size),
        transforms.CenterCrop(image_size),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
    ])
)

test_dataset = torchvision.datasets.ImageFolder(
    root='./data/test',
    transform=transforms.Compose([
        transforms.Resize(image_size),
        transforms.CenterCrop(image_size),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
    ])
)

validation_dataset = torchvision.datasets.ImageFolder(
    root='./data/val',
    transform=transforms.Compose([
        transforms.Resize(image_size),
        transforms.CenterCrop(image_size),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
    ])
)


dataloader_train = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)
dataloader_test = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)
dataloader_validation = torch.utils.data.DataLoader(validation_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)

In [None]:
model = model_resnet18

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

In [None]:
def evaluate(model, data_loader):

    model.eval() 
    # Evaluation on test set
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in enumerate(data_loader):
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    test_accuracy = correct / total
    print(f"Test Accuracy: {test_accuracy * 100:.2f}%")

In [None]:
def train(model, dataloader_train, criterion, optimizer, num_epochs, device):
    model.to(device)
    train_loss = []
    train_acc = []

    # Start the training.
    if torch.cuda.is_available():
        print("Using CUDA")

    print('Training')
    for epoch in range(num_epochs):
        model.train()
        train_running_loss = 0.0
        train_running_correct = 0
        counter = 0

        for i, data in enumerate(dataloader_train,0):
            counter += 1
            image, labels = data
            image = image.to(device)
            labels = labels.to(device)
            optimizer.zero_grad()
            # Forward pass.
            outputs = model(image)
            # Calculate the loss.
            loss = criterion(outputs, labels)
            train_running_loss += loss.item()
            # Calculate the accuracy.
            _, preds = torch.max(outputs.data, 1)
            train_running_correct += (preds == labels).sum().item()
            # Backpropagation
            loss.backward()
            # Update the weights.
            optimizer.step()    # Output training 
            
            if counter % 100 == 0:
                print(f'Epoch {epoch+1}/{num_epochs}, Step {counter}/{len(dataloader_train)}, Loss: {loss.item():.4f}')
        train_epoch_loss = train_running_loss / counter
        train_loss.append(train_epoch_loss)
        # valid_loss.append(valid_epoch_loss)
        train_epoch_acc = 100 * train_running_correct/len(train_dataset)
        train_acc.append(train_epoch_acc )
        # valid_acc.append(valid_epoch_acc)
        print(f"Training loss: {train_epoch_loss :.3f}, training acc: {train_epoch_acc:.3f}")
        # print(f"Validation loss: {valid_epoch_loss:.3f}, validation acc: {valid_epoch_acc:.3f}")
        print('-'*50)


    return model

In [None]:
# Choose 100 images from EuroSAT dataset

if not os.path.exists('./data/eurosat_selected'):
    print('Creating data directory')
    os.mkdir('./data/eurosat_selected')

if not os.path.exists('./data/eurosat_training'):
    print('Creating training directory')
    os.mkdir('./data/eurosat_training')

# Load the EuroSAT Categories
eurosat_categories = [name for name in os.listdir('./data/eurosat/EuroSAT_RGB/') if os.path.isdir(os.path.join('./data/eurosat/EuroSAT_RGB/', name))]
# Randomly select 5 categories
print(f"Selecting 5 categories from {eurosat_categories} categories")
selected_categories = np.random.choice(eurosat_categories, 5, replace=False)
print(f"Selected categories: {selected_categories}")

selected_images = []
training_images = []

# From each directory, randomly select 20 images
for category in selected_categories:
    images = os.listdir(os.path.join('./data/eurosat/EuroSAT_RGB/', category))
    selected = random.sample(images, 20)
    selected_images.extend([(category, image) for image in selected])
    print(f"Selected {selected} from {category}")
    # Copy selected images to the selected directory
    for image in selected:
        shutil.copyfile(os.path.join('./data/eurosat/EuroSAT_RGB', category, image), os.path.join('./data/eurosat_selected', image))

# From these 100 images, randomly select 5 images from each category for the training set
for category in selected_categories:
    category_images = [image for (cat, image) in selected_images if cat == category]
    training = random.sample(category_images, 5)
    training_images.extend(training)
    print(f"Selected {training} for training")

    # Copy training images to the training directory
    for image in training:
        shutil.copyfile(os.path.join('./data/eurosat_selected', image), os.path.join('./data/eurosat_training', image))