# dSTORM Localization Filtering Pipeline

This pipeline processes ELYRA dSTORM localization data with outline masks and blink-density filtering.

## Pipeline Steps
1. **Load Data** - Read localization files (requires x, y columns)
2. **Outline Masking** - Apply periphery/nucleus polygon masks from MATLAB-generated ROI files
3. **Blink Density Filtering** - Retain localizations with ≥6 co-localized events within 50nm
4. **Output** - Save retained localizations and blinking statistics

## Blinking Analysis
The pipeline tracks single-blink percentage per cell. Cells with unusually high single-blink fractions may indicate poor labeling or high noise. Outlier detection using IQR and MAD methods flags these cells.

## File Naming Convention (matches MATLAB output)
- Data file: `{condition}_{time}min_c{cell}.txt`
- Periphery outline: `{condition}_{time}min_c{cell}_periphery_outline.txt`
- Nucleus outline: `{condition}_{time}min_c{cell}_nucleus_outline.txt`

---
## Block 0: Imports and Core Classes

In [None]:
import re
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.path as mpath
from scipy.spatial import cKDTree

try:
    import locan as lc
except Exception as e:
    lc = None
    print("WARNING: locan not available, will use pandas fallback.\n", e)

# Pattern matches: control_5min_c14.txt, iso_20min_c3.txt, etc.
RAW_RE = re.compile(r'^(iso|control)_(\d+)\s*min_c(\d+)\.txt$', re.IGNORECASE)


class ParticleDataManager:
    """Scans directory for data files matching {condition}_{time}min_c{cell}.txt"""

    def __init__(self, base_path=".", recursive=False):
        self.base_path = Path(base_path)
        self.recursive = bool(recursive)
        self.file_catalog = {}
        self.records = {}
        self.scan_for_files()

    @staticmethod
    def extract_cell_number(filepath):
        m = re.search(r'c\s*(\d+)', filepath.stem, flags=re.IGNORECASE)
        return int(m.group(1)) if m else None

    @staticmethod
    def _group_key(cond, minutes):
        return f"{cond.lower()}_{int(minutes)}min"

    def scan_for_files(self):
        if not self.base_path.exists():
            print(f"Warning: directory '{self.base_path}' does not exist")
            return
        entries = list(self.base_path.rglob("*.txt")) if self.recursive else list(self.base_path.glob("*.txt"))
        n_data = 0
        for p in entries:
            m_raw = RAW_RE.match(p.name)
            if not m_raw:
                continue
            cond, minutes, cell = m_raw.group(1).lower(), int(m_raw.group(2)), int(m_raw.group(3))
            self.records[(cond, minutes, cell)] = p
            n_data += 1
        catalog = {}
        for (cond, minutes, cell), path in sorted(self.records.items()):
            gkey = self._group_key(cond, minutes)
            catalog.setdefault(gkey, []).append(path)
        for gkey, files in catalog.items():
            catalog[gkey] = sorted(files, key=lambda p: (self.extract_cell_number(p) or 0, p.name))
        self.file_catalog = catalog
        if self.file_catalog:
            print(f"Scanned '{self.base_path}': {n_data} data files found. Groups:")
            for key, files in sorted(self.file_catalog.items()):
                print(f"  - {key}: {len(files)} cells")
        else:
            print(f"No data files found in '{self.base_path}'")

    def get_available_cells(self, condition, time):
        condition = str(condition).strip().lower()
        minutes = int(re.search(r"(\d+)", str(time)).group(1))
        out = {}
        for (cond, mins, cell), path in self.records.items():
            if cond == condition and mins == minutes:
                out[cell] = path
        return dict(sorted(out.items()))

    def list_available_files(self):
        if not self.file_catalog:
            print("No files found.")
            return
        for key, files in sorted(self.file_catalog.items()):
            print(f"\n{key.upper()}:")
            print("-" * 40)
            for fp in files:
                cnum = self.extract_cell_number(fp)
                print(f"  Cell {cnum}: {fp.name}")


