In [1]:
import symforce
symforce.set_epsilon_to_symbol()

In [2]:
import numpy as np
from symforce.values import Values

import rosbag2_py
from rclpy.serialization import deserialize_message

In [29]:
bag_path = "../bags/dataset_drive_square_no_vid"
# Copied the following from https://qiita.com/nonanonno/items/8f7bce03953709fd5af9
storage_options = rosbag2_py.StorageOptions(uri=bag_path, storage_id="sqlite3")
converter_options = rosbag2_py.ConverterOptions(input_serialization_format="cdr", output_serialization_format="cdr")

sr = rosbag2_py.SequentialReader()
sr.open(storage_options, converter_options)
sr.set_filter(rosbag2_py.StorageFilter(topics=["/odom", "/detections", "/camera/camera_info"]))

[INFO] [1698635001.892230717] [rosbag2_storage]: Opened database '../bags/dataset_drive_square_no_vid/rosbag2_2023_10_25-11_14_04_0.db3' for READ_ONLY.


In [30]:
# Create a dictionary of each topic's message type
# This will be used for deserializing the messages.
import importlib
def get_type_from_str(type_str: str):
    # dynamic load message package
    pkg = importlib.import_module(".".join(type_str.split("/")[:-1]))
    return eval(f"pkg.{type_str.split('/')[-1]}")

msg_type_table = {}
for topic in sr.get_all_topics_and_types():
    msg_type_table[topic.name] = get_type_from_str(topic.type)

In [62]:
# Now we extract messages from the entire bag
sr.seek(0)
msgs = []
while sr.has_next():
    msgs.append(sr.read_next())

[INFO] [1698636241.743269991] [rosbag2_storage]: Opened database '../bags/dataset_drive_square_no_vid/rosbag2_2023_10_25-11_14_04_0.db3' for READ_ONLY.


In [63]:
# Extract detections and odometry messages
f_deserialize_msg = lambda msg: deserialize_message(msg[1], msg_type_table[msg[0]])
def get_first_msg_of_topic(topic):
    return next(deserialize_message(msg[1], msg_type_table[msg[0]]) for msg in msgs if msg[0] == topic)

example_odom_data = get_first_msg_of_topic("/odom")
example_detections_data = get_first_msg_of_topic("/detections")
example_camera_info_data = get_first_msg_of_topic("/camera/camera_info")

print(example_odom_data)
print("---")
print(example_detections_data)
print("---")
print(example_camera_info_data)

nav_msgs.msg.Odometry(header=std_msgs.msg.Header(stamp=builtin_interfaces.msg.Time(sec=1697911413, nanosec=507019835), frame_id='odom'), child_frame_id='base_footprint', pose=geometry_msgs.msg.PoseWithCovariance(pose=geometry_msgs.msg.Pose(position=geometry_msgs.msg.Point(x=0.002499930899425671, y=2.0161891289617244e-06, z=0.0), orientation=geometry_msgs.msg.Quaternion(x=0.0, y=0.0, z=-0.010080474430627446, w=0.9999491907268356)), covariance=array([1.e-01, 0.e+00, 0.e+00, 0.e+00, 0.e+00, 0.e+00, 0.e+00, 1.e-01,
       0.e+00, 0.e+00, 0.e+00, 0.e+00, 0.e+00, 0.e+00, 1.e-01, 0.e+00,
       0.e+00, 0.e+00, 0.e+00, 0.e+00, 0.e+00, 1.e+05, 0.e+00, 0.e+00,
       0.e+00, 0.e+00, 0.e+00, 0.e+00, 1.e+05, 0.e+00, 0.e+00, 0.e+00,
       0.e+00, 0.e+00, 0.e+00, 1.e+05])), twist=geometry_msgs.msg.TwistWithCovariance(twist=geometry_msgs.msg.Twist(linear=geometry_msgs.msg.Vector3(x=0.0, y=0.0, z=0.0), angular=geometry_msgs.msg.Vector3(x=0.0, y=0.0, z=0.0)), covariance=array([0., 0., 0., 0., 0., 0., 

In [66]:
# Get the example messages into np form to play around with them
example_corners = np.array([
    np.array([corner.x, corner.y])
    for corner in example_detections_data.detections[0].corners
])

camera_K = np.reshape(example_camera_info_data.k, (3, 3))

In [65]:
## Phase 1: Integrate odometry using Symforce?
# Define the odometry residual function?
import symforce.symbolic as sf
from symforce.codegen.geo_factors_codegen import prior_factor, between_factor

def residual_between(pose_1: sf.Pose3, pose_2: sf.Pose3, rdelta_measured: sf.Pose3, epsilon: sf.Scalar):
    # Compute the expected body-frame displacement
    # pose_1 * rdelta = pose_2
    rdelta_expected = pose_1.inverse(epsilon) * pose_2

    # The "error": rdelta_expected ⊟ rdelta_measured
    error = rdelta_measured.local_coordinates(rdelta_expected, epsilon)
    return error


def get_tag_corner_residual(i_corner: np.int_, camera_cal: sf.CameraCal):
    ## Get the position of the tags in the camera frame
    # Define the displacements from the tag center to each tag corner
    R = sf.Rot3.identity()
    w = 0.203
    # Corner order: start in bottom left (-x, -y) and go ccw
    g_tag_c1 = sf.Pose3(R, [-w/2, -w/2, 0])
    g_tag_c2 = sf.Pose3(R, [-w/2, w/2, 0])
    g_tag_c3 = sf.Pose3(R, [w/2, w/2, 0])
    g_tag_c4 = sf.Pose3(R, [w/2, -w/2, 0])
    g_tag_corners = [g_tag_c1, g_tag_c2, g_tag_c3, g_tag_c4]
    g_tag_ci = g_tag_corners[i_corner]

    def residual_tag_obs(camera_pose: sf.Pose3, tag_pose: sf.Pose3, corners_px_measured: sf.M21, camera_cal: sf.CameraCal):
        # Measure the error between the expected pixel coordinates of the tag corners
        # versus the actual measured ones.
    
        # Compute the expected tag corner positions in the camera frame
        # based on the hypothesized camera and tag poses.
        g_cam_tag = camera_pose.inverse() * tag_pose
        g_cam_corner = g_cam_tag * g_tag_ci
    
        # Project the tag corner positions into the camera frame using the camera model
        corners_px_expected = camera_cal.pixel_from_camera_point(g_cam_corner)[0]
    
        return corners_px_measured - corners_px_expected
    