In [1]:
# built-in
import os
from datetime import datetime

from dataclasses import dataclass
from typing import Dict, List, Tuple, Union, Callable, Optional
from enum import Enum

# 3rd party lib
import pandas as pd
import numpy as np
import yaml

import matplotlib.pyplot as plt

# ros:
import rosbag

# 3rd party util
from icecream import ic

In [2]:
# ours:
from uwarl_helper.uwarl_util import create_all_folders
from uwarl_bag_utils.bag_parser import BagParser, TYPES_VAR
from uwarl_bag_utils.uwarl_common import PARSER_CALLBACKS

In [3]:
BP = BagParser(PARSER_CALLBACKS)

In [4]:
DIRECTORY = "/home/jx/.ros/bag_replay_recorder_files"
BAG_DICT = {
    "EE1-RVR-Pt-L/R": "EE-1-8_S5-E30_8_DEMO_23_recording_2023-04-06-16-18-37_2023-04-11-11-25-29.bag",
    "EE1-FWD-Pt-U/D": "EE-1-9_S5-E30_9_DEMO_18_recording_2023-04-06-16-19-17_2023-04-11-11-26-32.bag",
    "EE1-RVR-Pt-U/D": "EE-1-10_S5-E30_10_DEMO_24_recording_2023-04-06-16-20-02_2023-04-11-11-27-38.bag",
}

FIG_OUT_DIR = "/home/jx/UWARL_catkin_ws/src/vins-research-pkg/research-project/output/vins_analysis"

class AnalysisManager:
    _auto_save :bool = True
    _auto_close :bool = False # Turn on to close all figures after saving
    _output_dir :str = FIG_OUT_DIR
    
    def __init__(self, output_dir: str=FIG_OUT_DIR, bag_dict: Dict[str, str]=BAG_DICT, run_name: str="vins_analysis"):
        # create output folder
        self._create_dir(output_dir, run_name)
        # save info
        self._save_info(bag_dict=bag_dict)
    
    def _create_dir(self, output_dir: str, run_name: str):
        self._output_dir = f"{output_dir}/{run_name}"
        create_all_folders(self._output_dir)

    def _save_info(self, bag_dict: Dict[str, str]):
        if self._auto_save:
            with open(f"{self._output_dir}/info.yaml", "w") as f:
                yaml.dump(bag_dict, f)
            
    def save_fig(self, fig, tag):
        if self._auto_save:
            fig.savefig("{}/plot_{}.png".format(self._output_dir, tag), bbox_inches = 'tight')
        if self._auto_close:
            plt.close(fig)

AM = AnalysisManager(run_name="run_{}".format(datetime.now().strftime("%Y-%m-%d_%H")))

In [5]:
class ProcessedData:
    def __init__(self, DIRECTORY, BAG_PATH):
        self._bag_path = BAG_PATH
        # load bag file
        BP.bind_bagfile(bagfile=f"{DIRECTORY}/{BAG_PATH}")
        BP.load_bag_topics()
        self.bag_info = BP.get_bag_info_safe()
        self.bag_topics = BP.get_bag_topics_lut_safe()
        BP.process_all_bag_msgs()
        self.bag_data = BP.get_processed_bag_safe()
        # unbind toolchain
        BP.unbind_bagfile()
        self._init_process()
    
    def _init_process(self):
        self.T0=datetime.fromtimestamp(self.bag_info["start"])
        self.T1=datetime.fromtimestamp(self.bag_info["end"])
        self.dT = (self.T1 - self.T0).total_seconds()
        
        prefix = self._bag_path.split("_recording_")[0]
        ic(prefix)
        unique_id, t_window, run_id, _, demo_id = prefix.split("_")
        
        self.description = {
            "cam-session-run": unique_id,
            "dt": t_window,
            "demo": demo_id,
        }
        ic(self.description)
        

In [6]:
pData={}
for label, path in BAG_DICT.items():
    pData[label] = ProcessedData(DIRECTORY, path)

  self._info_dict = yaml.load(self._bag_data._get_yaml_info())
ic| prefix: 'EE-1-8_S5-E30_8_DEMO_23'
ic| self.description: {'cam-session-run': 'EE-1-8', 'demo': '23', 'dt': 'S5-E30'}
ic| prefix: 'EE-1-9_S5-E30_9_DEMO_18'
ic| self.description: {'cam-session-run': 'EE-1-9', 'demo': '18', 'dt': 'S5-E30'}
ic| prefix: 'EE-1-10_S5-E30_10_DEMO_24'
ic| self.description: {'cam-session-run': 'EE-1-10', 'demo': '24', 'dt': 'S5-E30'}


In [7]:
class Battery_Plot:
    DEFAULT_FIGSIZE = (5, 5)
    # list of data:
    list_of_note = []
    list_of_battery_v = []
    list_of_dT_s = []
    list_of_bag_labels = []
    N_bags = 0
    NAME: str = None
    
    def __init__(self, bags: Dict[str, ProcessedData]): 
        for label, data in bags.items():
            self.N_bags += 1
            self.list_of_bag_labels.append(label)
            self.list_of_note.append(
                data.description)
            self.list_of_battery_v.append(
                data.bag_data["/uwarl/robotnik_base_hw/voltage"][TYPES_VAR.VOLTAGE])
            self.list_of_dT_s.append(data.dT)
        
        ic(np.shape(self.list_of_battery_v))
        
    def plot_concatenated(self):
        N_bags = self.N_bags
        # concatenate all the data:
        dT_s = np.sum(self.list_of_dT_s)
        battery_v = np.concatenate(self.list_of_battery_v)
        # plot the battery:
        t_s = np.arange(0, dT_s, dT_s/len(battery_v))
        fig = plt.figure(figsize=(self.DEFAULT_FIGSIZE[0]*N_bags, self.DEFAULT_FIGSIZE[1]))
        plt.plot(t_s, battery_v)
        plt.xlabel("Time (s)")
        plt.ylabel("Voltage (V)")
        plt.title(f"Waterloo Steel Voltage over Time ({N_bags} bags)")
        
        # (Auto-label) Segment bag files:
        t_end = 0
        y_range = plt.ylim()
        # if N_bags > 1:
        plt.axvline(x=0, color = 'r', ls='--', alpha=0.5)
        for i in range(N_bags):
            # segment bag_files
            t_end += self.list_of_dT_s[i]
            label = self.list_of_bag_labels[i]
            plt.axvline(x=t_end, color = 'r', ls='--', alpha=0.5)
            plt.text(t_end, y_range[1], f" [{label}]", color='r', verticalalignment='top', horizontalalignment='right')
        AM.save_fig(fig, "battery_concatenated.png")

    def plot_comparison(self):
        list_of_t_s = []
        N_bags = self.N_bags
        fig = plt.figure(figsize=self.DEFAULT_FIGSIZE)
        plt.xlabel("Time (hours)")
        plt.ylabel("Voltage (V)")
        plt.title(f"Waterloo Steel Voltage over Time ({N_bags} bags)")
        for i in range(N_bags):
            dT_s = self.list_of_dT_s[i]
            battery_v = self.list_of_battery_v[i]
            t_s = np.arange(0, dT_s, dT_s/len(battery_v))
            plt.plot(t_s, battery_v, label=self.list_of_bag_labels[i])
        plt.legend()
        AM.save_fig(fig, "battery_comparison.png")
    
    

In [8]:
BatPlot = Battery_Plot(pData)
BatPlot.plot_concatenated()
BatPlot.plot_comparison()



  result = asarray(a).shape
ic| np.shape(self.list_of_battery_v): (3,)
