In [None]:
%load_ext autoreload
%autoreload 2

import numpy as np
from utils import rotate_and_reposition, get_2D_coords_from_3D, get_3D_coords_from_2D, jitter_2D_coords, \
    deg_to_rad, rad_to_deg, CameraMetadata, center_3D_coordinates, rotation_matrix_to_euler_angles

<h1> Run simple cube experiment </h1>

<h2> Generate base model </h2>

In [None]:
# Establish base 3D coordinates

base_3D_coordinates = np.array([
    [-0.5, 0.5, -0.5],
    [-0.5, 0.5, 0.5],
    [0.5, 0.5, 0.5],
    [0.5, 0.5, -0.5],
    [0.5, 1.5, -0.5],
    [0.5, 1.5, 0.5],
    [-0.5, 1.5, 0.5],
    [-0.5, 1.5, -0.5]
])

In [None]:
camera_metadata = CameraMetadata(
    focal_length=4050 * 3.45e-6,
    focal_length_pixel=4050,
    baseline_m=0.105,
    pixel_count_width=4000,
    pixel_count_height=3096,
    image_sensor_width=0.01412,
    image_sensor_height=0.01035
)

In [None]:
def compute_orthonormal_basis(coords):
    """Given a set of jittered cube coordinates, approximate the orthonormal basis 
    corresponding to the new coordinate system that is axis-aligned with the coordinates."""
    print(coords)
    u = coords[3] - coords[0]
    v = coords[7] - coords[0]
    u = u / np.linalg.norm(u)
    v = v - (np.dot(u, v))*u
    v = v / np.linalg.norm(v)
    w = np.cross(u, v)
    w = w / np.linalg.norm(w)
    return np.vstack((u, v, w)).T

In [None]:
B = compute_orthonormal_basis(rotate_and_reposition(base_3D_coordinates, 
                                                    deg_to_rad(0), 
                                                    deg_to_rad(1), 
                                                    deg_to_rad(0), [0, 0, 0]))
rotation_matrix_to_euler_angles(B)

In [None]:
B

<h2> Validate data and functions via 2D / 3D rendering </h2>

In [None]:
from matplotlib import pyplot as plt
import pandas as pd
import plotly.express as px

In [None]:
def transform_into_df(coords):
    df = pd.DataFrame({
        'x': list(coords[:, 0]),
        'y': list(coords[:, 1]),
        'z': list(coords[:, 2])
    })

    return df

In [None]:
scaling_factor = 1.0
volume = scaling_factor**3
rescaled_3D_coordinates = volume * base_3D_coordinates
yaw_deg, pitch_deg, roll_deg = 10, 20, -10
yaw, pitch, roll = [deg_to_rad(x) for x in (yaw_deg, pitch_deg, roll_deg)]
new_centroid_position = [0, 0, 0]
repositioned_3D_coords = rotate_and_reposition(rescaled_3D_coordinates, yaw, pitch, roll, new_centroid_position)
centered_3D_coords = center_3D_coordinates(repositioned_3D_coords)
B = compute_orthonormal_basis(centered_3D_coords)
local_yaw, local_pitch, local_roll = rotation_matrix_to_euler_angles(B)
print(np.array([local_yaw, local_pitch, local_roll]) * 180.0 / np.pi)


In [None]:
df1 = transform_into_df(rescaled_3D_coordinates)
df2 = transform_into_df(repositioned_3D_coords)
df3 = transform_into_df(centered_3D_coords)
fig = px.scatter_3d(df1, x='x', y='y', z='z')
fig.show()

In [None]:
fig = px.scatter_3d(df2, x='x', y='y', z='z')
fig.show()

In [None]:
fig = px.scatter_3d(df3, x='x', y='y', z='z')
fig.show()

In [None]:
X_left, X_right = get_2D_coords_from_3D(base_3D_coordinates, camera_metadata)
plt.scatter(X_left[:, 0], X_left[:, 1], color='blue')
plt.scatter(X_right[:, 0], X_right[:, 1], color='red')
plt.grid()
plt.show()

<h2> Generate large dataset </h2>

In [None]:
from collections import defaultdict

In [None]:
volume_range = [0.5, 10.0]
yaw_range_deg = [-50, 50]
pitch_range_deg = [-50, 50]
roll_range_deg = [-50, 50]
centroid_range_x = [-0.5, 0.5]
centroid_range_y = [0.5, 1.5]
centroid_range_z = [-0.5, 0.5]

N = 500000
jitter_std = 10

