## Beating Angle (Grade 12 Friendly)

**What this notebook does:**
- It finds the tip of a cilium in each video frame.
- It tracks how far the tip swings left/right.
- It computes the **beating angle** using three points:
  - **P0** = base of the cilium (where it attaches).
  - **P1** and **P2** = two extreme tip positions.

**Why the angle matters:**
A larger angle means the cilium sweeps a wider arc.

**Big picture steps:**
1. Crop a **Region of Interest (ROI)** around the cilium.
2. Make a binary mask (black/white) to isolate the cilium.
3. Skeletonize the mask to get a thin centerline.
4. Find the endpoints (base and tip).
5. Measure the angle between the two farthest tips.

If you want more control, scroll to the config cell and edit the settings.


In [42]:
#Import libraries and package
import cv2
import numpy as np
import csv 
import os 

In [43]:
# Import libraries
import cv2
import numpy as np
import csv
import os
from skimage.morphology import skeletonize

# Paths
video_dir = "./data_videos"
out_dir = "output/beat_angle"
os.makedirs(out_dir, exist_ok=True)

# Video overlay options (optional)
save_annotated_video = False
zoom_roi = False
zoom_scale = 3

# ROI (Region of Interest)
# - Set roi_box = None to auto-detect motion (recommended).
# - Or set a manual box: (x1, y1, x2, y2) in pixels.
roi_box = None

# Base point (P0)
# - Set P0 = None to auto-detect the base.
# - Or set P0 manually: (x, y) in full-frame pixels.
P0 = None

max_frames = 400  # how many frames to analyze from each video

# Mask/threshold settings (tune if the mask looks wrong)
mask_invert = True
mask_blur_ksize = 5
mask_adaptive_block = 31
mask_adaptive_C = 2

# Motion ROI settings (auto-crop based on movement)
roi_sample_frames = 40
roi_padding = 12


In [44]:
from collections import Counter
from cv2 import blur

def euclidean_distance(p1, p2):
    """Calculate the Euclidean distance between two points."""
    p1 = np.asarray(p1, dtype = float)
    p2 = np.asarray(p2, dtype = float)
    return float(np.linalg.norm(p1 - p2))

def compute_beat_angle(p0, p1, p2):
    """Compute the angle formed by three vectors using Arcos and position vectors.
    P0 = base, P1/P2 = two tip differents (different phases):
    returns angle in degrees"""
    
    p0 = np.asarray(p0, dtype=float)
    p1 = np.asarray(p1, dtype=float)
    p2 = np.asarray(p2, dtype=float)
    
    a = euclidean_distance(p0, p1)
    b = euclidean_distance(p0, p2)
    c = euclidean_distance(p1, p2)
    
    denom = max(2.0 * a * b, 1e-8)
    cos_theta = (a*a + b*b - c*c) / denom
    cos_theta = np.clip(cos_theta, -1.0, 1.0)
    
    angle_rad = np.arccos(cos_theta)
    angle_deg = np.degrees(angle_rad)
    return float(angle_deg)

def preprocess_to_mask(gray_roi):
    """Generate a clean binary mask for cilia segmentation."""
    if mask_blur_ksize and mask_blur_ksize > 1:
        gray_roi = cv2.GaussianBlur(gray_roi, (mask_blur_ksize, mask_blur_ksize), 0)
    
    if mask_adaptive_block and mask_adaptive_block >= 3:
        bw = cv2.adaptiveThreshold(
            gray_roi, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
            cv2.THRESH_BINARY_INV if mask_invert else cv2.THRESH_BINARY,
            mask_adaptive_block, mask_adaptive_C
        )
    else:
        thresh_type = cv2.THRESH_BINARY_INV if mask_invert else cv2.THRESH_BINARY
        _, bw = cv2.threshold(gray_roi, 0, 255, thresh_type + cv2.THRESH_OTSU)

    kernel = np.ones((3, 3), np.uint8)
    bw = cv2.morphologyEx(bw, cv2.MORPH_OPEN, kernel, iterations=1)
    bw = cv2.morphologyEx(bw, cv2.MORPH_CLOSE, kernel, iterations=1)
    return bw

