In [None]:
path = /data/mtp03_20250801_060149.bmp

In [None]:
#!/usr/bin/env python3
import argparse, sys, os
import cv2
import numpy as np
import pandas as pd

# -------------------------------
# DEFAULT CONFIG (edit as needed)
# -------------------------------

# Fiducial (red/blue checker) detection in HSV
HSV_RED1 = ((0, 80, 60), (10, 255, 255))       # lower red
HSV_RED2 = ((170, 80, 60), (180, 255, 255))    # upper red
HSV_BLUE = ((100, 80, 60), (135, 255, 255))

FIDUCIAL_MIN_AREA = 200
FIDUCIAL_MAX_CANDIDATES = 20

# Blob detector (center spots) — permissive (rounded squares)
BLOB_MIN_AREA = 200
BLOB_MAX_AREA = 8000
BLOB_MIN_CIRCULARITY = 0.08
BLOB_MIN_INERTIA = 0.08
BLOB_MIN_CONVEXITY = 0.2

N_COLS = 12
N_ROWS = 8

# Inner ROI (ellipse) as fraction of cell size
INNER_SCALE_X = 0.45
INNER_SCALE_Y = 0.45

# Glare rejection: drop top X percentile
DEFAULT_GLARE_DROP_TOP_PCT = 5.0

# ---------------------------------
# Utility functions
# ---------------------------------

def imread(path):
    img = cv2.imread(path, cv2.IMREAD_COLOR)
    if img is None:
        raise FileNotFoundError(path)
    return img

def order_corners(pts):
    pts = np.array(pts, dtype=np.float32)
    s = pts.sum(axis=1)
    diff = np.diff(pts, axis=1).reshape(-1)
    rect = np.zeros((4,2), np.float32)
    rect[0] = pts[np.argmin(s)]       # TL
    rect[2] = pts[np.argmax(s)]       # BR
    rect[1] = pts[np.argmin(diff)]    # TR
    rect[3] = pts[np.argmax(diff)]    # BL
    return rect

def get_hsv_masks(bgr):
    hsv = cv2.cvtColor(bgr, cv2.COLOR_BGR2HSV)
    red1 = cv2.inRange(hsv, np.array(HSV_RED1[0]), np.array(HSV_RED1[1]))
    red2 = cv2.inRange(hsv, np.array(HSV_RED2[0]), np.array(HSV_RED2[1]))
    red = cv2.bitwise_or(red1, red2)
    blue = cv2.inRange(hsv, np.array(HSV_BLUE[0]), np.array(HSV_BLUE[1]))
    return red, blue

