# Local quadratic acq model and plot acq stats for 2022-12 PEA test set

This generates plots of the 2022-Dec ASVT data set with MAXMAG clipped with flight data
2019-July-01 (approx start time of MAXMAG clipping in flight products).

Reference page:
https://occweb.cfa.harvard.edu/twiki/bin/view/Aspect/PeaAcqModelCalDec2022Testing

In [None]:
import numpy as np
from pathlib import Path
import os
from collections import Counter
import itertools
import warnings

from scipy import stats
from scipy.stats import binom
from scipy import optimize
from astropy.table import Table, vstack
import matplotlib.pyplot as plt
from matplotlib import patches
import tables
from cxotime import CxoTime
import agasc
from chandra_aca.star_probs import binom_ppf
import matplotlib.style
matplotlib.style.use('bmh')

from utils_stats import flatten_pea_test_data, read_twiki_csv, get_acq_stats_data

%matplotlib inline

In [None]:
SKA = Path(os.environ["SKA"])


In [None]:
topic = "PeaAcqModelCalDec2022Testing/"
name = "pea_analysis_2022_336_AcqProbModel_calibration_test_results.csv"
dat8 = read_twiki_csv(topic + name)


In [None]:
asvt = flatten_pea_test_data(dat8)
# Rename to conventions of acq stats database
asvt.rename_column("star_mag", "mag_aca")
asvt.rename_column("ccd_temp", "t_ccd")
asvt.rename_column("search_success", "obc_id")
asvt.rename_column("search_box_hw", "halfwidth")
# Coerce uint8 columns (which are all actually bool) to bool
asvt["obc_id"] = asvt["obc_id"].astype(bool)
asvt.info


In [None]:
Counter(asvt["t_ccd"])


In [None]:
# plt.hist(datf['star_mag'], bins=20)
Counter(asvt["mag_aca"])


In [None]:
flt = get_acq_stats_data()
ok = ~np.isclose(flt["color"], 1.5)
flt = flt[ok]


In [None]:
flt.info

In [None]:
flt_asvt = vstack([flt, asvt], join_type="inner")


In [None]:
def get_vals_and_bins(vals):
    out_vals = np.array(sorted(set(vals)))
    out_val_centers = (out_vals[1:] + out_vals[:-1]) / 2
    out_val_bins = np.concatenate(
        [
            [out_vals[0] - 0.5],
            out_val_centers,
            [out_vals[-1] + 0.5],
        ]
    )
    return out_vals, out_val_bins


In [None]:
t_ccd_vals, t_ccd_bins = get_vals_and_bins(asvt["t_ccd"])
mag_vals, mag_bins = get_vals_and_bins(asvt["mag_aca"])
halfwidth_vals, halfwidth_bins = get_vals_and_bins(asvt["halfwidth"])


In [None]:
print(mag_vals)
print(t_ccd_vals)
print(halfwidth_vals)

In [None]:
# Aggregate binned number of samples and successes for ASVT data


def get_samples_successes(dat, mag_bins, t_ccd_bins, halfwidth_bins):
    zeros = np.zeros(
        shape=(len(mag_bins) - 1, len(t_ccd_bins) - 1, len(halfwidth_bins) - 1),
        dtype=int,
    )
    n_samp = zeros.copy()
    n_succ = zeros.copy()

    # Bin halfwidths (narrow since ASVT data are all at the same mag, T_ccd)
    for ii, mag0, mag1 in zip(itertools.count(), mag_bins[:-1], mag_bins[1:]):
        ok0 = (dat["mag_aca"] >= mag0) & (dat["mag_aca"] < mag1)
        for jj, t_ccd0, t_ccd1 in zip(
            itertools.count(), t_ccd_bins[:-1], t_ccd_bins[1:]
        ):
            ok1 = (dat["t_ccd"] >= t_ccd0) & (dat["t_ccd"] < t_ccd1)
            for kk, halfwidth0, halfwidth1 in zip(
                itertools.count(), halfwidth_bins[:-1], halfwidth_bins[1:]
            ):
                ok2 = (dat["halfwidth"] >= halfwidth0) & (dat["halfwidth"] < halfwidth1)
                ok = ok0 & ok1 & ok2
                n_samp[ii, jj, kk] = np.count_nonzero(ok)
                n_succ[ii, jj, kk] = np.count_nonzero(dat["obc_id"][ok])

    return n_samp, n_succ