class ParticleDataLoader:
    """Loads localization files with automatic column mapping."""
    
    def __init__(self, filepath):
        self.filepath = Path(filepath)
        self.locdata = None
        self.df = None
        self.column_mapping = {}
        self.load_data()
    
    def load_data(self):
        try:
            if lc is not None:
                self.locdata = lc.load_Elyra_file(str(self.filepath))
                self.df = self.locdata.dataframe
            else:
                raise ImportError("locan not available")
        except Exception:
            # Fallback to pandas
            try:
                self.df = pd.read_csv(self.filepath, sep='\t', encoding='latin1')
            except Exception as e:
                print(f"Failed to load {self.filepath}: {e}")
                self.df = pd.DataFrame()
                return
        self.map_columns()
        print(f"Loaded {len(self.df)} localizations from {self.filepath.name}")
    
    def map_columns(self):
        if self.df is None or self.df.empty:
            return
        column_variations = {
            'x': ['Position X [nm]', 'position_x', 'x', 'X', 'x_nm'],
            'y': ['Position Y [nm]', 'position_y', 'y', 'Y', 'y_nm'],
            'photons': ['Number Photons', 'photons', 'n_photons', 'intensity', 'Photons'],
            'precision': ['Precision [nm]', 'precision', 'uncertainty', 'Precision'],
            'frame': ['First Frame', 'frame', 'Frame', 'first_frame'],
            'background': ['Background variance', 'background', 'Background']
        }
        actual_columns = self.df.columns.tolist()
        for key, variations in column_variations.items():
            for var in variations:
                if var in actual_columns:
                    self.column_mapping[key] = var
                    break
                for col in actual_columns:
                    if col.lower() == var.lower():
                        self.column_mapping[key] = col
                        break
        # Fallback to locan coordinate keys
        if ('x' not in self.column_mapping or 'y' not in self.column_mapping) and self.locdata is not None:
            if hasattr(self.locdata, 'coordinate_keys') and len(self.locdata.coordinate_keys) >= 2:
                self.column_mapping['x'] = self.locdata.coordinate_keys[0]
                self.column_mapping['y'] = self.locdata.coordinate_keys[1]

---
## Block 1: Utility Functions

In [None]:
def ensure_xy_columns(loader):
    """Create DataFrame with standardized x,y columns."""
    df = loader.df.copy()
    x_col = loader.column_mapping.get('x')
    y_col = loader.column_mapping.get('y')
    if x_col is None or y_col is None:
        raise KeyError("x and y columns not found in data")
    df['x'] = df[x_col].astype(float)
    df['y'] = df[y_col].astype(float)
    df = df[np.isfinite(df['x']) & np.isfinite(df['y'])]
    return df


def _retained_canonical_df(loader, df, final_mask):
    """Build output DataFrame with available columns."""
    retained = df.loc[final_mask, ['x', 'y']].copy()
    for k in ['photons', 'precision', 'frame', 'background']:
        if k in loader.column_mapping:
            col_name = loader.column_mapping[k]
            if col_name in loader.df.columns:
                retained[k] = loader.df.loc[retained.index, col_name].values
    return retained

---
## Block 2: Blink Density Calculation

Computes the number of localizations within 50nm radius for each point using a KD-tree. This "blinks per site" metric identifies clustered multi-blink events vs isolated single-blink noise.

In [None]:
def compute_blinks_per_site(df, radius_nm=50):
    """Count localizations within radius_nm for each point."""
    if 'blinks_per_site' in df.columns:
        return df
    pts = df[['x', 'y']].to_numpy()
    if pts.size == 0:
        df['blinks_per_site'] = 0
        return df
    tree = cKDTree(pts)
    neigh = tree.query_ball_tree(tree, r=radius_nm)
    df['blinks_per_site'] = np.fromiter((len(n) for n in neigh), dtype=int, count=len(pts))
    return df

---
## Block 3: Outline File Parsing and Masking

Reads periphery and nucleus outline polygon files (from MATLAB) and applies spatial masks:
- Points outside the periphery polygon → removed
- Points inside the nucleus polygon → removed (if file exists)

