<img style="display: block; margin: auto; border-radius: 50%;" src="https://yt3.ggpht.com/SQPVly-us6iK-A-3PK4nwzZjzoXAdJU1pN1YKeYkyCQoIGWdAcKSVbbnmjpBGmcMdsLxu4doTg=s600-c-k-c0x00ffffff-no-rj-rp-mo" width="100px" height="100px">

<h3>Convolutional Neural Network for Image Classification [PyTORCH + CUSTOM ARCH + MAC M3 MPS] -- [VR Imersed]</h3>

https://www.youtube.com/channel/UCeS3HdDzVUCfl8WsOExR-UA

Subscribe for more videos

Links:</br>
https://www.kaggle.com/datasets/apollo2506/eurosat-dataset</br>
https://pytorch.org/docs/stable/notes/mps.html

</br>

Install and import all required packages

In [None]:
!pip3 install opendatasets numpy matplotlib tqdm torch torchvision

In [None]:
import os
import time
import shutil
import random
import torch
import torchvision
import opendatasets

import numpy as np

from torch import nn as nn
from torch.nn import functional as functional
from torch import optim as optim
from torchvision import transforms as transforms
from matplotlib import pyplot as plt
from tqdm import tqdm

Check for MPS (Metal Performance Shaders) - MAC's GPU

In [None]:
if torch.backends.mps.is_available():
    device = torch.device("mps")
    x = torch.ones(1, device=device)
    print(x)
else:
    device = torch.device("cpu")
    print("MPS device not available.")

Download the DataSet from Kaggle

In [None]:
opendatasets.download("https://www.kaggle.com/datasets/apollo2506/eurosat-dataset")

In [1]:
dataset_dir = "./eurosat-dataset/EuroSAT"
train_dataset_dir = "./data/train"
test_dataset_dir = "./data/test"

Classes identification

In [None]:
os.listdir(dataset_dir)

In [None]:
classes = [dir_name for dir_name in os.listdir(dataset_dir) if os.path.isdir(os.path.join(dataset_dir, dir_name))]
classes.sort() 

In [None]:
classes

Prepare train and test directories

In [3]:
try:
    shutil.rmtree(train_dataset_dir)
    shutil.rmtree(test_dataset_dir)
except:
    print("Working dir is empty xD")

Working dir is empty xD


In [None]:
os.makedirs(train_dataset_dir)
os.makedirs(test_dataset_dir)

Map class names by index

In [None]:
class_map = { 
    0:'AnnualCrop',
    1:'Forest',
    2:'HerbaceousVegetation',
    3:'Highway',
    4:'Industrial',
    5:'Pasture',
    6:'PermanentCrop',
    7:'Residential',
    8:'River',
    9:'SeaLake'}

Split the data onto train and test directories

In [None]:
train_image_names = []
test_image_names = []
cur_class_index = 0

for class_name in classes:
    if class_name != ".":
        image_list_for_class = os.listdir(os.path.join(dataset_dir, class_name))
        sample_size = int(len(image_list_for_class) * 0.8)
        train_dir_class = os.path.join(train_dataset_dir, str(cur_class_index))
        os.mkdir(train_dir_class)
        for image_name in random.sample(image_list_for_class, sample_size):
            shutil.copy2(os.path.join(dataset_dir, class_name, image_name), train_dir_class)
            train_image_names.append(image_name)

        test_dir_class = os.path.join(test_dataset_dir, str(cur_class_index))
        test_image_names = list(set(image_list_for_class) - set(train_image_names))
        os.mkdir(test_dir_class)
        for image_name in test_image_names:
            shutil.copy2(os.path.join(dataset_dir, class_name, image_name), test_dir_class)
        
        cur_class_index += 1

Pre-processing and data loader creation

In [None]:
# We expect to have images with the same dimension and scale to feed the model 
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

In [None]:
train_dataset = torchvision.datasets.ImageFolder(root=train_dataset_dir, transform=transform)
train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=2)

In [None]:
test_dataset = torchvision.datasets.ImageFolder(root=test_dataset_dir, transform=transform)
test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=1, shuffle=True, num_workers=2)

Display images and labels

In [None]:
def show_img(img):
    img = img / 2 + 0.5
    npimg = img.numpy()

    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()

In [None]:
images, labels = next(iter(train_dataloader))
print("Image count:", len(images), "Label count:", len(labels))
labels

In [None]:
show_img(torchvision.utils.make_grid(images[:8]))
print("Labels: ", ' '.join('%d' % labels[j] for j in range(8)) )

Model architecture