In [None]:
n_samp_flt, n_succ_flt = get_samples_successes(
    flt, mag_bins, t_ccd_bins, halfwidth_bins
)


In [None]:
n_samp_asvt, n_succ_asvt = get_samples_successes(
    asvt, mag_bins, t_ccd_bins, halfwidth_bins
)


In [None]:
n_samp = n_samp_flt + n_samp_asvt
n_succ = n_succ_flt + n_succ_asvt
p_succ = n_succ / n_samp
p_fail = 1 - p_succ

In [None]:
def as_table(arr, fmt=None):
    """Turn one of the summary arrays into a readable table"""
    t = Table()
    t["mag"] = [str(val) for val in mag_vals]
    names = [f"{t_ccd:.1f}" for t_ccd in t_ccd_vals]
    for jj, name in enumerate(names):
        t[name] = arr[:, jj]
        if fmt:
            t[name].info.format = fmt
    return t


In [None]:
as_table(n_samp[:, :, -1])

In [None]:
def get_successes_slice(
    dat,
    mag0=None,
    mag1=None,
    t_ccd0=None,
    t_ccd1=None,
    halfwidth0=None,
    halfwidth1=None,
):
    ok = np.ones_like(dat["mag_aca"], dtype=bool)
    if mag0 is not None:
        ok &= dat["mag_aca"] >= mag0
    if mag1 is not None:
        ok &= dat["mag_aca"] < mag1
    if t_ccd0 is not None:
        ok &= dat["t_ccd"] >= t_ccd0
    if t_ccd1 is not None:
        ok &= dat["t_ccd"] < t_ccd1
    if halfwidth0 is not None:
        ok &= dat["halfwidth"] >= halfwidth0
    if halfwidth1 is not None:
        ok &= dat["halfwidth"] < halfwidth1
    return dat[ok]


In [None]:
def calc_p_succ(pars, xs, x0s, order, as_probit=False):
    """
    Binomial probability model

    :param pars: p0, p1, p2 (quadratic in t_ccd) and floor (min p_fail)
    :param t_ccd: t_ccd (degC) or scaled t_ccd if rescale is False.
    :param tc2: (scaled t_ccd) ** 2, this is just for faster fitting
    :param box_delta: delta p_fail for search box size
    :param rescale: rescale t_ccd to about -1 to 1 (makes P0, P1, P2 better-behaved)
    :param probit: return probability as probit instead of 0 to 1.
    """
    xs = np.asarray(xs)
    x0s = np.asarray(x0s)

    p_succ_probit = pars[-1] * np.ones(shape=xs.shape[1:])

    for ii in range(len(x0s)):
        dx = (xs[ii, ...] - x0s[ii])
        if order == 1:
            p_succ_probit[...] += pars[ii] * dx
        elif order == 2:
            p_succ_probit[...] += pars[2 * ii] * dx + pars[2 * ii + 1] * dx ** 2
        else:
            raise ValueError(f"order={order} not supported")

    # Possibly transform from probit to linear probability
    out = p_succ_probit if as_probit else stats.norm.cdf(p_succ_probit)

    return out


In [None]:
def calc_binom_stat(succ, p_succ):
    """
    Calculate log-likelihood for a binomial probability distribution.

    Defining p = model, then probability of seeing data == 1 is p and
    probability of seeing data == 0 is (1 - p).  Note here that ``data``
    is strictly either 0.0 or 1.0, and np.where interprets those float
    values as False or True respectively.

    Parameters
    ----------
    succ : array-like
        Array of successes (True or 1) or failures (False or 0)
    p_succ : array-like (same shape as ``succ``)
        Array of probabilities of success
    """
    bad = (p_succ < 0) | (p_succ > 1)
    if np.any(bad):
        raise ValueError(f"p_succ must be in the range 0 to 1 (got {p_succ[bad]})")
    p_succ = p_succ.clip(1e-8, 1 - 1e-8)
    fit_stat = -np.sum(np.log(np.where(succ, p_succ, 1.0 - p_succ)))
    return fit_stat