In [None]:
PAT_PERI_END = re.compile(r'(?i)_periphery_outline(?:_v\d+)?\.txt$')
PAT_NUCL_END = re.compile(r'(?i)_nucleus_outline(?:_v\d+)?\.txt$')


def _read_outline_points(path):
    """Read polygon vertices from outline file."""
    if path is None or not path.exists():
        return None
    try:
        df = pd.read_csv(path, sep=r"\s+", engine="python", comment="#")
        cols = {c.lower(): c for c in df.columns}
        xcol, ycol = cols.get("x"), cols.get("y")
        if xcol is None or ycol is None:
            if len(df.columns) >= 2:
                xcol, ycol = df.columns[:2]
            else:
                return None
        pts = df[[xcol, ycol]].astype(float).to_numpy()
        return pts if len(pts) >= 3 else None
    except Exception:
        return None


def _polygon_area_nm2(pts):
    """Calculate polygon area using shoelace formula."""
    if pts is None or len(pts) < 3:
        return 0.0
    x, y = pts[:, 0], pts[:, 1]
    return 0.5 * abs(np.dot(x, np.roll(y, -1)) - np.dot(y, np.roll(x, -1)))


def _find_outline_file(search_dir, stem, kind):
    """Find matching outline file for a data file."""
    assert kind in ("periphery", "nucleus")
    pattern = PAT_PERI_END if kind == "periphery" else PAT_NUCL_END
    
    # Try exact match first: stem_periphery_outline.txt
    exact = search_dir / f"{stem}_{kind}_outline.txt"
    if exact.exists():
        return exact
    
    # Search for matching files
    for p in search_dir.glob(f"*{kind}_outline*.txt"):
        if pattern.search(p.name) and p.stem.startswith(stem.split('_')[0]):
            # Check if cell number matches
            m_data = re.search(r'c(\d+)', stem, re.IGNORECASE)
            m_file = re.search(r'c(\d+)', p.stem, re.IGNORECASE)
            if m_data and m_file and m_data.group(1) == m_file.group(1):
                return p
    return None


def apply_outline_masks(df, stem, search_dir):
    """Apply periphery and nucleus outline masks."""
    search_dir = Path(search_dir)
    keep = np.ones(len(df), dtype=bool)
    info = {
        "periphery_file": "", "nucleus_file": "",
        "periphery_vertices": 0, "nucleus_vertices": 0,
        "periphery_area_um2": 0.0, "nucleus_area_um2": 0.0, "final_area_um2": 0.0,
        "n_removed_outside_periphery": 0, "n_removed_in_nucleus": 0, "n_kept": len(df)
    }
    
    pts = df[["x", "y"]].to_numpy(np.float64)
    
    # Periphery mask
    peri_file = _find_outline_file(search_dir, stem, "periphery")
    peri_pts = _read_outline_points(peri_file)
    if peri_pts is not None:
        info["periphery_file"] = peri_file.name
        info["periphery_vertices"] = len(peri_pts)
        info["periphery_area_um2"] = _polygon_area_nm2(peri_pts) / 1e6
        inside_peri = mpath.Path(peri_pts).contains_points(pts)
        info["n_removed_outside_periphery"] = int((~inside_peri).sum())
        keep &= inside_peri
    
    # Nucleus mask
    nucl_file = _find_outline_file(search_dir, stem, "nucleus")
    nucl_pts = _read_outline_points(nucl_file)
    if nucl_pts is not None:
        info["nucleus_file"] = nucl_file.name
        info["nucleus_vertices"] = len(nucl_pts)
        info["nucleus_area_um2"] = _polygon_area_nm2(nucl_pts) / 1e6
        inside_nucl = mpath.Path(nucl_pts).contains_points(pts)
        info["n_removed_in_nucleus"] = int(inside_nucl.sum())
        keep &= ~inside_nucl
    
    info["final_area_um2"] = max(info["periphery_area_um2"] - info["nucleus_area_um2"], 0.0)
    info["n_kept"] = int(keep.sum())
    
    return keep, info

