# Pinch Detection Lab – Latest Notebook
_Generated: 2025-09-01 03:57_

This notebook contains the latest stationary and walking detectors, plots, and exporters.

In [2]:
import os, numpy as np, pandas as pd, matplotlib.pyplot as plt
from scipy import signal as sp_signal
plt.rcParams['figure.figsize']=(10,4); plt.rcParams['figure.dpi']=120
TIME_CANDS=['time','timestamp','ts','t','date','datetime','elapsed','elapsed_s','seconds','seconds_elapsed']
def resolve_path(p):
    for c in (p, p.strip(), p.replace('  ',' ')):
        if os.path.exists(c): return c
    return p
def _guess_time_col(df):
    for c in df.columns:
        if str(c).lower() in TIME_CANDS: return c
    return df.columns[0]
def _normalize_time_series(t_raw):
    t=np.asarray(t_raw).astype(float); d=np.diff(t); d=d[np.isfinite(d)&(d>0)]
    if len(d)==0: fs=100.0; return np.arange(len(t))/fs, fs
    md=float(np.median(d))
    scale=1.0 if 1e-4<=md<=10 else (1e-3 if 10<md<=5000 else (1e-6 if 5000<md<=5e6 else None))
    if scale is None: fs=100.0; return np.arange(len(t))/fs, fs
    t=t*scale; d=np.diff(t); d=d[np.isfinite(d)&(d>0)]; fs=1.0/float(np.median(d)) if len(d) else 100.0
    if not np.isfinite(fs) or fs<=0 or fs>1000: fs=100.0
    return t, fs
def _guess_axes_cols(df):
    lower={c:str(c).lower() for c in df.columns}; m={}
    for c,lc in lower.items():
        if any(p in lc for p in ['useraccelerationx','accelx','acc_x','acceleration_x']) or lc=='x': m['ax']=c
        elif any(p in lc for p in ['useraccelerationy','accely','acc_y','acceleration_y']) or lc=='y': m['ay']=c
        elif any(p in lc for p in ['useraccelerationz','accelz','acc_z','acceleration_z']) or lc=='z': m['az']=c
        elif 'rotationratex' in lc or 'gyro_x' in lc or 'rotationx' in lc: m['gx']=c
        elif 'rotationratey' in lc or 'gyro_y' in lc or 'rotationy' in lc: m['gy']=c
        elif 'rotationratez' in lc or 'gyro_z' in lc or 'rotationz' in lc: m['gz']=c
    need=['ax','ay','az','gx','gy','gz']
    if not all(k in m for k in need):
        num=[c for c in df.columns if np.issubdtype(df[c].dtype, np.number) and str(c).lower() not in TIME_CANDS]
        if len(num)>=6:
            vs=sorted([(df[c].var(),c) for c in num], reverse=True)
            for k,(_,c) in zip(need,vs[:6]):
                if k not in m: m[k]=c
    if not all(k in m for k in need): raise ValueError('set accel/gyro columns manually')
    return m['ax'],m['ay'],m['az'],m['gx'],m['gy'],m['gz']
def load_accel(path,time_col=None,ax=None,ay=None,az=None,gx=None,gy=None,gz=None):
    p=resolve_path(path); df=pd.read_csv(p)
    if time_col is None: time_col=_guess_time_col(df)
    if None in (ax,ay,az,gx,gy,gz): ax,ay,az,gx,gy,gz=_guess_axes_cols(df)
    t,fs=_normalize_time_series(df[time_col].values) if time_col in df.columns else (np.arange(len(df))/100.0,100.0)
    A=df[[ax,ay,az]].astype(float).copy(); G=df[[gx,gy,gz]].astype(float).copy()
    a=np.sqrt((A**2).sum(axis=1)).values; g=np.sqrt((G**2).sum(axis=1)).values
    return {'df':df,'time':t,'fs':fs,'axes':A,'gyro_axes':G,'acc_mag':a,'gyro_mag':g}
def hp_moving_mean(x,fs,win=0.5):
    w=max(1,int(round(win*fs))); return x - pd.Series(x).rolling(w,1,center=True).mean().values
def bandpass(x,fs,lo,hi,order=3):
    b,a=sp_signal.butter(order,[lo/(fs/2),hi/(fs/2)],btype='band'); return sp_signal.filtfilt(b,a,x)
def gait_lowband(x,fs,lo=0.7,hi=3.0,order=3):
    b,a=sp_signal.butter(order,[lo/(fs/2),hi/(fs/2)],btype='band'); return sp_signal.filtfilt(b,a,x)
def rms_envelope(x,fs,win=0.06):
    w=max(1,int(round(win*fs))); return np.sqrt(pd.Series(x**2).rolling(w,1,center=True).mean().values + 1e-12)
