In [None]:
import torch
import torch.nn as nn

from torchinfo import summary

In [None]:
def build_circle_squasher():
    # build a "down" network, and "up" network, and a "middle" network
    circle_squasher_down = nn.Sequential(
        nn.Conv2d(
            in_channels=1,
            out_channels=8,
            kernel_size=3,
            stride=1,
            padding=1
        ),
        nn.ReLU(),
        nn.Conv2d(
            in_channels=8,
            out_channels=8,
            kernel_size=3,
            stride=1,
            padding=1
        ),
        nn.ReLU(),
        nn.MaxPool2d(
            kernel_size=2,
            stride=2
        ),

        nn.Conv2d(
            in_channels=8,
            out_channels=16,
            kernel_size=3,
            stride=1,
            padding=1
        ),
        nn.ReLU(),
        nn.Conv2d(
            in_channels=16,
            out_channels=16,
            kernel_size=3,
            stride=1,
            padding=1
        ),
        nn.ReLU(),
        nn.MaxPool2d(
            kernel_size=2,
            stride=2
        ),

        nn.Conv2d(
            in_channels=16,
            out_channels=32,
            kernel_size=3,
            stride=1,
            padding=1
        ),
        nn.ReLU(),
        nn.Conv2d(
            in_channels=32,
            out_channels=32,
            kernel_size=3,
            stride=1,
            padding=1
        ),
        nn.ReLU(),
    )
    # output is [*, 32, 32, 32]

    # take a single parameter and expand it to a larger layer
    circle_squasher_middle = nn.Sequential(
        nn.Linear(1, 8),
        nn.ReLU(),
        nn.Linear(8, 64),
        nn.ReLU(),
        nn.Linear(64, 1024),
        nn.ReLU()
    )
    # output is [*, 1024]

    circle_squasher_up = nn.Sequential(
        nn.ConvTranspose2d(
            in_channels=32+1,
            out_channels=16,
            kernel_size=2,
            stride=2,
            padding=0
        ),
        nn.ReLU(),
        nn.Conv2d(
            in_channels=16,
            out_channels=16,
            kernel_size=3,
            stride=1,
            padding=1
        ),
        nn.ReLU(),

        nn.ConvTranspose2d(
            in_channels=16,
            out_channels=8,
            kernel_size=2,
            stride=2,
            padding=0
        ),
        nn.ReLU(),
        nn.Conv2d(
            in_channels=8,
            out_channels=1,
            kernel_size=3,
            stride=1,
            padding=1
        ),
        nn.ReLU(),
    )

    class CircleSquasher(nn.Module):
        def __init__(self):
            super().__init__()
            
            self.circle_squasher_down = circle_squasher_down
            self.circle_squasher_middle = circle_squasher_middle
            self.circle_squasher_up = circle_squasher_up

        def forward(self, image, radius_scale_factor):
            batch_count = image.shape[0]

            # print(f"image.shape: {image.shape}")
            # print(f"radius_scale_factor.shape: {radius_scale_factor.shape}")
            
            down_image_filters = self.circle_squasher_down(image)
            # print(f"down_image_filters.shape: {down_image_filters.shape}")
            flat_down_image_filters = down_image_filters.reshape(batch_count, -1)
            # print(f"flat_down_image_filters shape: {flat_down_image_filters.shape}")

            radius_scale_factor_embedding = self.circle_squasher_middle(radius_scale_factor)
            # print(f"radius_scale_factor_embedding shape: {radius_scale_factor_embedding.shape}")

            flat_down_image_filters_with_radius_scale_factor = torch.cat(
                (
                    flat_down_image_filters,
                    radius_scale_factor_embedding
                ),
                dim=1
            )
            # print(f"flat_down_image_filters_with_radius_scale_factor.shape: {flat_down_image_filters_with_radius_scale_factor.shape}")

            image_filters_with_radius_scale_factor = flat_down_image_filters_with_radius_scale_factor.reshape(batch_count, -1, 32, 32)
            # print(f"image_filters_with_radius_scale_factor.shape: {image_filters_with_radius_scale_factor.shape}")
            image = self.circle_squasher_up(image_filters_with_radius_scale_factor)

            return image
    
    #return circle_squasher_down, circle_squasher_up
    return CircleSquasher()

