# Orchestrate

Convert a collection of `ParsedEvent` objects into an `EventOrchestra` containing relevant ML-Tracking analysis data.

*Hope you have some memory to spare...*

## Setup

In [None]:
import json
import pickle
from collections import defaultdict
from pathlib import Path
from datetime import datetime
from typing import Optional, Tuple

import progressbar
from timesignal import EventOrchestra, EventSignal

from lib.parse import parse_tracks
from lib.models import ParsedEvent
from lib.timestamp import lcm_timestamp_to_seconds, pdt_timestamp_seconds_to_utc_datetime

In [None]:
# INPUT: Where the ParsedEvent files are stored
JSON_DIR = Path('data/json/parsed_lcm/')
JSON_FILES = list(JSON_DIR.glob('*.json'))

# OUTPUT: Where to write the orchestra
ORCHESTRA_FILE = Path('data/pickle/orchestra.pkl')

In [None]:
def get_extractor(*keys: str, map: Optional[dict] = None):
    """
    Get a function that extracts signal values from a parsed event given a sequence of keys.
    Optionally, remap the extracted value to a new value using a given map.
    """
    def extract(ev: ParsedEvent) -> Tuple[datetime, float]:
        v = ev.event
        for k in keys:
            v = v[k]
        if map is not None:
            v = map[v]
        pdt_timestamp_seconds = lcm_timestamp_to_seconds(ev.meta.timestamp)
        return pdt_timestamp_seconds_to_utc_datetime(pdt_timestamp_seconds), float(v)
    return extract

## Specify `ParsedEvent` extraction for non-track events

In [None]:
BASIC_EXTRACTOR_SPEC = {
    'SUPERVISOR_CFG': {
        'search_timeout': get_extractor('search_timeout'),
        'acquire_timeout': get_extractor('acquire_timeout'),
        'track_timeout': get_extractor('track_timeout'),
        'track_duration': get_extractor('track_duration'),
        'state_number': get_extractor('state_number'),
    },
    'MWT_CONTROL_STAT': {
        'x_enabled': get_extractor('is_x_effort_enabled'),
        'x_setpoint': get_extractor('x_traj', 'set_point'),
        'x_cmd': get_extractor('x_control', 'cmd'),
        'x_measure': get_extractor('x_control', 'measure'),
        'y_enabled': get_extractor('is_y_effort_enabled'),
        'y_setpoint': get_extractor('y_traj', 'set_point'),
        'y_cmd': get_extractor('y_control', 'cmd'),
        'y_measure': get_extractor('y_control', 'measure'),
        'z_enabled': get_extractor('is_z_effort_enabled'),
        'z_setpoint': get_extractor('z_traj', 'set_point'),
        'z_cmd': get_extractor('z_control', 'cmd'),
        'z_measure': get_extractor('z_control', 'measure'),
        'yaw_cmd': get_extractor('yaw_control', 'cmd'),
        'yaw_measure': get_extractor('yaw_control', 'measure'),
        'pilot_enabled': get_extractor('is_pilot_enabled'),
    },
    'MWT_SEARCH_STAT': {
        'x_mode': get_extractor('x_mode'),
        'x_effort_cmd': get_extractor('x_effort_cmd'),
        'y_mode': get_extractor('y_mode'),
        'y_effort_cmd': get_extractor('y_effort_cmd'),
        'z_mode': get_extractor('z_mode'),
        'z_effort_cmd': get_extractor('z_effort_cmd'),
        'yaw_mode': get_extractor('yaw_mode'),
        'yaw_effort_cmd': get_extractor('yaw_effort_cmd'),
        'control_mode': get_extractor('control_mode'),
    }
}

## Read events from JSON files

This looks in the `data/json/parsed_lcm` directory for JSON files. These are assumed to be collections of `ParsedEvent` objects.

In [None]:
# Load the events into memory and parse into Python dicts
ALL_EVENTS = []
for json_file in JSON_FILES:
    with json_file.open('r') as f:
        events = json.load(f)
        ALL_EVENTS.extend(events)

In [None]:
# Parse Python dicts into ParsedEvent objects, overwriting the dicts as we go.
# This takes some time... consider just working with dicts.
for idx in range(len(ALL_EVENTS)):
    ALL_EVENTS[idx] = ParsedEvent.from_dict(ALL_EVENTS[idx])

In [None]:
# Key events by channel
EVENTS_BY_CHANNEL = defaultdict(list)
for event in progressbar.progressbar(ALL_EVENTS, max=len(ALL_EVENTS)):
    EVENTS_BY_CHANNEL[event.meta.channel].append(event)

## Extract non-track sub-orchestras

In [None]:
def make_event_orchestra(channel: str) -> EventOrchestra:
    """
    Map an extractor across the EVENTS_BY_CHANNEL for a given channel to create an EventOrchestra.
    """
    return EventOrchestra({
        key: EventSignal(map(extractor, EVENTS_BY_CHANNEL[channel]), interpolation='nearest')
        for key, extractor in BASIC_EXTRACTOR_SPEC[channel].items()
    })

In [None]:
# Create basic supervisor/control/search orchestras
supervisor_config_eo = make_event_orchestra('SUPERVISOR_CFG')
control_status_eo = make_event_orchestra('MWT_CONTROL_STAT')
search_status_eo = make_event_orchestra('MWT_SEARCH_STAT')

## Extract tracks and generate track sub-orchestras

In [None]:
# Parse Track objects from the BOX_STEREO_TRACK channel events
track_dict = parse_tracks(ALL_EVENTS)

In [None]:
# Convert Track objects into EventOrchestra objects and compose into single EventOrchestra, keyed by track ID
tracks_orchestra = EventOrchestra({
    str(track_id): track.to_event_orchestra()
    for track_id, track in track_dict.items()
})

## Compose all sub-orchestras into single orchestra

In [None]:
root_orchestra = EventOrchestra({
    'supervisor': supervisor_config_eo,
    'control': control_status_eo,
    'search': search_status_eo,
    'tracks': tracks_orchestra,
})

## Write orchestra

In [None]:
# Pickle the root orchestra
with ORCHESTRA_FILE.open('wb') as f:
    pickle.dump(root_orchestra, f)