# Streaming a custom scenario: An example with the comma2k19 dataset
This notebook shows an example of the workflow to reuse the semantic player presented in this repository with a custom scenario.

### Overview of the example comma2k19 dataset
As a demonstration, we use the public [comma2k19 dataset](https://github.com/commaai/comma2k19).
It is a dataset that contains multiple journeys (aka., routes) in a commute are in California, United States.
The dataset has the following structure:
```
Dataset_chunk_n
|
+-- route_id (dongle_id|start_time)
    |
    +-- segment_number
        |
        +-- preview.png (first frame video)
        +-- raw_log.bz2 (raw capnp log, can be read with openpilot-tools: logreader)
        +-- video.hevc (video file, can be read with openpilot-tools: framereader)
        +-- processed_log/ (processed logs as numpy arrays, see format for details)
        +-- global_pos/ (global poses of camera as numpy arrays, see format for details)
```

In this example, we focus on the `processed_log/` folder.
This folder has several features available. 
According to its documentation, we have the following:
```
processed_log
|
+--IMU ([forward, right, down])
|  |
|  +--acceleration: (m^2/s)
|  +--gyro_uncalibrated (rad/s)
|  +--gyro_bias: android gyro bias estimate (rad/s)
|  +--gyro: with android bias correction (rad/s)
|  +--magnetic_uncalibrated: (T)
|  +--magnetic: with android calibration(T)
|
+--CAN data:
|  |
|  +--car_speed (m/s)
|  +--steering_angle (deg)
|  +--wheel_speeds: [front_left, front_right, rear_left, rear_right] (m/s)
|  +--radar: [forward distance (m),
|  |          left distance (m),
|  |          relative speed (m/s),
|  |          nan,
|  |          nan,
|  |          address,
|  |          new_track (bool)]
|  +--raw CAN: This not stored as a value array but as three seperate arrays [src, address, data]
|
+--GNSS
   |
   +--live_gnss_qcom: [latitude (deg),
   |                   longitude (deg),
   |                   speed (m/s),
   |                   utc_timestamp (s),
   |                   altitude (m),
   |                   bearing (deg)]
   +--live_gnss_ublox: [latitude (deg),
   |                    longitude (deg),
   |                    speed (m/s),
   |                    utc_timestamp (s),
   |                    altitude (m),
   |                    bearing (deg)]
   |
   +--raw_gnss_qcom: every row represents a measurement
   |                 of 1 sattelite at 1 epoch can easily
   |                 be manipulated with laika.
   |                 [prn (nmea_id, see laika),
   |                  week of gps_time of reception (gps_week),
   |                  time pf week of gps_time of reception (s),
   |                  nan,
   |                  pseudorange (m),
   |                  pseudorange_std (m),
   |                  pseudorange_rate (m/s),
   |                  pseudorange_rate_std (m/s)]
   +--raw_gnss_ublox: every row represents a measurement
                      of 1 sattelite at 1 epoch can easily
                      be manipulated with laika.
                      [prn (nmea_id, see laika),
                       week of gps_time of reception (gps_week),
                       time pf week of gps_time of reception (s),
                       GLONASS channel number (-7..6) nan if not GLONASS,
                       pseudorange (m),
                       pseudorange_std (m),
                       pseudorange_rate (m/s),
                       pseudorange_rate_std (m/s)`
```
In this example, and to keep the demostration clear, we show the workflow for the features `car_speed (m/s)`and `steering_angle (deg)`. 
However, it could be easily extended by selecting any other feature.

### Overview of the workflow
The suggested workflow is illustrated in the following figure:

![Image](figures/flow_for_custom_scenario.png)

In the figure, we can distinguish three main parts, which are detailed in the rest of this notebook:
* [(a) Dataset preparation.](#preparation) Preprocess and transform the dataset into a common shape/structure (e.g., a pandas `DataFrame`). This step is required when we work with datasets that have been previously stored. In case of working with data streams (e.g., observing a real sensor on real time), this step may be skipped.
* [(b) Semantic annotation.](#semantify) Semantically annotate the record(s) of the dataset/stream (e.g., obtaining its corresponding RDF graph data).
* [(c) Stream it out.](#stream_it_out) Broadcasting the semantic version of the record(s) sequentially.

## (a) Dataset preparation <a id='preparation'></a>


### Dependencies and reference paths

In [1]:
import os
import numpy as np
import pandas as pd
from dataclasses import dataclass

> **Note:** Make sure you provide the proper path to the dataset, which must match the location where you stored it.

In [2]:
journey_name = 'b0c9d2329ad1606b|2018-08-24--18-40-57'
path_to_datasets = os.path.dirname(os.path.dirname(os.getcwd()))  # replace this with your own path
journey_path = f'{path_to_datasets}/datasets/comma2k19/Chunk_2/{journey_name}/'
segments_numbers = sorted([int(d.name) for d in os.scandir(journey_path) if d.is_dir()])  # get all the available segment numbers
print(f'The segments available for the journey {journey_name} are:\n {segments_numbers}')

The segments available for the journey b0c9d2329ad1606b|2018-08-24--18-40-57 are:
 [4, 5, 6, 7, 8, 9, 10]


In [3]:
# Select one segment
segment_num = 9
segment_path = f'/Users/q471483/Developer/datasets/comma2k19/Chunk_2/{journey_name}/{segment_num}/'

### Feature selection and mapping reference to ontology concepts

Let us define the mapping (according to the documentation of the dataset) to the corresponding ontology concepts.
In this example, we use the [Vehicle Signal Specification Ontology (VSSo)](https://github.com/w3c/vsso) as the reference for the instantiation of the data points.
Thus, the names assigned in the dictionary match the VSSo concept names.
> **Note:** The VSSo ontology is under development and you might encounter differences between the names used here and the ones described in the ontology. If any missmatch is found, please raise an issue on GitHub to update the notebook.

In [4]:
@dataclass
class Feature:
    """Class for characterizing a feature of the dataset."""
    name: str
    dtype: str
    path: str

Use names of the VSSo concepts to facilitate the instantiation of concepts.

In [5]:
# Mapping dictionary for the names of the features
column_names_mapping = {
    # Used in this example
    'speed':'Speed',
    'steering_angle':'SteeringWheelAngle',
    
    # Others: Modify/Extend it as needed
    'radar_value_0':'radar_forward_distance',
    'radar_value_1':'radar_left_distance',
    'radar_value_2':'radar_relative_speed',
    'radar_value_5':'radar_address',
    'radar_value_6':'radar_new_track',
    'wheel_speed_value_0':'wheel_speed_front_left',
    'wheel_speed_value_1':'wheel_speed_front_right',
    'wheel_speed_value_2':'wheel_speed_rear_left',
    'wheel_speed_value_3':'wheel_speed_rear_right',
    'live_gnss_ublox_value_0':'CurrentPositionLatitude',
    'live_gnss_ublox_value_1':'CurrentPositionLongitude',
    'live_gnss_ublox_value_2':'CurrentPositionSpeed',
    'live_gnss_ublox_value_3':'CurrentPosition_utc_timestamp',
    'live_gnss_ublox_value_4':'CurrentPositionAltitude',
    'live_gnss_ublox_value_5':'CurrentPositionBearing',
    'accelerometer_value_0':'AccelerationLongitudinal',
    'accelerometer_value_1':'AccelerationLateral',
    'accelerometer_value_2':'AccelerationVertical'
}

In [6]:
def get_features_of_interest(segment_path):
    """Returns a list with the features of interest (Intances of the dataclass Feature) for the giving segment path."""

    # Used in this example
    
    # speed
    speed = Feature(name='speed', 
                    dtype=float, 
                    path=f'{segment_path}/processed_log/CAN/speed/')

    # steering_wheel_angle
    steering_angle = Feature(name='steering_angle',
                             dtype='float64',
                             path=f'{segment_path}/processed_log/CAN/steering_angle/')
    
    # Others: Modify/Extend it as needed

    # radar
    radar = Feature(name='radar',
                    dtype='float64',
                    path=f'{segment_path}/processed_log/CAN/radar/')

    # wheel_speed
    wheel_speed = Feature(name='wheel_speed',
                          dtype='float64',
                          path=f'{segment_path}/processed_log/CAN/wheel_speed/')

    # live_gnss_ublox
    live_gnss_ublox = Feature(name='live_gnss_ublox',
                             dtype='float64',
                             path=f'{segment_path}/processed_log/GNSS/live_gnss_ublox/')
    
    # accelerometer
    accelerometer = Feature(name='accelerometer',
                            dtype='float64',
                            path=f'{segment_path}/processed_log/IMU/accelerometer/')

    return [speed, steering_angle]  # select here the desired features to include

In [7]:
def get_feature_from_df(index_name, feat_name, df):
    """Returns the selected feature as pandas DataFrame from the given DataFrame."""
    df.reset_index(inplace=True)
    df = df[[index_name, feat_name]]  # select feature of interest
    df.set_index(index_name, inplace=True)
    return df.dropna()

In [8]:
features_of_interest = get_features_of_interest(segment_path)
print('Selected features from the dataset:')
for f in features_of_interest:
    print(f.name)

Selected features from the dataset:
speed
steering_angle


### Load data of the selected features into one DataFrame with time as index
Now, let's load the data associated to each of those features into a pandas DataFrame.
Then, put all the DataFrames in a list called `all_features`.

In [9]:
all_features = []

# Iterate over the desired features
for f in features_of_interest:

    # Load times and values of the current feature into numpy arrays
    times = np.load(f.path+'t')
    values = np.squeeze(np.load(f.path+'value'))
    
    # Initialize a DataFrame with the numpy arrays
    current_df = pd.DataFrame()
    
    if len(values.shape) == 1:
        # Use the feature name directly when only one variable is available
        current_df = pd.DataFrame(data=values.tolist(), index=times, columns=[f.name])
    else:
        # Add numeration to the feature name when multiple variables are available
        current_df = pd.DataFrame(data=values.tolist(), index=times, columns=[f'{f.name}_value_{c}' for c in range(len(values[0]))])

    print(current_df.head(5))
    
    # Delete columns where all values are null
    current_df.dropna(how='all', axis=1, inplace=True)    
        
    # Parse the index timestamps as DateTime
    current_df.index = pd.to_datetime(current_df.index, unit='s')  # dataset documentation says the unit is seconds
    current_df.index.name = 'datetime'
    
    # Drop rows where an index is repeated to avoid conflicts while resampling
    current_df = current_df[~current_df.index.duplicated(keep='first')]
    
    # Resample to the DataFrame with ffill method
    current_df.sort_index(inplace=True)
    current_df = current_df.resample('250ms').ffill()
    
    # Append the DataFrame to a list
    all_features.append(current_df)

                   speed
105521.459948  27.577778
105521.485107  27.563889
105521.498111  27.537500
105521.508899  27.536111
105521.515962  27.552778
               steering_angle
105521.475753            -0.8
105521.492700            -0.9
105521.508846            -0.9
105521.514293            -0.9
105521.526352            -0.9


For example, one of the features looks like the following:

In [10]:
all_features[0].head()

Unnamed: 0_level_0,speed
datetime,Unnamed: 1_level_1
1970-01-02 05:18:41.250,
1970-01-02 05:18:41.500,27.5375
1970-01-02 05:18:41.750,27.622222
1970-01-02 05:18:42.000,27.554167
1970-01-02 05:18:42.250,27.625


The `all_features` variable has a list of DataFrames corresponding to the selected features. 
It is now time to merge them all into one `segment_df`.

In [11]:
# Merge all features of the segment into one DataFrame
segment_df = pd.DataFrame()
for f_df in all_features:
    if segment_df.empty:
        segment_df = f_df
    else:
        segment_df = segment_df.join(f_df, how='outer')

In [12]:
# Change the name of the columns (if needed) according to the column_names_mapping dict
segment_df.rename(columns=column_names_mapping, inplace=True)

# Drop null values
segment_df.dropna(inplace=True)

segment_df.reset_index(inplace=True)

segment_df.index.name = 'id'

segment_df.head()

Unnamed: 0_level_0,datetime,Speed,SteeringWheelAngle
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,1970-01-02 05:18:41.500,27.5375,-0.9
1,1970-01-02 05:18:41.750,27.622222,-0.5
2,1970-01-02 05:18:42.000,27.554167,-0.4
3,1970-01-02 05:18:42.250,27.625,-0.5
4,1970-01-02 05:18:42.500,27.640278,-1.2


We can observe that the result of the data preprocessing ended up with a table-like structure.
One can also decide not to resample the data.
In that case, we may face several null values bacause, in practice, some observations do not occur at the exact same time.

## (b) Semantic annotation <a id='semantify'></a>
The data of one segment is now in a table-like structure.
The next step is to obtain its semantic version (e.g., with RDF triples).
In a streaming setup, we do that by iterating over the rows of the DataFrame that are sorted by time.
Then, a mapping function is applied. 
Such a mapping function requires a certain schema defined from a semantic model.
For instance, an schema for the annotation of vehicle properties is illustrated below:

<img src="figures/vsso_sosa_schema_simplified.png" width="500">


For example, the following RML rules map the observations of the speed and steering wheel angle:
```turtle
@prefix rr: <http://www.w3.org/ns/r2rml#> .
@prefix rml: <http://semweb.mmlab.be/ns/rml#> .
@prefix ql: <http://semweb.mmlab.be/ns/ql#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
@prefix schema: <http://schema.org/> .
@prefix sem-rc: <http://stream-reasoning-challenge.org/>.
@prefix sem-speed-obs: <http://stream-reasoning-challenge/vehicle/speed/observation/>.
@prefix indv: <http://stream-reasoning-challenge.org/indv/>.
@prefix vsso: <https://github.com/w3c/vsso#> .
@prefix sosa: <http://www.w3.org/ns/sosa/> .

#Dataset
sem-rc:speed a rr:TriplesMap ;
    rml:logicalSource
        [
            rml:source "comma.csv" ;
            rml:referenceFormulation ql:CSV
        ] ;
#subject
    rr:subjectMap
        [
            rr:template "http://stream-reasoning-challenge/vehicle/speed/observation/{id}" ;
            rr:class sosa:Observation
        ] ;

    rr:predicateObjectMap
        [
            rr:predicateMap [ rr:constant sosa:observedProperty ] ;
            rr:objectMap
                [
                    rr:constant indv:Speed
                ]
        ] ;

    rr:predicateObjectMap
        [
            rr:predicate sosa:hasSimpleResult;
            rr:objectMap
                [
                    rml:reference "speed" ;
                    rr:datatype xsd:float
                ]
        ] ;

    rr:predicateObjectMap
        [
            rr:predicate sosa:resultTime;
            rr:objectMap
                [
                    rml:reference "datetime" ;
                    rr:datatype xsd:dateTime
                ]
        ] ;
.

#Dataset
sem-rc:steering a rr:TriplesMap ;
    rml:logicalSource
        [
            rml:source "comma.csv" ;
            rml:referenceFormulation ql:CSV
        ] ;
#subject
    rr:subjectMap
        [
            rr:template "http://stream-reasoning-challenge/vehicle/steering/observation/{id}" ;
            rr:class sosa:Observation
        ] ;

    rr:predicateObjectMap
        [
            rr:predicateMap [ rr:constant sosa:observedProperty ] ;
            rr:objectMap
                [
                    rr:constant indv:SteeringWheelAngle
                ]
        ] ;

    rr:predicateObjectMap
        [
            rr:predicate sosa:hasSimpleResult;
            rr:objectMap
                [
                    rml:reference "steering" ;
                    rr:datatype xsd:float
                ]
        ] ;

    rr:predicateObjectMap
        [
            rr:predicate sosa:resultTime;
            rr:objectMap
                [
                    rml:reference "datetime" ;
                    rr:datatype xsd:dateTime
                ]
        ] ;
.

```

The associated background knowledge would be the ontology from where the individuals are instantiated:
```turtle
@prefix indv: <http://stream-reasoning-challenge.org/indv/>.
@prefix vsso: <https://github.com/w3c/vsso#> .

indv:Speed a vsso:Speed .
indv:SteeringWheelAngle a vsso:SteeringWheelAngle .
```

The resulting RDF data (at time `1970-01-02 05:18:41.750`) looks like the following: 
```turtle
@prefix sosa: <http://www.w3.org/ns/sosa/> .
@prefix vsso: <https://github.com/w3c/vsso#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .

<http://stream-reasoning-challenge/vehicle/speed/observation/1> a sosa:Observation;
  sosa:hasSimpleResult "27.622222222222224"^^xsd:float;
  sosa:observedProperty vsso:Speed;
  sosa:resultTime "1970-01-02 05:18:41.750"^^xsd:dateTime .

<http://stream-reasoning-challenge/vehicle/steering/observation/1> a sosa:Observation;
  sosa:hasSimpleResult "-0.5"^^xsd:float;
  sosa:observedProperty vsso:SteeringWheelAngle;
  sosa:resultTime "1970-01-02 05:18:41.750"^^xsd:dateTime .

```

This data will be broadcasted in a message by a custom player.

> **Note: ** Please refer to the `semantization/` folder to see the corresponding mapping to `RDF` data using the RDF Mapping Language (RML).

## (c) Message broadcast <a id='stream_it_out'></a>

To complete the workflow, we need to broadcast the message with the current semantic data.
For that purpose, we can use as a reference the [`simple_player.py` script](https://github.com/patrik999/stream-reasoning-challenge/blob/master/src/simple_player.py) provided in the repository.
A custom player for our semantic data would look like the following:


```python
import getopt, sys
import optparse
import time
from abstract_player import AbstractPlayer 

class CustomPlayer(AbstractPlayer):
 
    def init(self, stream_id, template_id):  # __init__
        self.streamID = stream_id
        self.templateID = template_id

    def start(self, freq_in_ms, replay):
        '''Modify this function according to your needs.'''
        
        self.frequency = freq_in_ms
        
        # ------------------------------
        # TODO Implement here your own functionality
        # ------------------------------
        
        # Load the dataset
        dataset = load_the_dataset()  # TODO use your own function/code
        
        # Prepare the dataset (as explained in this notebook)
        df = prepare_dataset(dataset)  # TODO use your own function/code
        
        # Iterate over the rows of the data frame
        for row in df.iterrows():
            
            # Semantify the current row of records
            msgText = semantify_this_row(row_of_records)  # TODO use your own function/code
            yield msgText
            time.sleep(self.frequency/1000.0)
        
    def modify(self, freq_in_ms):
        self.frequency = freq_in_ms
```