In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import waltlabtools as wlt

from config import FIND28_ISINGLE_MULTIPLIER
from utils import (
    # weight_digital,
    # StratifiedGroupKFoldFirst,
    Barcode,
    # BarcodeDtype,
    # BarcodeArray,
    # score_estimator,
    get_isingle_aeb,
)

In [None]:
VALIDATION_FILE = "../input/raw/validation/06132024 validation.csv"
DILUTION_FACTOR = 2

In [None]:
q_raw = pd.read_csv(VALIDATION_FILE)
nonzero_calibrators = (q_raw["Sample Type"] == "Calibrator") & (
    q_raw["Replicate Conc."] > 0
)

cal_curve_cols = {
    7: "Ag85B only",
    9: "LAM only",
    11: "Both",
}

q_raw["Curve"] = "Both"
q_raw.loc[nonzero_calibrators, "Curve"] = (
    q_raw.loc[nonzero_calibrators, "Location"]
    .map(lambda s: int(s.split()[-1][1:]))
    .map(cal_curve_cols)
)

In [None]:
# READ FILES
q = wlt.HDX(raw=q_raw, assay_defining_cols=["Curve", "Plex"])
plexes = sorted(q.raw.Plex.unique())

# CALCULATE AEBS AND CONCENTRATIONS
q.raw = get_isingle_aeb(
    q.raw, {"700 LAM FIND28": FIND28_ISINGLE_MULTIPLIER}, max_aeb=np.nan
)
for plex in sorted(q.raw.Plex.unique()):
    analyte = plex.split()[1]
    df = q.raw[
        (q.raw["Plex"] == plex)
        & (q.raw["Sample Type"] == "Calibrator")
        & (q.raw["Replicate AEB"].notna())
        & (q.raw["Curve"].isin(["Both", f"{analyte} only"]))
    ]
    q.cal_curves["Both", plex] = wlt.CalCurve(model="linear").fit(
        X=df["Replicate Conc."], y=df["Replicate AEB"]
    )


# Concentrations
def get_conc(row):
    return q.cal_curves["Both", row["Plex"]].conc(row["Replicate AEB"])


q.raw.loc[q.raw["Sample Type"] == "Specimen", "Replicate Conc."] = q.raw[
    q.raw["Sample Type"] == "Specimen"
].apply(get_conc, axis=1)


Xs = q.raw[q.raw["Sample Type"] == "Specimen"].pivot_table(
    columns="Plex",
    index="Sample Barcode",
    values="Replicate Conc.",
    aggfunc=["median", "min", "max", "count"],
)

## Dropouts (calibration curves with single calibrators to check cross-reactivity)

In [None]:
fig, axs = plt.subplots(ncols=2, figsize=(9, 4.5))

color_lod_fmt = {
    "750 LAM G3": ("#7D3571", 1, "^"),
    "700 LAM FIND28": ("#DD3C4A", 60, "s"),
    "647 LAM S4-20": ("#F2AB0D", 3, "o"),
    "488 Ag85B 182": ("#2E9AB6", 0.2, "d"),
}

constant_model = wlt.Model(
    func=lambda X, a: a * X**0,
    inverse=lambda y, a: np.nan,
    coef_init=np.array([1]),
    name="constant",
)

kws = {"capsize": 3}
xranges = {"Ag85B only": (1e-3, 8 * np.sqrt(5)), "LAM only": (1e-1, 800 * np.sqrt(5))}
for plex, (color, lod, fmt) in color_lod_fmt.items():
    for ax, curve in zip(axs, xranges.keys()):
        df = q.raw[
            (q.raw["Plex"] == plex)
            & (q.raw["Sample Type"] == "Calibrator")
            & (q.raw["Replicate AEB"].notna())
            & (
                (q.raw["Curve"] == curve)
                | (
                    (q.raw["Curve"] == "Both")
                    & (q.raw["Sample Barcode"] == "Calibrator A")
                )
            )
        ]
        if (curve == "Ag85B only") and ("Ag85B" in plex):
            factor = 1
            model = "linear"
        elif (curve == "Ag85B only") and ("Ag85B" not in plex):
            factor = 0.01
            model = constant_model
        elif (curve == "LAM only") and ("LAM" in plex):
            factor = 1
            model = "linear"
        elif (curve == "LAM only") and ("LAM" not in plex):
            factor = 100
            model = constant_model
        else:
            raise ValueError("Unknown curve/plex combination: %s" % curve)
        q.cal_curves[curve, plex] = wlt.CalCurve(model=model).fit(
            X=df["Replicate Conc."] * factor, y=df["Replicate AEB"]
        )

        aebs = pd.concat(
            [df["Replicate Conc."] * factor, df["Replicate AEB"]], axis=1
        ).groupby("Replicate Conc.")["Replicate AEB"]
        cc_y = aebs.median()
        cc_x = cc_y.index
        yerr = abs(aebs.quantile([0, 1]).unstack().sub(cc_y, axis=0)).T
        ax.errorbar(cc_x, cc_y, yerr=yerr, fmt=fmt, color=color, **kws)
        ax.errorbar(
            [-np.inf],
            [-np.inf],
            yerr=[1],
            fmt=f"{fmt}-",
            color=color,
            label=plex[4:],
            **kws,
        )

        # Curve
        x = np.array(
            [0, np.log(2.56) / np.log(10) * xranges[curve][0]]
            + [np.nan]
            + list(np.geomspace(*xranges[curve], num=50))
        )
        if model == "linear":
            ax.plot(
                x, q.cal_curves["Both", plex].signal(x), "-", color=color, alpha=0.75
            )
        else:
            ax.plot(
                x,
                q.cal_curves[curve, plex].signal(np.zeros_like(x)),
                "-",
                color=color,
                alpha=0.75,
            )