def robust_z(x,fs,win=3.0):
    w=max(3,int(round(win*fs))); s=pd.Series(x)
    med=s.rolling(w,max(1,w//4),center=True).median()
    mad=s.rolling(w,max(1,w//4),center=True).apply(lambda v: np.median(np.abs(v-np.median(v))),raw=False)
    mad=mad.replace(0,np.nan).fillna(mad.median() if np.isfinite(mad.median()) else 1.0)
    return ((s-med)/(1.4826*mad + 1e-9)).values
def plot_score_threshold(t,score,thr,idxs=None,title='score',out_png=None):
    plt.figure(figsize=(10,4)); plt.title(title); plt.plot(t,score,label='score'); plt.plot(t,thr,label='thr')
    if idxs is not None and len(idxs): plt.scatter(t[idxs],score[idxs],marker='x',label='events')
    plt.legend(); plt.xlabel('s'); plt.ylabel('score');
    (plt.savefig(out_png,dpi=150,bbox_inches='tight') or plt.close()) if out_png else plt.show()


In [3]:
def detect_stationary(data,k_mad=5.5,acc_gate=0.025,gyro_gate=0.10,hp_win=0.5,thr_win=3.0,
                      refractory_s=0.12,peakwin_s=0.04,gatewin_s=0.18,min_iei_s=0.10):
    t,fs=data['time'],data['fs']; a=data['acc_mag']; g=data['gyro_mag']
    a_hp=hp_moving_mean(a,fs,hp_win); da=np.gradient(a_hp,1.0/fs); dg=np.gradient(g,1.0/fs)
    z_a=robust_z(a_hp,fs,thr_win); z_g=robust_z(g,fs,thr_win)
    z_da=robust_z(np.abs(da),fs,thr_win); z_dg=robust_z(np.abs(dg),fs,thr_win)
    score=np.sqrt(np.maximum(z_a,0)**2+np.maximum(z_g,0)**2+np.maximum(z_da,0)**2+np.maximum(z_dg,0)**2)
    wthr=max(3,int(round(thr_win*fs)))
    thr=pd.Series(score).rolling(wthr,max(1,int(round(0.75*fs))),center=True)\
        .apply(lambda v: np.median(v)+k_mad*(1.4826*np.median(np.abs(v-np.median(v)))+1e-9),raw=False).values
    n=len(score); refr=int(round(refractory_s*fs)); pw=int(round(peakwin_s*fs)); gate=int(round(gatewin_s*fs))
    idxs=[]; last=-10**9
    for i in np.where(score>thr)[0]:
        if i-last<refr: continue
        i0=max(0,i-pw); i1=min(n,i+pw+1)
        if i != i0 + np.argmax(score[i0:i1]): continue
        g0=max(0,i-gate); g1=min(n,i+gate+1)
        if np.nanmax(a_hp[g0:g1])<acc_gate or np.nanmax(g[g0:g1])<gyro_gate: continue
        if len(idxs) and (i-idxs[-1])<int(round(min_iei_s*fs)): continue
        idxs.append(i); last=i
    return {'idxs':np.array(idxs,dtype=int),'score':score,'thr':thr,'a_hp':a_hp}


In [4]:
def detect_walking(data,k_mad=5.0,acc_gate=0.035,gyro_gate=0.13,hp_win=0.5,bp_lo=6.0,bp_hi=22.0,env_win=0.06,
                   align_tol_s=0.10,rise_max_s=0.16,decay_dt_s=0.14,decay_frac_max=0.65,
                   energy_ratio_min=0.02,low_lo=0.7,low_hi=3.0,corr_lag_s=0.08,corr_min=0.40,
                   thr_win=3.0,refractory_s=0.12,peakwin_s=0.04,gatewin_s=0.20,min_iei_s=0.10):
    t,fs=data['time'],data['fs']; a=data['acc_mag']; g=data['gyro_mag']
    a_hp=hp_moving_mean(a,fs,hp_win); a_bp=bandpass(a_hp,fs,bp_lo,bp_hi); g_bp=bandpass(g,fs,bp_lo,bp_hi)
    a_env=rms_envelope(a_bp,fs,env_win); g_env=rms_envelope(g_bp,fs,env_win)
    a_low=gait_lowband(a,fs,low_lo,low_hi); g_low=gait_lowband(g,fs,low_lo,low_hi)
    z_a=robust_z(a_env,fs,thr_win); z_g=robust_z(g_env,fs,thr_win)
    score=np.sqrt(np.maximum(z_a,0)**2 + np.maximum(z_g,0)**2)
    wthr=max(3,int(round(thr_win*fs)))
    thr=pd.Series(score).rolling(wthr,max(1,int(round(0.75*fs))),center=True)\
        .apply(lambda v: np.median(v)+k_mad*(1.4826*np.median(np.abs(v-np.median(v)))+1e-9),raw=False).values
    n=len(score); refr=int(round(refractory_s*fs)); pw=int(round(peakwin_s*fs)); gate=int(round(gatewin_s*fs))
    align=int(round(align_tol_s*fs)); decay_dt=int(round(decay_dt_s*fs)); rise_max=int(round(rise_max_s*fs))
    idxs=[]; last=-10**9; meta=[]
    for i in np.where(score>thr)[0]:
        if i-last<refr: continue
        i0=max(0,i-pw); i1=min(n,i+pw+1)
        if i != i0 + np.argmax(score[i0:i1]): continue
        g0=max(0,i-gate); g1=min(n,i+gate+1)
        if np.nanmax(a_hp[g0:g1])<acc_gate or np.nanmax(g[g0:g1])<gyro_gate: continue
        ia=g0+np.argmax(a_env[g0:g1]); ig=g0+np.argmax(g_env[g0:g1])
        if abs(ia-ig)>align: continue
        pre=max(0, ia - int(round(0.2*fs)))
        peak=float(a_env[ia]); lvl10=0.1*peak; lvl90=0.9*peak
        sub=a_env[pre:ia+1]; b10=np.where(sub<=lvl10)[0]; b90=np.where(sub<=lvl90)[0]
        if len(b10)==0 or len(b90)==0: continue
        rise=(pre+b90[-1])-(pre+b10[-1])
        dec_a=float(a_env[min(n-1, ia+decay_dt)])
        if not (0<=rise<=rise_max and dec_a<=decay_frac_max*peak): continue
        w=int(round(0.15*fs)); j0=max(0,i-w); j1=min(n,i+w+1)
        E_pinch=float(np.sum(a_bp[j0:j1]**2 + g_bp[j0:j1]**2)); E_low=float(np.sum(a_low[j0:j1]**2 + g_low[j0:j1]**2))+1e-12
        if (E_pinch/E_low)<energy_ratio_min: continue
        seg_a=a_bp[j0:j1]-np.mean(a_bp[j0:j1]); seg_g=g_bp[j0:j1]-np.mean(g_bp[j0:j1])
        denom=np.sqrt(np.sum(seg_a**2)*np.sum(seg_g**2))+1e-12
        import scipy.signal as ss
        xcorr=ss.correlate(seg_a,seg_g,mode='full'); lags=np.arange(-len(seg_a)+1,len(seg_a))
        corr_lag=int(round(corr_lag_s*fs)); mask=np.where(np.abs(lags)<=corr_lag)[0]
        corr_peak=np.max(np.abs(xcorr[mask]))/denom if denom>0 else 0.0
        if corr_peak<corr_min: continue
        if len(idxs) and (i-idxs[-1])<int(round(min_iei_s*fs)): continue
        idxs.append(i); last=i; meta.append(dict(index=int(i), time=float(t[i]), corr_peak=float(corr_peak)))
    return {'idxs':np.array(idxs,dtype=int),'score':score,'thr':thr,'a_bp':a_bp,'g_bp':g_bp,'a_env':a_env,'g_env':g_env,'meta':meta}


In [8]:
def export_events(t,idxs,prefix):
    os.makedirs('./data',exist_ok=True)
    out=f'./data/{prefix}_events.csv'
    pd.DataFrame({'index':idxs.astype(int),'time_sec':t[idxs]}).to_csv(out,index=False)
    print('Wrote',out); return out
def summarize_events(t,idxs):
    times=t[idxs] if len(idxs) else np.array([]); iei=np.diff(times) if len(times)>=2 else np.array([])
    return dict(n=len(iei), median=(np.median(iei) if len(iei) else np.nan))


In [9]:
# Update paths if needed
P_STATIONARY='./data/WristMotionStationary.csv'
P_WALK_PIN  ='./data/WristMotionPinchesWalking.csv'
P_WALK_BASE ='./data/WristMotionWalinkBaseline.csv'
for name, P in [('stationary',P_STATIONARY),('walk_pin',P_WALK_PIN),('walk_base',P_WALK_BASE)]:
    print(f'\n=== {name}: {P} ===')
    data=load_accel(P); t,fs=data['time'],data['fs']
    if name=='stationary':
        det=detect_stationary(data); idxs=det['idxs']
        export_events(t,idxs,'stationary_relaxedv2_new')
        plot_score_threshold(t,det['score'],det['thr'],idxs,title='Stationary – relaxed‑v2',out_png='./data/stationary_relaxedv2_new_score.png')
    else:
        det=detect_walking(data); idxs=det['idxs']
        prefix='walking_pinches_corr' if name=='walk_pin' else 'walking_baseline_corr'
        export_events(t,idxs,prefix)
        plot_score_threshold(t,det['score'],det['thr'],idxs,title=('Walking with pinches' if name=='walk_pin' else 'Walking baseline')+' – corr',out_png=f'./data/{prefix}_overview.png')



=== stationary: ./data/WristMotionStationary.csv ===
Wrote ./data/stationary_relaxedv2_new_events.csv

=== walk_pin: ./data/WristMotionPinchesWalking.csv ===
Wrote ./data/walking_pinches_corr_events.csv

=== walk_base: ./data/WristMotionWalinkBaseline.csv ===
Wrote ./data/walking_baseline_corr_events.csv
