In [None]:
import pandas as pd

In [None]:
data=pd.read_csv("train.csv")
test=pd.read_csv("test.csv")

In [None]:
data.head(10)

In [None]:
from pathlib import Path

root=Path("mabe_data")
print("Root exists:", root.exists())
print("Top-level items:", [p.name for p in root.iterdir()])

In [None]:
import os

train_tracking=os.path.join(root,'train_tracking')
test_tracking=os.path.join(root,'test_tracking')
train_annotation=os.path.join(root,'train_annotation')

In [None]:
print(f"Original number of training videos: {len(data)}")

In [None]:
data=data[~data['lab_id'].str.startswith('MABe22_')].reset_index(drop=True)
print(f"Number of training videos after filtering MABe22: {len(data)}")

print("\nTrain Metadata Sample")
display(data.head(3))

print("\nTest Metadata Sample")
display(test.head(3))

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import json

fig,axes=plt.subplots(2,1,figsize=(14,9))
lab_counts=data['lab_id'].value_counts()
sns.barplot(x=lab_counts.index,y=lab_counts.values,ax=axes[0],color='pink')
axes[0].set_title('Distribution of Videos per Lab')
axes[0].set_xlabel('Lab ID')
axes[0].set_ylabel('Number of Videos')
axes[0].tick_params(axis='x',rotation=45)

bodypart_counts=data['body_parts_tracked'].apply(lambda x: f"{len(json.loads(x))} parts").value_counts()
sns.barplot(x=bodypart_counts.index,y=bodypart_counts.values,ax=axes[1],color='hotpink')
axes[1].set_title('Distribution of Videos per Number of Tracked Body Parts')
axes[1].set_xlabel('Number of Videos')
axes[1].set_ylabel('Number of Tracked Body Parts')

plt.tight_layout()
plt.show()

In [None]:
target_behaviours={'attack','mount','chase'}
bodyparts_todrop=['headpiece_bottombackleft','headpiece_bottombackright','headpiece_bottomfrontleft','headpiece_bottomfrontright','headpiece_topbackleft','headpiece_topbackright','headpiece_topfrontleft','headpiece_topfrontright','spine_1','spine_2','tail_middle_1','tail_middle_2','tail_midpoint']

In [None]:
def index_tracking_paths(metadata,mode='train'):
    assert mode in ['train','test'], "Mode must be 'train' or 'test'."

    base=train_tracking if mode=="train" else test_tracking
    missing,found=0
    paths=[]

    for _, row in metadata.iterrows():
        lab=str(row["lab_id"])
        vid=str(row["video_id"])
        path=base/lab/f"{vid}.parquet"

        if not path.exists():
            missing+=1
            continue

        paths.append(path)
        found+=1

    print(f"[{mode}] tracking files: found {found}, missing {missing} under {base}")
    return pd.DataFrame(paths)

In [None]:
def load_and_pivot_tracking(path):
    tracking=pd.read_parquet(path)

    if len(tracking['bodypart'].unique())>10:
        tracking=tracking[~tracking['bodypart'].isin(bodyparts_todrop)]

    pivoted=tracking.pivot(
    index='video_frame',
    columns=['mouse_id','bodypart'],
    values=['x','y'])

    pivoted=pivoted.reorder_levels([1,2,0],axis=1).sort_index(axis=1)

    if 'pix_per_cm_approx' in row and row['pix_per_cm_approx']>0:
        pivoted/=row['pix_per_cm_approx']

    return pivoted

In [None]:
def parse_behaviours_labelled(raw)->pd.DataFrame|None:
    raw=row.get("behaviours_labeled", None)

    parsed=None
    if isinstance(raw,list):
        parsed=raw
    elif isinstance(raw,str):
        s=raw.strip()
        if s:
            try:
                parsed=json.loads(s)
            except json.JSONDecodeError:
                try:
                    parsed=ast.literal_eval(s)
                except Exception:
                    parsed=None
    if not parsed:
        return None

    rows=[]
    for b in parsed:
        parts=None
        if isinstance(b,str):
            parts=next(csv.reader([b]))
        elif isinstance(b,(list,tuple)):
            parts=list(b)
        elif isinstance(b,dict):
            parts=[b.get("agent"),b.get("target"),b.get("action")]
        else:
            continue
        if len(parts)>=3:
            rows.append(parts[:3])
    if not rows:
        return None
    behaviour=pd.DataFrame(rows,columns["agent","target","action"])

    return behaviour

