In [1]:
%load_ext autoreload
%autoreload 2

from readers import *
from datalib import *
import config.config_create_trajectory as config

import matplotlib.pyplot as plt
import numpy as np
import math
import scipy.optimize as optimization
from bisect import bisect
import time as t

%matplotlib notebook

Failed to load Python extension for LZ4 support. LZ4 compression will not be available.


In [2]:
start_time = t.time()
print("Reading demonstration data")

position_data = []
velocity_data = []

starting_indices = []
ending_indices = []

for i in range(len(config.demos)):
    demo = config.demos[i]
    print(f'Reading demonstration file {demo}')

    position_data.append(PositionDataSet())
    velocity_data.append(PositionDataSet())

    # Read data
    franka_reader = FrankaStateReader(demo)
    
    while not franka_reader.end():
        dp = franka_reader.next_datapoint()
        time = dp.time
        franka_state = dp.value
        
        position_data[i].append(PositionDataPoint(time, franka_state.position))
        velocity_data[i].append(PositionDataPoint(time, franka_state.velocity))
        
    # Align time
    position_data[i].align_time()
    velocity_data[i].align_time()

print("--- %s seconds ---" % (t.time() - start_time))
print("Done")

Reading demonstration data
Reading demonstration file data/demo1.bag
Reading demonstration file data/demo2.bag
Reading demonstration file data/demo4.bag
Reading demonstration file data/demo10.bag
Reading demonstration file data/demo11.bag
Reading demonstration file data/demo12.bag
Reading demonstration file data/demo13.bag
Reading demonstration file data/demo14.bag
Reading demonstration file data/demo15.bag
Reading demonstration file data/demo18.bag
Reading demonstration file data/demo19.bag
Reading demonstration file data/demo21.bag
--- 54.3469295501709 seconds ---
Done


In [3]:
def fitting_func(t, a, A, gamma, omega, phi, v_min):
    result = []
    if not isinstance(t, (np.ndarray, np.generic)):
        if isinstance(t, list):
            t = np.array(t)
        else:
            t = np.array([t])

    for timestamp in t:
        result.append(v_min + a*timestamp + A*(math.exp(gamma * timestamp) * math.cos(omega*timestamp + phi) - math.cos(phi)))
    return np.array(result)

In [4]:
start_time = t.time()
print("Fitting curves")

velocity_to_be_fitted_data = []
fitted_data = []
fitted_linear_data = []

impact_indices_data = []
ante_impact_velocity_data = []
impact_phase_velocity_data = []
post_impact_velocity_data = []

# config.impact_duration = 0.09

for i in range(len(config.demos)):
    
    velocity_to_be_fitted_data.append(PositionDataSet())
    fitted_data.append(PositionDataSet())
    fitted_linear_data.append(PositionDataSet())
    
    # Impact phase
    ending_index = config.impact_intervals[i][0][0] - config.impact_detection_delays[i][0]
    starting_index = config.impact_intervals[i][0][-1] - config.impact_detection_delays[i][-1]
    impact_indices = []
    for j in range(len(config.impact_intervals[i][0])):
        if j > 0:
            impact_indices.append(config.impact_intervals[i][0][j] - config.impact_detection_delays[i][-1])
        else:
            impact_indices.append(config.impact_intervals[i][0][j] - config.impact_detection_delays[i][0])
    
    impact_indices_data.append(impact_indices)
    
    ante_impact_position = position_data[i][:ending_index].copy()
    impact_phase_position = position_data[i][ending_index:starting_index].copy()
    post_impact_position = position_data[i][starting_index:].copy()
    detected_impact_position = PositionDataSet([position_data[i][j] for j in impact_indices]).copy()
    
    ante_impact_velocity = velocity_data[i][:ending_index].copy()
    impact_phase_velocity = velocity_data[i][ending_index:starting_index].copy()
    post_impact_velocity = velocity_data[i][starting_index:].copy()
    detected_impact_velocity = PositionDataSet([velocity_data[i][j] for j in impact_indices]).copy()
    
    ante_impact_velocity_data.append(ante_impact_velocity)
    impact_phase_velocity_data.append(impact_phase_velocity)
    post_impact_velocity_data.append(post_impact_velocity)
    
    impact_phase_ending_index = 0
    last_impact_time = impact_phase_position[-1].time
    while post_impact_position[impact_phase_ending_index].time < last_impact_time + config.impact_duration:
        impact_phase_ending_index += 1
        
    velocity_data_to_fit = []
    fitted_velocity_data = []
    fitted_linear_velocity_data = []
        
    for j in range(3):   
        
        fitted_linear_velocity_data.append(DataSet())
        fitted_velocity_data.append(DataSet())
        
        data_to_fit = post_impact_velocity[:impact_phase_ending_index].get_index(j).copy()
        time_shift = data_to_fit[0].time
        data_to_fit.align_time()
        
        ante_impact_velocity_value = impact_phase_velocity[-1][j].value
        func = lambda t, a, A, gamma, phi, omega : fitting_func(t, a, A, gamma, omega, phi, ante_impact_velocity_value)
        omega = 2*math.pi / (config.impact_duration/3)
        p0 = [0, 1/100, 0, 0, omega]
        bounds = ((-np.inf, 1/100, -np.log(10)/((3/2)*config.impact_duration/3), -np.pi, (2/3)*omega), (np.inf, np.inf, 0, np.pi, (3.2)*omega))
        
        p, pcov = optimization.curve_fit(func, np.array(data_to_fit.time), np.array(data_to_fit.value), p0=p0,bounds=bounds, maxfev=1000000)
        print(f'a: {"{:.2f}".format(p[0])}, A: {"{:.4f}".format(p[1])}, \N{greek small letter gamma}: {"{:.2f}".format(p[2])}, \N{greek small letter phi}: {"{:.2f}".format(p[3])}, \N{greek small letter omega}: {"{:.2f}".format(p[4])}')
        
        fitted_velocity_data_values = func(data_to_fit.time, p[0], p[1], p[2], p[3], p[4])
        fitted_linear_velocity_data_values = func(data_to_fit.time, p[0], 0, 0, 0, 0)
        
        for k in range(len(fitted_velocity_data_values)):
            fitted_velocity_data[j].append(DataPoint(data_to_fit.time[k] + time_shift, fitted_velocity_data_values[k]))
            fitted_linear_velocity_data[j].append(DataPoint(data_to_fit.time[k] + time_shift, fitted_linear_velocity_data_values[k] - p[1] * math.cos(p[3])))
        
        data_to_fit.align_time(time_shift)
        velocity_data_to_fit.append(data_to_fit)
    
    for k in range(len(fitted_velocity_data[0])):
        time = fitted_velocity_data[0][k].time
        value = []
        fitted_value = []
        fitted_linear_value = []
        for j in range(3):
            value.append(velocity_data_to_fit[j][k].value)
            fitted_value.append(fitted_velocity_data[j][k].value)
            fitted_linear_value.append(fitted_linear_velocity_data[j][k].value)
        velocity_to_be_fitted_data[i].append(PositionDataPoint(time, value))
        fitted_data[i].append(PositionDataPoint(time, fitted_value))
        fitted_linear_data[i].append(PositionDataPoint(time, fitted_linear_value))
        
