In [1]:
import time
import pulsar
import producer_app.files.schema_exec_pb2 as schema_exec_pb2
import producer_app.files.schema_sensor_health_pb2 as schema_sensor_health_pb2
from google.protobuf.timestamp_pb2 import Timestamp
from mcap_protobuf.writer import Writer
import random
import io
import pyarrow as pa
import pyarrow.parquet as pq
from mcap_protobuf.decoder import DecoderFactory
from mcap.reader import make_reader
from google.protobuf.json_format import MessageToDict
from datetime import datetime
from uuid import uuid4
import os
import threading
import influxdb_client
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
from mcap.exceptions import RecordLengthLimitExceeded, EndOfFile


In [2]:

def generate_mcap(file_path: str):
    """
    Generates an MCAP file with synthetic data and stores it at the specified path.
    :param file_path: Path to store the generated MCAP file.
    """
    timestamp_ns = time.time_ns()
    
    # Create Timestamp in Protobuf Format
    timestamp = Timestamp()
    timestamp.seconds = timestamp_ns // 1_000_000_000
    timestamp.nanos = timestamp_ns % 1_000_000_000

    # Create Synthetic Data for the AppExec Message
    app1 = schema_exec_pb2.AppInfo(
        app_name="App1", exec_time=random.uniform(1, 3), cpu_usage=random.uniform(1, 3)
    )
    app2 = schema_exec_pb2.AppInfo(
        app_name="App2", exec_time=random.uniform(1, 3), cpu_usage=random.uniform(1, 3)
    )

    app_exec_message = schema_exec_pb2.AppExec(
        timestamp=timestamp, num_apps=2, apps=[app1, app2]
    )

    # Create Synthetic Data for the SensorHealth Message
    sensor_health_message = schema_sensor_health_pb2.SensorHealth(
        timestamp=timestamp,
        num_sensors=3,
        sensor_temps=[
            random.uniform(20, 40),
            random.uniform(20, 40),
            random.uniform(20, 40),
        ],
    )

    # Write the MCAP Log to an In-Memory Binary Stream
    mcap_stream = io.BytesIO()
    mcap_writer = Writer(mcap_stream)

    # Writing messages
    mcap_writer.write_message(
        topic="topic/app_exec",
        message=app_exec_message,
        log_time=time.time_ns(),
        publish_time=time.time_ns(),
    )
    mcap_writer.write_message(
        topic="topic/sensor_health",
        message=sensor_health_message,
        log_time=time.time_ns(),
        publish_time=time.time_ns(),
    )
    
    mcap_writer.finish()
    
    # Save to file
    with open(file_path, "wb") as f:
        f.write(mcap_stream.getvalue())
    
    print(f"MCAP log saved to {file_path}")
    
    mcap_stream.close()


In [3]:
generate_mcap("output.mcap")

MCAP log saved to output.mcap


In [4]:
def protobuf_sensor_health_to_line_protocol(msg):
    """Converts a Protobuf message into a flattened dictionary format."""

    msg_dict = MessageToDict(msg)
    ts = int(msg.timestamp.seconds * 1e9 + msg.timestamp.nanos)
    num_sensors = msg_dict.get("numSensors", 0)
    list_of_points = list()
    for idx, i in enumerate(msg_dict.get("sensorTemps", [])):
        point = (
            Point("sensor_health")
            .tag("num_sensors", num_sensors)
            .tag("sensor_id", idx)
            .field("sensor_temp", i)
            .time(ts, WritePrecision.NS)
        )
        list_of_points.append(point)
    return list_of_points

In [5]:
def protobuf_app_exec_to_line_protocol(msg):
    """Converts a Protobuf message into a flattened dictionary format."""
    msg_dict = MessageToDict(msg)
    ts = int(
        msg.timestamp.seconds * 1e9 + msg.timestamp.nanos
    )  # Convert Protobuf Timestamp to nanoseconds and safely typecast to int
    num_apps = msg_dict.get("numApps", 0)
    list_of_points = list()
    for app in msg_dict.get("apps", []):
        point = (
            Point("app_exec")
            .tag("app_name", app.get("appName"))
            .tag("num_apps", num_apps)
            .field("exec_time", app.get("execTime", None))
            .field("cpu_usage", app.get("cpuUsage", None))
            .time(ts, WritePrecision.NS)
        )
        list_of_points.append(point)
    return list_of_points


In [6]:
store = list()
with open("output.mcap", "rb") as f:
    mcap_stream = io.BytesIO(f.read())
    reader = make_reader(mcap_stream, decoder_factories=[DecoderFactory()])
    for schema, channel, message, proto_msg in reader.iter_decoded_messages():
        if channel.topic == "topic/app_exec":
            r = protobuf_app_exec_to_line_protocol(proto_msg)
            store += r
        elif channel.topic == "topic/sensor_health":
            r = protobuf_sensor_health_to_line_protocol(proto_msg)
            store +=r

In [8]:
for i in store:
    print(i.to_line_protocol())

app_exec,app_name=App1,num_apps=2 cpu_usage=2.5842957901265793,exec_time=2.4047865370480617 1740690767828762880
app_exec,app_name=App2,num_apps=2 cpu_usage=2.252740691541364,exec_time=2.171073277275747 1740690767828762880
sensor_health,num_sensors=3,sensor_id=0 sensor_temp=32.83455491033759 1740690767828762880
sensor_health,num_sensors=3,sensor_id=1 sensor_temp=39.548044608649214 1740690767828762880
sensor_health,num_sensors=3,sensor_id=2 sensor_temp=32.64090362307218 1740690767828762880
