# Shape detection with CNN demo
Demo notebook to show how a CNN can detect self-generated shapes. Just run it on Colab and don't forget to enable GPU-support. It might not make a world of a difference, though, as the random generation of shapes will take up a big chunk of the computational time and is irreducible / non-transferrable to GPU, but at least the churning of the models will be faster.

Normally, I would have written this such that it imports the shape-generation and model-generation code. However, with Colab, you then would have to mount the GDrive-folder, import all the repo-code and also have to add the folder to the Python-path of the instance you are running it on, which is a hassle and it does not work like it is supposed to.

So instead, I included all the code from the repo here, which means I will have to manually copy it over anytime I want to change the code again. Which is still faster than going the "proper" route outlined above.

In [None]:
!pip install --quiet pytorch-lightning

# Define allowable shapes and set hyperparameters first!

In [None]:
from enum import Enum

# Define the enums.
# Note that since Pytorch expects the classes to start with zero,
# but Enums start normally with one, we are overwriting the starting
# value here and use the syntax able to do this, instead of the regular
# syntax which is slightly more legible. This way, we can easily change
# the forms that are generated.
Colouring = Enum('Colouring', 'SINGLE_CHANNEL SINGLE_COLOUR RANDOM_PIXELS')

FORMS = 'CIRCLE CROSS FOUR_CORNERS HOURGLASS LINE PARALLEL_LINES TRIANGLE'
#FORMS = 'CIRCLE CROSS'
ShapeTypes = Enum('ShapeTypes', FORMS, start=0)

N_x, N_y, N_c, N_target = 50, 50, 3, len(ShapeTypes)
colouring = Colouring.SINGLE_COLOUR

batch_size = 1000
learning_rate = 0.001
max_epochs = 100
analyse_last_model_trained = True

# Shape detection code

In [None]:
"""Module to generate various randomised shapes.

Module generates shapes with randomised anchor points and colours.

Usage example:
    1. gen = ShapeGenerator(N_x=100, N_y=100)
    2. image = gen.generate_random(
        colouring=Colouring.SINGLE_CHANNEL,
        shape_type=None
    )
"""

import random
import numpy as np
from typing import Union, Tuple

from PIL import Image, ImageDraw