---
## Block 4: Single-Cell Filter Function

Combines outline masking and blink density filtering for a single cell.

In [None]:
def filter_cell(loader, stem=None, blinks_threshold=5, blinks_radius_nm=50, verbose=True):
    """Apply outline masks and blink filtering to a single cell."""
    if stem is None:
        stem = Path(loader.filepath).stem
    
    data_dir = Path(loader.filepath).parent
    df = ensure_xy_columns(loader)
    n_loaded = len(df)
    
    if verbose:
        print(f"[{stem}] Loaded {n_loaded} points")
    
    # Apply outline masks
    outline_mask, outline_info = apply_outline_masks(df, stem, data_dir)
    if verbose:
        peri_status = "FOUND" if outline_info["periphery_file"] else "missing"
        nucl_status = "FOUND" if outline_info["nucleus_file"] else "missing"
        print(f"[{stem}] Outlines: periphery={peri_status}, nucleus={nucl_status}")
        print(f"[{stem}] Removed: {outline_info['n_removed_outside_periphery']} outside periphery, "
              f"{outline_info['n_removed_in_nucleus']} in nucleus")
    
    # Filter to outline-passing points
    df = df.loc[outline_mask].copy()
    
    # Compute blink density
    df = compute_blinks_per_site(df, radius_nm=blinks_radius_nm)
    n_after_outline = len(df)
    n_single = int((df["blinks_per_site"] == 1).sum())
    pct_single = 100.0 * n_single / n_after_outline if n_after_outline > 0 else 0.0
    
    # Apply blink threshold
    blink_mask = df["blinks_per_site"] >= blinks_threshold
    n_final = int(blink_mask.sum())
    
    if verbose:
        print(f"[{stem}] After outlines: {n_after_outline} points, single-blink: {n_single} ({pct_single:.1f}%)")
        print(f"[{stem}] Blink filter (>={blinks_threshold}): {n_final} retained")
    
    summary = {
        "stem": stem,
        "n_loaded": n_loaded,
        "n_after_outline": n_after_outline,
        "n_removed_outside_periphery": outline_info["n_removed_outside_periphery"],
        "n_removed_in_nucleus": outline_info["n_removed_in_nucleus"],
        "periphery_file": outline_info["periphery_file"],
        "nucleus_file": outline_info["nucleus_file"],
        "periphery_area_um2": outline_info["periphery_area_um2"],
        "nucleus_area_um2": outline_info["nucleus_area_um2"],
        "final_area_um2": outline_info["final_area_um2"],
        "n_singleblink": n_single,
        "pct_singleblink": pct_single,
        "n_final": n_final,
    }
    
    return df, blink_mask, summary

---
## Block 5: Plotting and Report Functions

In [None]:
def plot_filtered_cell(df, keep_mask, title=None, save_path=None, show=True):
    """Plot retained vs removed localizations."""
    if len(df) == 0:
        return None
    x, y = df['x'].to_numpy(), df['y'].to_numpy()
    keep = np.asarray(keep_mask, dtype=bool)
    
    fig, ax = plt.subplots(figsize=(7, 7), dpi=150)
    if (~keep).any():
        ax.scatter(x[~keep], y[~keep], s=1.5, alpha=0.25, c="gray", label="Removed")
    if keep.any():
        ax.scatter(x[keep], y[keep], s=2.0, alpha=0.9, c="tab:blue", label="Retained")
    
    ax.set_aspect("equal")
    ax.set_xlabel("X (nm)")
    ax.set_ylabel("Y (nm)")
    if title:
        ax.set_title(title)
    ax.legend(loc="upper right", frameon=False)
    
    if save_path:
        Path(save_path).parent.mkdir(parents=True, exist_ok=True)
        fig.savefig(save_path, bbox_inches="tight")
    if show:
        plt.show()
    else:
        plt.close(fig)
    return fig