axs[0].set(
    xlabel="Ag85B (pg/mL)",
    ylabel="AEB",
    ylim=(0.01, 30),
    xlim=(-0.00025, 8 * np.sqrt(5)),
    yscale="log",
)
axs[1].set(
    xlabel="LAM (pg/mL)", ylim=(0.01, 30), xlim=(-0.025, 800 * np.sqrt(5)), yscale="log"
)
axs[0].set_xscale("symlog", linthresh=1e-3)
axs[1].set_xscale("symlog", linthresh=1e-1)


axs[0].legend(loc="upper left", reverse=True, framealpha=1)
axs[1].legend(loc="upper left", reverse=True, framealpha=1)

plt.tight_layout()
# plt.savefig("../output/figures/f_dropouts_v0.pdf")

In [None]:
def get_barcode(barcode: str):
    if barcode.startswith("Calibrator"):
        return {"Specimen Barcode": barcode, "Purpose": "Dropouts"}

    split_barcode = barcode.split()
    if " X " in barcode:
        try:
            sample_barcode = Barcode(split_barcode[0])
        except ValueError:
            sample_barcode = split_barcode[0]
        return {
            "Specimen Barcode": sample_barcode,
            "Dilution Factor": float(split_barcode[-1]),
            "Purpose": "Dilution Linearity",
        }
    elif len(split_barcode) == 3:
        return {
            "Specimen Barcode": Barcode(split_barcode[0]),
            f"{split_barcode[1]} Spike": int(split_barcode[-1]),
            "Purpose": "Spike and Recovery",
        }
    elif len(split_barcode) == 2:
        sample_1_frac = {"a": 1.0, "b": 0.75, "c": 0.5, "d": 0.25, "e": 0.0}[
            split_barcode[-1]
        ]
        return {
            "Specimen Barcode": split_barcode[0][-1],
            "Sample 1 Fraction": sample_1_frac,
            "Sample 2 Fraction": 1 - sample_1_frac,
            "Purpose": "Admixture Linearity",
        }


meta = pd.DataFrame(
    (get_barcode(barcode) for barcode in Xs.index), index=Xs.index
).fillna({"Dilution Factor": 2, "LAM Spike": 0, "Ag85B Spike": 0})
Xs[[("meta", col) for col in meta.columns]] = meta

## Spike and Recovery

In [None]:
fig, axs = plt.subplots(ncols=2, nrows=2, figsize=(9, 10))


def get_recovery_df(plex: str, aggfunc: str) -> pd.DataFrame:
    return (
        Xs[Xs[("meta", "Purpose")] == "Spike and Recovery"]
        .pivot_table(
            index=[("meta", f"{plex.split()[1]} Spike")],
            columns=[("meta", "Specimen Barcode")],
            values=[(aggfunc, plex)],
        )
        .dropna(axis=1)
        * DILUTION_FACTOR
    )[(aggfunc, plex)]


recovered_conc = {plex: get_recovery_df(plex, "median") for plex in plexes}

recovered_pct = {
    key: (value.iloc[1:] - value.loc[0]).div(value.index[1:], axis=0)
    for key, value in recovered_conc.items()
}