def skeleton_endpoints(skel):
    """Find endpoints: skeleton pixels with exactly 1 neighbor in 8-connectivity.
    Returns list of (x,y) points in ROI coordinates."""
    sk = (skel > 0).astype(np.uint8)
    ys, xs = np.where(sk == 1)
    ends = []
    for y, x in zip(ys, xs):
        y0, y1 = max(0, y-1), min(sk.shape[0], y+2)
        x0, x1 = max(0, x-1), min(sk.shape[1], x+2)
        neighbors = int(np.sum(sk[y0:y1, x0:x1]) - 1)
        if neighbors == 1:
            ends.append((x, y))
    return ends

def pick_base_endpoint(bw, endpoints_roi, window=11):
    """Pick the endpoint with the thickest local mask (proxy for base)."""
    if not endpoints_roi:
        return None
    half = window // 2
    best_pt = None
    best_score = -1
    for x, y in endpoints_roi:
        y0, y1 = max(0, y - half), min(bw.shape[0], y + half + 1)
        x0, x1 = max(0, x - half), min(bw.shape[1], x + half + 1)
        score = int(np.sum(bw[y0:y1, x0:x1] > 0))
        if score > best_score:
            best_score = score
            best_pt = (x, y)
    return best_pt

def tip_from_endpoints(endpoints_roi, P0_roi):
    """Choose tip as endpoint farthest from base (P0) in ROI coords."""
    if len(endpoints_roi) == 0:
        return None
    pts = np.asarray(endpoints_roi, float)
    d = np.linalg.norm(pts - np.asarray(P0_roi, float)[None, :], axis=1)
    tip = pts[int(np.argmax(d))]
    return (int(tip[0]), int(tip[1]))

def pick_extremes(tips_xy):
    """Finds two extreme tip positions (max pairwise distance)"""
    tips = [t for t in tips_xy if t is not None]
    if len(tips) < 2:
        return None, None, np.nan
    tips = np.asarray(tips, dtype=float)
    best_d = -1.0
    best_i, best_j = 0, 1
    for i in range(len(tips)):
        for j in range(i+1, len(tips)):
            d = np.linalg.norm(tips[i] - tips[j])
            if d > best_d:
                best_d = d
                best_i, best_j = i, j
    P1 = tips[best_i]
    P2 = tips[best_j]
    return P1, P2, best_d