In [None]:
1*16*32*32 + 1*1024

In [None]:
(1*16*32*32 + 1*1024) / (32*32)

In [None]:
summary(
    build_circle_squasher(),
    input_data=(torch.ones(2, 1, 128, 128), torch.ones(2, 1)),
    col_names=("input_size", "output_size", "num_params")
)

In [None]:
import itertools

import matplotlib.pyplot as plt
import numpy as np
from PIL import Image, ImageDraw

In [None]:
def get_empty_image(image_width, image_height):
    # mode="F" for 32-bit floating point pixels
    # mode="LA" for 8-bit grayscale with alpha channel
    empty_image = Image.new(
        mode="F",
        size=(image_width, image_height),
        color=255
    )

    return empty_image

In [None]:
def draw_a_circle(target_image, circle_e1, circle_e2, circle_radius, outline_color=255):
    """
    The most simple image of a circle?
    """
    artist = ImageDraw.ImageDraw(target_image)
    artist.arc(
        (
            circle_e1 - circle_radius/2,
            circle_e2 - circle_radius/2,
            circle_e1 + circle_radius/2,
            circle_e2 + circle_radius/2
        ),
        start=0,
        end=360,
        width=1,
        fill=outline_color
    )
    
    return target_image


In [None]:
def fill_a_circle(target_image, circle_e1, circle_e2, circle_radius, circle_fill_color=255):
    artist = ImageDraw.ImageDraw(target_image)
    artist.ellipse(
        (
            circle_e1 - circle_radius/2,
            circle_e2 - circle_radius/2,
            circle_e1 + circle_radius/2,
            circle_e2 + circle_radius/2
        ),
        width=1,
        #outline=255,  # what happens without this?
        fill=circle_fill_color
    )

    return target_image

In [None]:
def image_of_circles_with_scaling(circle_count):
    """    
    scale_factor should be in the interval [1/2, 3/2]
    """
    image_of_filled_circles = get_empty_image(
        image_width=128,
        image_height=128
    )

    circle_radius_list = list()
    image_of_outlined_circles = get_empty_image(
        image_width=128,
        image_height=128
    )

    circle_parameters_list = list()
    for circle_i in range(circle_count):
        circle_radius = np.random.randint(low=10, high=40)
        circle_radius_list.append(circle_radius)
        circle_e1=np.random.randint(low=20, high=80)
        circle_e2=np.random.randint(low=20, high=80)

        circle_fill_color = np.random.randint(low=100, high=200)

        circle_parameters_list.append(
            {
                "circle_radius": circle_radius,
                "circle_e1": circle_e1,
                "circle_e2": circle_e2,
                "circle_fill_color": circle_fill_color
            }   
        )

    radius_scale_factor = np.random.random() + 0.5
    for circle_parameters in circle_parameters_list:
        # these are the input training images
        fill_a_circle(
            target_image=image_of_filled_circles,
            **circle_parameters
            #circle_e1=circle_e1,
            #circle_e2=circle_e2,
            #circle_radius=circle_radius,
        )
        # these are the "target" training images
        # squash or inflate the circles
        scaled_circle_parameters = circle_parameters.copy()
        scaled_circle_parameters["circle_radius"] *= radius_scale_factor
        fill_a_circle(
            target_image=image_of_outlined_circles,
            **scaled_circle_parameters
            #circle_e1=circle_e1,
            #circle_e2=circle_e2,
            #circle_radius=circle_radius,
        )

    # squash or inflate the circles
    for circle_parameters in circle_parameters_list:
        circle_parameters.pop("circle_fill_color")

        draw_a_circle(
            target_image=image_of_outlined_circles,
            outline_color=0,
            **circle_parameters,
            #circle_e1=circle_e1,
            #circle_e2=circle_e2,
            #circle_radius=circle_radius,
        )


    return (circle_radius_list, radius_scale_factor, image_of_filled_circles, image_of_outlined_circles)

In [None]:
import matplotlib.pyplot as plt

# generate 4 sets of input/output
radius_scale_factor_list = list()
circle_radiuses_list = list()
input_images = list()
output_images = list()