In [None]:
def calc_fit_stat(pars, xs, x0s, succ, order):
    p_succ = calc_p_succ(pars, xs, x0s, order)
    fit_stat = calc_binom_stat(succ, p_succ)
    return fit_stat

In [None]:
xs = [np.random.uniform(0, 3, size=1000), np.random.uniform(0, 3, size=1000)]
x0s = [1.5, 1.0]
pars = [1, 0.5, 0.25]


In [None]:
p_succ = calc_p_succ(pars, xs, x0s, order=1)

In [None]:
succ = np.random.uniform(size=p_succ.shape) < p_succ

In [None]:
calc_fit_stat(pars, xs, x0s, succ, order=1)

In [None]:
calc_fit_stat([1, 1.0], xs, x0s, succ, order=1)

In [None]:
%time optimize.minimize(calc_fit_stat, [0.0, 0.0, 0.0], args=(xs, x0s, succ, 1))

In [None]:
dok = get_successes_slice(
    flt_asvt,
    mag0=10.125,
    mag1=10.375,
    t_ccd0=-10,
    t_ccd1=-5,
    halfwidth0=55,
    halfwidth1=65,
)

In [None]:
np.mean(dok["obc_id"])


In [None]:
len(dok)

In [None]:
%%time
xs = [dok['mag_aca'], dok['t_ccd']]
x0s = [8.0, -14.0]
res = optimize.minimize(calc_fit_stat, [0.0, 0.0, 0.0], args=(xs, x0s, dok['obc_id'], 1))
res

In [None]:
from itertools import count

In [None]:
def iterbins(bin_edges, width):
    """
    Centers:     0   1   2   3   4   5   6   7   8   9
    Edges:     0   1   2   3   4   5   6   7   8   9   10
    width=1 answers:
       (0, 2), (0, 3), (1, 4) ...
    """
    i_min = 0
    i_max = len(bin_edges) - 1
    for i_center in range(len(bin_edges) - 1):
        i_edge0 = np.clip(i_center - width, i_min, i_max)
        i_edge1 = np.clip(i_center + width + 1, i_min, i_max)
        yield i_center, bin_edges[i_edge0], bin_edges[i_edge1]

In [None]:
for i_center, mag0, mag1 in iterbins(mag_bins, 2):
    print(i_center, mag0, mag1)

In [None]:
n_succ.size

In [None]:
print(mag_vals)
print(mag_bins)

In [None]:
p_fit = np.zeros(shape=n_succ.shape, dtype=float)
p_succ = np.zeros_like(p_fit)

for i_mag, mag0, mag1 in iterbins(mag_bins, width=1):
    mag = mag_vals[i_mag]
    print(f"{mag=} {i_mag=} ({mag0} to {mag1})")

    for i_t_ccd, t_ccd0, t_ccd1 in iterbins(t_ccd_bins, width=1):
        t_ccd = t_ccd_vals[i_t_ccd]
        print(f"{t_ccd=} {i_t_ccd=} ({t_ccd0} to {t_ccd1})")

        for i_halfwidth, halfwidth0, halfwidth1 in iterbins(halfwidth_bins, width=1):
            halfwidth = halfwidth_vals[i_halfwidth]
            # print(f"{halfwidth=} {i_halfwidth=} ({halfwidth0} to {halfwidth1})")
            dok = get_successes_slice(
                flt_asvt,
                mag0=mag0,
                mag1=mag1,
                t_ccd0=t_ccd0,
                t_ccd1=t_ccd1,
                halfwidth0=halfwidth0,
                halfwidth1=halfwidth1,
            )
            xs = [dok["mag_aca"]]  # , dok["t_ccd"], dok["halfwidth"]]
            x0s = [mag]  # , t_ccd, halfwidth]
            res = optimize.minimize(
                calc_fit_stat, [0.0, 0.0], args=(xs, x0s, dok["obc_id"], 1)
            )
            p_succ0 = np.count_nonzero(dok["obc_id"]) / len(dok)
            p_fit0 = stats.norm.cdf(res.x[-1])
            p_fit[i_mag, i_t_ccd, i_halfwidth] = p_fit0
            p_succ[i_mag, i_t_ccd, i_halfwidth] = p_succ0
            # print(f"{mag=} {t_ccd=} {halfwidth=} {p_fit0=:.2f} {p_succ=:.2f}")

