In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from ipywidgets import widgets
from IPython.display import display, clear_output
import random
import mlflow
import tensorflow as tf
from tensorflow import keras
import os

# Constants
L1, L2, L3 = 1.0, 1.5, 0.5  # link lengths

def forward_kinematics_3dof(theta1, theta2, theta3, l1=L1, l2=L2, l3=L3):
    x = l1 * np.cos(theta1) * np.sin(theta2) + l2 * np.cos(theta1) * np.sin(theta2 + theta3)
    y = l1 * np.sin(theta1) * np.sin(theta2) + l2 * np.sin(theta1) * np.sin(theta2 + theta3)
    z = l1 * np.cos(theta2) + l2 * np.cos(theta2 + theta3) + l3
    return x, y, z

def calculate_arm_positions(theta1, theta2, theta3, l1=L1, l2=L2, l3=L3):
    p0 = np.array([0, 0, 0])
    p1 = np.array([0, 0, l3])
    x2 = l1 * np.cos(theta1) * np.sin(theta2)
    y2 = l1 * np.sin(theta1) * np.sin(theta2)
    z2 = l3 + l1 * np.cos(theta2)
    p2 = np.array([x2, y2, z2])
    p3 = np.array(forward_kinematics_3dof(theta1, theta2, theta3, l1, l2, l3))
    return np.vstack((p0, p1, p2, p3))

def load_and_preprocess_data(filename='robot_arm_dataset_10M.npz'):
    data = np.load(f'./Data/{filename}')
    inputs, outputs = data['inputs'], data['outputs']
    
    input_mean = np.mean(inputs, axis=0)
    input_std = np.std(inputs, axis=0)
    inputs_normalized = (inputs - input_mean) / input_std
    outputs_normalized = outputs / (np.pi/2)
    split_index = int(0.9 * len(inputs))
    train_inputs, test_inputs = inputs_normalized[:split_index], inputs_normalized[split_index:]
    train_outputs, test_outputs = outputs_normalized[:split_index], outputs_normalized[split_index:]
    return (train_inputs, train_outputs), (test_inputs, test_outputs), input_mean, input_std

@tf.function
def forward_kinematics_tf(theta):
    theta1, theta2, theta3 = tf.unstack(theta, axis=1)
    
    x = L1 * tf.cos(theta1) * tf.sin(theta2) + L2 * tf.cos(theta1) * tf.sin(theta2 + theta3)
    y = L1 * tf.sin(theta1) * tf.sin(theta2) + L2 * tf.sin(theta1) * tf.sin(theta2 + theta3)
    z = L1 * tf.cos(theta2) + L2 * tf.cos(theta2 + theta3) + L3
    
    return tf.stack([x, y, z], axis=1)

def huber_loss(y_true, y_pred, delta=1.0):
    error = y_true - y_pred
    is_small_error = tf.abs(error) <= delta
    squared_loss = tf.square(error) / 2
    linear_loss = delta * (tf.abs(error) - delta / 2)
    return tf.where(is_small_error, squared_loss, linear_loss)

def custom_loss(fk_weight=1, delta=0.1):
    def loss_fn(y_true, y_pred):
        # Huber loss for joint angles
        angle_loss = huber_loss(y_true, y_pred, delta)
        
        # Forward kinematics loss (using Huber loss)
        fk_true = forward_kinematics_tf(y_true)
        fk_pred = forward_kinematics_tf(y_pred)
        fk_loss = huber_loss(fk_true, fk_pred, delta)
        
        # Combine losses
        total_loss = tf.reduce_mean(angle_loss) + fk_weight * tf.reduce_mean(fk_loss)
        return total_loss
    return loss_fn

def load_model(run_id):
    # Get the path to the model file
    client = mlflow.tracking.MlflowClient()
    run = client.get_run(run_id)
    artifact_uri = run.info.artifact_uri
    model_path = os.path.join(artifact_uri, "model/data/model")
    
    # Load the model with custom objects
    custom_objects = {
        'custom_loss': custom_loss,
        'loss_fn': custom_loss(),
        'forward_kinematics_tf': forward_kinematics_tf,
        'huber_loss': huber_loss
    }
    
    loaded_model = keras.models.load_model(model_path, custom_objects=custom_objects)
    return loaded_model

