In [1]:
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
#from torchview import draw_graph
import matplotlib.pyplot as plt
import numpy as np
from auxiliary.visualization import plot_raster

In [2]:
# Metal device enables running the code on the GPU on MacOS (analogous to CUDA)
device = torch.device("mps")

### Loading the data

In [3]:
# Setting up a Dataset object for DataLoader
class BuildingRasterDataset(Dataset):
    def __init__(self, path, transform=None):
        '''Loads the data'''
        # Read raster maps
        blocks_rasterized = np.load(f"{path}/blocks_rasterized.npy")
        contexts_rasterized = np.load(f"{path}/contexts_rasterized.npy")
        targets_rasterized = np.load(f"{path}/targets_rasterized.npy")
        # Read generalization operators, specified in the following order
        # ["deletion", "aggregation", "typification", "displacement", "enlargement", "simplification"]
        targets_genops = np.load(f"{path}/targets_genops.npy")
        # Read target uuids
        self.uuid = np.load(f"{path}/targets_uuids.npy")

        # Check whether all parts have the same dimensionality
        assert blocks_rasterized.shape[0] == contexts_rasterized.shape[0] == targets_rasterized.shape[0] \
                == targets_genops.shape[0] == self.uuid.shape[0]

        # Convert numpy array to tensor and add singleton dimension / channel for rasters (binary raster)
        # now they have shape (n_samples, 1, height, width)
        self.block = torch.from_numpy(blocks_rasterized).unsqueeze(1)
        self.context = torch.from_numpy(contexts_rasterized).unsqueeze(1)
        self.target = torch.from_numpy(targets_rasterized).unsqueeze(1)
        self.genops = torch.from_numpy(targets_genops)

        # store number of samples
        self.n_samples = self.uuid.shape[0]

        # store transformation
        self.transform = transform

    def __getitem__(self, index):
        '''Enables indexing, returns uuid, target, context and block raster as features and generalization operators as label'''
        uuid = self.uuid[index]
        target = self.target[index]
        context = self.context[index]
        block = self.block[index]
        genops = self.genops[index]

        # apply given transformations to the rasters
        # TODO: how can a random transformation be uniformly applied to all three rasters? E.g. all rotated in the same direction?
        if self.transform:
            target = self.transform(target)
            context = self.transform(context)
            block = self.transform(block)

        return uuid, target, context, block, genops
        
    def __len__(self):
        '''Enables dataset length calculation'''
        return self.n_samples

In [None]:
# Ensures that the same transform is applied to target, context and block
class SameTransform:
    def __init__(self):
        # Initialize with the transformations you want to use
        self.rotation = transforms.RandomRotation(degrees=(0, 360))
        self.flip = transforms.RandomHorizontalFlip(p=0.5)

    def __call__(self, imgs):
        # Apply the same random transformation to all images
        angle = self.rotation.get_params(self.rotation.degrees)
        flip_prob = torch.rand(1).item()

        transformed_imgs = []
        for img in imgs:
            transformed_img = transforms.functional.rotate(img, angle)
            if flip_prob > 0.5:
                transformed_img = transforms.functional.hflip(transformed_img)
            transformed_imgs.append(transformed_img)

        return transformed_imgs

### Training pipeline

1) Design model (input, output size, forward pass)
2) Construct loss and optimizer
3) Training loop
     * Forward pass: Compute prediction
     * Backward pass: Compute gradients
     * Update weights

### Model design

In [4]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        # First convolutional layer taking 1 input channel (image), 16 output channels, kernel size 3
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, padding=1)
        # Second convolutional layer, taking 16 input channels, outputting 32 channels, kernel size 3
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
        # Third convolutional layer, taking 32 input channels, outputting 64 channels, kernel size 3
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        # A MaxPooling Layer to reduce the spatial dimensions of the output from convolutional layers
        self.pool = nn.MaxPool2d(2, 2)
        # Fully connected layer taking the flattened output from the last convolutional layer and outputting 10 classes
        # (for CIFAR-10, for example)
        self.fc = nn.Linear(64 * 4 * 4, 10)  # Assuming the input size to the network is 32x32 pixels

    def forward(self, x):
        # Apply convolutions followed by batch normalization, a ReLU activation, and max pooling
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))
        # Flatten the output for the fully connected layer
        x = x.view(-1, 64 * 4 * 4)
        x = self.fc(x)
        return x

# Example of using the model
model = CNN()
print(model)

model.to(device)

CNN(
  (conv1): Conv2d(3, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv2): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv3): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (fc): Linear(in_features=1024, out_features=10, bias=True)
)


CNN(
  (conv1): Conv2d(3, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv2): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv3): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (fc): Linear(in_features=1024, out_features=10, bias=True)
)

### Training loop

In [5]:
num_epochs = 10
batch_size = 4

# Transformations for the input rasters
transform = transforms.Compose(
    [transforms.RandomRotation(degrees=(0, 360)),
     transforms.RandomHorizontalFlip()])

dataset = BuildingRasterDataset("../training_samples.nosync/raster", transform=transform)
dataloader = DataLoader(dataset=dataset, batch_size=batch_size, shuffle=True)

total_samples = len(dataset)
n_iterations = math.ceil(total_samples/batch_size)

for epoch in range(num_epochs):
    for i, (uuid, target, context, block, genops) in enumerate(dataloader): 
        #print(uuid[0])
        #plot_raster(target.numpy()[0, 0])
        #plot_raster(context.numpy()[0, 0])
        #plot_raster(block.numpy()[0, 0])
        
        if not (i + 1) % 5:
            print(f"epoch {epoch+1}/{num_epochs}, step {i+1}/{n_iterations}")

In [None]:
# prediction evaluations should not be part of the computational graph
#with torch.no_grad():    