In [None]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

import os
import csv
import zipfile
import optuna
import numpy as np
import tifffile as tiff
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from matplotlib.gridspec import GridSpec
from sklearn.model_selection import ParameterGrid

import sys
sys.path.append("..")
from peak_detection_bright import peak_detection_bright

parentdir = os.path.abspath(os.path.join(os.getcwd(), os.pardir))

In [None]:
def match_points_2d(pred_xy, gt_xy, dr=3.0):
    """
    pred_xy, gt_xy: list/array of (x, y)
    One-to-one greedy matching within dr.
    Returns tp, fp, fn, matches=[(pred_j, gt_i), ...]
    """
    pred = np.asarray(pred_xy, float)
    gt   = np.asarray(gt_xy, float)

    if pred.size == 0:
        return 0, 0, len(gt), []
    if gt.size == 0:
        return 0, len(pred), 0, []

    used_pred = set()
    matches = []

    for i in range(len(gt)):
        x, y = gt[i]
        best_j = None
        best_d = None

        for j in range(len(pred)):
            if j in used_pred:
                continue
            d = np.hypot(pred[j, 0] - x, pred[j, 1] - y)
            if d <= dr and (best_d is None or d < best_d):
                best_d = d
                best_j = j

        if best_j is not None:
            used_pred.add(best_j)
            matches.append((best_j, i))

    tp = len(matches)
    fp = len(pred) - tp
    fn = len(gt) - tp
    return tp, fp, fn, matches

def precision_recall_fbeta(tp, fp, fn, beta=1.0):
    prec = 0.0 if (tp + fp) == 0 else tp / (tp + fp)
    rec  = 0.0 if (tp + fn) == 0 else tp / (tp + fn)
    b2 = beta * beta
    denom = (1 + b2) * tp + b2 * fn + fp
    fbeta = 0.0 if denom == 0 else (1 + b2) * tp / denom
    return prec, rec, fbeta

def run_peak_pipeline_single_frame(img, params):
    """
    Replace THIS with your peak pipeline call.
    Must return coords as (N,2) array in (x,y) order.
    """
    coords, *_ = peak_detection_bright(img, prev_frames=None, binary_mask=None, exinfo=None, presetROIsize=True, **params)
    if coords is None:
        return np.empty((0, 2), dtype=float)
    coords = np.asarray(coords)
    if coords.size == 0:
        return np.empty((0, 2), dtype=float)
    return coords[:, :2].astype(float)

def fbeta(tp, fp, fn, beta=2.0):
    b2 = beta * beta
    denom = (1 + b2) * tp + b2 * fn + fp
    return 0.0 if denom == 0 else (1 + b2) * tp / denom

def evaluate_params(conf_data, manual_events, cav_params, dr=5.0, beta=0.25):
    pred_events = run_peak_pipeline_single_frame(conf_data, cav_params)
    tp, fp, fn, matches = match_points_2d(pred_events, manual_events, dr=dr)
    score = fbeta(tp, fp, fn, beta=beta)
    return score, tp, fp, fn, pred_events, matches

def load_manual_events_from_zip(folder, file):
    """
    Returns sorted list of (frame, x, y) as ints.
    """
    roinames = zipfile.ZipFile(os.path.join(folder,file)).namelist()
    manual_events = []
    for roiname in roinames:
        y = int(roiname.split('-')[0].split('.')[0])
        x = int(roiname.split('-')[1].split('.')[0])
        manual_events.append((x, y))
    manual_events.sort(key=lambda v: v[0])
    return manual_events

def optimize_single_frame(img, manual_xy, dr=3.0, beta=1.0, n_trials=200, seed=0):
    """
    img: 2D numpy array
    manual_xy: list of (x,y)
    """
    sampler = optuna.samplers.TPESampler(seed=seed)
    study = optuna.create_study(direction="maximize", sampler=sampler)

    def objective(trial):
        # Suggest parameters (edit ranges to match your pipeline)
        params = dict(
            maxfilter_kersize=trial.suggest_float("maxfilter_kersize", 5.0, 5.0),
            peak_min_dist=trial.suggest_float("peak_min_dist", 5.0, 20.0),
            num_peaks=trial.suggest_int("num_peaks", 200, 200),
            thresh_abs=trial.suggest_float("thresh_abs", 5.0, 100.0),
            smoothing_radius=trial.suggest_float("smoothing_radius", 0.5, 2.0),
            border_limit=trial.suggest_int("border_limit", 15, 15),
            init_smooth=trial.suggest_int("init_smooth", 1, 1),
            roi_border=trial.suggest_int("roi_border", 5, 5),
            roi_th_factor=trial.suggest_int("roi_th_factor", 6, 6),
        )
        # Constraints
        pred = run_peak_pipeline_single_frame(img, params)
        pred_xy = [(float(x), float(y)) for x, y in pred]  # ensure list of tuples

        tp, fp, fn, _ = match_points_2d(pred_xy, manual_xy, dr=dr)
        prec, rec, f = precision_recall_fbeta(tp, fp, fn, beta=beta)

        # Log extra info
        trial.set_user_attr("tp", int(tp))
        trial.set_user_attr("fp", int(fp))
        trial.set_user_attr("fn", int(fn))
        trial.set_user_attr("precision", float(prec))
        trial.set_user_attr("recall", float(rec))

        return f

    study.optimize(objective, n_trials=n_trials)
    return study

def load_conf_frame(folder, timelapse, frameidx=0):
    files_all = os.listdir(folder)
    files_conf = [f for f in files_all if "conftimelapse" in f and f.endswith(".tif") and 'temporal' not in f]
    files_conf.sort()
    file_conf = files_conf[timelapse]

    conf_data = tiff.imread(os.path.join(folder, file_conf)) - 2**15
    conf_data = conf_data[frameidx]
    return conf_data, file_conf

In [None]:
folders = [os.path.join(parentdir, 'exampledata\\cav1\\conftimelapse')]
folderidx = 0
folder = folders[folderidx]
timelapse = 0

In [None]:
beta = 0.25  # >1: recall, <1: precision; good balance in this case: ~0.25, i.e. precision four times as important as recall (avoid false positives, as we have many events, but do not want to get many false positives)
dr = 5
n_trials = 100

conf_data, file_conf = load_conf_frame(folder, timelapse)
print("Loaded:", file_conf, "shape:", conf_data.shape)

allfiles = os.listdir(folder)
manualeventsfiles = [file for file in allfiles if 'manualevents' in file]
manual_events = load_manual_events_from_zip(folder, file=manualeventsfiles[timelapse])
print("Manual events:", len(manual_events))

study = optimize_single_frame(conf_data, manual_events, dr=dr, beta=beta, n_trials=n_trials, seed=0)
print("Best FÎ²:", study.best_value)
print("Best params:", study.best_params)

best_params = study.best_params
score, tp, fp, fn, pred_events, matches = evaluate_params(
    conf_data, manual_events, best_params, dr=dr, beta=beta
)
print(f"\nBest F2={score:.4f}  TP={tp}  FP={fp}  FN={fn}")