because I'm trying to lean torch I am going to build the same thing again but this time in torch

In [None]:
import os
import pandas as pd 
import torch
from collections import defaultdict




In [None]:
import kagglehub
download = kagglehub.dataset_download("mahmoudreda55/satellite-image-classification")

data_dir = os.path.join(download, "DATA")
print(data_dir)



In [None]:
dir_paths_to_classes = []

for sub_class in ['cloudy','water', 'green_area', 'desert']:
    sub_class_dir = os.path.join(data_dir, sub_class)
    if not os.path.exists(sub_class_dir):
        print(f"Directory {sub_class_dir} not found.")
    else:
        print(f"Directory {sub_class_dir} exists.")
        dir_paths_to_classes.append(os.path.join(data_dir, sub_class))

print(dir_paths_to_classes)

In [None]:
from PIL import Image

resolution_dict = defaultdict(list)
# Iterate through each directory and get the size of each image

for path in dir_paths_to_classes:
    for image in os.listdir(path):
        # Open the image file
        image_path = os.path.join(path, image)
        image_class = path.split("/")[-1]
        try:
            with Image.open(image_path) as img:
                # Get the size of the image
                size = img.size
                # Append the size to the dictionary
                resolution_dict[image_class].append(size)
        except Exception as e:
            print(f"Error opening image {image_class}: {e}")
            continue

class_stats = {}



# Print the sizes of images in each directory
for image_class, resolutions in resolution_dict.items():

    widths, heights = zip(*resolutions)
    average_width = sum(widths) / len(widths)
    average_height = sum(heights) / len(heights)

    class_stats[image_class] = {
        'average_width': average_width,
        'average_height': average_height,
        'num_images': len(resolutions)
    }
    

print("Average image sizes:")
print(class_stats)

        

In [None]:
from torchvision import transforms, datasets
from torch.utils.data import DataLoader

transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) 
])

dataset =  datasets.ImageFolder(root=data_dir, transform=transform)

train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

# Create data loaders which will be used to load the data in batches which is more efficient
# and allows for shuffling the data


train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

In [None]:
import torch.nn as nn

print(dataset.classes)
print(len(dataset.classes))

# adjust the class weights based on the number of samples in each class
class_sizes = [1500, 1131, 1500, 1500]
class_weights = 1.0 / torch.tensor(class_sizes, dtype=torch.float)
class_weights = class_weights / class_weights.sum() * len(class_sizes)
print("Class weights:", class_weights)

#use a weighted loss function to account for the class imbalance
loss_fn = nn.CrossEntropyLoss(weight=class_weights)

In [None]:
import torch.nn as nn

class SmallCNN(nn.Module):
    def __init__(self, num_classes=4):
        super(SmallCNN, self).__init__()

        self.normalize = lambda x: x / 255.0

        self.conv1 = nn.Conv2d(3,16, kernel_size=3, padding=1)
        self.act1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)

        self.conv2 = nn.Conv2d(16,32, kernel_size=3, padding=1)
        self.act2 = nn.ReLU()
        self.pool2= nn.MaxPool2d(kernel_size=2, stride=2)

        self.conv3 = nn.Conv2d(32,64, kernel_size=3, padding=1)
        self.act3 = nn.ReLU()
        self.pool3 = nn.MaxPool2d(kernel_size=3, stride=2)

        flattened_size = 14400

        self.fc1 = nn.Linear(flattened_size, 128)
        self.act4 = nn.ReLU()
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        x = self.normalize(x)

        #convolutional bocks
        x = self.pool1(self.act1(self.conv1(x)))
        x = self.pool2(self.act2(self.conv2(x)))
        x = self.pool3(self.act3(self.conv3(x)))

    
        
        # Flatten
        x = x.view(x.size(0), -1)


        x = self.act4(self.fc1(x))
        x = self.fc2(x)

        return x


In [46]:
model = SmallCNN(num_classes=4)
print(model)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
print(device)

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
loss_fn = nn.CrossEntropyLoss(weight=class_weights)

SmallCNN(
  (conv1): Conv2d(3, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (act1): ReLU()
  (pool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (act2): ReLU()
  (pool2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv3): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (act3): ReLU()
  (pool3): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
  (fc1): Linear(in_features=16384, out_features=128, bias=True)
  (act4): ReLU()
  (fc2): Linear(in_features=128, out_features=4, bias=True)
)
cpu


In [None]:
# import torchvision.models as models

# # Load a pre-trained ResNet model

# model = models.resnet50(weights='IMAGENET1K_V1')
# # Modify the final layer to match the number of classes in your dataset
# num_classes = len(dataset.classes)

# num_features = model.fc.in_features
# model.fc = nn.Sequential(
#     nn.Linear(num_features, 512),
#     nn.ReLU(),
#     nn.Dropout(0.4),
#     nn.Linear(512, num_classes)
# )
# # Move the model to the GPU if available
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# model = model.to(device)

# #Only train the final layer, rest is frozen
# optimizer = torch.optim.Adam(model.fc.parameters(), lr=0.001)

In [47]:

num_epochs = 20

best_val_accuracy = 0.0

train_losses = []
train_accuracies = []
val_losses = []
val_accuracies = []

for epoch in range(num_epochs):
    print(f"Starting epoch {epoch+1}/{num_epochs}")

    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    batch_count =0
    #remember, train_loader loads the data in batches
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        batch_count += 1
        if batch_count % 10 == 0:  # Print every 10 batches
            print(f"Epoch {epoch+1}, batch {batch_count}")

        

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        loss = loss_fn(outputs, labels)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        # Calculate running loss and accuracy
        running_loss += loss.item() * inputs.size(0)
        _, preds = torch.max(outputs, 1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)

    epoch_loss = running_loss / len(train_loader.dataset)
    epoch_accuracy = correct / total
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.4f}")

    train_losses.append(epoch_loss)
    train_accuracies.append(epoch_accuracy)


    # Validation phase
    model.eval()
    val_running_loss = 0.0
    val_correct = 0
    val_total = 0

    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            # Forward pass
            outputs = model(inputs)
            loss = loss_fn(outputs, labels)

            # Calculate running loss and accuracy
            val_running_loss += loss.item() * inputs.size(0)
            _, preds = torch.max(outputs, 1)
            val_correct += (preds == labels).sum().item()
            val_total += labels.size(0)

    val_epoch_loss = val_running_loss / len(val_loader.dataset)
    val_epoch_accuracy = val_correct / val_total
    print(f"Validation Loss: {val_epoch_loss:.4f}, Validation Accuracy: {val_epoch_accuracy:.4f}")

    val_losses.append(val_epoch_loss)
    val_accuracies.append(val_epoch_accuracy)


    # Save the model if the validation accuracy is improved
    if val_epoch_accuracy > best_val_accuracy:
        best_val_accuracy = val_epoch_accuracy
        torch.save(model.state_dict(), "best_model.pth")
        print("Model saved!")
    else:
        print("Model not saved, validation accuracy did not improve.")

    print('-'*30)

Starting epoch 1/20
Feature map shape before flattening: torch.Size([32, 64, 15, 15])
Flattened shape: torch.Size([32, 14400])


RuntimeError: mat1 and mat2 shapes cannot be multiplied (32x14400 and 16384x128)

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(10,4))
plt.subplot(1,2,1)
plt.plot(train_losses, label='Training Loss')
plt.plot(val_losses, label ='validation Loss')
plt.xlabel=('Epoch')
plt.legend()

plt.subplot(1,2,1)
plt.plot(train_accuracies, label = 'Training Accuracy')
plt.plot(val_accuracies, label = 'Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

plt.tight_layout()
plt.show()