# MoCap Arena

---

In [19]:
# Importing modules...
import numpy as np
import scipy as sp
import cv2
import time
import socket

import sys
sys.path.append('../..') # Go back to base directory

from modules.vision.camera import Camera
from modules.vision.epipolar_geometry import build_essential_matrix, build_fundamental_matrix, order_centroids
from modules.vision.blob_detection import detect_blobs

# Instanciating Camera Objects

To generate the data regarding the mathematical model of the vision sensor in *CoppeliaSim*, it will be instanciated a Camera Obejcts that matches the Vision Sensors' intrinsic and extrinsic parameters. 

---

In [20]:
n_cameras = 4
camera = [] # Camera objects list

# Object matrix of Camera 0
base_matrix = np.array([[-7.07106781e-01,  5.00000000e-01, -5.00000000e-01, 2.50000000e+00],
                        [ 7.07106781e-01,  5.00000000e-01, -5.00000000e-01, 2.50000000e+00],
                        [ 1.46327395e-13, -7.07106781e-01, -7.07106781e-01, 2.50000000e+00]])

for ID in range(n_cameras):
    # Spread all cameras uniformely in a circle around the arena
    R = np.array(sp.spatial.transform.Rotation.from_euler('z', (360 / n_cameras) * ID, degrees=True).as_matrix())
    object_matrix = R @ base_matrix

    camera.append(Camera(# Intrinsic Parameters
                         resolution=(720,720), 
                         fov_degrees=60.0,     
         
                         # Extrinsic Parameters
                         object_matrix=object_matrix,
         
                         # Rational Lens Distortion Model
                         # distortion_model='rational',
                         # distortion_coefficients=np.array([0.014, -0.003, -0.0002, -0.000003, 0.0009, 0.05, -0.007, 0.0017]), 
                         
                         # Fisheye Lens Distortion Model
                         distortion_model='fisheye',
                         distortion_coefficients=np.array([0.395, 0.633, -2.417, 2.110]),
         
                         # Image Noise Model
                         snr_dB=13
                         ))

# Computing Fundamental Matrices

For each pair $\{i, j\}$, a fundamental matrix will be calculated between camera's $i$ and $j$. In this case, this information will be storted in a matrix $F$ such that $F_{ij}$ will be the fundamental matrix between camera's $i$ and $j$.

---

In [21]:
fundamental_matrix = np.array(np.zeros((n_cameras, n_cameras, 3, 3)))

for reference in range(n_cameras):
    for auxiliary in range(n_cameras):
        if reference == auxiliary:
            continue

        E = build_essential_matrix(camera[reference].extrinsic_matrix, camera[auxiliary].extrinsic_matrix)

        F = build_fundamental_matrix(camera[reference].intrinsic_matrix, camera[auxiliary].intrinsic_matrix, E)

        fundamental_matrix[reference][auxiliary] = F

# Creating the Server's UDP Socket

---

In [22]:
tag = 'SERVER' 

print(f'[{tag}] Creating socket...')

buffer_size = 1024   # In bytes

# Try to create server socket
try: 
    server_socket = socket.socket(socket.AF_INET,    # Internet
                                  socket.SOCK_DGRAM) # UDP
    print(f'[{tag}] Socket successfully created')
    
except socket.error as err: 
    print(f'[{tag}] Socket creation failed with error {err}\n')
    print(f'[{tag}] Quitting code...')
    exit()

server_ip = '127.0.0.1' # Server IP
server_port = 8888      # Server Port
server_address = (server_ip, server_port) 

server_socket.bind(server_address)
print(f'[{tag}] Bound to port {server_port}')

[SERVER] Creating socket...
[SERVER] Socket successfully created
[SERVER] Bound to port 8888


# Identifying Arena Controller

---

In [23]:
n_clients = 4
simulation_time = 10 # In seconds

# Controller socket info
controller_ip = '127.0.0.1' # Controller IP
controller_port = 7777      # Controller Port
controller_address = (controller_ip, controller_port)

# Send capture info to the controller
message = f'{n_clients},{simulation_time}'

message_bytes = message.encode()

server_socket.sendto(message_bytes, controller_address)
print(f'[{tag}] Capture info sent')

# Wait for controller to setup the scene
server_socket.recvfrom(buffer_size)
print(f'[{tag}] Scene ready')

# Send trigger
server_socket.sendto(''.encode(), controller_address)
print(f'[{tag}] Trigger sent!')


[SERVER] Capture info sent
[SERVER] Scene ready
[SERVER] Trigger sent!


# Identifying Clients Addresses

---

In [24]:
client_addresses = {}

print(f'[{tag}] Waiting for clients...')

# Address lookup 
while len(client_addresses.keys()) < n_clients: # Until all clients are identified
    message_bytes, address = server_socket.recvfrom(buffer_size)

    try:
        ID = int(message_bytes.decode()) # Decode message

    except: # Invalid message for decoding
        continue # Look for another message

    client_addresses[address] = ID

    print(f'\tClient {ID} connected')