for _ in range(4):
    circle_count = np.random.randint(low=1, high=5)
    circle_radiuses, radius_scale_factor, input_circles_image, output_circles_image = image_of_circles_with_scaling(
        circle_count=circle_count,
    )
    radius_scale_factor_list.append(radius_scale_factor)
    circle_radiuses_list.append(circle_radiuses)
    
    input_images.append(input_circles_image)
    output_images.append(output_circles_image)

fig, axs = plt.subplots(nrows=2, ncols=2)
for i, (r, c) in enumerate(itertools.product(range(2), range(2))):
    print(f"circle radiuses: {circle_radiuses_list[i]}")
    print(f"radius scale factor: {radius_scale_factor_list[i]}")

    axs[r][c].imshow(input_images[i], origin="lower")
    #print(np.array(im))

fig, axs = plt.subplots(nrows=2, ncols=2)
for i, (r, c) in enumerate(itertools.product(range(2), range(2))):
    print('circle radiuses: {}'.format(circle_radiuses))

    axs[r][c].imshow(output_images[i], origin="lower")
    #print(np.array(im))


In [None]:
# a class to interact with DataLoaders
class CircleImageDataset:
    def __init__(self, circle_image_count):

        self.circle_image_list = list()

        for i in range(circle_image_count):
            
            circle_count = np.random.randint(low=1, high=5)
            circle_radius_list, radius_scale_factor, input_circles_image, target_circles_image = image_of_circles_with_scaling(
                circle_count=circle_count
            )

            # self.circle_image_list is a list of tuples of training data like this:
            #   [
            #      (target_image, (input_image, radius_scale_factor))
            #   ]
            self.circle_image_list.append(
                (
                    # get the right type here - single precision floating point
                    # this depends on how the optimization is handled
                    # but I want to get it right here
                    #circle_radiuses,

                    # the PIL image is converted to a 2D numpy array here
                    # in addition an extra dimension is inserted for 'channel'
                    # which PyTorch convolutional networks expect
                    np.expand_dims(
                        np.array(target_circles_image),
                        axis=0
                    ),

                    (
                        # the PIL image is converted to a 2D numpy array here
                        # in addition an extra dimension is inserted for 'channel'
                        # which PyTorch convolutional networks expect
                        np.expand_dims(
                            np.array(input_circles_image),
                            axis=0
                        ),
                        
                        np.array([radius_scale_factor], dtype=np.float32)
                    )
                )
            )

    def __getitem__(self, index):
        # self.circle_image_list[index] looks like
        #   (target_image, (input_image, radius_scale_factor))
        return self.circle_image_list[index]

    def __len__(self):
        return len(self.circle_image_list)

In [None]:
def test_circle_image_dataset():
    circle_image_dataset = CircleImageDataset(100)
    print(f"len(circle_image_dataset): {len(circle_image_dataset)}")
    target_circle_image, (input_circle_image, radius_scale_factor) = circle_image_dataset[99]
    print(f"target image.shape : {target_circle_image.shape}")
    print(f"input image.shape : {input_circle_image.shape}")
    print(f"radius scale factor: {radius_scale_factor}")

test_circle_image_dataset()

In [None]:
from torch.utils.data import DataLoader

In [None]:
def test_circle_image_dataloader():
    circle_image_dataloader = DataLoader(CircleImageDataset(circle_image_count=100), batch_size=10)
    for (target_circle_images, (input_circle_images, radius_scale_factors)) in circle_image_dataloader:

        print(f"target_circle_images.shape: {target_circle_images.shape}")
        print(f"target_circle_images.dtype: {target_circle_images.dtype}")
        print(f"input_circle_images.shape:  {input_circle_images.shape}")
        print(f"input_circle_images.dtype:  {input_circle_images.dtype}")
        print(f"radius_scale_factors.shape: {radius_scale_factors.shape}")
        print(f"radius_scale_factors.dtype: {radius_scale_factors.dtype}")
        print(f"radius_scale_factors: {radius_scale_factors}")

        test_circle_squasher = build_circle_squasher()
        predicted_circle_images = test_circle_squasher(input_circle_images, radius_scale_factors)
        print(f"predicted_circle_images.shape: {predicted_circle_images.shape}")
        print(f"predicted_circle_images.dtype: {predicted_circle_images.dtype}")
        
        break

test_circle_image_dataloader()

