In [None]:
import numpy as np
import sys

from geometry_msgs.msg import Point, Pose, Quaternion, Transform, TransformStamped, Vector3
from math import floor, hypot, sin, sqrt
from mcap_ros1.writer import Writer
from pylab import get_cmap
from rospy import Time
from std_msgs.msg import ColorRGBA, String
from tf2_msgs.msg import TFMessage
from visualization_msgs.msg import Marker, MarkerArray

In [55]:
CMAP = get_cmap("gist_rainbow")

def nanos(t):
    if t == 0.0:
        return 1
    return floor(t * 1e+9)

def make_point(x, y, rows):
    wx = x - rows * 0.5
    wy = y - rows * 0.5
    return Point(wx, wy, 0)

def make_pose(x, y, rows):
    return Pose(make_point(x, y, rows), Quaternion(0, 0, 0, 1))

def make_color(x, y, rows):
    r, g, b, a = CMAP(hypot(x / (rows - 1) / sqrt(2), y / (rows - 1) / sqrt(2)))
    return ColorRGBA(r, g, b, 0.75)

def make_tf(t):
    tf = TransformStamped()
    tf.header.frame_id = 'map'
    tf.header.stamp = Time.from_sec(t)
    tf.child_frame_id = 'markers'
    tf.transform = Transform(Vector3(0, 0, sin(t)), Quaternion(0, 0, 0, 1))
    return TFMessage([tf])

def make_cube(x, y, t, rows):
    marker = Marker()
    marker.header.frame_id = "markers"
    marker.header.stamp = Time.from_sec(t)
    marker.id = (y << 16) | x
    marker.type = Marker.CUBE
    marker.action = Marker.ADD
    marker.frame_locked = True
    marker.pose = make_pose(x, y, rows)
    marker.scale = Vector3(0.75, 0.75, 1)
    marker.color = make_color(x, y, rows)
    return marker

def make_cubelist(t, rows):
    marker = Marker()
    marker.header.frame_id = "markers"
    marker.header.stamp = Time.from_sec(t)
    marker.id = 0
    marker.type = Marker.CUBE_LIST
    marker.action = Marker.ADD
    marker.frame_locked = True
    marker.pose.orientation.w = 1
    marker.scale = Vector3(0.75, 0.75, 1)
    marker.points = []
    marker.colors = []
    for y in range(0, rows):
        for x in range(0, rows):
            marker.points.append(make_point(x, y, rows))
            marker.colors.append(make_color(x, y, rows))
    return marker

# Create an list of all messages as (nanoseconds, topic_name, message) tuples
print("Generating messages...")
msgs = []
for t in np.arange(0, 10.1, 0.1):
    # /tf
    msgs.append((nanos(t), "/tf", make_tf(t)))

    # /cube
    rows = 32
    for y in range(0, rows):
        for x in range(0, rows):
            msgs.append((nanos(t), "/cube", make_cube(x, y, t, rows)))

    # /cubearray
    rows = 64
    cubes = []
    for y in range(0, rows):
        for x in range(0, rows):
            cubes.append(make_cube(x, y, t, rows))
    msgs.append((nanos(t), "/cubearray", MarkerArray(cubes)))

    # /cubelist
    rows = 256
    msgs.append((nanos(t), "/cubelist", make_cubelist(t, rows)))

# Sort all messages by timestamp
print("Sorting...")
msgs = sorted(msgs, key=lambda x: x[0])

# Write the MCAP file
print("Writing benchmark.mcap...")
with open("benchmark.mcap", "wb") as f:
    mcap = Writer(f)
    for (timestamp, topic, msg) in msgs:
        mcap.write_message(topic, msg, timestamp, timestamp)
    mcap.finish()

print("Done")


Generating messages...
Sorting...
Writing benchmark.mcap...
Done