print(f'[{tag}] All clients connected!')

[SERVER] Waiting for clients...
	Client 0 connected
	Client 1 connected
	Client 2 connected
	Client 3 connected
[SERVER] All clients connected!


# Running Simulation

For a running simulation, in each Presentation Timestamp (PTS), the following process will happen:

1. Get images of all cameras;
2. Detect blobs;
3. Undistort detected blob coordinates;
4. Order blobs based on epipolar lines;
5. Triangulate markers.

For each PTS the data will be saved for later analysis.

To evaluate how well the system is able to process at run-time, the Real Time Factor of the system will be calculated:

$${RTF} = \frac{t_S}{t_R}$$

Where $t_S$ is the total simulation time, and $t_R$ is the total ammount of real time that the system took to ran.

The `synchronous` flag will define if the *CoppeliaSim* simulation will wait for the trigger to go to its next step. If so, the system is synchronous, if not, the simulation will run by itself and will not wait for the calculations to be done in the code, making the system asynchronous.

---

In [25]:
'''# Synchronous system flag
synchronous = True

# Make system synchronous
sim.setStepping(synchronous)

simulation_time = 10 # In seconds
simulation_step = sim.getSimulationTimeStep()

# Triangulation pair
pair = (0, 1)
reference, auxiliary = pair

images = {} # Images of all cameras in each presentation timestamps
true_marker_positions = {} # Ground truth marker positions in each presentation timestamps
estimated_marker_positions = {} # Triangulated marker positions in each presentation timestamps

# Simulation begins here
sim.startSimulation()

# Create triangulation tag
tag_handle = sim.createDummy(0.015)

# Change triangulation tag color properties
sim.setObjectColor(tag_handle, 0, sim.colorcomponent_ambient_diffuse, list([0, 0, 0]))
sim.setObjectColor(tag_handle, 0, sim.colorcomponent_specular,        list([0, 0, 0])) 
sim.setObjectColor(tag_handle, 0, sim.colorcomponent_emission,        list([0, 1, 0])) 

timer_start = time.time() # Start timer

while (t := sim.getSimulationTime()) < simulation_time:
    # Get and save images
    sync_images = [camera[ID].get_coppelia_image(sim.getVisionSensorImg) for ID in pair]
    images[t] = sync_images 

    # Detect blobs on the distorted image
    distorted_blobs = [detect_blobs(sync_images[ID]) for ID in pair]

    # If no blobs were detected then images are invalid
    if any(blob is None for blob in distorted_blobs):
        if synchronous:
            sim.step()
        continue # Jump to next iteration
    
    # If blob count is different between each image
    if distorted_blobs[reference].shape != distorted_blobs[auxiliary].shape:
        if synchronous:
            sim.step()
        continue # Jump to next iteration
    
    # Undistort blobs
    undistorted_blobs = [camera[ID].undistort_points(distorted_blobs[ID].T.reshape(1, -1, 2).astype(np.float32), 
                                                     camera[ID].intrinsic_matrix, 
                                                     camera[ID].distortion_coefficients,
                                                     np.array([]),
                                                     camera[ID].intrinsic_matrix).reshape(-1,2).T for ID in pair]

    # Order blobs
    epilines_auxiliary = cv2.computeCorrespondEpilines(points=undistorted_blobs[reference].T, 
                                                       whichImage=1, 
                                                       F=fundamental_matrix[reference][auxiliary]).reshape(-1,3)
    
    undistorted_blobs[auxiliary] = order_centroids(undistorted_blobs[auxiliary], epilines_auxiliary)

    # Triangulate markers
    triangulated_positions = cv2.triangulatePoints(camera[reference].projection_matrix.astype(float), 
                                                   camera[auxiliary].projection_matrix.astype(float), 
                                                   undistorted_blobs[reference].astype(float), 
                                                   undistorted_blobs[auxiliary].astype(float))
    
    # Normalize homogeneous coordinates and save
    triangulated_positions = (triangulated_positions / triangulated_positions[-1])[:-1, :]
    estimated_marker_positions[t] = triangulated_positions

    # Position triangulation tag
    sim.setObjectPosition(tag_handle, triangulated_positions.reshape(-1, 3)[0].tolist())

    # Get ground truth marker position for comparison
    true_marker_position = np.array(sim.getObjectPosition(sim.getObject('/Marker'))).reshape(3,-1)
    true_marker_positions[t] = true_marker_position

    # Call next simulation step
    if synchronous:
        sim.step()

timer_end = time.time() # Stop timer

# Simulation ends here
sim.stopSimulation()

print(f'Real Time Factor: {simulation_time / (timer_end - timer_start)}')'''

