# 1- Jetson Stats from ROS2 Bag

Reads `/diagnostics` from a ROS2 bag (recorded with **jetson_stats**) and plots CPU usage/frequency, GPU usage, memory, temperature, and power over time.

In [None]:
!pip install rosbag2-py matplotlib seaborn pandas numpy==1.25.0 pyyaml

In [None]:
import os
import rosbag2_py
import rclpy
from rclpy.serialization import deserialize_message
from rosidl_runtime_py.utilities import get_message
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd


def _bag_storage_id(bag_path):
    """Detect rosbag2 storage format from bag directory: 'mcap' or 'sqlite3'."""
    if not os.path.isdir(bag_path):
        return "sqlite3"
    for name in os.listdir(bag_path):
        if name.endswith(".mcap"):
            return "mcap"
    return "sqlite3"


def read_bag_diagnostics(bag_path):
    """Read diagnostics messages from a ROS2 bag (jetson_stats format on /diagnostics). Supports SQLite and MCAP."""
    try:
        rclpy.init()
    except Exception:
        pass

    storage_id = _bag_storage_id(bag_path)
    storage_options = rosbag2_py.StorageOptions(uri=bag_path, storage_id=storage_id)
    converter_options = rosbag2_py.ConverterOptions(
        input_serialization_format='cdr',
        output_serialization_format='cdr'
    )
    reader = rosbag2_py.SequentialReader()
    reader.open(storage_options, converter_options)

    topic_types = reader.get_all_topics_and_types()
    type_map = {topic.name: topic.type for topic in topic_types}

    timestamps = []
    cpu_usage = {f'cpu_{i}': [] for i in range(8)}
    cpu_freq = {f'cpu_{i}': [] for i in range(8)}
    gpu_data = []
    memory_data = []
    temp_data = {}
    power_data_curr = []
    power_data_avg = []

    while reader.has_next():
        topic, data, timestamp = reader.read_next()
        if topic != '/diagnostics':
            continue

        msg_type = get_message(type_map[topic])
        msg = deserialize_message(data, msg_type)
        time_sec = timestamp / 1e9
        timestamps.append(time_sec)

        for status in msg.status:
            if 'jetson_stats cpu' in status.name:
                cpu_num = int(status.name.split()[-1])
                cpu_usage_val = float(status.message.rstrip('%'))
                cpu_usage[f'cpu_{cpu_num}'].append(cpu_usage_val)
                freq_val = None
                for value in status.values:
                    if value.key == 'Freq':
                        freq_val = float(value.value) / 1000.0
                        break
                cpu_freq[f'cpu_{cpu_num}'].append(freq_val if freq_val is not None else 0)

            elif 'jetson_stats gpu' in status.name:
                gpu_data.append(float(status.message.rstrip('%')))

            elif 'jetson_stats mem ram' in status.name:
                mem_info = status.message.split('/')
                memory_data.append(float(mem_info[0].rstrip('GB')))

            elif 'jetson_stats temp' in status.name:
                for value in status.values:
                    if value.key in ['cpu', 'gpu', 'soc0', 'soc1', 'soc2']:
                        temp_val = float(value.value)
                        if temp_val > 0:
                            temp_data.setdefault(value.key, []).append(temp_val)

            elif 'jetson_stats power' in status.name:
                power_info = status.message.split()
                power_data_curr.append(int(power_info[0].split('=')[1].rstrip('mW')))
                power_data_avg.append(int(power_info[1].split('=')[1].rstrip('mW')))

    return {
        'timestamps': timestamps,
        'cpu_usage': cpu_usage,
        'cpu_freq': cpu_freq,
        'gpu_data': gpu_data,
        'memory_data': memory_data,
        'temp_data': temp_data,
        'power_data_curr': power_data_curr,
        'power_data_avg': power_data_avg,
    }