class ShapeGenerator(object):
    """Generator class to get a follow-up item.
    
    Please note that how this whole generation works is that it generates a mask/canvas
    with black background, on which the PIL-draw functions will be executed.

    Then we will fill in the colours later, as we want to have very special colouring
    options which are not supported by default (see colouring-function).

    For this reason, for now, only a black background is allowed, though nothing keeps
    you from later adding an inversion function or replacing black with any other colour.
    """
    # TODO: Make this generator-style?
    # TODO: The frequent calls to self.colour_in(im, colour) smell like a generator is in order!

    def __init__(self, N_x: int = 256, N_y: int = 256):
        """Sets static values for the images to be created.
        
        Parameters:
            N_x: Number of pixels in image's x-axis.
            N_y: Number of pixels in image's y-axis.
        """
        self.N_x = N_x
        self.N_y = N_y
        self.background = (0, 0, 0)
        self.shape_drawing_colour = (1, 1, 1)

        # Generate the lookup for the generator-function available:
        self.generator_function_lookup = {
            'CIRCLE': self.generate_circle,
            'CROSS': self.generate_cross,
            'FOUR_CORNERS': self.generate_four_corners,
            'HOURGLASS': self.generate_hourglass,
            'LINE': self.generate_line,
            'PARALLEL_LINES': self.generate_parallel_lines,
            'TRIANGLE':self.generate_triangle
        }
        # Generate a list to choose randomly from, to not have to do that with every call again:
        self.generator_function_candidates = [k.name for k in ShapeTypes]

    def generate_random(self, colouring: Union[Colouring, np.ndarray] = None, shape_type: ShapeTypes = None) -> np.ndarray:
        """Generates a random shape.
        
        Chooses a random shape (unless a shape is specified) with the chosen colouring.
        Returns a numpy array of shape (N_x, N_y, N_channels).

        Parameters:
            colouring: Determines the colouring mode for the image.
                       Colouring.SINGLE_CHANNEL uses only one, randomly chosen single channel with a random value.
                       Colouring.SINGLE_COLOUR randomly generates one multi-channel colour to use.
                       Colouring.RANDOM_PIXELS generates a random colour for each pixel.
                       If given an array of shape (N_channels,), then this colour will be used for all pixels.
                       If None, then a random colouring will be chosen.
            shape_type: Determines the type of the shape. Please check ShapeTypes above to check which shapes
                        are available. They are rather self-explanatory (I hope). If you want to visually inspect
                        them, check out the script `draw_shapes.py` in the same folder.
        """
        # First determine a shape, if we haven't gotten one passed:
        if shape_type is None:
            shape_type = ShapeTypes[random.choice(self.generator_function_candidates)]

        try:
            generator_function = self.generator_function_lookup[shape_type.name]
        except:
            raise ValueError(f'Parameter shape_type has to be either None or of ShapeTypes, not {shape_type}')

        # Then determine the colouring (we do this old-school style, as I currently still write in 3.8):
        if colouring == Colouring.SINGLE_CHANNEL:
            colour = np.zeros((3,), dtype=np.uint8)
            colour[np.random.randint(3)] = np.random.randint(255) + 1

        elif colouring == Colouring.SINGLE_COLOUR:
            colour = np.random.randint(256, size=3, dtype=np.uint8)

        elif colouring == Colouring.RANDOM_PIXELS:
            # Please note that we have to generate the pixelwise random colours
            # with flipped axis, so N_y first, then N_x, due to how PIL handles these things:
            colour = np.random.randint(256, size=(self.N_y, self.N_x, 3), dtype=np.uint8)

        elif type(colouring) == np.ndarray:
            colour = colouring

        else:
            raise ValueError(f"Colouring parameter has to be of type Colouring, not {colouring}")

        return generator_function(colour), shape_type.value

    def get_canvas(self) -> Tuple:
        """Creates an image and a draw object to work on."""
        im = Image.new('RGB', (self.N_x, self.N_y), self.background)
        draw = ImageDraw.Draw(im)
        return im, draw

    def colour_in(self, image: Image, colours: np.ndarray) -> np.ndarray:
        """Colours a mask.
        
        Takes an image of shape (N, M, 3) as input and adds "colours".
        
        Parameters:
            image: PILImage-object, which will act as a mask. So it should be binary in nature,
                   containing zeros where no colour should be and ones where colours should be.
            colours: numpy-array either of shape (N, M, 3) or (1, 1, 3) which contains the colour
                     values before masking. Please note that the data type should be uint8, so that
                     the result will be a valid image.
        """
        if colours.dtype == np.uint8:
            return Image.fromarray(
                np.array(image) * colours
            )
        else:
            raise TypeError('Colours array has the wrong data type. Please make sure it is np.uint8.')

    def generate_circle(self, colouring: np.ndarray) -> np.ndarray:
        im, draw = self.get_canvas()

        # Generate a center-point which should lie somewhere in the middle.
        center = (
            np.random.uniform(low=im.size[0]//4, high=(im.size[0] * 3) // 4),
            np.random.uniform(low=im.size[1]//4, high=(im.size[1] * 3) // 4)
        )

        # Determine the max radius:
        max_radius = min(
            center[0], center[1], im.size[0] - center[0], im.size[1] - center[1]
        )

        # Determine the actual radius:
        radius = np.random.uniform(low=max_radius // 4, high=(max_radius * 3) // 4)

        # Draw the circle:
        draw.ellipse(
            (
                (center[0] - radius, center[1] - radius),
                (center[0] + radius, center[1] + radius)
            ), fill=self.background, outline=self.shape_drawing_colour
        )

        return self.colour_in(im, colouring)

    @staticmethod
    def four_points(im: Image):
        """Creates four coordinates for four points, where one lies in each quadrant.
        
        Parameters:
            im: PIL.Image which determines the quadrants.

        Returns:
            Tuple in the ordering: top_left, top_right, bottom_left, bottom_right
        """
        # Create the coordinates for four points, one in each quadrant:
        low_x = np.random.uniform(low=0, high=im.size[0]//2 - 1, size=2)
        high_x = np.random.uniform(low=im.size[0]//2, high=im.size[0] - 1, size=2)
        low_y = np.random.uniform(low=0, high=im.size[1]//2 - 1, size=2)
        high_y = np.random.uniform(low=im.size[1]//2, high=im.size[1] - 1, size=2)

        return (low_x[0], low_y[0]), (high_x[0], low_y[1]), (low_x[1], high_y[0]), (high_x[1], high_y[1])


    def generate_cross(self, colouring: np.ndarray) -> np.ndarray:
        im, draw = self.get_canvas()

        top_left, top_right, bottom_left, bottom_right = self.four_points(im)

        draw.line((top_left, bottom_right), width=1, fill=self.shape_drawing_colour)
        draw.line((top_right, bottom_left), width=1, fill=self.shape_drawing_colour)

        return self.colour_in(im, colouring)

    def generate_four_corners(self, colouring: np.ndarray) -> np.ndarray:
        im, draw = self.get_canvas()

        top_left, top_right, bottom_left, bottom_right = self.four_points(im)

        # Draw the four lines:
        draw.line((top_left, top_right), width=1, fill=self.shape_drawing_colour)
        draw.line((top_right, bottom_right), width=1, fill=self.shape_drawing_colour)
        draw.line((bottom_right, bottom_left), width=1, fill=self.shape_drawing_colour)
        draw.line((bottom_left, top_left), width=1, fill=self.shape_drawing_colour)

        return self.colour_in(im, colouring)

    def generate_hourglass(self, colouring: np.ndarray) -> np.ndarray:
        im, draw = self.get_canvas()

        top_left, top_right, bottom_left, bottom_right = self.four_points(im)

        # Draw the four lines:
        draw.line((top_left, bottom_right), width=1, fill=self.shape_drawing_colour)
        draw.line((top_left, top_right), width=1, fill=self.shape_drawing_colour)
        draw.line((top_right, bottom_left), width=1, fill=self.shape_drawing_colour)
        draw.line((bottom_left, bottom_right), width=1, fill=self.shape_drawing_colour)

        return self.colour_in(im, colouring)

    def generate_line(self, colouring: np.ndarray) -> np.ndarray:
        im, draw = self.get_canvas()

        x_coords = np.random.uniform(low=0, high=im.size[0], size=(2))
        y_coords = np.random.uniform(low=0, high=im.size[1], size=(2))

        draw.line(
            (
                (x_coords[0], y_coords[0]),
                (x_coords[1], y_coords[1])
            ),
            width=1, fill=self.shape_drawing_colour
        )

        return self.colour_in(im, colouring)

    def generate_parallel_lines(self, colouring: np.ndarray) -> np.ndarray:
        im, draw = self.get_canvas()

        x_coords = np.random.uniform(low=im.size[0]//10, high=(im.size[0] * 9) // 10, size=(2))
        y_coords = np.random.uniform(low=im.size[1]//10, high=(im.size[1] * 9) // 10, size=(2))

        x_translation = np.random.uniform(low=im.size[0]//20, high=im.size[0]//10)
        y_translation = np.random.uniform(low=im.size[1]//20, high=im.size[1]//10)

        draw.line(
            (
                (x_coords[0], y_coords[0]),
                (x_coords[1], y_coords[1])
            ),
            width=1, fill=self.shape_drawing_colour
        )
        draw.line(
            (
                (x_coords[0] + x_translation, y_coords[0] + y_translation),
                (x_coords[1] + x_translation, y_coords[1] + y_translation)
            ),
            width=1, fill=self.shape_drawing_colour
        )

        return self.colour_in(im, colouring)

    def generate_triangle(self, colouring: np.ndarray) -> np.ndarray:
        im, draw =self.get_canvas()

        points = self.four_points(im)
        choice = np.random.choice(list(range(4)), size=3, replace=False)
        triangle_points = tuple(points[c] for c in choice)

        draw.polygon(triangle_points, outline=self.shape_drawing_colour, fill=self.background)

        return self.colour_in(im, colouring)

    @staticmethod
    def scale_translate_and_rotate(image: np.ndarray) -> np.ndarray:
        """Method to randomly scale, translate and rotate an image.
        
        Use this if you feel that the generator-process above is a bit too easy
        for the model (it places the points in fixed coordinates, after all).

        This method is a wrapper around other methods to rescale, translate and rotate a
        shape to have more variety in the output.
        
        Please note that this methods works on the 3D-array, not the 2-D-shape-mask!
        This is to make use of the all the ready-made PIL-functions to manipulate images.
        However, since we want to stay with numpy for as long as possible, we will not
        give back a PIL-image, but the resulting 3D-array.
        """
        # TODO: Shouldn't there be some code here that actually does something? ;)
        return image

# PL-Module definition with data loaders

In [None]:
"""Defines a loader for the training and validation.

The thing to look out for is that there is no fixed loader, 
but we will create all the data on the fly.
"""

from typing import Optional

import torch
import pytorch_lightning as pl
from torchvision import transforms, datasets
from torch.utils.data import DataLoader, IterableDataset


class ShapeIterableDataset(IterableDataset):
    def __init__(self, N_x=256, N_y=256, colouring=None, batch_size=10):
        super(ShapeIterableDataset).__init__()
        self.shape_generator = ShapeGenerator(N_x, N_y)
        self.colouring = colouring
        self.transforms = transforms.ToTensor()
        self.batch_size = batch_size

    def __iter__(self):
        dataset = []
        for _ in range(self.batch_size):
            im, label = self.shape_generator.generate_random(colouring=self.colouring)
            dataset.append([self.transforms(im), label])
        return iter(dataset)


class ShapeIterableDataLoader(pl.LightningDataModule):
    def __init__(self, N_x=50, N_y=50, batch_size=10, colouring=Colouring.SINGLE_COLOUR):
        super(ShapeIterableDataLoader, self).__init__()
        self.N_x = N_x
        self.N_y = N_y
        self.batch_size = batch_size
        self.colouring = colouring

    def prepare_data(self) -> None:
        return super().prepare_data()

    def prepare_data_per_node(self):
        pass

    def setup(self, stage: Optional[str] = None) -> None:
        return super().setup(stage)

    def train_dataloader(self):
        return DataLoader(
            ShapeIterableDataset(self.N_x, self.N_y, colouring=self.colouring, batch_size=self.batch_size)
        )
    
    def val_dataloader(self):
        return DataLoader(
            ShapeIterableDataset(self.N_x, self.N_y, colouring=self.colouring, batch_size=self.batch_size),
        )
    
    def test_dataloader(self):
        return DataLoader(
            ShapeIterableDataset(self.N_x, self.N_y, colouring=self.colouring, batch_size=self.batch_size),
        )


In [None]:
"""Simple CNN model to verify the basic idea of being able to identify shapes."""


class ShapeDetectorModelCNN(pl.LightningModule):

    def __init__(self, N_c=3, N_target=10, learning_rate=0.001):
        super(ShapeDetectorModelCNN, self).__init__()

        # Hyperparameters:
        self.lr = learning_rate
        self.loss = torch.nn.CrossEntropyLoss()

        # Basic architecture, reminiscent of LeNet with a bit of batch normalisation:
        self.conv1 = torch.nn.Conv2d(in_channels=N_c, out_channels=20, kernel_size=(5, 5))
        self.relu1 = torch.nn.ReLU()
        self.maxpool1 = torch.nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))
        self.bn1 = torch.nn.BatchNorm2d(20)

        self.conv2 = torch.nn.Conv2d(in_channels=20, out_channels=50, kernel_size=(5, 5))
        self.relu2 = torch.nn.ReLU()
        self.maxpool2 = torch.nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))
        self.bn2 = torch.nn.BatchNorm2d(50)

        self.linear3 = torch.nn.Linear(in_features=450 * 9, out_features=500)
        self.relu3 = torch.nn.ReLU()

        self.dropout = torch.nn.Dropout(p=0.5)

        self.linear4 = torch.nn.Linear(in_features=500, out_features=N_target)
        self.logsoftmax4 = torch.nn.Softmax(dim=1)

    def forward(self, x):
        z1 = self.bn1(self.maxpool1(self.relu1(self.conv1(x))))
        z2 = self.bn2(self.maxpool2(self.relu2(self.conv2(z1))))
        z3 = self.dropout(self.relu3(self.linear3(torch.flatten(z2, 1))))
        return self.logsoftmax4(self.linear4(z3))

    def configure_optimizers(self):
        return torch.optim.SGD(self.parameters(), lr = self.lr)

    def training_step(self, batch, batch_idx):
        x, y = batch
        loss = self.loss(self.forward(x), y)
        self.log('train_loss', loss, prog_bar=True, logger=True)
        return loss

    def validation_step(self, batch, batch_idx):
        x, y = batch
        loss = self.loss(self.forward(x), y)
        self.log('val_loss', loss, prog_bar=True, logger=True)
        return loss

    def test_step(self, batch, batch_idx):
        x, y = batch
        loss = self.loss(self.forward(x), y)
        self.log('test_loss', loss, prog_bar=True, logger=True)
        return loss

# Training setup

In [None]:
# Set up tensorboard to monitor the trainings:
%load_ext tensorboard
%tensorboard --logdir ./lightning_logs

In [None]:
model_checkpoint_callback = pl.callbacks.ModelCheckpoint(
    dirpath='checkpoints',
    verbose=True,
    save_top_k=1,
    filename='shape_identification_best',
    monitor=['train_loss', 'val_loss'],
    mode='min',
)

early_stopping_callback = pl.callbacks.EarlyStopping('val_loss', patience=5, verbose=True, mode='min')

logger = pl.loggers.TensorBoardLogger('lightning_logs', name='shape_identification')

trainer = pl.Trainer(
    checkpoint_callback=model_checkpoint_callback,
    callbacks=[early_stopping_callback],
    check_val_every_n_epoch=5,
    logger=logger,
    gpus=1, 
    max_epochs=max_epochs,
)

In [None]:
data_module = ShapeIterableDataLoader(N_x=N_x, N_y=N_y, batch_size=batch_size, colouring=colouring)
shape_cnn = ShapeDetectorModelCNN(N_c=N_c, N_target=N_target, learning_rate=learning_rate)

# Create Trainer Object
trainer.fit(model=shape_cnn, datamodule=data_module)

# Training result analysis

In [None]:
!ls lightning_logs/shape_identification/version_7/checkpoints

In [None]:
if analyse_last_model_trained:
    shape_cnn_best = shape_cnn
else:
    # First, let's load the best model.
    # For this to work correctly, you have to iteratively find the last run that was performed
    # as somehow the best-model is not stored.
    shape_cnn_best = ShapeDetectorModelCNN.load_from_checkpoint(
        checkpoint_path='lightning_logs/shape_identification/version_7/checkpoints/epoch=99-step=100000.ckpt', 
        N_c=N_c, N_target=N_target
    )
    
# Then let's generate a bunch of data and predict on it:
test_data = ShapeIterableDataset(N_x, N_y, colouring, batch_size=batch_size)

In [None]:
from tqdm.notebook import tqdm

predictions = []
labels = []

shape_cnn_best.freeze()

for im_tensor, label in tqdm(test_data):
    yp = shape_cnn_best(im_tensor.unsqueeze(dim=0))
    predictions.append(yp.numpy().argmax())
    labels.append(label)

shape_cnn_best.unfreeze()

In [None]:
# Print the confusion matrix:
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix

sns.set(rc={"figure.figsize":(15, 10)})
ax = sns.heatmap(
    confusion_matrix(labels, predictions, normalize='true'), 
    annot=True
)
_ = ax.set(
    xlabel='predicted as', 
    ylabel='true label', 
    xticklabels=[s.name for s in ShapeTypes],
    yticklabels=[s.name for s in ShapeTypes]
)
plt.show()