ticks = {
    "Ag85B": (([0, 5, 10, 15, 20],), ([0, 5, 10, 15, 20, 25, 30],)),
    "LAM": (
        (
            [0, 40, 200, 400, 800, 1200, 1600],
            ["0  ", "  40", "200", "400", "800", "1200", "1600"],
        ),
        ([0, 40, 200, 400, 800, 1200, 1600, 2000],),
    ),
}

color_fmt = zip(wlt._plot.COLORS["colorblind"], "o^sd*hXv")
barcode_color_fmt = {}
for ax, plex in zip(np.ravel(axs), plexes):
    analyte = plex.split()[1]
    y = get_recovery_df(plex, "median")
    yerr_pos = get_recovery_df(plex, "max") - y
    yerr_neg = y - get_recovery_df(plex, "min")
    xmax = y.index.max() * 1.1
    ymax = xmax * 1.2

    # if plex == "700 LAM FIND28":
    #     y = y.drop(index=40)
    #     yerr_pos = yerr_pos.drop(index=40)
    #     yerr_neg = yerr_neg.drop(index=40)

    for barcode in y.columns:
        if barcode not in barcode_color_fmt:
            barcode_color_fmt[barcode] = next(color_fmt)
        color, marker = barcode_color_fmt[barcode]
        avg_recovery = (
            (y[barcode].iloc[1:] - y[barcode].loc[0]) / y[barcode].index[1:]
        ).mean()
        x = y.index
        yerr = [yerr_neg[barcode], yerr_pos[barcode]]
        ax.errorbar(
            x,
            y[barcode],
            yerr=yerr,
            fmt=f"{marker}-",
            label=f"average {avg_recovery:.0%}",
            alpha=0.75,
            capsize=3,
            color=color,
        )
    ax.fill_between(
        [0, xmax],
        [0, 0.8 * xmax],
        [0, 1.2 * xmax],
        color="k",
        alpha=0.05,
        lw=0,
        label="80–120% recovery",
        zorder=-7,
    )
    ax.legend(framealpha=1, loc="upper left")
    ax.set_xticks(*ticks[analyte][0])
    ax.set_yticks(*ticks[analyte][1])
    ax.set(
        title=plex,
        xlabel=f"{analyte} Spike (pg/mL)",
        ylabel=f"Recovered {analyte} concentration (pg/mL)",
        aspect="equal",
        xlim=(0, xmax),
        ylim=(0, ymax),
    )

plt.tight_layout()
plt.savefig("../results/f_sr_v0.pdf")

In [None]:
barcode_color_fmt

## Dilution linearity

In [None]:
fig, axs = plt.subplots(ncols=2, nrows=2, figsize=(9, 10))


def get_dilution_df(plex: str, aggfunc: str) -> pd.DataFrame:
    df = (
        Xs[Xs[("meta", "Purpose")] == "Dilution Linearity"].pivot_table(
            index=[("meta", "Dilution Factor")],
            columns=[("meta", "Specimen Barcode")],
            values=[(aggfunc, plex)],
        )
    )[(aggfunc, plex)]
    inf_fold = df.loc[np.inf, "DilutionLinearity"]
    df.loc[np.inf] = inf_fold
    return df.dropna(thresh=2, axis=1)


diluted_conc = {plex: get_dilution_df(plex, "median") for plex in plexes}

recovered_conc = {
    key: value.mul(value.index, axis=0) for key, value in diluted_conc.items()
}

xticks = {
    "ticks": [0, 1 / 16, 1 / 8, 1 / 4, 1 / 2, 1],
    "labels": ["0 ", "¹/₁₆", "⅛", "¼", "½", "1"],
}


def minimize_L1_objective(x, y_min, y_max):
    x = np.array(x)
    y_min = np.array(y_min)
    y_max = np.array(y_max)

    n = len(x)
    a = y_min / x
    b = y_max / x

    # Combine and sort unique values
    critical_points = np.sort(np.unique(np.concatenate((a, b))))

    best_alpha = None
    min_penalty = float("inf")

    for i in range(len(critical_points) - 1):
        mid_alpha = (critical_points[i] + critical_points[i + 1]) / 2

        penalty = 0
        for j in range(n):
            if mid_alpha * x[j] < y_min[j]:
                penalty += y_min[j] - mid_alpha * x[j]
            elif mid_alpha * x[j] > y_max[j]:
                penalty += mid_alpha * x[j] - y_max[j]

        if penalty < min_penalty:
            min_penalty = penalty
            best_alpha = mid_alpha

    return best_alpha, min_penalty