def plot_diagnostics(data):
    """Plot CPU/GPU/memory/temperature/power from parsed diagnostics."""
    timestamps = np.array(data['timestamps'])
    if len(timestamps) > 0:
        timestamps = timestamps - timestamps[0]

    fig, axes = plt.subplots(2, 2, figsize=(16, 12))
    fig.suptitle('Jetson Orin Diagnostics', fontsize=16)

    colors = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd', '#8c564b', '#0F9E97', '#32E184']

    ax1 = axes[0, 0]
    ax1_freq = ax1.twinx()
    for i, (cpu_name, cpu_usage_vals) in enumerate(data['cpu_usage'].items()):
        if len(cpu_usage_vals) > 0:
            c = colors[i % len(colors)]
            ax1.plot(timestamps[:len(cpu_usage_vals)], cpu_usage_vals, color=c, linestyle='-', linewidth=2, label=f'{cpu_name} usage', alpha=0.8)
            freq_vals = data['cpu_freq'][cpu_name]
            if len(freq_vals) > 0:
                ax1_freq.plot(timestamps[:len(freq_vals)], freq_vals, color=c, linestyle='--', linewidth=1.5, label=f'{cpu_name} freq', alpha=0.6)
    ax1.set_title('CPU Usage (%) and Frequency (MHz)')
    ax1.set_xlabel('Time (s)')
    ax1.set_ylabel('CPU Usage (%)', color='black')
    ax1_freq.set_ylabel('CPU Frequency (MHz)', color='gray')
    ax1.set_ylim(0, 100)
    ax1.grid(True, alpha=0.3)
    lines1, labels1 = ax1.get_legend_handles_labels()
    lines2, labels2 = ax1_freq.get_legend_handles_labels()
    ax1.legend(lines1 + lines2, labels1 + labels2, loc='upper left', fontsize=8)

    ax2 = axes[0, 1]
    if len(data['gpu_data']) > 0:
        ax2.plot(timestamps[:len(data['gpu_data'])], data['gpu_data'], 'g-', marker='o', markersize=3, label='GPU', linewidth=2)
    ax2.set_title('GPU Usage (%)')
    ax2.set_xlabel('Time (s)')
    ax2.set_ylabel('Usage (%)')
    ax2.set_ylim(0, 100)
    ax2.legend()
    ax2.grid(True, alpha=0.3)

    ax3 = axes[1, 0]
    if len(data['memory_data']) > 0:
        ax3.plot(timestamps[:len(data['memory_data'])], data['memory_data'], 'r-', marker='o', markersize=3, label='RAM Used', linewidth=2)
    ax3.set_title('Memory Usage (GB)')
    ax3.set_xlabel('Time (s)')
    ax3.set_ylabel('Memory (GB)')
    ax3.legend()
    ax3.grid(True, alpha=0.3)

    ax4 = axes[1, 1]
    temp_colors = {'cpu': 'red', 'gpu': 'green', 'soc0': 'blue', 'soc1': 'orange', 'soc2': 'purple'}
    for temp_name, temp_values in data['temp_data'].items():
        if len(temp_values) > 0:
            ax4.plot(timestamps[:len(temp_values)], temp_values, color=temp_colors.get(temp_name, 'black'), marker='o', markersize=2, label=temp_name, linewidth=2)
    ax4.set_title('Temperature (°C)')
    ax4.set_xlabel('Time (s)')
    ax4.set_ylabel('Temperature (°C)')
    ax4.legend()
    ax4.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

    if any(len(v) > 0 for v in data['cpu_usage'].values()):
        fig2, (ax_u, ax_f) = plt.subplots(2, 1, figsize=(14, 8))
        for i, (cpu_name, cpu_usage_vals) in enumerate(data['cpu_usage'].items()):
            if len(cpu_usage_vals) > 0:
                ax_u.plot(timestamps[:len(cpu_usage_vals)], cpu_usage_vals, color=colors[i % len(colors)], marker='o', markersize=2, label=cpu_name, linewidth=2)
        ax_u.set_title('CPU Usage per Core (%)')
        ax_u.set_ylabel('Usage (%)')
        ax_u.set_ylim(0, 100)
        ax_u.legend()
        ax_u.grid(True, alpha=0.3)
        for i, (cpu_name, cpu_freq_vals) in enumerate(data['cpu_freq'].items()):
            if len(cpu_freq_vals) > 0:
                ax_f.plot(timestamps[:len(cpu_freq_vals)], cpu_freq_vals, color=colors[i % len(colors)], marker='s', markersize=2, label=cpu_name, linewidth=2)
        ax_f.set_title('CPU Frequency per Core (MHz)')
        ax_f.set_xlabel('Time (s)')
        ax_f.set_ylabel('Frequency (MHz)')
        ax_f.legend()
        ax_f.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.show()

    if len(data['power_data_curr']) > 0:
        plt.figure(figsize=(12, 4))
        plt.plot(timestamps[:len(data['power_data_curr'])], data['power_data_curr'], color='purple', marker='o', markersize=3, label='Current Power', linewidth=2)
        if len(data['power_data_avg']) > 0:
            plt.plot(timestamps[:len(data['power_data_avg'])], data['power_data_avg'], color='orange', marker='s', markersize=3, label='Average Power', linewidth=2)
        plt.title('Power Consumption')
        plt.xlabel('Time (s)')
        plt.ylabel('Power (mW)')
        plt.legend()
        plt.grid(True, alpha=0.3)
        plt.show()


In [None]:
# Change this to your bag path (directory containing the bag DB and metadata)
BAG_PATH = "../bags/test1"

data = read_bag_diagnostics(BAG_PATH)
plot_diagnostics(data)