dataset = defaultdict(list)
for t in range(N):
    
    volume = np.random.uniform(*volume_range)
    scaling_factor = volume**(1.0 / 3)
    rescaled_3D_coordinates = scaling_factor * base_3D_coordinates
    
    yaw = deg_to_rad(np.random.uniform(*yaw_range_deg))
    pitch = deg_to_rad(np.random.uniform(*pitch_range_deg))
    roll = deg_to_rad(np.random.uniform(*roll_range_deg))
    
    new_centroid_position = np.array([np.random.uniform(*x) for x in (centroid_range_x, centroid_range_y, centroid_range_z)])
    repositioned_3D_coords = rotate_and_reposition(rescaled_3D_coordinates, yaw, pitch, roll, new_centroid_position)
    repositioned_X_left, repositioned_X_right = get_2D_coords_from_3D(repositioned_3D_coords, camera_metadata)
    jittered_X_left, jittered_X_right = jitter_2D_coords(repositioned_X_left, repositioned_X_right, jitter_std)
    jittered_3D_coords = get_3D_coords_from_2D(jittered_X_left, jittered_X_right, camera_metadata)
    
    centered_3D_coords = center_3D_coordinates(jittered_3D_coords)
    B = compute_orthonormal_basis(centered_3D_coords)
    local_yaw, local_pitch, local_roll = rotation_matrix_to_euler_angles(B)
    
    
    dataset['X'].append(centered_3D_coords.tolist())
    dataset['y'].append(volume)
    dataset['yaw'].append(rad_to_deg(yaw))
    dataset['pitch'].append(rad_to_deg(pitch))
    dataset['roll'].append(rad_to_deg(roll))
    
    dataset['local_yaw'].append(rad_to_deg(local_yaw))
    dataset['local_pitch'].append(rad_to_deg(local_pitch))
    dataset['local_roll'].append(rad_to_deg(local_roll))
    
    if t % 1000 == 0:
        print(t)
    
    
    

In [None]:
df = pd.DataFrame(dataset)

<h1> Train neural network architecture </h1>

In [None]:
from keras.layers import Input, Dense, Flatten
from keras.models import Model
import keras
import torch
from torch import nn
from sklearn.linear_model import LinearRegression

In [None]:
class Network(nn.Module):
    """Network class defines neural-network architecture for both weight and k-factor estimation
    (currently both neural networks share identical architecture)."""

    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(24, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, 64)
        self.output = nn.Linear(64, 1)
        self.relu = nn.ReLU()

    def forward(self, x):
        """Run inference on input keypoint tensor."""
        x = x.view(x.shape[0], -1)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.fc3(x)
        x = self.relu(x)
        x = self.output(x)
        return x
    
    def forward_intermediate(self, x):
        """Run inference on input keypoint tensor and get final hiddel layer weights."""
        x = x.view(x.shape[0], -1)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.fc3(x)
        x = self.relu(x)
        return x
        

def convert_to_pytorch(model):
    pytorch_model = Network()
    weights = model.get_weights()

    pytorch_model.fc1.weight.data = torch.from_numpy(np.transpose(weights[0]))
    pytorch_model.fc1.bias.data = torch.from_numpy(np.transpose(weights[1]))
    pytorch_model.fc2.weight.data = torch.from_numpy(np.transpose(weights[2]))
    pytorch_model.fc2.bias.data = torch.from_numpy(np.transpose(weights[3]))
    pytorch_model.fc3.weight.data = torch.from_numpy(np.transpose(weights[4]))
    pytorch_model.fc3.bias.data = torch.from_numpy(np.transpose(weights[5]))
    pytorch_model.output.weight.data = torch.from_numpy(np.transpose(weights[6]))
    pytorch_model.output.bias.data = torch.from_numpy(np.transpose(weights[7]))
    
    return pytorch_model


def apply_final_layer_ols(pytorch_model):
    X_ols = pytorch_model.forward_intermediate(torch.from_numpy(X_train).float()).detach().numpy()
    lr = LinearRegression().fit(X_ols, y_train)
    pytorch_model.output.weight.data = torch.from_numpy(np.array(lr.coef_).reshape(1, -1))
    pytorch_model.output.bias.data = torch.from_numpy(np.array([lr.intercept_]))



def get_model():
    inputs = Input(shape=(24,))
    x = Dense(256, activation='relu')(inputs)
    x = Dense(128, activation='relu')(x)
    x = Dense(64, activation='relu')(x)
    pred = Dense(1)(x)
    model = Model(inputs, pred)
    return model


def train_model(model, X_train, y_train, X_val, y_val, train_config):
    epochs = train_config['epochs']
    batch_size = train_config['batch_size']
    lr = train_config['learning_rate']
    patience = train_config['patience']

    callbacks = [keras.callbacks.EarlyStopping(monitor='val_loss',
                                               min_delta=0,
                                               patience=patience,
                                               verbose=0,
                                               mode='auto')]

    optimizer = keras.optimizers.Adam(learning_rate=lr)
    model.compile(optimizer=optimizer,
                  loss='mean_squared_error',
                  metrics=['accuracy'])
    model.fit(X_train, y_train, validation_data=(X_val, y_val), callbacks=callbacks,
              batch_size=batch_size, epochs=epochs)

    return model

