# Affine Transform - Generate Neural Network

## Introduction

In this code, we generate a neural network that predicts the transformed (rotated) value of the input point.

We then enforce a constraint on the output set such that we require the transformed (rotated) points to stay within a certain box. These constraints can be defined in the form of $Ax < b$. 

The idea is to repair the layers of the original neural network train in order to ensure the following two cases:

1. If true transformed (rotated) points are following the constraint, the predicted transformed (rotated) points are close to the true transformed (rotated) points as well as follow the constraint; and

2. If true transformed (rotated) points are violating the constraint, the predicted transformed (rotated) points are close to the true transformed (rotated) points but also follow the constraint.
-----------------


In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from tensorflow import keras
import numpy as np
from shapely.geometry import Polygon, Point
from affine_utils import gen_rand_points_within_poly, Batch
import pickle
import os
import argparse
from matplotlib import pyplot as plt
import matplotlib as mpl

In [2]:
path = os.path.join(os.getcwd(), "original_net")
if not os.path.exists(path):
    os.makedirs(path)
    print("The new directory is created!")
    

## Data Generation

Start by creating a dataset for the neural network that learns an Affine Transform in a 2d space.

In [3]:
num_pts = 300  # number of samples
train2test_ratio = 0.7
## affine transformation matrices
translate1 = np.array(
    [[1, 0, 2.5], [0, 1, 2.5], [0, 0, 1]]
)  # translation matrix 1
translate2 = np.array(
    [[1, 0, -2.5], [0, 1, -2.5], [0, 0, 1]]
)  # translation matrix 2
rotate = np.array(
    [
        [np.cos(np.pi / 4), -np.sin(np.pi / 4), 0],
        [np.sin(np.pi / 4), np.cos(np.pi / 4), 0],
        [0, 0, 1],
    ]
)  # rotation matrix
## original, transformed, and constraint Polygons
poly_orig = Polygon([(1, 1), (4, 1), (4, 4), (1, 4)])
poly_trans = Polygon(
    [(2.5, 4.621), (4.624, 2.5), (2.5, 0.3787), (0.3787, 2.5)]
)
vert_const_inp = np.array(
    [[1.25, 3.75, 3.75, 1.25], [1.25, 1.25, 3.75, 3.75], [1, 1, 1, 1]]
)  # contraint vertices in input space
vert_const_out = np.matmul(
    np.matmul(np.matmul(translate1, rotate), translate2), vert_const_inp
)  # constraint vertices in output space
poly_const = Polygon(
    [
        (vert_const_out[0, 0], vert_const_out[1, 0]),
        (vert_const_out[0, 1], vert_const_out[1, 1]),
        (vert_const_out[0, 2], vert_const_out[1, 2]),
        (vert_const_out[0, 3], vert_const_out[1, 3]),
    ]
)


In [4]:
x = gen_rand_points_within_poly(poly_orig, num_pts)
y = np.matmul(np.matmul(np.matmul(translate1, rotate), translate2), x.T)
y = y.T
## construct a data batch class
batch_size = int(train2test_ratio * num_pts)
batch = Batch(x, y, batch_size)
x_train, y_train, x_test, y_test = batch.get_batch()
print("Data size: {}".format(num_pts))
print("Train/Test Split: {}".format(train2test_ratio))
print(f"Training Set Size = {x_train.shape}")
print(f"Testing Set Size = {x_test.shape}")


Data size: 300
Train/Test Split: 0.7
Training Set Size = (210, 3)
Testing Set Size = (90, 3)


In [5]:
x_train_tensor = x_train.astype(np.float32)
y_train_tensor = y_train.astype(np.float32)
x_test_tensor = x_test.astype(np.float32)
y_test_tensor = y_test.astype(np.float32)

train_set = []
for i in range(x_train.shape[0]):
    train_set.append([x_train_tensor[i,:], y_train_tensor[i,:]])

test_set = []
for i in range(x_test.shape[0]):
    test_set.append([x_test_tensor[i,:], y_test_tensor[i,:]])


# print(train_set)                                      
trainloader = torch.utils.data.DataLoader(train_set, batch_size=16, shuffle=True)
testloader = torch.utils.data.DataLoader(test_set, batch_size=1, shuffle=True)

## Neural Network Specification

Now that we have the dataset ready, let us start by defining the neural network to learn this affine transform.

In [6]:
## Network
input_dim = 3
output_dim = 3
hid_dim_0 = 20
hid_dim_1 = 10
architecture = [input_dim, hid_dim_0, hid_dim_1, output_dim]
regularizer_rate = 0.001
train_epochs = 1000
learning_rate = 0.003

