In [1]:
from rclpy.serialization import serialize_message, deserialize_message
from geometry_msgs.msg import PoseStamped

In [8]:
# Load class from string
# https://stackoverflow.com/questions/1176136/convert-string-to-python-class-object
import importlib

def class_from_string(module_name, class_name):
    # load the module, will raise ImportError if module cannot be loaded
    m = importlib.import_module(module_name)
    # get the class, will raise AttributeError if class cannot be found
    c = getattr(m, class_name)
    return c

def class_string_from_msg(msg):
    """
    주어진 msg의 class를 module과 name으로 나눠서 tuple of str로 반환합니다.
    예시)
    > p = geometry_msgs.msg.Pose
    > type(p)
    geometry_msgs.msg._pose.Pose
    > msg_type = class_string_from(msg)
    > msg_type
    ('geometry_msgs.msg._pose', 'Pose')

    
    :return: (str, str)
    """
    module = type(msg).__module__
    name = type(msg).__name__
    return (module, name)


# 예시를 위한 클래스
class DummyMessage:
    def __init__(self, msg, topic_name, timestamp=None):
        # timestamp가 주어지면 timestamp를 쓰고 아니라면 msg 자체의 stamp를 사용
        self.timestamp = timestamp if timestamp else msg.header.stamp
        # msg class를 (module, name)로 나눠줍니다.
        self.msg_type = class_string_from_msg(msg)
        # msg topic name.
        self.topic_name = topic_name
        # serialize as a byte
        self.data = serialize_message(msg)
        
    def deserialize(self):
        # msg_type을 class module로 다시 변환
        class_module = class_from_string(*self.msg_type)
        # deserialize
        msg = deserialize_message(self.data, class_module)
        return msg

In [9]:
topic_name = 'robot_pose'
# Serialize a msg
msg = PoseStamped()
msg.header.frame_id = 'map'
msg

geometry_msgs.msg.PoseStamped(header=std_msgs.msg.Header(stamp=builtin_interfaces.msg.Time(sec=0, nanosec=0), frame_id='map'), pose=geometry_msgs.msg.Pose(position=geometry_msgs.msg.Point(x=0.0, y=0.0, z=0.0), orientation=geometry_msgs.msg.Quaternion(x=0.0, y=0.0, z=0.0, w=1.0)))

In [10]:
msg_in_database = DummyMessage(msg, topic_name)

In [11]:
msg_in_database.timestamp

builtin_interfaces.msg.Time(sec=0, nanosec=0)

In [12]:
msg_in_database.msg_type

('geometry_msgs.msg._pose_stamped', 'PoseStamped')

In [13]:
msg_in_database.data

b'\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00map\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xf0?'

In [14]:
restored_msg = msg_in_database.deserialize()

In [15]:
restored_msg

geometry_msgs.msg.PoseStamped(header=std_msgs.msg.Header(stamp=builtin_interfaces.msg.Time(sec=0, nanosec=0), frame_id='map'), pose=geometry_msgs.msg.Pose(position=geometry_msgs.msg.Point(x=0.0, y=0.0, z=0.0), orientation=geometry_msgs.msg.Quaternion(x=0.0, y=0.0, z=0.0, w=1.0)))