In [None]:
dok


In [None]:
p_fit[-1].round(3)

In [None]:
p_succ[-1].round(3)


In [None]:
n_samp[-1]


In [None]:
n_succ[-1]


In [None]:
AXIS_LABELS = ["mag", "t_ccd", "halfwidth"]
X_VALS = [mag_vals, t_ccd_vals, halfwidth_vals]


def make_plots_grid(x_axis="t_ccd", x_grid="halfwidth", y_grid="mag"):
    idx_x_axis = AXIS_LABELS.index(x_axis)
    idx_x_grid = AXIS_LABELS.index(x_grid)
    idx_y_grid = AXIS_LABELS.index(y_grid)

    n_x_axis = n_samp.shape[idx_x_axis]
    n_x_grid = n_samp.shape[idx_x_grid]
    n_y_grid = n_samp.shape[idx_y_grid]

    size_per_plot = 2.0
    figsize = (size_per_plot * n_x_grid, size_per_plot * n_y_grid)
    fig, axes = plt.subplots(
        nrows=n_y_grid, ncols=n_x_grid, figsize=figsize, sharex=True, sharey=True
    )
    idxs_point = [None, None, None]

    for i_row in range(n_y_grid):
        idxs_point[idx_y_grid] = i_row

        for i_col in range(n_x_grid):
            idxs_point[idx_x_grid] = i_col
            x = X_VALS[idx_x_axis]
            y = np.empty(shape=(n_x_axis,))
            y_fit = np.empty(shape=(n_x_axis,))
            yerr = np.empty(shape=(2, n_x_axis))

            for i_x in range(n_x_axis):
                idxs_point[idx_x_axis] = i_x
                ijk = tuple(idxs_point)
                k = n_succ[ijk]
                n = n_samp[ijk]
                with warnings.catch_warnings():
                    warnings.simplefilter("ignore")
                    sig_low, sig_high = binom_ppf(k, n, [0.16, 0.84])
                p = k / n
                y[i_x] = p
                y_fit[i_x] = p_fit[ijk]
                yerr[0, i_x] = p - sig_low
                yerr[1, i_x] = sig_high - p

            ax = axes[i_row, i_col]
            ax.errorbar(x, y, yerr=yerr, fmt="o-", color="C0")
            ax.plot(x, y_fit, "-", color="C1")
            ax.set_ylim(0, 1)
            ax.text(
                0.05,
                0.05,
                f"{x_grid}={X_VALS[idx_x_grid][i_col]}",
                horizontalalignment="left",
                verticalalignment="bottom",
                transform=ax.transAxes,
                fontsize="small",
            )
            ax.text(
                0.05,
                0.15,
                f"{y_grid}={X_VALS[idx_y_grid][i_row]}",
                horizontalalignment="left",
                verticalalignment="bottom",
                transform=ax.transAxes,
                fontsize="small",
            )
            # ax.set_title(f"mag={mag_vals[i_mag]:.1f}")

    fig.subplots_adjust(hspace=0, wspace=0)
    fig.tight_layout()


In [None]:
make_plots_grid(x_axis="t_ccd", x_grid="halfwidth", y_grid="mag")

In [None]:
make_plots_grid(x_axis="halfwidth", x_grid="mag", y_grid="t_ccd")

In [None]:
make_plots_grid(x_axis="mag", x_grid="halfwidth", y_grid="t_ccd")