In [None]:
# Load source data
SIGNALS_FILE = 'falstad-data/CAN_signals.csv'

FREQUENCY_SIGNAL = 'ESP_m_Raddrehz'
POSITION_SIGNAL = 'ESP_Wegimp_VA'
POSITION_WRAPVAL = 1<<11

import pandas as pd

df = pd.read_csv(SIGNALS_FILE, index_col='time')

# Index is a Unix-epoch time
df.index = pd.to_datetime(df.index, unit='s')

#
# Tools
#
def unwrap_column(df: pd.DataFrame, column: str, wrap_val: float) -> None:
    """Adjust wrapped around column (change of more than half of the wrap-around value)"""
    # Assignment to "df.iloc[:,col_idx]" is much faster than to "df.iloc[:][column]"
    col_idx = df.columns.get_loc(column)
    wraps = df.iloc[:, col_idx].to_numpy()
    wraps = abs(wraps[1:] - wraps[:-1]) > wrap_val/2
    wraps = wraps.nonzero()[0]
    for idx in wraps:
        adj = wrap_val if df.iloc[idx, col_idx] > df.iloc[idx+1, col_idx] else -wrap_val
        df.iloc[idx+1:, col_idx] += adj
def date2offset(dates:pd.DatetimeIndex, start:pd.Timestamp=None) -> pd.Index:
    """Convert dates to float offsets"""
    if start is None:
        start = dates[0]
    return (dates - start).astype(int) / 1e9
def offset2date(date:pd.Index, start:pd.Timestamp) -> pd.DatetimeIndex:
    """Convert float offsets to dates"""
    return pd.to_timedelta(date, unit='s') + start

# Detect and adjust wrap-arounds in 'ESP_Wegimp_VA
position_raw = df[POSITION_SIGNAL].copy()
unwrap_column(df, POSITION_SIGNAL, POSITION_WRAPVAL)

In [None]:
# Visualize source data
import plotly.graph_objects as go

fig = go.FigureWidget()
fig.update_layout(
    title='Source data',
    hovermode='x unified',
    yaxis2=dict(overlaying='y', side='right'))

# Reorder columns
df = df[[POSITION_SIGNAL, FREQUENCY_SIGNAL]]
for idx, column in enumerate(df.columns):
    fig.add_scatter(x=df.index, y=df[column], yaxis='y%d'%(idx+1), name=column)

# Add original (unwrapped) position 
fig.add_scatter(x=position_raw.index, y=position_raw, yaxis='y1', line_dash='dash',
        visible='legendonly', name=POSITION_SIGNAL + ' (raw)')

fig.update_traces(xhoverformat='%s.%L')
fig

In [None]:
# Visualize approximated derivatives
import numpy as np
import plotly.graph_objects as go
import poly_approx

MAX_DERIVS = 6
NUM_DERIVS = 4
SRC_FREQ_SCALE = 100

approx = poly_approx.approximator()

# Allow data pre-procesing / reduction of the sample rate
start_t = df.index[0]
src_data = np.array((df[POSITION_SIGNAL], date2offset(df.index, start_t))).T

def lin_filter(arr: np.array, width: int, *, axis: int=0) -> np.array:
    """Linear (averaging) filter of array"""
    idx = np.arange(arr.shape[axis] - width + 1) + np.arange(width)[...,np.newaxis]
    return np.take(arr, idx, axis=axis).sum(axis) / width

fig = go.FigureWidget()
fig.update_layout(
    title='Approximated derivatives',
    hovermode='x unified',
    yaxis2=dict(overlaying='y', side='right'),
    yaxis3=dict(overlaying='y', visible=False),
    yaxis4=dict(overlaying='y', visible=False))

# Prepare scatter data from 4 derivatives
scat_vals = tuple(([],[]) for _ in range(NUM_DERIVS or 4))

# Experimental filtering
if False:
    # Replace time (index)
    src_data[...,1] = np.linspace(src_data[0,1], src_data[-1,1], src_data.shape[0], endpoint=True)
if False:
    # Blur data (index and position)
    for _ in range(4):
        src_data = lin_filter(src_data, 10) # 10 ~ .5 sec

for v, t in src_data:
    approx.approximate(v, t)
    # Drop high-rank components
    if MAX_DERIVS is not None:
        approx.reduce(max_rank=MAX_DERIVS)
    tmp_obj = approx.copy()
    # Convert to derivatives
    if NUM_DERIVS is not None:
        d_idx = min(NUM_DERIVS, tmp_obj.num_deltas()) - 1
        d_time = tmp_obj.get_value_time(d_idx)[1]
        tmp_obj.make_derivs(time=d_time, delta_rank=d_idx)
    for idx, scatt in zip(range(tmp_obj.num_deltas()), scat_vals):
        v, t = tmp_obj.get_value_time(idx, as_deriv=True)
        scatt[0].append(t)
        scatt[1].append(v)

for idx, scatt in enumerate(scat_vals):
    fig.add_scatter(x=offset2date(scatt[0], start_t), y=scatt[1], yaxis='y%d'%(idx+1), name='Deriv%d'%idx,
            visible=True if idx<2 else 'legendonly')

# Source data for reference (scaled)
fig.add_scatter(x=df.index, y=df[POSITION_SIGNAL], mode='markers',
        yaxis='y1', name='Position', visible='legendonly')
fig.add_scatter(x=df.index, y=df[FREQUENCY_SIGNAL] * SRC_FREQ_SCALE,
        yaxis='y2', name='Frequency', visible='legendonly')

fig.update_traces(xhoverformat='%s.%L (%M:%S)')
fig