In [None]:
train_pct, val_pct, test_pct = 0.6, 0.2, 0.2
train_idx = int(train_pct * df.shape[0])
val_idx = int((train_pct + val_pct) * df.shape[0])
train_mask = df.index < train_idx
val_mask = (df.index >= train_idx) & (df.index < val_idx)
test_mask = (df.index >= val_idx)

In [None]:
X_train = np.array(list(df[train_mask].X.values))
X_train = X_train.reshape(X_train.shape[0], X_train.shape[1]*X_train.shape[2], -1)
y_train = df[train_mask].y.values

X_val = np.array(list(df[val_mask].X.values))
X_val = X_val.reshape(X_val.shape[0], X_val.shape[1]*X_val.shape[2], -1)
y_val = df[val_mask].y.values

X_test = np.array(list(df[test_mask].X.values))
X_test = X_test.reshape(X_test.shape[0], X_test.shape[1]*X_test.shape[2], -1)
y_test = df[test_mask].y.values


In [None]:
model = get_model()

In [None]:
model.summary()

In [None]:
train_config = dict(
    epochs=1000,
    batch_size=64, 
    learning_rate=1e-4,
    patience=30
)

train_model(model, X_train, y_train, X_val, y_val, train_config)

<h1> Accuracy Reporting </h1>

In [None]:
import seaborn as sns

In [None]:
pytorch_model = convert_to_pytorch(model)
apply_final_layer_ols(pytorch_model)

X = np.array(list(df.X.values))
X = X.reshape(X.shape[0], X.shape[1]*X.shape[2], -1)

y_pred = (pytorch_model(torch.from_numpy(X).float())).detach().numpy().squeeze()
df['y_pred'] = y_pred
df['pct_error'] = (df.y_pred - df.y) / df.y

In [None]:
yaw_bucket_cutoffs = np.arange(-50, 55, 5)
pitch_bucket_cutoffs = np.arange(-50, 55, 5)
roll_bucket_cutoffs = np.arange(-50, 55, 5)

In [None]:
def produce_heatmap(df, angle_1, angle_2, bucket_cutoffs_1, bucket_cutoffs_2):
    heatmap_arr = np.zeros([len(bucket_cutoffs_1) - 1, len(bucket_cutoffs_2) - 1])

    for i, angle_1_cutoffs in enumerate(zip(bucket_cutoffs_1, bucket_cutoffs_1[1:])):
        for j, angle_2_cutoffs in enumerate(zip(bucket_cutoffs_2, bucket_cutoffs_2[1:])):
            angle_1_low, angle_1_high = angle_1_cutoffs
            angle_2_low, angle_2_high = angle_2_cutoffs
            angle_1_mask = (df[angle_1] > angle_1_low) & (df[angle_1] < angle_1_high)
            angle_2_mask = (df[angle_2] > angle_2_low) & (df[angle_2] < angle_2_high)
            orientation_mask = angle_1_mask & angle_2_mask
            mean_error_pct = (df[orientation_mask].y_pred.mean() - df[orientation_mask].y.mean()) / df[orientation_mask].y.mean()
            heatmap_arr[i][j] = round(100 * mean_error_pct, 2)

    angle_1_buckets = []
    for i, angle_1_cutoffs in enumerate(zip(bucket_cutoffs_1, bucket_cutoffs_1[1:])):
        angle_1_low, angle_1_high = angle_1_cutoffs
        angle_1_bucket = '{} <-> {}'.format(angle_1_low, angle_1_high)
        angle_1_buckets.append(angle_1_bucket)

    angle_2_buckets = []
    for i, angle_2_cutoffs in enumerate(zip(bucket_cutoffs_2, bucket_cutoffs_2[1:])):
        angle_2_low, angle_2_high = angle_2_cutoffs
        angle_2_bucket = '{} <-> {}'.format(angle_2_low, angle_2_high)
        angle_2_buckets.append(angle_2_bucket)

    plt.figure(figsize=(15, 10))
    sns.heatmap(heatmap_arr, xticklabels=angle_1_buckets, yticklabels=angle_2_buckets, annot=True)
    plt.xlabel('{} range (degrees)'.format(angle_1))
    plt.ylabel('{} range (degrees)'.format(angle_2))
    plt.title('Error percentage (%) broken down by Orientation Bucket')
    plt.show()

In [None]:
produce_heatmap(df, 'local_yaw', 'local_pitch', yaw_bucket_cutoffs, pitch_bucket_cutoffs)

In [None]:
produce_heatmap(df, 'local_yaw', 'local_roll', yaw_bucket_cutoffs, roll_bucket_cutoffs)

In [None]:
produce_heatmap(df, 'local_pitch', 'local_roll', pitch_bucket_cutoffs, roll_bucket_cutoffs)