In [None]:
# 100,000, no shuffle, works, 20 epochs is ok but sometimes training does not progress
# 100,000 with shuffling has more stable training
# 10,000, no shuffle, works, 50 epochs is ok
train_circle_image_loader = DataLoader(
    CircleImageDataset(circle_image_count=10000),
    batch_size=200,  # one batch must fit in the GPU memory
    shuffle=True
)

test_circle_image_loader = DataLoader(
    CircleImageDataset(circle_image_count=1000),
    batch_size=100
)

#validate_circle_image_loader = DataLoader(CircleImageDataset(circle_image_count=1000), batch_size=100)

In [None]:
len(train_circle_image_loader.dataset)

In [None]:
def train(
    circle_squasher_model,
    optimizer,
    loss_function,
    train_dataloader,
    test_dataloader,
    epoch_count
):
    
    if torch.cuda.is_available():
        device = torch.device("cuda")
    else:
        device = torch.device("cpu")

    circle_squasher_model.to(device)

    for epoch_i in range(epoch_count):
        training_loss = 0.0
        circle_squasher_model.train()
        for correct_squashed_circle_images, (circle_images, radius_scale_factors) in train_dataloader:
            optimizer.zero_grad()

            # torch calls circle_images 'inputs'
            circle_images = circle_images.to(device)
            correct_squashed_circle_images = correct_squashed_circle_images.to(device)
            radius_scale_factors = radius_scale_factors.to(device)
            
            predicted_squashed_circle_images = circle_squasher_model(
                circle_images,
                radius_scale_factors
            )
            
            loss = loss_function(
                predicted_squashed_circle_images,
                correct_squashed_circle_images
            )
            loss.backward()
            optimizer.step()

            training_loss += loss.data.item()

        training_loss /= len(train_dataloader.dataset)

        test_loss = 0.0
        circle_squasher_model.eval()
        for correct_squashed_circle_images, (circle_images, radius_scale_factors) in test_dataloader:

            # torch calls circle_images 'inputs'
            circle_images = circle_images.to(device)
            correct_squashed_circle_images = correct_squashed_circle_images.to(device)
            radius_scale_factors = radius_scale_factors.to(device)

            predicted_squashed_circle_images = circle_squasher_model(
                circle_images,
                radius_scale_factors
            )

            loss = loss_function(predicted_squashed_circle_images, correct_squashed_circle_images)
            test_loss += loss.data.item()

        test_loss /= len(test_dataloader.dataset)

        print(
            #'Epoch: {}, Training Loss: {:.2f}, Test Loss: {:.2f}, percent_wrong = {}'.format(
            'Epoch: {}, Training Loss: {:.2f}, Test Loss: {:.2f}'.format(
                epoch_i, training_loss, test_loss
            )
        )

In [None]:
import torch.optim

circle_squasher = build_circle_squasher()
train(
    circle_squasher,
    torch.optim.Adam(circle_squasher.parameters()),
    torch.nn.MSELoss(),
    train_circle_image_loader,
    test_circle_image_loader,
    epoch_count=100
)

In [None]:
# try out the circle squasher
if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

circle_squasher.eval()
a_circle_image_dataloader = DataLoader(CircleImageDataset(10), batch_size=1)
for a_circle_target_image, (a_circle_input_image, radius_scale_factor) in a_circle_image_dataloader:
    print(f"input shape : {a_circle_input_image.shape}")
    print(f"radius_scale_factor: {radius_scale_factor}")

    a_squashed_circle_tensor = circle_squasher(
        a_circle_input_image.to(device),
        radius_scale_factor.to(device)
    )
    a_squashed_circle_image = a_squashed_circle_tensor.cpu().detach().numpy()
    
    print(f"output shape: {a_squashed_circle_image.shape}")
    
    fig, axs = plt.subplots(nrows=1, ncols=3)
    #for i, (r, c) in enumerate(itertools.product(range(1), range(2))):
    #print('circle radiuses: {}'.format(circle_radiuses))
    axs[0].imshow(a_circle_input_image[0, 0, :, :], origin="lower")
    axs[1].imshow(a_circle_target_image[0, 0, :, :], origin="lower")
    axs[1].set_xlabel(radius_scale_factor)
    axs[2].imshow(a_squashed_circle_image[0, 0, :, :], origin="lower")
    #print(np.array(im))