def write_filter_report(summary, out_dir, retained_count):
    """Write filter report matching MATLAB _areas.txt format."""
    out_dir = Path(out_dir)
    stem = summary["stem"]
    
    lines = [
        f"File: {stem}.txt",
        f"Periphery area (um^2): {summary['periphery_area_um2']:.6f}",
        f"Core/Nucleus area (um^2): {summary['nucleus_area_um2']:.6f}",
        f"Final area (um^2): {summary['final_area_um2']:.6f}",
        f"Original points: {summary['n_loaded']}",
        f"Final points: {retained_count}",
        f"Total removed: {summary['n_loaded'] - retained_count}",
        f"Periphery outline file: {summary['periphery_file'] or '(none)'}",
        f"Nucleus outline file: {summary['nucleus_file'] or '(none)'}",
        f"Single-blink percentage: {summary['pct_singleblink']:.2f}%",
    ]
    
    report_path = out_dir / f"{stem}_filter_report.txt"
    report_path.write_text("\n".join(lines), encoding="utf-8")
    return report_path

---
## Block 6: Batch Processing Function

In [None]:
def process_all_cells(pdm, out_dir=None, blinks_threshold=5, blinks_radius_nm=50,
                      require_periphery=True, save_png=False):
    """Process all cells and save filtered outputs."""
    out_dir = Path(out_dir if out_dir else "retained_filtered")
    out_dir.mkdir(parents=True, exist_ok=True)
    
    print(f"\n{'='*50}")
    print(f"BATCH PROCESSING")
    print(f"{'='*50}")
    print(f"Output: {out_dir.resolve()}")
    print(f"Blink threshold: >= {blinks_threshold} (within {blinks_radius_nm}nm)")
    print(f"Require periphery outline: {require_periphery}\n")
    
    summaries = []
    n_processed, n_skipped = 0, 0
    
    for key in pdm.file_catalog:
        condition, time_str = key.split("_", 1)
        available = pdm.get_available_cells(condition, time_str)
        
        for cell_num, filepath in sorted(available.items()):
            stem = filepath.stem
            search_dir = filepath.parent
            
            # Check for periphery outline if required
            if require_periphery:
                peri_file = _find_outline_file(search_dir, stem, "periphery")
                if peri_file is None:
                    print(f"[SKIP] {stem}: periphery outline missing")
                    n_skipped += 1
                    continue
            
            try:
                print(f"\n[{stem}] Processing...")
                loader = ParticleDataLoader(filepath)
                
                df, blink_mask, summary = filter_cell(
                    loader, stem=stem,
                    blinks_threshold=blinks_threshold,
                    blinks_radius_nm=blinks_radius_nm,
                    verbose=True
                )
                
                # Add metadata
                summary["condition"] = condition
                summary["time"] = time_str
                summary["cell_num"] = cell_num
                summaries.append(summary)
                
                # Save retained points
                retained = _retained_canonical_df(loader, df, blink_mask)
                out_txt = out_dir / f"{stem}_retained.txt"
                retained.to_csv(out_txt, sep="\t", index=False)
                print(f"[{stem}] Saved: {out_txt.name} ({len(retained)} rows)")
                
                # Save report
                write_filter_report(summary, out_dir, len(retained))
                
                # Save PNG if requested
                if save_png:
                    out_png = out_dir / f"{stem}_filtered.png"
                    title = f"{stem}: {len(retained)}/{summary['n_after_outline']} retained"
                    plot_filtered_cell(df, blink_mask, title=title, save_path=out_png, show=False)
                
                n_processed += 1
                
            except Exception as e:
                print(f"[ERROR] {stem}: {e}")
    
    # Save batch summary
    summary_df = pd.DataFrame(summaries) if summaries else pd.DataFrame()
    if not summary_df.empty:
        summary_df.to_csv(out_dir / "batch_summary.csv", index=False)
    
    print(f"\n{'='*50}")
    print(f"COMPLETE: {n_processed} processed, {n_skipped} skipped")
    print(f"{'='*50}")
    
    return summary_df

---
## Block 7: Blinking Analysis and Outlier Detection

Identifies cells with unusually high single-blink percentages using:
- **IQR method**: Points outside Q1 - 1.5×IQR or Q3 + 1.5×IQR
- **MAD method**: Modified z-score > 3.5