"# Synchronous system flag\nsynchronous = True\n\n# Make system synchronous\nsim.setStepping(synchronous)\n\nsimulation_time = 10 # In seconds\nsimulation_step = sim.getSimulationTimeStep()\n\n# Triangulation pair\npair = (0, 1)\nreference, auxiliary = pair\n\nimages = {} # Images of all cameras in each presentation timestamps\ntrue_marker_positions = {} # Ground truth marker positions in each presentation timestamps\nestimated_marker_positions = {} # Triangulated marker positions in each presentation timestamps\n\n# Simulation begins here\nsim.startSimulation()\n\n# Create triangulation tag\ntag_handle = sim.createDummy(0.015)\n\n# Change triangulation tag color properties\nsim.setObjectColor(tag_handle, 0, sim.colorcomponent_ambient_diffuse, list([0, 0, 0]))\nsim.setObjectColor(tag_handle, 0, sim.colorcomponent_specular,        list([0, 0, 0])) \nsim.setObjectColor(tag_handle, 0, sim.colorcomponent_emission,        list([0, 1, 0])) \n\ntimer_start = time.time() # Start timer\n\nwhile

# Analysing Precision and Accuracy 

Since the ground truth data of the marker's position is available, it's possible to evaluate the Precision and Accuracy of the system for the capture of a single stationary marker on the scene.

Calculating the average difference between the position of the marker it's ground truth will determine the **Precision**. Calculating the standard deviation of the marker's position will determine how accurate the system is by it's **Jitter**.

---

In [26]:
'''static_capture = False # Flag if the marker is moving or not in the scene 

# Gathering Data
all_marker_positions = None
triangulation_errors = None
for frame, pts in enumerate(estimated_marker_positions.keys()):
    if all_marker_positions is None:
        all_marker_positions = estimated_marker_positions[pts]
    else:
        all_marker_positions = np.hstack((all_marker_positions, estimated_marker_positions[pts]))

    if triangulation_errors is None:
        triangulation_errors = np.linalg.norm(estimated_marker_positions[pts] - true_marker_positions[pts])
    else:
        triangulation_errors = np.hstack((triangulation_errors, np.linalg.norm(estimated_marker_positions[pts] - true_marker_positions[pts])))

# For static single marker capture
if static_capture == True:
    true_position = np.ravel(list(true_marker_positions.values())[0]) # Ground truth
    average_position = np.mean(all_marker_positions, axis=1) # For Precision
    std_dev_position = np.std(all_marker_positions, axis=1)  # For Accuracy 

    print('Ground Truth Position:', true_position)
    print('Average Position:', average_position) 
    print(f'Average Reconstruction Error: {np.mean(triangulation_errors) * 1e3 :.2f} mm') # Precision
    print(f'Standard Deviation: {np.linalg.norm(std_dev_position) * 1e3 :.2f} mm')        # Accuracy

# For moving single marker capture
else:
    print(f'Average Reconstruction Error: {np.mean(triangulation_errors) * 1e3 :.2f} mm') # Precision'''

"static_capture = False # Flag if the marker is moving or not in the scene \n\n# Gathering Data\nall_marker_positions = None\ntriangulation_errors = None\nfor frame, pts in enumerate(estimated_marker_positions.keys()):\n    if all_marker_positions is None:\n        all_marker_positions = estimated_marker_positions[pts]\n    else:\n        all_marker_positions = np.hstack((all_marker_positions, estimated_marker_positions[pts]))\n\n    if triangulation_errors is None:\n        triangulation_errors = np.linalg.norm(estimated_marker_positions[pts] - true_marker_positions[pts])\n    else:\n        triangulation_errors = np.hstack((triangulation_errors, np.linalg.norm(estimated_marker_positions[pts] - true_marker_positions[pts])))\n\n# For static single marker capture\nif static_capture == True:\n    true_position = np.ravel(list(true_marker_positions.values())[0]) # Ground truth\n    average_position = np.mean(all_marker_positions, axis=1) # For Precision\n    std_dev_position = np.std(all_

# Playing Back the Camera's Feed

The following cell will replicate a real time camera feed of the simulation. Change the `ID` parameter to switch between camera views.

---

In [27]:
'''# Playback a camera image feed in real time
ID = 0 # Camera ID to be watched
timestep = int(1e3 * simulation_time/len(images.keys())) # In milliseconds
for frame, pts in enumerate(images.keys()):
    cv2.imshow(f'Camera {ID}', images[pts][ID])
    cv2.waitKey(timestep)

# Closing all open windows 
cv2.destroyAllWindows()'''

"# Playback a camera image feed in real time\nID = 0 # Camera ID to be watched\ntimestep = int(1e3 * simulation_time/len(images.keys())) # In milliseconds\nfor frame, pts in enumerate(images.keys()):\n    cv2.imshow(f'Camera {ID}', images[pts][ID])\n    cv2.waitKey(timestep)\n\n# Closing all open windows \ncv2.destroyAllWindows()"