# Feature Engineering Pipeline for CIL Project
This notebook loads all `*spots.csv` files in the current directory, cleans them, detects cell–cell contact events indicative of contact-inhibition-of-locomotion (CIL), and exports frame‑level and track‑level feature tables ready for machine‑learning and ABM calibration.

*Created on 2025-04-29*

In [8]:
from pathlib import Path
import pandas as pd

DATA_DIR   = Path(".")
SPOTS_FILES = sorted(DATA_DIR.glob("*spots.csv"))

def load_spots(path: Path, skip_header_rows: int = 3) -> pd.DataFrame:
    """Return a clean TrackMate spots DataFrame (TrackID, Frame, T, X, Y, Movie)."""
    lines = path.read_text(encoding="latin1").splitlines()

    # ------------------------------------------------------------------ #
    # ➊ Keep everything *after* the three descriptive lines
    # ------------------------------------------------------------------ #
    body = lines[skip_header_rows:]

    # ------------------------------------------------------------------ #
    # ➋ Determine the expected number of commas from the *first* row
    #    (TrackMate always writes a fixed-width table after the header)
    # ------------------------------------------------------------------ #
    header_like   = body[0]
    expected_commas = header_like.count(",")

    good_rows = []
    for ln in body:
        if ln.count(",") == expected_commas:      # perfect width ➜ keep
            good_rows.append(ln)
        # else:  drop the row silently

    # ------------------------------------------------------------------ #
    # ➌ Load the kept rows with pandas (no dtype inference)
    # ------------------------------------------------------------------ #
    from io import StringIO
    csv_buffer = StringIO("\n".join(good_rows))

    # Read *all columns as string* so nothing blows up
    df_raw = pd.read_csv(csv_buffer, dtype=str, header=None)

    # ➍ Use the first row as real column names, then drop it
    df_raw.columns = df_raw.iloc[0]
    df_raw = df_raw.drop(index=df_raw.index[0]).reset_index(drop=True)

    # ➎ Rename and reduce to essentials
    rename = {
        "TRACK_ID": "TrackID",     "TRACK_INDEX": "TrackID",
        "Frame": "Frame",          "FRAME": "Frame",
        "POSITION_X": "X",         "(µm)": "X",
        "POSITION_Y": "Y",         "(µm).1": "Y",
        "POSITION_T": "T",         "(sec)": "T",
    }
    df = df_raw.rename(columns={c: rename.get(c, c) for c in df_raw.columns})
    df = df[["TrackID", "Frame", "T", "X", "Y"]]

    # ➏ Cast numeric columns
    for col in ["TrackID", "Frame", "T", "X", "Y"]:
        df[col] = pd.to_numeric(df[col], errors="coerce")

    # ➐ Drop rows with missing essentials and tag the movie name
    df = df.dropna(subset=["TrackID", "Frame", "X", "Y"])
    df["Movie"] = path.stem
    return df

# ---------- run it on all your files -----------------------------------------
spots_tables = [load_spots(p) for p in SPOTS_FILES]
spots        = pd.concat(spots_tables, ignore_index=True)

print(f"✅ Loaded {len(spots):,} rows from {len(SPOTS_FILES)} files")
spots.head()


TypeError: Cannot convert numpy.ndarray to numpy.ndarray

In [None]:

# basic sanity checks
print('Unique movies:', spots['Movie'].unique())
print('Tracks:', spots['TrackID'].nunique())
print(spots.info())


In [None]:

# Parameter: distance threshold for a 'contact' event (microns)
R_CIL = 25.0

# Detect contacts frame-by-frame within each movie
contacts = []
for movie, grp in spots.groupby('Movie'):
    for frame, fdf in grp.groupby('Frame'):
        coords = fdf[['X','Y']].values
        idx = fdf.index.values
        from scipy.spatial.distance import pdist, squareform
        D = squareform(pdist(coords))
        pairs = np.argwhere(np.triu((D < R_CIL) & (D>0), k=1))
        for i,j in pairs:
            contacts.append({'Movie':movie,'Frame':frame,
                             'Track_i':fdf.loc[idx[i],'TrackID'],
                             'Track_j':fdf.loc[idx[j],'TrackID'],
                             'dist':D[i,j]})
contacts = pd.DataFrame(contacts)
contacts.head()


In [None]:

# Flag each spot row if it is in contact
spots['in_contact'] = False
if not contacts.empty:
    key = contacts[['Movie','Frame','Track_i']]
    spots.loc[pd.MultiIndex.from_frame(key).isin(spots.set_index(['Movie','Frame','TrackID']).index),'in_contact']=True
    key = contacts[['Movie','Frame','Track_j']]
    spots.loc[pd.MultiIndex.from_frame(key).isin(spots.set_index(['Movie','Frame','TrackID']).index),'in_contact']=True
spots.head()


In [None]:

# Compute per-frame instantaneous speed and turn angle
spots = spots.sort_values(['Movie','TrackID','Frame'])
spots['dX'] = spots.groupby(['Movie','TrackID'])['X'].diff()
spots['dY'] = spots.groupby(['Movie','TrackID'])['Y'].diff()
spots['dt'] = spots.groupby(['Movie','TrackID'])['T'].diff()
spots['speed'] = np.sqrt(spots.dX**2 + spots.dY**2)/spots.dt
spots['angle'] = np.arctan2(spots.dY, spots.dX)
spots['dtheta'] = spots.groupby(['Movie','TrackID'])['angle'].diff()

# Export frame‑level features
spots.to_parquet('features_framelevel.parquet')
spots.head()


In [None]:

# Aggregate to track level
track_feats = (spots.groupby(['Movie','TrackID'])
               .agg(total_distance=('speed', lambda s: (s*spots.loc[s.index,'dt']).sum()),
                    mean_speed=('speed','mean'),
                    confinement_ratio=('speed', lambda s: s.mean()/s.max()),
                    contact_fraction=('in_contact','mean'),
                    mean_turn_rate=('dtheta','mean'))
               .reset_index())
track_feats.to_parquet('features_tracklevel.parquet')
track_feats.head()