In [None]:
class CodeSpaceNet(nn.Module):


    def __init__(self):

        super(CodeSpaceNet, self).__init__()
    
        self.conv1 = nn.Conv2d(3, 64, 3, 1)
        self.conv2 = nn.Conv2d(64, 128, 3, 1)
        self.conv3 = nn.Conv2d(128, 256, 3, 1)

        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)

        self.fc1 = nn.Linear(215296, 2048)
        self.fc2 = nn.Linear(2048, 512)
        self.fc3 = nn.Linear(512, 128)
        self.fc4 = nn.Linear(128, 10)


    def forward(self, x):
        # Extract features (borders, textures, shapes...) of images by applying filters (kernels)
        x = self.conv1(x)
        # The ReLU activation function introduces the property of nonlinearity 
        # and solves the vanishing gradients issue
        x = functional.relu(x) 

        x = self.conv2(x)
        x = functional.relu(x)

        x = self.conv3(x)
        x = functional.relu(x)
        # Selects the maximum element from the region of the feature map covered by the filter.
        # It reduces the spatial dimensions of features by selecting the maximum value within each small window or region.
        x = functional.max_pool2d(x, 2)
        # A regularization technique to prevent overtitting during the trainning process.
        # It randomly discharges some neurons outpus.
        x = self.dropout1(x)
        # Reshape the tensor to feed fc.
        x = torch.flatten(x, 1)
        # It works like a MLP classifier. Each neuron of the layer is connection to all neurons of the previous layer.
        x = self.fc1(x)
        x = functional.relu(x)
        x = self.dropout2(x) # Generally used before fc to reduce dependency between neurons.

        x = self.fc2(x)
        x = functional.relu(x)

        x = self.fc3(x)
        x = functional.relu(x)

        x = self.fc4(x)
        # Calculates a probability for every possible class.
        return functional.log_softmax(x, dim=1)

In [None]:
model = CodeSpaceNet()

In [None]:
print(model)

In [None]:
model.to(device)

In [None]:
criterion = nn.CrossEntropyLoss()

In [None]:
optimizer = optim.Adam(model.parameters())

In [None]:
num_epochs = 30

In [None]:
test_iter = iter(test_dataloader)

In [None]:
print("Train start")

for epoch in range(num_epochs):

    running_loss = 0.0
    i = 0
    start = time.time()    
    
    for data in (pbar := tqdm(train_dataloader)):
        pbar.set_description(f"\nEpoch {epoch} GPU Mem.: {round(torch.mps.current_allocated_memory() / 1024 / 1024 / 1024, 2)} GB") 

        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()

        outputs = model(inputs)

        loss = criterion(outputs, labels)
        # Back propagates loss to calculate the gradient.
        loss.backward()
        # Updatesthe neural network weights.
        optimizer.step()

        running_loss += loss.item()

        total_correct = 0
        total_samples = 0

        if i % 100 == 0:

            with torch.no_grad():

                test_images, test_labels = next(test_iter)
                test_images, test_labels = test_images.to(device), test_labels.to(device)

                test_outputs = model(test_images[:8])

                _, predicted = torch.max(test_outputs, 1)

        i += 1

    end = time.time()
    print(f"Epoch {epoch}, Loss: {running_loss / (i)}, Time: {round((end - start) / 60, 2)} min.")

print("Train stop")        

In [None]:
total_corect = 0
total_samples = 0

In [None]:
model.eval()

with torch.no_grad():

    for data in (pbar := tqdm(test_dataloader)):

        pbar.set_description(f"Model eval.")

        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)

        outputs = model(inputs)

        _, predicted = torch.max(outputs.data, 1)

        total_samples += labels.size(0)
        total_correct += (predicted == labels).sum().item()

In [None]:
accuracy = total_correct / total_samples
print(accuracy)

In [None]:
with torch.no_grad():

    data_iter = iter(test_dataloader)
    data = next(data_iter)

    inputs, _ = data

    image = inputs[0].unsqueeze(0)
    image = image.to(device)

    outputs = model(image)

    _, predicted = torch.max(outputs, 1)

    np_img = image.cpu().numpy()[0]

    np_img = np.transpose(np_img, (1, 2, 0))

    if np_img.shape[2] == 1:
        np_img = np.squeeze(np_img, axis=2)
    elif np_img.shape[2] == 3:
        np_img = (np_img - np_img.min()) / (np_img.max() - np_img.min())

    plt.figure(figsize=(3, 3))
    plt.imshow(np_img)
    plt.title(class_map[predicted.item()])
    plt.axis("off")
    plt.show()
stop = time.time()