print("--- %s seconds ---" % (t.time() - start_time))
print("Done")

Fitting curves
a: 1.53, A: 0.0100, γ: -0.00, φ: -0.32, ω: 489.77
a: 0.62, A: 0.0429, γ: -115.13, φ: 2.96, ω: 314.16
a: -1.69, A: 0.0353, γ: -115.13, φ: 0.32, ω: 463.26
a: 0.53, A: 0.0362, γ: -115.13, φ: -1.17, ω: 314.16
a: -2.74, A: 0.0497, γ: -115.13, φ: -3.14, ω: 314.16
a: -2.43, A: 0.0733, γ: -115.13, φ: -3.03, ω: 314.16
a: 7.03, A: 0.1127, γ: -115.13, φ: -0.16, ω: 525.17
a: 0.83, A: 0.0414, γ: -115.13, φ: -3.14, ω: 564.28
a: -1.42, A: 0.0713, γ: -115.13, φ: 2.75, ω: 314.16
a: -0.22, A: 0.0100, γ: -115.13, φ: -3.14, ω: 807.48
a: 0.94, A: 0.0100, γ: -6.29, φ: -0.75, ω: 451.60
a: -0.51, A: 0.0453, γ: -115.13, φ: -0.72, ω: 314.16
a: -0.61, A: 0.0479, γ: -115.13, φ: 2.44, ω: 314.16
a: 0.79, A: 0.0220, γ: -53.72, φ: -1.18, ω: 476.60
a: -0.84, A: 0.0154, γ: -115.13, φ: -0.48, ω: 314.16
a: 0.11, A: 0.0290, γ: -115.13, φ: -3.14, ω: 765.58
a: 2.66, A: 0.0100, γ: -7.42, φ: 1.33, ω: 415.37
a: -0.23, A: 0.0894, γ: -115.13, φ: -0.24, ω: 314.16
a: 2.16, A: 0.0371, γ: -113.59, φ: 0.45, ω: 338.35
a

In [5]:
# Plot velocity