color_fmt = zip(wlt._plot.COLORS["rcParams"], "o^sd*hXv")
barcode_color_fmt = {}
for ax, plex in zip(np.ravel(axs), plexes):
    analyte = plex.split()[1]
    y = get_dilution_df(plex, "median")
    y_max = get_dilution_df(plex, "max")
    y_min = get_dilution_df(plex, "min")
    xmax = y.index.max() * 1.1
    ymax = xmax * 1.2
    for barcode in y.columns:
        if barcode not in barcode_color_fmt:
            barcode_color_fmt[barcode] = next(color_fmt)
        color, marker = barcode_color_fmt[barcode]
        x = 1 / y.index
        yerr = [(y - y_min)[barcode], (y_max - y)[barcode]]
        alpha, min_penalty = minimize_L1_objective(
            x[:-1], y_min[barcode].iloc[:-1], y_max[barcode].iloc[:-1]
        )
        ax.errorbar(
            x,
            y[barcode],
            yerr=yerr,
            fmt=f"{marker}",
            label=barcode,
            alpha=0.75,
            capsize=3,
            color=color,
        )
        ax.plot(
            x[y[barcode].notna()],
            alpha * x[y[barcode].notna()],
            color=color,
            alpha=0.75,
        )
    ax.legend(framealpha=1)
    ax.set_xticks(**xticks)
    ax.set(
        title=plex,
        xlabel="1/dilution factor",
        ylabel=f"{analyte} concentration (pg/mL)",
        xlim=(0, 1),
        ylim=(0, ax.get_ylim()[1]),
    )

plt.tight_layout()
plt.savefig("../results/f_dl_v0.pdf")

## Admixture linearity

In [None]:
fig, axs = plt.subplots(ncols=2, nrows=2, figsize=(9, 10))


def get_admixture_df(plex: str, aggfunc: str) -> pd.DataFrame:
    df = (
        Xs[Xs[("meta", "Purpose")] == "Admixture Linearity"].pivot_table(
            index=[("meta", "Sample 1 Fraction")],
            columns=[("meta", "Specimen Barcode")],
            values=[(aggfunc, plex)],
        )
    )[(aggfunc, plex)] * DILUTION_FACTOR
    return df


xticks = {
    "ticks": [0, 0.25, 0.5, 0.75, 1],
    "labels": ["0", "0.25", "0.5", "0.75", "1"],
}


from scipy.optimize import minimize


def objective(params, x, y_min, y_max):
    alpha, beta = params
    penalties = np.maximum(0, alpha * x + beta - y_max) + np.maximum(
        0, y_min - (alpha * x + beta)
    )
    return np.sum(penalties)


def minimize_L1_objective_linear(x, y_min, y_max):
    initial_guess = [0.0, 0.0]

    result = minimize(
        objective, initial_guess, args=(x, y_min, y_max), method="L-BFGS-B"
    )

    alpha, beta = result.x
    min_penalty = result.fun

    return alpha, beta, min_penalty


color_fmt = zip(wlt._plot.COLORS["rcParams"], "o^sd*hXv")
barcode_color_fmt = {}
for ax, plex in zip(np.ravel(axs), plexes):
    analyte = plex.split()[1]
    y = get_admixture_df(plex, "median")
    y_max = get_admixture_df(plex, "max")
    y_min = get_admixture_df(plex, "min")
    xmax = y.index.max() * 1.1
    ymax = xmax * 1.2
    for barcode in y.columns:
        if barcode not in barcode_color_fmt:
            barcode_color_fmt[barcode] = next(color_fmt)
        color, marker = barcode_color_fmt[barcode]
        yerr = [(y - y_min)[barcode], (y_max - y)[barcode]]
        alpha, beta, min_penalty = minimize_L1_objective_linear(
            y.index, y_min[barcode], y_max[barcode]
        )
        ax.errorbar(
            y.index,
            y[barcode],
            yerr=yerr,
            fmt=f"{marker}",
            label=barcode,
            alpha=0.75,
            capsize=3,
            color=color,
        )
        ax.plot(
            y.index,
            alpha * y.index + beta,
            color=color,
            alpha=0.75,
        )
    ax.legend(framealpha=1)
    ax.set_xticks(**xticks)
    ax.set(
        title=plex,
        xlabel="% sample A and B, respectively",
        ylabel=f"{analyte} concentration (pg/mL)",
        xlim=(-0.01, 1.01),
        ylim=(0, ax.get_ylim()[1]),
        xticks=[0, 0.25, 0.5, 0.75, 1],
        xticklabels=["0\n100", "25\n75", "50\n50", "75\n25", "100\n0"],
    )

plt.tight_layout()
plt.savefig("../results/f_al_v0.pdf")