# Trajectory extraction from Rosbag file



The goal of this script is to take a raw rosbag recording of a scene and partition out a set of discrete throw demonstrations.

## Raw rosbag conversion to csv with time-series position data

Imports:

In [2]:
from bagpy import bagreader
import pandas as pd

Constants - file paths, topic handles and other:

In [None]:
IFILE="rawdata/record_all_demonstrations.bag"
OFILE="processed_data/combined_timeseries.csv"
BOTTLE="/mocap_node/Bottle/Odom"
CATCHER="/mocap_node/CatchNet/Odom"
GRIPPER="/mocap_node/TrashPickup/Odom"
NAME_INDEX = 2
TIME="Time"

Odometry extraction from a topic csv file. There is a large number of superfluous columns that can be discarded early on. To make merging the odometry time series of different objects possible it's necessary to rename the coordinate columns:

In [None]:
def odom_extract(df: pd.DataFrame, name: str) -> pd.DataFrame:
    relevant = [
        "Time",
        "pose.pose.position.x",
        "pose.pose.position.y",
        "pose.pose.position.z",
        "pose.pose.orientation.x",
        "pose.pose.orientation.y",
        "pose.pose.orientation.z",
        "pose.pose.orientation.w",
    ]
    name_map = {s:s.replace("pose.pose",name) for s in relevant[1:]}
    out = df[relevant]
    return out.rename(columns=name_map)

Convenience functions for reading each topic and merging them all:

In [None]:
def topic_read(topic: str, bg: bagreader) -> pd.DataFrame:
    name = topic.split("/")[NAME_INDEX]
    topic_csv = bg.message_by_topic(topic)
    topic_df = odom_extract(pd.read_csv(topic_csv), name)
    return topic_df

def df_merge(*dfs) -> pd.DataFrame:
    combined = pd.concat(dfs)
    return combined.sort_values(by=[TIME])

Putting it all together, execution (takes a while because the rosbag recordings are large files):

In [None]:
ds = bagreader(IFILE)
bottle = topic_read(BOTTLE, ds)
catch_net = topic_read(CATCHER, ds)
gripper = topic_read(GRIPPER, ds)
output = df_merge(bottle, catch_net, gripper)
output.to_csv(OFILE)

## Position interpolation, shared argument axis

Fill all the missing values and produce regularly spaced timestamp values.

Imports and constants:

In [None]:
from cmath import nan
import pandas as pd
import numpy as np
from typing import Tuple
IFILE="processed_data/combined_timeseries.csv"
OFILE="processed_data/regular_timeseries.csv"
TIME="Time"

Resampling code - produce a dataframe with regularly spaced timestamps, initially filled wiht NaN values:

In [None]:
def trim_float(f, step):
    inv = 1 / step
    return float((int(f * inv) + 1)) / inv

def resample(df: pd.DataFrame, step=0.01) -> Tuple[pd.DataFrame, np.ndarray]:
    start = trim_float(df.iloc[0][TIME], step)
    end = trim_float(df.iloc[-1][TIME], step)
    times = np.arange(start=start, stop=end, step=step)
    d={"Time": times, "Unnamed: 0": "Not a number"}
    cols = df.columns
    regular = pd.DataFrame(data=d, columns=cols)
    return regular, times

"Interpolate" - actually just forward fill the missing values (for now):

In [None]:
def interpolate(df: pd.DataFrame, reg: pd.DataFrame) -> pd.DataFrame:
    combined = pd.concat([df, reg])
    combined: pd.DataFrame = combined.drop_duplicates(subset=[TIME]).sort_values(by=[TIME])
    interp = combined.fillna(method="ffill")
    return interp.loc[lambda d: d["Unnamed: 0"] == "Not a number"]

Execution code:

In [None]:
df = pd.read_csv(IFILE)
rows, index = resample(df)
interpolated = interpolate(df, rows)
interpolated.to_csv(OFILE)

## Throw separation

Here each throw is separated out into a different dataframe and the intervening data are discarded.

Imports and constants. This time there's a need to rename columns that would get duplicated, various panda DataFrame operations like max() and diff() keep the name of whatever column they were applied along in the output.

In [None]:
from os import times
import pandas as pd
from datetime import datetime
from typing import List
IFILE="processed_data/regular_timeseries.csv"
OFILE_BASE="processed_data/demo-"
TIME="Time"
GRIPPER_X="TrashPickup.position.x"
GRIPPER_XLIMU="TrashPickup.position.xlim+"
GRIPPER_XLIML="TrashPickup.position.xlim-"
BOTTLE_X="Bottle.position.x"
BOTTLE_XLIMU="Bottle.position.xlim+"
BOTTLE_XLIML="Bottle.position.xlim-"
WINDOW=150
STEPS=500

This function finds the starting index of each throw demonstration. First, rolling maxima and minima are computed for the entire dataframe. Then the combined dataframe is checked against upper and lower limits found by visually inspecting the trajectories in PlotJuggler. Finally, the thresholded points are diffed - only ones more than 5 seconds after the preceding data point are considered starting indices.

In [None]:
def demo_starts(df: pd.DataFrame) -> pd.DataFrame:
    upper = df.rolling(150).max()[[GRIPPER_X, BOTTLE_X]].rename(columns={GRIPPER_X:GRIPPER_XLIMU, BOTTLE_X:BOTTLE_XLIMU})
    lower = df.rolling(150).min()[[GRIPPER_X, BOTTLE_X]].rename(columns={GRIPPER_X:GRIPPER_XLIML, BOTTLE_X:BOTTLE_XLIML})
    combined = pd.concat([df, upper, lower], axis=1)
    starts = combined.loc[lambda d: (d[GRIPPER_XLIMU] <= 1.2) &
                                    (d[GRIPPER_XLIML] >= 1.1) &
                                    (d[BOTTLE_XLIMU] <= 1.2) &
                                    (d[BOTTLE_XLIML] >= 1.1)]
    diffs = starts[TIME].diff()
    return diffs.loc[lambda d: d > 5].index

This helper function produces a STEPS timestep long dataframe for each starting index:

In [None]:
def subframes(df: pd.DataFrame, indices) -> List[pd.DataFrame]:
    dfs = []
    for index in indices:
        dfs.append(df.iloc[index:index+STEPS])
    return dfs

But this one converts the timestamp of the first observation to a datetime string, creates a filename and outputs the dataframe to a csv file:

In [None]:
def name_and_write(df: pd.DataFrame):
    t = df.iloc[0][TIME]
    timestamp = datetime.utcfromtimestamp(t).strftime('%d-%m-%Y-%H:%M:%S')
    fname = f"{OFILE_BASE}{timestamp}.csv"
    df.to_csv(fname)

## Gripper separation event detection

The goal of this section is to produce a timestamp for when the gripper and bottle have separated, for each demonstration trajectory.