def find_fiducial_corners(bgr, use_overlap=False, debug=False):
    """Return 4 points [TL, TR, BR, BL] if found, else None."""
    h, w = bgr.shape[:2]
    red, blue = get_hsv_masks(bgr)
    kernel = np.ones((5,5), np.uint8)
    if use_overlap:
        cand = cv2.bitwise_and(cv2.dilate(red,kernel), cv2.dilate(blue,kernel))
    else:
        cand = cv2.dilate(cv2.bitwise_or(red,blue), kernel)

    cnts, _ = cv2.findContours(cand, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    picks = []
    for c in cnts:
        a = cv2.contourArea(c)
        if a < FIDUCIAL_MIN_AREA: 
            continue
        x,y,wc,hc = cv2.boundingRect(c)
        cx, cy = x+wc/2, y+hc/2
        picks.append((a, (cx,cy)))
    picks.sort(key=lambda x: -x[0])
    picks = picks[:FIDUCIAL_MAX_CANDIDATES]
    if not picks:
        return None

    # choose closest to four image corners
    targets = [(0,0), (w-1,0), (w-1,h-1), (0,h-1)]
    chosen = []
    pool = picks[:]
    for tx,ty in targets:
        if not pool: return None
        d = [np.hypot(cx-tx, cy-ty) for _,(cx,cy) in pool]
        ix = int(np.argmin(d))
        chosen.append(pool.pop(ix)[1])

    return order_corners(chosen)

def warp_by_corners(bgr, corners):
    h, w = bgr.shape[:2]
    dst = np.array([[0,0],[w-1,0],[w-1,h-1],[0,h-1]], dtype=np.float32)
    M = cv2.getPerspectiveTransform(order_corners(corners), dst)
    out = cv2.warpPerspective(bgr, M, (w,h), flags=cv2.INTER_LINEAR)
    return out

def detect_plate_rectangle(bgr):
    gray = cv2.cvtColor(bgr, cv2.COLOR_BGR2GRAY)
    blur = cv2.GaussianBlur(gray, (9,9), 0)
    edges = cv2.Canny(blur, 50, 150)
    lines = cv2.HoughLinesP(edges, 1, np.pi/180, threshold=200, minLineLength=450, maxLineGap=30)
    if lines is None: 
        return None
    lines = lines.reshape(-1,4)

    def ang(l): 
        x1,y1,x2,y2 = l; return np.degrees(np.arctan2(y2-y1, x2-x1))
    horiz = [l for l in lines if abs(ang(l)) < 15]
    vert  = [l for l in lines if abs(ang(l)) > 75]
    if len(horiz) < 2 or len(vert) < 2: 
        return None

    def length(l): 
        x1,y1,x2,y2 = l; return np.hypot(x2-x1, y2-y1)
    horiz.sort(key=length, reverse=True)
    vert.sort(key=length, reverse=True)
    horiz = horiz[:10]; vert = vert[:10]

    xs = [(min(l[0],l[2]), max(l[0],l[2]), l) for l in vert]
    ys = [(min(l[1],l[3]), max(l[1],l[3]), l) for l in horiz]
    left  = min(xs, key=lambda t:t[0])[2]
    right = max(xs, key=lambda t:t[1])[2]
    top   = min(ys, key=lambda t:t[0])[2]
    bot   = max(ys, key=lambda t:t[1])[2]

    def line(p1,p2):
        x1,y1 = p1; x2,y2 = p2
        A = y2-y1; B = x1-x2; C = A*x1 + B*y1
        return A,B,C
    def inter(L1,L2):
        A1,B1,C1 = L1; A2,B2,C2 = L2
        d = A1*B2 - A2*B1
        if abs(d) < 1e-6: return None
        x = (C1*B2 - C2*B1)/d
        y = (A1*C2 - A2*C1)/d
        return (x,y)

    L_left  = line((left[0],left[1]), (left[2],left[3]))
    L_right = line((right[0],right[1]), (right[2],right[3]))
    L_top   = line((top[0],top[1]), (top[2],top[3]))
    L_bot   = line((bot[0],bot[1]), (bot[2],bot[3]))

    tl = inter(L_left, L_top)
    tr = inter(L_right,L_top)
    br = inter(L_right,L_bot)
    bl = inter(L_left, L_bot)
    if None in (tl,tr,br,bl): 
        return None
    return order_corners([tl,tr,br,bl])

def build_blob_detector():
    p = cv2.SimpleBlobDetector_Params()
    p.filterByArea = True; p.minArea = BLOB_MIN_AREA; p.maxArea = BLOB_MAX_AREA
    p.filterByCircularity = True; p.minCircularity = BLOB_MIN_CIRCULARITY
    p.filterByInertia = True; p.minInertiaRatio = BLOB_MIN_INERTIA
    p.filterByConvexity = True; p.minConvexity = BLOB_MIN_CONVEXITY
    p.filterByColor = True; p.blobColor = 255  # light blobs on dark
    return cv2.SimpleBlobDetector_create(p)

def detect_centers_blob(rectified_bgr):
    gray = cv2.cvtColor(rectified_bgr, cv2.COLOR_BGR2GRAY)
    tophat = cv2.morphologyEx(gray, cv2.MORPH_TOPHAT, cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (25,25)))
    norm = cv2.normalize(tophat, None, 0, 255, cv2.NORM_MINMAX)
    blur = cv2.GaussianBlur(norm, (7,7), 0)
    det = build_blob_detector()
    kps = det.detect(blur)
    pts = np.array([kp.pt for kp in kps], dtype=np.float32) if kps else np.empty((0,2), np.float32)
    return pts