In [None]:
def analyze_blinking_outliers(summary_df, iqr_k=1.5, mad_z=3.5):
    """Detect outlier cells based on single-blink percentage."""
    if summary_df is None or len(summary_df) == 0:
        print("No data to analyze.")
        return pd.DataFrame()
    
    if "pct_singleblink" not in summary_df.columns:
        print("Missing pct_singleblink column.")
        return pd.DataFrame()
    
    df = summary_df.copy()
    df["pct"] = pd.to_numeric(df["pct_singleblink"], errors="coerce")
    
    if "condition" in df.columns:
        groups = df.groupby("condition")["pct"]
    else:
        df["_all"] = "all"
        groups = df.groupby("_all")["pct"]
    
    # IQR method
    q1 = groups.transform(lambda x: x.quantile(0.25))
    q3 = groups.transform(lambda x: x.quantile(0.75))
    iqr = q3 - q1
    df["outlier_iqr"] = (df["pct"] < q1 - iqr_k * iqr) | (df["pct"] > q3 + iqr_k * iqr)
    
    # MAD method
    median = groups.transform("median")
    mad = groups.transform(lambda x: np.median(np.abs(x - np.median(x))))
    df["z_mod"] = 0.6745 * (df["pct"] - median) / mad.replace(0, np.nan)
    df["outlier_mad"] = df["z_mod"].abs() > mad_z
    
    df["outlier_confirmed"] = df["outlier_iqr"] & df["outlier_mad"]
    
    # Print summary
    print(f"\nBlinking Outlier Analysis")
    print(f"  IQR method (k={iqr_k}): {df['outlier_iqr'].sum()} flagged")
    print(f"  MAD method (z>{mad_z}): {df['outlier_mad'].sum()} flagged")
    print(f"  Confirmed (both): {df['outlier_confirmed'].sum()} flagged")
    
    return df[df["outlier_confirmed"]]


def show_blinking_summary(summary_df):
    """Display single-blink statistics."""
    if summary_df is None or len(summary_df) == 0:
        print("No data.")
        return
    
    cols = ["condition", "time", "cell_num", "pct_singleblink", "n_singleblink", "n_after_outline", "n_final"]
    cols = [c for c in cols if c in summary_df.columns]
    
    out = summary_df[cols].copy()
    if "pct_singleblink" in out.columns:
        out["pct_singleblink"] = out["pct_singleblink"].round(2)
    
    sort_cols = [c for c in ["condition", "time", "cell_num"] if c in out.columns]
    if sort_cols:
        out = out.sort_values(sort_cols)
    
    try:
        from IPython.display import display
        display(out)
    except Exception:
        print(out.to_string())

---
## Configuration and Execution

In [None]:
# Set your data directory
BASE_DIR = Path(r"C:\Users\venturasubirav2\dSTORM\ALL_DATA")
OUT_DIR = BASE_DIR / "retained_filtered"

print(f"Input: {BASE_DIR}")
print(f"Output: {OUT_DIR}")

In [None]:
# Scan for data files
pdm = ParticleDataManager(base_path=BASE_DIR, recursive=False)
pdm.list_available_files()

In [None]:
# Run batch processing
summary_df = process_all_cells(
    pdm=pdm,
    out_dir=OUT_DIR,
    blinks_threshold=6,        # Keep localizations with >= 5 blinks within 50nm
    blinks_radius_nm=50,
    require_periphery=True,    # Skip cells without periphery outline
    save_png=True,
)

---
## Results: Blinking Analysis

In [None]:
show_blinking_summary(summary_df)

In [None]:
outliers = analyze_blinking_outliers(summary_df)

if len(outliers) > 0:
    print("\nConfirmed outlier cells:")
    try:
        from IPython.display import display
        display(outliers[["stem", "pct_singleblink", "n_final"]])
    except Exception:
        print(outliers[["stem", "pct_singleblink", "n_final"]].to_string())
else:
    print("\nNo confirmed outliers.")