Connect to the database

In [28]:
from pyArango.connection import *

# Connect to the database
conn = Connection(arangoURL="http://iui_arangodb:8529", username="root", password="isaac")

# Open the isaac database / create it if it does not exist
if not conn.hasDatabase("isaac"):
    print("There is no isaac database, did you load it?")
else:
    db = conn["isaac"]
    print("Connected to database")

Connected to database


Specify target info

In [29]:
import numpy as np
import quaternion

# Target Pose
target_position = np.array([1.0, 0, -0.7])
target_attitude = quaternion.from_euler_angles(0, 0, 0)  # order is roll, pitch, yaw

# Parameters
ros_topic_pose = "/gnc/ekf".replace("/", "_")[1:]
ros_topic_image = "/hw/cam_nav/image_color".replace("/", "_")[1:]
max_distance = 1.2
min_distance = 0.1
max_angle = 15


Query the nearby points and get image and pose pair

In [56]:
import matplotlib.pyplot as plt

# Query pose topic for the closest timestamps to the image topic & only get nearby pairs
query = (
    "FOR doc_image IN " + ros_topic_image +
    "  LET closest_pose = (" +
    "    FOR doc_pose IN " + ros_topic_pose +
    "      SORT ABS((doc_pose.header.stamp.secs + doc_pose.header.stamp.nsecs * 0.000000001) - (doc_image.header.stamp.secs + doc_image.header.stamp.nsecs * 0.000000001))" +
    "      LIMIT 1" +
    "      RETURN doc_pose" +
    "    )" +
    "  FILTER closest_pose[0].pose.position.x >= " + str(target_position[0] - max_distance) +
    "    AND closest_pose[0].pose.position.x <= " + str(target_position[0] + max_distance) +
    "    AND closest_pose[0].pose.position.y >= " + str(target_position[1] - max_distance) +
    "    AND closest_pose[0].pose.position.y <= " + str(target_position[1] + max_distance) +
    "    AND closest_pose[0].pose.position.z >= " + str(target_position[2] - max_distance) +
    "    AND closest_pose[0].pose.position.z <= " + str(target_position[2] + max_distance) +
    "\n" +
    "  RETURN {doc_image, closest_pose: closest_pose[0]}"
)

result = list(db.AQLQuery(query, rawResults = True))
print("Found " + str(len(result)) + " results")

Found 1930 results


Eliminate points based on pointing angle

In [58]:
# Perform the quaternion rotation
rotated_vector2 = target_attitude * np.quaternion(0, 1.0, 0.0, 0.0) * target_attitude.conjugate()

# Extract the rotated vector
vector2 = np.array([rotated_vector2.x,
                           rotated_vector2.y,
                           rotated_vector2.z])

for i in range(len(result)-1, -1, -1):
    # Check if time difference between the image and the pose is too far apart
    
    # Calculate the first vector from the difference between the points
    robot_position = np.array([result[i]["closest_pose"]["pose"]["position"]["x"], result[i]["closest_pose"]["pose"]["position"]["y"], result[i]["closest_pose"]["pose"]["position"]["z"]])
    vector1 = target_position - robot_position

    # Calculate the angle between the two vectors
    angle = np.arccos(np.dot(vector1, vector2) / (np.linalg.norm(vector1) * np.linalg.norm(vector2)))

    # Convert angle to degrees
    angle_degrees = np.degrees(angle)
    
    if (angle_degrees > max_angle):
        del result[i]
        continue
print("Keeping " + str(len(result)) + " results")

Keeping 731 results


Eliminate points based on visual field

In [65]:
# Get the 
import subprocess
import json

# Get nested dictionary key values
def get_nested_value(data, keys):
    value = data
    for key in keys:
        if key in value:
            value = value[key]
        else:
            return None
    return value
 
selected_keys = [
    "doc_image/header",
    "closest_pose/pose/position/x",
    "closest_pose/pose/position/y",
    "closest_pose/pose/position/z",
    "closest_pose/pose/orientation/x",
    "closest_pose/pose/orientation/y",
    "closest_pose/pose/orientation/z",
    "closest_pose/pose/orientation/w"
]

selected_data = [
    [
        item[key] if "/" not in key else get_nested_value(item, key.split("/"))
        for key in selected_keys
    ]
    for item in result
]

# Convert the input list to a JSON string
input_json = json.dumps(selected_data)
cmd = ["rosrun", "analyst_notebook", "query_view_points", input_json]

# Execute the C++ script and pass the input JSON as a command-line argument
stdout = ""
stderr = ""
# popen = subprocess.Popen(
#     cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True
# )
# for stdout_line in iter(popen.stdout.readline, ""):
#     stdout += stdout_line

# popen.stdout.close()
# print(stdout)

# output, _ = process.communicate()

# # Parse the output JSON and convert it back to a list
# output_list = json.loads(output.decode())


# Delete data from initial list