def kmeans_sorted_1d(values, K):
    Z = np.float32(values.reshape(-1,1))
    criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 100, 0.1)
    attempts = 10
    compactness, labels, centers = cv2.kmeans(Z, K, None, criteria, attempts, cv2.KMEANS_PP_CENTERS)
    centers = centers.reshape(-1)
    order = np.argsort(centers)
    lab_map = {old:new for new,old in enumerate(order)}
    labels_sorted = np.vectorize(lab_map.get)(labels.reshape(-1))
    return centers[order], labels_sorted

def assign_grid_from_points(pts, n_cols, n_rows):
    xs = pts[:,0]; ys = pts[:,1]
    cx, lx = kmeans_sorted_1d(xs, n_cols)
    cy, ly = kmeans_sorted_1d(ys, n_rows)
    grid = [[None for _ in range(n_cols)] for _ in range(n_rows)]
    for (x,y), ix, iy in zip(pts, lx, ly):
        grid[iy][ix] = (float(x), float(y))
    # interpolate any missing
    for r in range(n_rows):
        for c in range(n_cols):
            if grid[r][c] is None:
                row_vals = [(cc, grid[r][cc]) for cc in range(n_cols) if grid[r][cc] is not None]
                col_vals = [(rr, grid[rr][c]) for rr in range(n_rows) if grid[rr][c] is not None]
                x_est = np.interp(c, [cc for cc,_ in row_vals], [v[0] for _,v in row_vals]) if len(row_vals)>=2 else None
                y_est = np.interp(r, [rr for rr,_ in col_vals], [v[1] for _,v in col_vals]) if len(col_vals)>=2 else None
                if x_est is not None and y_est is not None:
                    grid[r][c] = (float(x_est), float(y_est))
    return grid, cx, cy

def robust_intensity_L(bgr, center, dx, dy, drop_top_pct):
    x, y = center
    rx = dx * INNER_SCALE_X
    ry = dy * INNER_SCALE_Y
    h, w = bgr.shape[:2]
    mask = np.zeros((h,w), np.uint8)
    cv2.ellipse(mask, (int(round(x)), int(round(y))), (int(round(rx)), int(round(ry))), 0, 0, 360, 255, -1)
    L = cv2.cvtColor(bgr, cv2.COLOR_BGR2LAB)[:,:,0].astype(np.float32)
    vals = L[mask==255]
    if vals.size == 0: return np.nan
    if drop_top_pct > 0:
        cutoff = np.percentile(vals, 100 - drop_top_pct)
        vals = vals[vals <= cutoff]
        if vals.size == 0: return np.nan
    return float(np.median(vals))

# ---------------------------------
# Main pipeline
# ---------------------------------

