In [None]:
import os
import numpy as np
from caret_analyze import Architecture, Application, check_procedure, Lttng, LttngEventFilter
from caret_analyze.plot import Plot, message_flow, callback_graph, chain_latency, callback_sched
from caret_analyze.exceptions import UnsupportedTypeError
from caret_analyze.runtime.callback import CallbackBase, CallbackType
from bokeh.plotting import output_notebook, figure, show
import pandas as pd
import pandas_bokeh
import matplotlib.pyplot as plt
import IPython
output_notebook()
from typing import Callable, DefaultDict, Dict, List, Optional, Set, Tuple, Union
from bokeh.plotting import Figure, save

In [None]:
tracing_log_path = os.environ['HOME'] + '/work/tracedata/universe/autoware_launch_trace_20220826-105249_universe_rosbag/'
# tracing_log_path = os.environ['HOME'] + '/.ros/tracing/caret_sample'
# tracing_log_path = '/work/misc/autoware_dummy/caret_sample/'

In [None]:
arch = Architecture('lttng', tracing_log_path)
lttng = Lttng(tracing_log_path, event_filters=[LttngEventFilter.strip_filter(15, 10)])
app = Application(arch, lttng)
arch.export('architecture.yaml', force=True)

In [None]:
def round_value(value: float):
    if 0.5 < value and value < 1.5:
        return 1.0
    elif 8 < value and value < 12:
        return 10.0
    elif 30 < value and value < 35:
        return 33.0
    elif 45 < value and value < 55:
        return 50.0
    return value

In [None]:
expectation_list = []
for node in app.nodes:
    for callback in node.callbacks:
        expectation = {}
        expectation['component_name'] = node.node_name.split('/')[1]
        expectation['node_name'] = node.node_name
        expectation['callback_name'] = callback.callback_name
        expectation['callback_type'] = callback.callback_type.type_name
        expectation['period_ns'] = callback.timer.period_ns if callback.callback_type == CallbackType.TIMER else 0
        expectation['topic_name'] = callback.subscription.topic_name if callback.callback_type == CallbackType.SUBSCRIPTION else None
        value = 0

        try:
            timeseries_plot = Plot.create_callback_frequency_plot([callback])
            df_callback = timeseries_plot.to_dataframe()
            df_callback = df_callback.dropna()
            df_callback = df_callback.iloc[:-2, 1]    # remove the last data because freq becomes small, get freq only
            if len(df_callback) >= 2:
                if callback.callback_type == CallbackType.TIMER:
                    value = 1e9 / callback.timer.period_ns
                else:
                    value = float(df_callback.mean())
                    value = round_value(value)
            else:
                raise Exception
        except:
            print(f'This callback is not called: {callback.callback_name}')
            continue

        expectation['value'] = value
        expectation['lower_limit'] = value * 0.8
        expectation['upper_limit'] = value * 1.2
        expectation['ratio'] = 0.2 if value > 1 else 0.5
        expectation['continue_num'] = 2
        expectation_list.append(expectation)

import csv
with open('expectation_callback.csv', 'w', encoding='utf-8') as csvfile:
    writer = csv.DictWriter(csvfile, fieldnames=expectation_list[0].keys())
    writer.writerows(expectation_list)


In [None]:
with open('expectation_callback.csv', 'r', encoding='utf-8') as csvfile:
    for row in csv.DictReader(csvfile, expectation_list[0].keys()):
        print(row)

In [None]:
expectation_list = []
for comm in app.communications:
    expectation = {}
    expectation['topic_name'] = comm.topic_name
    expectation['publish_node_name'] = comm.publish_node_name
    expectation['pubilsh_component_name'] = comm.publish_node_name.split('/')[1]
    expectation['subscribe_node_name'] = comm.subscribe_node_name
    expectation['subscribe_component_name'] = comm.subscribe_node_name.split('/')[1]
    try:
        # timeseries_plot = Plot.create_communication_frequency_plot(comm)
        timeseries_plot = Plot.create_publish_subscription_frequency_plot(comm.publisher)
        df_topic = timeseries_plot.to_dataframe()
        df_topic = df_topic.dropna()
        df_topic = df_topic.iloc[:-2, 1]    # remove the last data because freq becomes small, get freq only
        if len(df_topic) >= 2:
            value = float(df_topic.mean())
            value = round_value(value)
        else:
            raise Exception
    except:
        print(f'This topic is not called: {comm.topic_name}')
        continue

    expectation['value'] = value
    expectation['lower_limit'] = value * 0.8
    expectation['upper_limit'] = value * 1.2
    expectation['ratio'] = 0.2 if value > 1 else 0.5
    expectation['continue_num'] = 2
    expectation_list.append(expectation)

import csv
with open('expectation_topic.csv', 'w', encoding='utf-8') as csvfile:
    writer = csv.DictWriter(csvfile, fieldnames=expectation_list[0].keys())
    writer.writerows(expectation_list)