def visualize_arm_positions(target, true_angles, predicted_angles, l1=L1, l2=L2, l3=L3):
    true_positions = calculate_arm_positions(*true_angles, l1, l2, l3)
    predicted_positions = calculate_arm_positions(*predicted_angles, l1, l2, l3)

    fig = plt.figure(figsize=(12, 8))
    ax = fig.add_subplot(111, projection='3d')

    # Plot target point
    ax.scatter(*target, color='red', s=100, label='Target Point')

    # Plot true arm position
    ax.plot(true_positions[:, 0], true_positions[:, 1], true_positions[:, 2], 'bo-', linewidth=2, markersize=6, label='True Arm Position')

    # Plot predicted arm position
    ax.plot(predicted_positions[:, 0], predicted_positions[:, 1], predicted_positions[:, 2], 'go-', linewidth=2, markersize=6, label='Predicted Arm Position')

    # Set labels
    ax.set_xlabel('X')
    ax.set_ylabel('Y')
    ax.set_zlabel('Z')

    # Calculate positions and format angles
    target_pos = forward_kinematics_3dof(*true_angles)
    predicted_pos = forward_kinematics_3dof(*predicted_angles)
    
    target_angles = f"({np.round(np.degrees(true_angles[0]), 1):.1f}°, {np.round(np.degrees(true_angles[1]), 1):.1f}°, {np.round(np.degrees(true_angles[2]), 1):.1f}°)"
    predicted_angles = f"({np.round(np.degrees(predicted_angles[0]), 1):.1f}°, {np.round(np.degrees(predicted_angles[1]), 1):.1f}°, {np.round(np.degrees(predicted_angles[2]), 1):.1f}°)"
    
    # Calculate Euclidean error
    error = np.linalg.norm(np.array(target_pos) - np.array(predicted_pos))

    # Create title with requested information
    title = f"Target Arm: ({target_pos[0]:.2f}, {target_pos[1]:.2f}, {target_pos[2]:.2f}) Angles: {target_angles}\n"
    title += f"Predicted Arm: ({predicted_pos[0]:.2f}, {predicted_pos[1]:.2f}, {predicted_pos[2]:.2f}) Angles: {predicted_angles}\n"
    title += f"Euclidean Error: {error:.4f}"
    
    ax.set_title(title)

    # Set equal aspect ratio
    max_range = np.array([ax.get_xlim(), ax.get_ylim(), ax.get_zlim()]).ptp().max() / 2.0
    mid_x = np.mean(ax.get_xlim())
    mid_y = np.mean(ax.get_ylim())
    mid_z = np.mean(ax.get_zlim())
    ax.set_xlim(mid_x - max_range, mid_x + max_range)
    ax.set_ylim(mid_y - max_range, mid_y + max_range)
    ax.set_zlim(mid_z - max_range, mid_z + max_range)

    ax.legend()
    plt.tight_layout()
    plt.show()

def plot_true_vs_predicted(model, test_inputs, test_outputs, input_mean, input_std, num_samples=10):
    # Denormalize inputs
    test_inputs_denorm = test_inputs * input_std + input_mean
    
    # Make predictions
    predicted_angles_norm = model.predict(pd.DataFrame(test_inputs), batch_size=2**16)
    predicted_angles = predicted_angles_norm * (np.pi/2)
    
    # Denormalize true angles
    true_angles = test_outputs * (np.pi/2)
    
    # Randomly select samples
    total_samples = len(test_inputs)
    selected_indices = random.sample(range(total_samples), num_samples)

    def update(sample_index):
        clear_output(wait=True)
        i = selected_indices[sample_index]
        target = test_inputs_denorm[i]
        true_angles_sample = true_angles[i]
        pred_angles_sample = predicted_angles[i]

        visualize_arm_positions(target, true_angles_sample, pred_angles_sample)

#         print(f"Sample {i}:")
#         print(f"Target position: ({target[0]:.4f}, {target[1]:.4f}, {target[2]:.4f})")
#         print(f"True angles: ({true_angles_sample[0]:.4f}, {true_angles_sample[1]:.4f}, {true_angles_sample[2]:.4f})")
#         print(f"Predicted angles: ({pred_angles_sample[0]:.4f}, {pred_angles_sample[1]:.4f}, {pred_angles_sample[2]:.4f})")

        # Calculate error
        true_pos = forward_kinematics_3dof(*true_angles_sample)
        pred_pos = forward_kinematics_3dof(*pred_angles_sample)
        error = np.linalg.norm(np.array(true_pos) - np.array(pred_pos))
#         print(f"Error: {error:.4f}")

    # Create a slider for sample selection
    sample_slider = widgets.IntSlider(value=0, min=0, max=num_samples-1, step=1, description='Sample')
    
    # Create interactive output
    out = widgets.interactive_output(update, {'sample_index': sample_slider})

    # Display widgets and output
    display(widgets.VBox([sample_slider, out]))

2024-08-21 00:00:36.539211: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F AVX512_VNNI FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2024-08-21 00:00:36.660622: I tensorflow/core/util/util.cc:169] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-08-21 00:00:36.688234: E tensorflow/stream_executor/cuda/cuda_blas.cc:2981] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


In [3]:
# Load and preprocess data
(train_inputs, train_outputs), (test_inputs, test_outputs), input_mean, input_std = load_and_preprocess_data()

# Load the model
run_id = '187b953c00db430a8b838e9d37a6186c'  # Replace with your actual run ID
model = load_model(run_id)

# Plot true vs predicted
plot_true_vs_predicted(model, test_inputs, test_outputs, input_mean, input_std, num_samples=100)



VBox(children=(IntSlider(value=0, description='Sample', max=99), Output()))