In [7]:
class Net(nn.Module):

    def __init__(self):
        super(Net, self).__init__()
        # 1 input image channel, 6 output channels, 5x5 square convolution
        # kernel
       
        # an affine operation: y = Wx + b
        self.fc1 = nn.Linear(architecture[0], architecture[1])  # 5*5 from image dimension
        self.fc2 = nn.Linear(architecture[1], architecture[2])
        self.fc3 = nn.Linear(architecture[2], architecture[3])

    def forward(self, x):
        # Max pooling over a (2, 2) window
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x
net = Net()
print(net)

Net(
  (fc1): Linear(in_features=3, out_features=10, bias=True)
  (fc2): Linear(in_features=10, out_features=3, bias=True)
  (fc3): Linear(in_features=3, out_features=20, bias=True)
)


Now, we train the neural network

In [8]:
optimizer = torch.optim.Adam(net.parameters(), lr=learning_rate)
criterion = nn.MSELoss()

loss_hist = []
for e in range(train_epochs):
    running_loss = 0
    for x,y in trainloader:
        # Flatten MNIST images into a 784 long vector
    
        # Training pass
        optimizer.zero_grad()
        params = list(net.parameters())
        output = net(x)
        regu = [torch.mean(x**2).item() for x in params]

        loss = criterion(output, y) + (0.5 * regularizer_rate * sum(regu))
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
    else:
        print(f"Training loss: {running_loss/len(trainloader)}")
        loss_hist.append(running_loss/len(trainloader))

  return F.mse_loss(input, target, reduction=self.reduction)


RuntimeError: The size of tensor a (20) must match the size of tensor b (3) at non-singleton dimension 1

In [None]:
import matplotlib.pyplot as plt

def getMSE(dataloader):
    overall_loss = 0
    for x,y in dataloader: 
        with torch.no_grad():
            out_data = net(x)
            loss = criterion(out_data, y)
            overall_loss += loss

    overall_loss = overall_loss / len(testloader)
    return overall_loss

print(f"Model MSE on Train Data Set = {getMSE(trainloader)}")
print(f"Model MSE on Test Data Set = {getMSE(testloader)}")
plt.plot(loss_hist)

## Input/Output Visualization

In [None]:
plt.rcParams["text.usetex"] = False
mpl.style.use("seaborn")

x_poly_trans_bound, y_poly_trans_bound = poly_trans.exterior.xy
x_poly_orig_bound, y_poly_orig_bound = poly_orig.exterior.xy

plt.plot(
    x_poly_orig_bound,
    y_poly_orig_bound,
    color="plum",
    alpha=0.7,
    linewidth=3,
    solid_capstyle="round",
    zorder=2,
    label="Original Set",
)
plt.plot(
    x_poly_trans_bound,
    y_poly_trans_bound,
    color="tab:blue",
    alpha=0.7,
    linewidth=3,
    solid_capstyle="round",
    zorder=2,
    label="Target Set",
)
plt.scatter(
    y_train[:, 0],
    y_train[:, 1],
    color="tab:blue",
    label="Original Target",
)
x_train_tensor = torch.Tensor(x_train)
with torch.no_grad():
    y_predict_tensor = net(x_train_tensor)
y_predict_train = y_predict_tensor.numpy()

plt.scatter(
    y_predict_train[:, 0],
    y_predict_train[:, 1],
    color="mediumseagreen",
    label="Predicted Target",
)
# plt.legend(loc="upper left", frameon=False, fontsize=20)
plt.title(r"In-place Rotation (Training dataset)", fontsize=25)
plt.xlabel("x", fontsize=25)
plt.ylabel("y", fontsize=25)
plt.show()

In [None]:
plt.plot(
    x_poly_orig_bound,
    y_poly_orig_bound,
    color="plum",
    alpha=0.7,
    linewidth=3,
    solid_capstyle="round",
    zorder=2,
    label="Original Set",
)
plt.plot(
    x_poly_trans_bound,
    y_poly_trans_bound,
    color="tab:blue",
    alpha=0.7,
    linewidth=3,
    solid_capstyle="round",
    zorder=2,
    label="Target Set",
)
plt.scatter(
    y_test[:, 0],
    y_test[:, 1],
    color="tab:blue",
    label="Original Target",
)
x_test_tensor = torch.Tensor(x_test)
with torch.no_grad():
    y_predict_tensor = net(x_test_tensor)
y_predict_test = y_predict_tensor.numpy()

plt.scatter(
    y_predict_test[:, 0],
    y_predict_test[:, 1],
    color="mediumseagreen",
    label="Predicted Target",
)
plt.legend(loc="upper left", frameon=False, fontsize=20)
plt.title(r"In-place Rotation (Training dataset)", fontsize=25)
plt.xlabel("x", fontsize=25)
plt.ylabel("y", fontsize=25)
plt.show()

