In [None]:
import logging_pb2
import geometry_pb2
import sensor_pb2

value = 0

def getValue():
    global value
    value += 1
    return value

def generateIMUData():
    imu = sensor_pb2.IMU()
    imu.accelerometer.x = getValue()
    imu.accelerometer.y = getValue()
    imu.accelerometer.z = getValue()
    
    imu.gyro.x = getValue()
    imu.gyro.y = getValue()
    imu.gyro.z = getValue()
    
    return imu

def generateJointData():
    joint = sensor_pb2.Joint()
    
    joint.q = getValue()
    joint.dq = getValue()
    if joint.dq % 3 == 0:
        joint.ddq = getValue()
    
    return joint

def generatePoseData():
    pose = geometry_pb2.Pose()
    pose.position.x = getValue()
    pose.position.y = getValue()
    pose.position.z = getValue()
    pose.orientation.x = getValue()
    pose.orientation.y = getValue()
    pose.orientation.z = getValue()
    pose.orientation.w = getValue()
    return pose

def generateRobotState():
    robot_state = sensor_pb2.RobotState()
    robot_state.pose.CopyFrom(generatePoseData())
    robot_state.imu.CopyFrom(generateIMUData())

    for i in range(4):
        joint = robot_state.joint.add()
        joint.CopyFrom(generateJointData())
        
    return robot_state

generator = { 0 : generateIMUData,
              1 : generateJointData,
              2 : generatePoseData,
              3 : generateRobotState }


In [None]:
import random
import struct
import time

msg_count = { sensor_pb2.RobotState.DESCRIPTOR.full_name : 0,
              sensor_pb2.IMU.DESCRIPTOR.full_name : 0,
              sensor_pb2.Joint.DESCRIPTOR.full_name : 0,
              geometry_pb2.Pose.DESCRIPTOR.full_name : 0 }

with open("log.data", "wb") as f:

    for i in range(40):
        # Generate a random msg:
        msg = generator[random.randint(0,3)]()
    
        record = logging_pb2.LogRecord()
        record.timestamp = time.monotonic()
        record.message_type = msg.DESCRIPTOR.full_name
        record.sequence_id = msg_count[record.message_type]
        msg_count[record.message_type] += 1
        record.serialisation = logging_pb2.LogRecord.PROTOBUF
        # Serialize and insert the actual message into the LogRecord
        record.message = msg.SerializeToString()
    
        data = record.SerializeToString()
        print(len(data))
        
        sz = struct.pack("=I", len(data))
        f.write(sz)
        f.write(data)
        f.write(sz)
        
        time.sleep(0.2)



In [None]:
from importlib import import_module

def deserialize(message, typ):
    """ This allows for generic deserialization of a message.
        Assumes that the module name matches the package name (with a '_pb2' suffix)
        This will create a message object of the correct type, and deserialize the
        'message' data into the object and return it. """
    module_, class_ = typ.rsplit('.', 1)
    class_ = getattr(import_module(module_+"_pb2"), class_)
    rv = class_()
    rv.ParseFromString(message)
    return rv

with open("log.data", "rb") as f:
    while True:
        # Get the size of the data to be read:
        sz = f.read(4)
        if not sz:
            # No data left in file. Be done!
            break;

        size, = struct.unpack("=I", sz)
        print(f"Reading {size} bytes")
    
        # Create a log record and deserialize the data
        record = logging_pb2.LogRecord()
        record.ParseFromString(f.read(size))
        print(record)
        # The log record ends in a size also. Use this to check that the log isn't corrupt
        end_sz = f.read(4)
        end_size, = struct.unpack("=I", end_sz)
        assert size == end_size,f"Expected {size}, found {end_size}"
        
        print("=====================")
        # Deserialize the data!
        msg = deserialize(record.message, record.message_type)
        print(f"{record.message_type} : ")
        print(msg)
        print("---------------------")