In [None]:
def pair_samples(metadata:pd.DataFrame,mode="train",train_annotation=Path("mabe_data/train_annotation"),drop_body_parts=None,target_behaviours=None):
    base_tracks=index_tracking_path(metadata,mode=mode)
    target_behaviours=set(target_behaviours or [])

    for _,m in base_tracks.iterrows():
        row=metadata[(metadata["lab_id"]==m["lab_id"])&(metadata["video_id"]==m["video_id"])].iloc[0]
        pivoted=load_and_pivot_tracking(m["tracking_path"],drop_body_parts=drop_body_parts,pix_per_cm=row.get("pix_per_cm_approx"))

        try:
            available_mice=pivoted.columns.get_level_values('mouse_id').unique()
        except (KeyError,ValueError):
            available_mice=pivoted.columns.get_level_values(0).unique()

        behaviours=parse_behaviours_labeled(row.get("behaviours_labeled"))            
        
        for agent_id,target_id in itertools.permutations(available_mice,2):
            agent_str=f"mouse{agent_id}"
            target_str=f"mouse{target_id}"

        pair_actions=behaviour[
            (behaviour['agent']==agent_str)&
            (behaviour['target']==target_str)
        ]['action'].unique()

        relevant_actions=list(set(pair_actions)&target_behaviours)
        if not relevant_actions:
            continue

        agent_data=pivoted[agent_id]
        target_data=pivoted[target_id]
        pair_tracking_data=pd.concat([agent_data,target_data],axis=1,keys=['agent','target'])

        pair_metadata=pd.DataFrame({
            'video_id':m['video_id'],
            'video_frame':pair_tracking_data.index,
            'agent_id':agent_str,
            'target_id':target_str})

        if mode=='train':
            annotation_path=os.path.join(train_annotation,m['lab_id'],f"{m['video_id']}.parquet")
            pair_labels=pd.DataFrame(0,index=pair_tracking_data.index,columns=relevant_actions)

            if os.path.exists(annotation_path):
                annotations=pd.read_parquet(annotation_path)

            pair_annotations=annotations[
                (annotations['agent_id']==agent_id)&
                (annotations['target_id']==target_id)&
                (annotations['action'].isin(relevant_actions))]

            for _, annotations_row in pair_annotations.iterrow():
                pair_labels.loc[annotations_row['start_frame']:annotations_row['stop_frame'],annotations_row['action']]=1

            yield 'pair', pair_tracking_data,pair_metadeta,pair_labels

        else:
            yield 'pair',pair_tracking_data,pair_metadata,relevant_actions

    

In [None]:
def create_pair_features(pair_data,body_parts):
    X=pd.DataFrame(index=pair_data.index)

    agent_parts=pair_data['agent'].columns.get_level_values(0).unique()
    target_parts=pair_data['target'].columns.get_level_values(0).unique()

    body_parts=set(body_parts) if 'body_parts' in globals() else set (agent_parts)|set(target_parts)

    for p1 in agent_parts:
        if p1 not in body_parts:
            continue 
        
        a=pair_data['agent'][p1][['x','y']].to_numpy(copy=False)
        for p2 in target_parts:
            if p2 not in body_parts:
                continue
            t=pair_data['target'][p2][['x','y']].to_numpy(copy=False)

            difference=a-t
            distance=np.sqrt((difference**2).sum(axis=1))
            X[f'distance_{p1}_{p2}']=distance

    if "agent" not in pair_data.columns.get_level_values(0):
        X[col_name]=np.nan
        return X

In [None]:
def get_xy(df,part):
        if not isinstance(df.columns,pd.MultiIndex):
            return None

        try:
            lvl0=df.columns.get_level_values(0)
            lvl1=df.columns.get_level_values(1)
            if part in lvl0 and {"x","y"}.issubset(set(lvl1)):
                sub=df[part]
                if {"x","y"}.issubset(sub.columns):
                    return sub[["x","y"]].to_numpy(copy=False)
        except Exception:
            pass

        try:
            swapped=df.copy()
            swapped.columns=swapped.columns.swaplevel(0,1)
            swapped=swapped.sort_index(axis=1)
            if part in swapped.columns.get_level_values(0) and {"x","y"}.issubset(swapped.columns.get_level_values(1)):
                sub=swapped.xs(part,axis=1,level=0)
                return sub[["x","y"]].to_numpy(copy=False)
        except Exception:
            pass

        return None