def estimate_motion_roi(cap, sample_frames=30, padding=10):
    """Estimate a tight ROI based on frame-to-frame motion energy."""
    n = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    if n == 0:
        return None
    frame_indices = np.linspace(0, max(n - 1, 0), num=min(sample_frames, n), dtype=int)
    prev = None
    motion = None
    for idx in frame_indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, int(idx))
        ok, frame = cap.read()
        if not ok:
            continue
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        gray = cv2.GaussianBlur(gray, (5, 5), 0)
        if prev is None:
            prev = gray
            continue
        diff = cv2.absdiff(gray, prev)
        _, mask = cv2.threshold(diff, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
        motion = mask if motion is None else cv2.bitwise_or(motion, mask)
        prev = gray

    if motion is None:
        return None
    ys, xs = np.where(motion > 0)
    if len(xs) == 0 or len(ys) == 0:
        return None
    x1, x2 = xs.min(), xs.max()
    y1, y2 = ys.min(), ys.max()
    h, w = motion.shape
    x1 = max(0, x1 - padding)
    y1 = max(0, y1 - padding)
    x2 = min(w, x2 + padding)
    y2 = min(h, y2 + padding)
    if x2 - x1 < 5 or y2 - y1 < 5:
        return None
    return (int(x1), int(y1), int(x2), int(y2))


In [45]:
def process_single_video(video_path, out_dir, roi_box, P0, max_frames):
    #Video processing
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise RuntimeError(f"Cannot open video file: {video_path}")

    fps = float(cap.get(cv2.CAP_PROP_FPS))
    W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    N = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    if roi_box is None:
        roi_box = estimate_motion_roi(cap, roi_sample_frames, roi_padding)
    if roi_box is None:
        roi_box = (0, 0, W, H)

    x1, y1, x2, y2 = roi_box
    x1 = max(0, min(x1, W - 1))
    x2 = max(1, min(x2, W))
    y1 = max(0, min(y1, H - 1))
    y2 = max(1, min(y2, H))
    roi_box = (x1, y1, x2, y2)

    P0_roi = None
    if P0 is not None:
        P0_roi = (max(0, P0[0] - x1), max(0, P0[1] - y1))

    writer = None
    annot_path = None
    if save_annotated_video:
        base = os.path.splitext(os.path.basename(video_path))[0]
        annot_path = os.path.join(out_dir, f"{base}_annotated.mp4")
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        writer = cv2.VideoWriter(annot_path, fourcc, fps, (W, H))

    tips_global = [] # list of (frame_idx, x, y)
    base_global = []
    masks_debug_saved = False

    n_to_read = min(N, max_frames)
    cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
    for fi in range(n_to_read):
        ok, frame = cap.read()
        if not ok:
            break

        # Crop ROI
        h, w = frame.shape[:2]
        roi = frame[max(0,y1):min(h,y2), max(0,x1):min(w,x2)]
        if roi.shape[0] == 0 or roi.shape[1] == 0:
            continue  # skip bad frame

        gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)

        # Mask + skeleton + endpoints
        bw = preprocess_to_mask(gray)
        skel = skeletonize(bw)
        skel = skeletonize(bw).astype(np.uint8) * 255  # bool → uint8 for cv2.imwrite
        ends = skeleton_endpoints(skel)

        if P0_roi is None:
            base_roi = pick_base_endpoint(bw, ends)
            if base_roi is not None:
                base_global.append((base_roi[0] + x1, base_roi[1] + y1))
            P0_frame = base_roi
        else:
            P0_frame = P0_roi

        tip_roi = None
        if P0_frame is not None:
            tip_roi = tip_from_endpoints(ends, P0_frame)

        tip_g = None
        if tip_roi is not None:
            tip_g = (tip_roi[0] + x1, tip_roi[1] + y1)
            tips_global.append((fi, tip_g[0], tip_g[1]))

        # Save one debug image set (so you can show proof in report)
        if not masks_debug_saved:
            base = os.path.splitext(os.path.basename(video_path))[0]
            cv2.imwrite(os.path.join(out_dir, f"{base}_debug_gray.png"), gray)
            cv2.imwrite(os.path.join(out_dir, f"{base}_debug_mask.png"), bw)
            cv2.imwrite(os.path.join(out_dir, f"{base}_debug_skel.png"), skel)
            masks_debug_saved = True

        # Draw proof overlay on full frame (optional)
        if save_annotated_video:
            disp = frame.copy()
            cv2.rectangle(disp, (x1, y1), (x2, y2), (0, 255, 0), 2) # ROI box
            if P0 is not None:
                cv2.circle(disp, P0, 5, (0, 255, 0), -1)
                cv2.putText(disp, "P0", (P0[0]+8, P0[1]-8),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,255,0), 2)
            elif base_global:
                base_med = np.median(np.asarray(base_global), axis=0)
                base_med = (int(base_med[0]), int(base_med[1]))
                cv2.circle(disp, base_med, 5, (0, 255, 0), -1)
                cv2.putText(disp, "P0", (base_med[0]+8, base_med[1]-8),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,255,0), 2)

            if tip_g is not None:
                cv2.circle(disp, tip_g, 5, (0, 0, 255), -1)
                cv2.putText(disp, "tip", (tip_g[0]+8, tip_g[1]-8),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,0,255), 2)
                if P0 is not None:
                    cv2.line(disp, P0, tip_g, (255, 255, 0), 1)

            cv2.putText(disp, f"fps={fps:.1f} frame={fi}/{n_to_read}",
                    (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2)

            #show magnified ROI in the corner (looks like a paper figure)
            if zoom_roi:
                roi_zoom = cv2.resize(roi, None, fx=zoom_scale, fy=zoom_scale, interpolation=cv2.INTER_NEAREST)
                zh, zw = roi_zoom.shape[:2]
                y0, x0 = 40, max(0, W - zw - 10)
                if y0 + zh < H and x0 + zw < W:
                    disp[y0:y0+zh, x0:x0+zw] = roi_zoom
                cv2.rectangle(disp, (x0, y0), (x0+zw, y0+zh), (255,255,255), 1)
                cv2.putText(disp, "Zoom ROI", (x0, y0-8),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 2)

            if writer is not None:
                writer.write(disp)

    cap.release()
    if writer is not None:
        writer.release()

    if P0 is None and base_global:
        base_med = np.median(np.asarray(base_global), axis=0)
        P0_final = (float(base_med[0]), float(base_med[1]))
    else:
        P0_final = P0

    tips_xy = [(x, y) for _, x, y in tips_global]
    P1, P2, sweep_dist = pick_extremes(tips_xy)

    angle_deg = None
    if P1 is not None and P2 is not None and P0_final is not None:
        angle_deg = compute_beat_angle(P0_final, P1, P2)

    #save the files 
    base = os.path.splitext(os.path.basename(video_path))[0]
    tips_csv = os.path.join(out_dir, f"{base}_tips.csv")
    summary_csv = os.path.join(out_dir, f"{base}_summary.csv")

    with open(tips_csv, "w", newline="") as f:
        w = csv.DictWriter(f, fieldnames=["video", "fps", "frame", "tip_x", "tip_y"])
        w.writeheader()
        for fi, x, y in tips_global:
            w.writerow({"video": base, "fps": fps, "frame": fi, "tip_x": x, "tip_y": y})

    with open(summary_csv, "w", newline="") as f:
        w = csv.DictWriter(f, fieldnames=["video", "fps", "P0", "P1", "P2", "beat_angle_deg", "tip_sweep_dist_px", "n_tips", "roi_box"])
        w.writeheader()
        w.writerow({
            "video": base,
            "fps": fps,
            "P0": "" if P0_final is None else str(P0_final),
            "P1": str(P1),
            "P2": str(P2),
            "beat_angle_deg": "" if angle_deg is None else float(angle_deg),
            "tip_sweep_dist_px": "" if np.isnan(sweep_dist) else float(sweep_dist),
            "n_tips": len(tips_global),
            "roi_box": str(roi_box)
        })

    print(f"Saved: {tips_csv}, {summary_csv}")
    if annot_path is not None:
        print(f"     {annot_path}")
    print("Debug images:")
    print(f"     {os.path.join(out_dir, f'{base}_debug_gray.png')}")
    print(f"     {os.path.join(out_dir, f'{base}_debug_mask.png')}")
    print(f"     {os.path.join(out_dir, f'{base}_debug_skel.png')}")

    return angle_deg, sweep_dist, len(tips_global)


In [46]:
#Save the CSV file!!
def save_results_csv(csv_path, rows):
    """rows: list of dicts with same keys"""
    if not rows:
        return
    fieldnames = list(rows[0].keys())
    with open(csv_path, "w", newline="") as f:
        w = csv.DictWriter(f, fieldnames=fieldnames)
        w.writeheader()
        w.writerows(rows)

# RUN ON ALL VIDEOS
summary_rows = []
for fname in os.listdir(video_dir):
    if not fname.lower().endswith((".mp4", ".avi", ".mov")):
        continue
    
    video_path = os.path.join(video_dir, fname)
    print(f"\n Processing {fname}...")
    angle, sweep, n_tips = process_single_video(video_path, out_dir, roi_box, P0, max_frames)
    summary_rows.append({
        "video": os.path.splitext(fname)[0],
        "beat_angle_deg": angle,
        "tip_sweep_dist_px": sweep,
        "n_tips": n_tips
    })

# Global summary
all_summary_csv = os.path.join(out_dir, "ALL_beating_angles.csv")
save_results_csv(all_summary_csv, summary_rows)
print(f"\n🎉 COMPLETE! All results: {all_summary_csv}")



 Processing healthy_5.mp4...
Saved: output/beat_angle/healthy_5_tips.csv, output/beat_angle/healthy_5_summary.csv
Debug images:
     output/beat_angle/healthy_5_debug_gray.png
     output/beat_angle/healthy_5_debug_mask.png
     output/beat_angle/healthy_5_debug_skel.png

🎉 COMPLETE! All results: output/beat_angle/ALL_beating_angles.csv