def process(image_path, out_csv, show=False, debug=False, method="auto", drop_top_pct=DEFAULT_GLARE_DROP_TOP_PCT):
    bgr = imread(image_path)
    H0, W0 = bgr.shape[:2]

    rectified = None
    used_rect = None

    if method in ("auto","fiducial"):
        # Try fiducial-based (first with overlap mask, then relaxed OR mask)
        for overlap in (True, False):
            corners = find_fiducial_corners(bgr, use_overlap=overlap, debug=debug)
            if corners is not None:
                rectified = warp_by_corners(bgr, corners)
                used_rect = "fiducials"
                break

    if rectified is None and method in ("auto","lines"):
        # Try plate rectangle detection
        corners = detect_plate_rectangle(bgr)
        if corners is not None:
            # Warp to a rectangle sized by detected quad
            tl,tr,br,bl = corners
            width_top = np.hypot(*(tr - tl))
            width_bot = np.hypot(*(br - bl))
            height_left = np.hypot(*(bl - tl))
            height_right= np.hypot(*(br - tr))
            W = int(max(width_top, width_bot))
            H = int(max(height_left, height_right))
            dst = np.array([[0,0],[W-1,0],[W-1,H-1],[0,H-1]], dtype=np.float32)
            M = cv2.getPerspectiveTransform(corners, dst)
            rectified = cv2.warpPerspective(bgr, M, (W,H))
            used_rect = "lines"

    # If still None, carry on without rectification
    working = rectified if rectified is not None else bgr
    if debug:
        print(f"Rectification: {used_rect if used_rect else 'none'}; working size = {working.shape[1]}x{working.shape[0]}")

    # Try to detect centers; if too few, we’ll fall back to uniform grid
    pts = detect_centers_blob(working)
    use_uniform_grid = (pts.shape[0] < (N_ROWS * N_COLS * 0.6))  # permissive; require ~60% found

    if not use_uniform_grid:
        grid, cx, cy = assign_grid_from_points(pts, N_COLS, N_ROWS)
        dx = float(np.median(np.diff(cx))) if len(cx)>1 else working.shape[1]/N_COLS
        dy = float(np.median(np.diff(cy))) if len(cy)>1 else working.shape[0]/N_ROWS
    else:
        # Even subdivision
        H, W = working.shape[:2]
        dx = W / N_COLS
        dy = H / N_ROWS
        grid = [[((c+0.5)*dx, (r+0.5)*dy) for c in range(N_COLS)] for r in range(N_ROWS)]

    # Measure
    records = []
    for r in range(N_ROWS):
        for c in range(N_COLS):
            center = grid[r][c]
            if center is None:
                val = np.nan
            else:
                val = robust_intensity_L(working, center, dx, dy, drop_top_pct)
            records.append({
                "row": chr(ord('A')+r),
                "col": c+1,
                "well": f"{chr(ord('A')+r)}{c+1}",
                "intensity_L": val
            })

    df = pd.DataFrame.from_records(records)
    df.to_csv(out_csv, index=False)

    # Print targets
    mask = df["row"].isin(["D","E"]) & df["col"].between(3,10)
    targets = df.loc[mask].copy()
    print("Targets (D3..D10, E3..E10):")
    print(targets.to_string(index=False))
    print(f"\nSaved all wells to: {out_csv}")

    # Optional visualization
    if show or debug:
        vis = working.copy()
        # draw ellipses + labels
        for r in range(N_ROWS):
            for c in range(N_COLS):
                center = grid[r][c]
                if center is None: continue
                x,y = map(int, center)
                rx = int(dx*INNER_SCALE_X); ry = int(dy*INNER_SCALE_Y)
                cv2.ellipse(vis, (x,y), (rx,ry), 0, 0, 360, (255,0,0), 2)
                cv2.putText(vis, f"{chr(65+r)}{c+1}", (x-12,y+4), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0,255,0), 1, cv2.LINE_AA)
        try:
            import matplotlib.pyplot as plt
            plt.figure(figsize=(10,10))
            plt.imshow(cv2.cvtColor(vis, cv2.COLOR_BGR2RGB))
            title = "Rectified" if rectified is not None else "Unrectified"
            plt.title(f"{title} grid & measurement ROIs")
            plt.axis("off")
            plt.show()
        except Exception:
            pass

    return df, targets

def main():
    ap = argparse.ArgumentParser(description="96-well plate color intensity extractor")
    ap.add_argument("--image", required=True)
    ap.add_argument("--out", default="well_intensities.csv")
    ap.add_argument("--method", choices=["auto","fiducial","lines","grid"], default="auto",
                    help="auto=fiducials→lines→grid; grid=just subdivide")
    ap.add_argument("--drop-top", type=float, default=DEFAULT_GLARE_DROP_TOP_PCT,
                    help="glare trimming: drop top percentile within ROI")
    ap.add_argument("--show", action="store_true")
    ap.add_argument("--debug", action="store_true")
    args = ap.parse_args()

    global DEFAULT_GLARE_DROP_TOP_PCT
    DEFAULT_GLARE_DROP_TOP_PCT = args.drop_top

    if args.method == "grid":
        # force no rectification + uniform grid
        df, targets = process(args.image, args.out, show=args.show, debug=args.debug, method="lines", drop_top_pct=args.drop_top)
    else:
        df, targets = process(args.image, args.out, show=args.show, debug=args.debug, method=args.method, drop_top_pct=args.drop_top)

if __name__ == "__main__":
    main()