for i in range(len(config.demos)):
    for j in range(3):
        plt.figure(figsize=config.figsize,dpi=config.dpi)
        plt.rcParams['xtick.labelsize'] = config.fontsize2
        plt.rcParams['ytick.labelsize'] = config.fontsize2
        
        plt.plot(ante_impact_velocity_data[i].time, ante_impact_velocity_data[i].get_index(j).value, f'C{4}-*', linewidth=config.linewidth, markersize=config.markersize3,label='Ante-impact phase')
        plt.plot(impact_phase_velocity_data[i].time, impact_phase_velocity_data[i].get_index(j).value, f'C{5}-*', linewidth=config.linewidth, markersize=config.markersize3,label='Impact phase')
        plt.plot(post_impact_velocity_data[i].time, post_impact_velocity_data[i].get_index(j).value, f'C{6}-*', linewidth=config.linewidth, markersize=config.markersize3,label='Post-impact phase')
        detected_impacts = velocity_data[i][impact_indices_data[i]]
        plt.plot(detected_impacts.time, detected_impacts.get_index(j).value, 'C0*', markersize=config.markersize1,label='Detected impacts')
    
        handles, labels = plt.gca().get_legend_handles_labels()
        by_label = dict(zip(labels, handles))
        plt.legend(by_label.values(), by_label.keys(),fontsize=config.fontsize2)
        plt.xlabel('Time [s]',fontsize=config.fontsize2)
        plt.ylabel('Velocity [m/s]',fontsize=config.fontsize2)
        plt.title(f'{config.labels[j]} velocity of different phases ' + config.demos[i],fontsize=config.fontsize1)
        if config.xlim is not None:
            x_min = impact_phase_velocity_data[i][0].time + config.xlim[0]
            x_max = impact_phase_velocity_data[i][-1].time + config.xlim[-1]
            sub_ydata = velocity_data[i][bisect(velocity_data[i].time, x_min):bisect(velocity_data[i].time, x_max)].get_index(j).value
            plt.xlim((x_min, x_max))
            plt.ylim((min(sub_ydata) - 0.05, max(sub_ydata) + 0.05))

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

  """


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [6]:
# Plot fitted velocity

for i in range(len(config.demos)):    
    for j in range(3):
        
        plt.figure(figsize=config.figsize,dpi=config.dpi)
        plt.rcParams['xtick.labelsize'] = config.fontsize2
        plt.rcParams['ytick.labelsize'] = config.fontsize2
        
        plt.plot(impact_phase_velocity_data[i].time, impact_phase_velocity_data[i].get_index(j).value, 'C3-*', linewidth=config.linewidth, markersize=config.markersize3, label='Impact phase')
        plt.plot(post_impact_velocity_data[i].time, post_impact_velocity_data[i].get_index(j).value, f'C{4}-*', linewidth=config.linewidth, markersize=config.markersize3, label='Post-impact phase')
        plt.plot(velocity_to_be_fitted_data[i].time, velocity_to_be_fitted_data[i].get_index(j).value, f'C{5}-*', linewidth=config.linewidth, markersize=config.markersize3, label='Data used for fitting')
        plt.plot(fitted_data[i].time, fitted_data[i].get_index(j).value, f'C{6}-*', linewidth=config.linewidth, markersize=config.markersize3, label='Fitted function')
        plt.plot(fitted_linear_data[i].time, fitted_linear_data[i].get_index(j).value, f'C{0}-*', linewidth=config.linewidth, markersize=config.markersize3, label='Fitted linear term')
        plt.plot(fitted_linear_data[i][0].time, fitted_linear_data[i][0][j].value, f'C{0}*', linewidth=config.linewidth, markersize=config.markersize1, label='Post-impact velocity')
        
        handles, labels = plt.gca().get_legend_handles_labels()
        by_label = dict(zip(labels, handles))
        plt.legend(by_label.values(), by_label.keys(),fontsize=config.fontsize2)
        plt.xlabel('Time [s]',fontsize=config.fontsize2)
        plt.ylabel('Velocity [m/s]',fontsize=config.fontsize2)
        plt.title(f'{config.labels[j]} post-impact velocity ' + config.demos[i],fontsize=config.fontsize1)
        if config.xlim is not None:
            x_min = velocity_to_be_fitted_data[i][0].time + config.xlim[0]
            x_max = velocity_to_be_fitted_data[i][-1].time + config.xlim[-1]
            sub_ydata1 = post_impact_velocity_data[i][bisect(post_impact_velocity_data[i].time, x_min):bisect(post_impact_velocity_data[i].time, x_max)].get_index(j).value
            sub_ydata2 = fitted_data[i][bisect(fitted_data[i].time, x_min):bisect(fitted_data[i].time, x_max)].get_index(j).value
            sub_ydata3 = fitted_linear_data[i][bisect(fitted_linear_data[i].time, x_min):bisect(fitted_linear_data[i].time, x_max)].get_index(j).value
            sub_ydata4 = impact_phase_velocity_data[i][bisect(impact_phase_velocity_data[i].time, x_min):bisect(impact_phase_velocity_data[i].time, x_max)].get_index(j).value
            
            y_min = min([min(sub_ydata1), min(sub_ydata2), min(sub_ydata3), min(sub_ydata4)]) - 0.05
            y_max = max([max(sub_ydata1), max(sub_ydata2), max(sub_ydata3), max(sub_ydata4)]) + 0.05
            plt.xlim((x_min, x_max))
            plt.ylim((y_min, y_max))

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>