## Saving Data and Model

In [None]:
print("Model's state_dict:")
for param_tensor in net.state_dict():
    print(param_tensor, "\t", net.state_dict()[param_tensor].size())

# Print optimizer's state_dict
print("Optimizer's state_dict:")
for var_name in optimizer.state_dict():
    print(var_name, "\t", optimizer.state_dict()[var_name])

In [None]:
if not os.path.exists(path + "/model"):
    os.makedirs(path + "/model")
    
model_scripted = torch.jit.script(net) # Export to TorchScript
model_scripted.save(path + "/model/affine_transform_pt_orig_model.pt")


if not os.path.exists(path + "/data"):
    os.makedirs(path + "/data")
with open(path + "/data/input_output_data_tc1.pickle", "wb") as data:
    pickle.dump([x_train, y_train, x_test, y_test], data)

In [None]:
with open(path + "/data/input_output_data_tc1.pickle", "rb") as data:
    dataset = pickle.load(data)
## affine transformation matrices
translate1 = np.array(
    [[1, 0, 2.5], [0, 1, 2.5], [0, 0, 1]]
)  # translation matrix 1
translate2 = np.array(
    [[1, 0, -2.5], [0, 1, -2.5], [0, 0, 1]]
)  # translation matrix 2
rotate = np.array(
    [
        [np.cos(np.pi / 4), -np.sin(np.pi / 4), 0],
        [np.sin(np.pi / 4), np.cos(np.pi / 4), 0],
        [0, 0, 1],
    ]
)  # rotation matrix

## original, transformed, and constraint Polygons
poly_orig = Polygon([(1, 1), (4, 1), (4, 4), (1, 4)])
poly_trans = Polygon(
    [(2.5, 4.621), (4.624, 2.5), (2.5, 0.3787), (0.3787, 2.5)]
)
inp_const_vertices = np.array(
    [[1.25, 3.75, 3.75, 1.25], [1.25, 1.25, 3.75, 3.75], [1, 1, 1, 1]]
)  # contraint vertices in input space
out_const_vertices = np.matmul(
    np.matmul(np.matmul(translate1, rotate), translate2), inp_const_vertices
)  # constraint vertices in output space
poly_const = Polygon(
    [
        (out_const_vertices[0, 0], out_const_vertices[1, 0]),
        (out_const_vertices[0, 1], out_const_vertices[1, 1]),
        (out_const_vertices[0, 2], out_const_vertices[1, 2]),
        (out_const_vertices[0, 3], out_const_vertices[1, 3]),
    ]
)

# divide training dataset
x_inside = []
x_outside = []
y_inside = []
y_outside = []

for i in range(dataset[0].shape[0]):
    if Point([dataset[1][i][0], dataset[1][i][1]]).within(poly_const):
        x_inside.append(dataset[0][i])
        y_inside.append(dataset[1][i])
    else:
        x_outside.append(dataset[0][i])
        y_outside.append(dataset[1][i])

if not os.path.exists(path + "/data"):
    os.makedirs(path + "/data")
with open(
    path + "/data/input_output_data_inside_train_tc1.pickle", "wb"
) as data:
    print(f"number of training points inside: {len(y_inside)}")
    pickle.dump([np.array(x_inside), np.array(y_inside)], data)
with open(
    path + "/data/input_output_data_outside_train_tc1.pickle", "wb"
) as data:
    print(f"number of training points outside: {len(y_outside)}")
    pickle.dump([np.array(x_outside), np.array(y_outside)], data)

# divide testing dataset
x_inside = []
x_outside = []
y_inside = []
y_outside = []

for i in range(dataset[2].shape[0]):
    if Point([dataset[3][i][0], dataset[3][i][1]]).within(poly_const):
        x_inside.append(dataset[2][i])
        y_inside.append(dataset[3][i])
    else:
        x_outside.append(dataset[2][i])
        y_outside.append(dataset[3][i])
if not os.path.exists(path + "/data"):
    os.makedirs(path + "/data")
with open(
    path + "/data/input_output_data_inside_test_tc1.pickle", "wb"
) as data:
    print(f"number of testing points inside: {len(y_inside)}")
    pickle.dump([np.array(x_inside), np.array(y_inside)], data)
with open(
    path + "/data/input_output_data_outside_test_tc1.pickle", "wb"
) as data:
    print(f"number of testing points outside: {len(y_outside)}")
    pickle.dump([np.array(x_outside), np.array(y_outside)], data)
