In [140]:
import numpy as np 
import pandas as pd 
import matplotlib.pyplot as plt  
import glob 
import os 
from scipy.ndimage import generic_filter
from scipy.signal import find_peaks 

In [154]:
def flat_region_filter(data: np.array, window_size = 200): 
    stddev = generic_filter(data, np.std, size=window_size) 
    movmean_stddev = pd.Series(stddev).rolling(window=window_size).mean() 
    idx_min_std = np.argmin(movmean_stddev) 
    idx_flat = np.arange(max([0,idx_min_std-window_size]), idx_min_std).astype(int) 
    data_flat_filt = data[idx_flat]  
    return idx_flat, data_flat_filt 

def filter_wrench_flat(data: pd.DataFrame, window_size=200) -> np.array: 
    FZ = data.loc[:,'FZ'].to_numpy()
    idx_flat, _ = flat_region_filter(FZ, window_size) 
    wrench_filtered = data.loc[idx_flat,['FX','FY','FZ','TX','TY','TZ']].to_numpy()  
    wrench_filtered_mean = np.mean(wrench_filtered, axis=0)
    return wrench_filtered_mean 

def clip_past_contact(data: pd.DataFrame) -> [pd.DataFrame, int, list]: 
    FZ = data.loc[:,'FZ'].to_numpy()
    # idx_peaks, properties = find_peaks(np.diff(np.array(-data.loc[:,'FZ'])), distance=500)
    idx_peaks, properties = find_peaks((np.array(-data.loc[:,'FZ'])), distance=500, height=20)
    properties['peak_heights'] if 'peak_heights' in properties else np.diff(np.array(-data.loc[idx_peaks,'FZ']))
    num_peaks = min([2,len(idx_peaks)])
    min_samples = 250 
    
    if len(idx_peaks) != 0: 
        idx_clip = idx_peaks[-num_peaks]  
    else: 
        idx_clip = len(data)-min_samples 

    idx_clip = min(idx_clip, len(data)-min_samples) # have at least min num of samples 
    data_clipped = data.loc[idx_clip:,:] 
    data_clipped = data_clipped.reset_index(drop=True)

    return data_clipped, idx_clip, idx_peaks 

# read all csv files in folder 
directory = '../../../wrench_timeseries_data/trial_17/' 
file_pattern = '*.csv'
files = glob.glob(os.path.join(directory, file_pattern)) 
# print(files)

# read specific file 
df_list = [] 
for j, file in enumerate(files):     
    df = pd.read_csv(file)
    # df = df.iloc[:500,:] 
    df_list.append(df) 

# find peak in data 

# filter data 
idx_flat = [] 
idx_peaks_list = [] 
for i, df in enumerate(df_list): 
    clipped_data, idx_clip, idx_peaks = clip_past_contact(df) 
    print(len(clipped_data)) 
    idx_flat_i, _ = flat_region_filter(clipped_data.loc[:,'FZ'].to_numpy(), window_size=200) 
    idx_flat.append(idx_clip + idx_flat_i)
    idx_peaks_list.append(idx_peaks)

    # throw error 
    if len(idx_flat_i) == 0: 
        print(f"Empty indices for case {i}!") 


# plot data 

%matplotlib qt 

dim_plots = ['FX','FY','FZ','TX','TY','TZ'] 
dim_coord = np.array([[0,0],[0,1],[0,2],[1,0],[1,1],[1,2]])

num_rows = max(dim_coord[:,0]) + 1
num_cols = max(dim_coord[:,1]) + 1

fig, axs = plt.subplots(num_rows, num_cols, figsize=(20,10))  
for i, dim in enumerate(dim_plots): 
    axs_handle = axs[dim_coord[i,0],dim_coord[i,1]]
    for j in [2]: #range(len(df_list)): 
        axs_handle.plot(df_list[j][dim]) 
        axs_handle.plot(idx_flat[j], df_list[j].loc[idx_flat[j], dim].to_numpy(), color='red') 
        axs_handle.scatter(idx_peaks_list[j], df_list[j].loc[idx_peaks_list[j], dim].to_numpy(), marker='x', color='red')
    axs_handle.set_title(dim) 
    axs_handle.legend(range(2*len(files))) 
    # axs_handle.set_xlim((0,500)) 

704
446
364


In [147]:
df_list[0]

IndexError: list index out of range

In [108]:
# plot kuka data stream 

wk = pd.read_csv("../../../kuka_wrench_data_stream.csv")
wk.columns =  ['FX','FY','FZ','TX','TY','TZ'] 

wk = wk.loc[1500:3000,:] # clip data 

dim_plots = ['FX','FY','FZ','TX','TY','TZ'] 
dim_coord = np.array([[0,0],[0,1],[0,2],[1,0],[1,1],[1,2]])

num_rows = max(dim_coord[:,0]) + 1
num_cols = max(dim_coord[:,1]) + 1

fig, axs = plt.subplots(num_rows, num_cols, figsize=(20,10))  
for i, dim in enumerate(dim_plots): 
    axs_handle = axs[dim_coord[i,0],dim_coord[i,1]]
    for j in [1]: #range(len(df_list)): 
        axs_handle.plot(wk[dim]) 
    axs_handle.set_title(dim) 