In [None]:
def agent_elongation(pair_data:pd.DataFrame,col_name="agent_elongation")->pd.DataFrame:
    nose=get_xy(agent,"nose")
    tail_base=get_xy(agent,"tail_base")
    ear_left=get_xy(agent,"ear_left")
    ear_right=get_xy(agent,"ear_right")

    X=pd.DataFrame(index=pair_data.index)
    if any(arr is None for arr in (nose,tail_base,ear_left,ear_right)):
        X[col_name]=np.nan
        return X

    nose_tail_distance=np.linalg.norm(nose-tail_base,axis=1)
    ear_ear_distance=np.linalg.norm(ear_left-ear_right,axis=1)

    denominator_okay=np.isfinite(ear_ear_distance)&(ear_ear_distance>1e-9)
    out=np.full_like(nose_tail_distance,np.nan,dtype=float)
    np.divide(nose_tail_distance,ear_ear_distance,out=out,where=denominator_okay)

    X[col_name]=out
    return X


In [None]:
def agent_motion_stats(pair_data:pd.DataFrame,fps:float|None=None)->pd.DataFrame:
    if 'body_center' in agent_parts:
        center_x=pair_data['agent']['body_center']['x']
        center_y=pair_data['agent']['body_center']['y']

        velocity_x=center_x.diff()
        velocity_y=center_y.diff()
        speed=np.sqrt(velocity_x**2+velocity_y**2)

        acceleration_x=velocity_x.diff()
        acceleration_y=velocity_y.diff()
        acceleration=np.sqrt(acceleration_x**2+acceleration_y**2)

        for w in [5,15,45]:
           
            X[f'agent_speed_mean_{w}']=speed.rolling(w,min_periods=1,center=True).mean()
            X[f'agent_speed_std_{w}']=speed.rolling(w,min_periods=1,center=True).std()

            X[f'agent_acceleration_mean_{w}']=acceleration.rolling(w,min_periods=1,center=True).mean()
            X[f'agent_acceleration_max_{w}']=acceleration.rolling(w,min_periods=1,center=True).max()

        return X

In [None]:
def add_pair_distance_features(pair_data,X,fps=None):
    try:
        agent=pair_data['agent']['body_center'][['x','y']]
        target=pair_data['target']['body_center'][['x','y']]
    except Exception:
        X['distance']=np.nan
        X['distance_change']=np.nan
        X['approach_rate']=np.nan
        return X

    relative_position=agent.to_numpy(copy=False)-target.to_numpy(copy=False)
    distance=np.sqrt((relative_position**2).sum(axis=1))
    distance_series=pd.Series(distance,index=agent.index)

    agent_delta=agent.diff()
    target_delta=target.diff()
    if fps and fps>0:
        agent_delta*=fps
        target_delta*=fps

    relative_velocity=(agent_delta.to_numpy(copy=False)-target_delta.to_numpy(copy=False))
    changed_distance=distance_series.diff()

    eps=1e-9
    denominator=np.maximum(distance,eps)
    approach_rate=(relative_position*relative_velocity).sum(axis=1)/denominator
    approach_rate_series=pd.Series(approach_rate,index=agent.index)

    agent_speed=np.linalg.norm(agent_delta.to_numpy(),axis=1)
    target_speed=np.linalg.norm(target_delta.to_numpy(),axis=1)
    dot=np.einsum("ij,ij->",np.nan_to_num(agent_delta.to_numpy()),np.nan_to_num(target_delta.to_numpy()))
    velocity_correlation=dot/(agent_speed*target_speed+1e-6)
    velocity_correlation_series=pd.Series(velocity_correlation,index=a.index)

    X['distance']=distance_series
    X['distance_change']=distance_change
    X['approach_rate']=approach_rate_series
    X["velocity_correlation"]=velocity_correlation_series
    
    return X

In [None]:
def finalize_features(X:pd.DataFrame)->pd.DataFrame:
    X=X.sort_index()
    X=X.fillna(method='bfill').fillna(method='ffill')
    X=X.dropna(axis=1,how='all')

    return X