# CDV position snapshot (single frame)

Build a `TrajectorySet` from CSV (t column treated as frame index), grab x/y coordinates at a specific time `t0` across all trajectories, assemble a displacement table over `delta_t` frames, and visualize a scatter of positions.

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from trajkit import Trajectory, TrajectorySet

csv_path = "/Users/mehdi/particleTraj/res_xyti_time11.csv"  # update to your path
t0 = 0.0        # time of interest (frames, since we treat t as frame index)
time_tol = 1e-3 # tolerance (frames) when matching t0 inside each trajectory


def normalize_track_id(raw_id):
    """Convert numeric ids like 1.0 -> "1" while keeping other labels intact."""
    try:
        as_int = int(raw_id)
        if float(raw_id) == as_int:
            return str(as_int)
    except Exception:
        pass
    return str(raw_id)


# Build TrajectorySet
df = pd.read_csv(csv_path).sort_values(["id", "t"])
ts = TrajectorySet(
    dataset_id="airwater_1um",
    units={"t": "frame", "x": "pixel"},
    meta={"source": str(csv_path)},
)

for tid, df_tid in df.groupby("id", sort=False):
    df_tid = df_tid.sort_values("t").reset_index(drop=True)
    coords = df_tid[["x", "y"]].to_numpy(dtype=float)
    frames = df_tid["t"].to_numpy(dtype=float)  # treat 't' column as frame index
    ts.add(
        Trajectory(
            track_id=normalize_track_id(tid),
            x=coords,
            frame=frames,
            frame_rate_hz=1.0,  # so time_seconds() == frame index
        )
    )

ts.summary_table().head()

In [None]:
# Coordinates for the chosen time across all trajectories
def find_index_at_time(t, target, tol):
    """Return index whose time is closest to `target` within `tol`, else None."""
    idx = np.searchsorted(t, target)
    candidates = []
    if idx < len(t):
        candidates.append(idx)
    if idx > 0:
        candidates.append(idx - 1)
    if not candidates:
        return None
    diffs = [abs(t[i] - target) for i in candidates]
    best = min(range(len(candidates)), key=lambda k: diffs[k])
    return candidates[best] if diffs[best] <= tol else None

points = []
for tid, tr in ts.trajectories.items():
    idx = find_index_at_time(tr.time_seconds(), t0, time_tol)
    if idx is None:
        continue
    points.append(tr.x[idx])

dim = next(iter(ts.trajectories.values())).D if ts.trajectories else 2
coords_at_t = np.vstack(points) if points else np.empty((0, dim))
print(f"Found {len(coords_at_t)} points at t0={t0}s (within tol={time_tol}s)")
coords_at_t[:5]

In [None]:
# Build displacement table across all trajectories (delta_t in frames)
delta_t_frames = 1.0
disp_rows = []
for tid, tr in ts.trajectories.items():
    t = tr.time_seconds()
    frames = tr.frame if tr.frame is not None else np.arange(len(t), dtype=int)
    targets = t + delta_t_frames
    idx = np.searchsorted(t, targets, side="left")
    for i, j in enumerate(idx):
        if j >= len(t):
            continue
        if abs(t[j] - targets[i]) > time_tol:
            continue
        dx = tr.x[j] - tr.x[i]
        row = {
            "track_id": tid,
            "t": float(t[i]),
            "frame": int(frames[i]) if frames is not None else i,
        }
        for k in range(tr.D):
            row[f"x{k}"] = float(tr.x[i, k])
            row[f"dx{k}"] = float(dx[k])
        disp_rows.append(row)

disp_df = pd.DataFrame(disp_rows)
print(f"Displacement rows: {len(disp_df)} (delta_t={delta_t_frames} frames)")
disp_df.head()


In [None]:
# Scatter plot of all x/y positions at the requested time
if len(coords_at_t) == 0:
    print("No positions found at the requested time (check t0 or time_tol)")
else:
    plt.figure(figsize=(6, 6))
    plt.scatter(coords_at_t[:, 0], coords_at_t[:, 1], s=24, alpha=0.8)
    plt.xlabel("x (pixel)")
    plt.ylabel("y (pixel)")
    plt.title(f"Positions at t0={t0}s ({len(coords_at_t)} tracks)")
    plt.axis("equal")
    plt.grid(True)
    plt.show()