# Prompt Processing Group Summary for {{ params.date }}

In [None]:
# Times Square parameters
date = "2024-04-01"
instrument = "LATISS"
survey = "AUXTEL_PHOTO_IMAGING"
mode = "INFO"

In [None]:
dayobs = int(date.replace("-", ""))

In [None]:
from astropy.time import Time, TimeDelta
import boto3
import json

%matplotlib inline
import matplotlib.pyplot as plt
import pandas
import re
from lsst_efd_client import EfdClient

In [None]:
import logging

logger = logging.getLogger("analysis")
level = getattr(logging, mode)
logger.setLevel(level)

In [None]:
match instrument:
    case "LATISS":
        sal_index = 2
        n_detector = 1
    case "LSSTComCamSim":
        sal_index = 3
        n_detector = 9
    case _:
        logger.error(f"Unknown instrument {instrument}")

In [None]:
async def get_df_from_next_visit_events(date):
    client = EfdClient("usdf_efd")

    start = Time(date, scale="utc", format="isot") + TimeDelta(
        12 * 60 * 60, format="sec"
    )
    end = start + TimeDelta(1, format="jd")

    topic = "lsst.sal.ScriptQueue.logevent_nextVisit"
    df = await client.select_time_series(topic, ["*"], start.utc, end.utc)
    all_canceled = await client.select_time_series(
        topic + "Canceled", ["*"], start.utc, end.utc
    )

    if df.empty:
        logger.info(f"No events on {date}")
        return None

    # Only select on-sky AuxTel imaging survey
    df = df.loc[
        (df["coordinateSystem"] == 2)
        & (df["salIndex"] == sal_index)
        & (df["survey"] == survey)
    ].set_index("groupId")
    logger.info(f"There were {len(df)} {survey} nextVisit events on {date}")

    # Ignore the explicitly canceled groups
    if not all_canceled.empty:
        canceled = df.index.intersection(
            all_canceled.set_index("groupId").index
        ).tolist()
        if canceled:
            logger.info(f"{len(canceled)} events were canceled {canceled}")
            df = df.drop(canceled)

    return df

In [None]:
df_efd = await get_df_from_next_visit_events(date)

In [None]:
from lsst.daf.butler import Butler

butler = Butler("/repo/embargo", writeable=False)

In [None]:
results = butler.registry.queryDimensionRecords(
    "exposure",
    where="exposure.science_program IN (survey) "
    "and instrument=instrument_name and exposure.day_obs=dayobs  ",
    bind={"dayobs": dayobs, "instrument_name": instrument, "survey": survey},
)
logger.info(f"Found {results.count()} raw exposure records in {butler} for {dayobs}")

embargo_records = dict()
for record in results:
    embargo_records[record.id] = record

if embargo_records:
    df_butler = pandas.DataFrame.from_records(
        [embargo_records[num].toDict() for num in embargo_records]
    ).set_index("group_name")
else:
    df_butler = None
    logger.warning("No raw records found in /repo/embargo. Notebook will fail.")

In [None]:
groups_no_raw = set(df_efd.index) - set(df_butler.index)
if groups_no_raw:
    logger.info(
        f"{len(groups_no_raw)} group had records in EFD but no raws in the embargo butler: {groups_no_raw}"
    )

In [None]:
df_efd = df_efd.drop(groups_no_raw)

In [None]:
if len(df_efd) != len(df_butler):
    logger.warning("Counts do not match; need attention")

In [None]:
df_md = pandas.merge(
    df_efd.reset_index(),
    df_butler.reset_index(),
    how="outer",
    left_on="groupId",
    right_on="group_name",
    suffixes=("_efd", "_butler"),
    validate="one_to_one",
).set_index("groupId")

logger.info(f"Total: {len(df_md)} groups in the table")

In [None]:
boring_cols = [
    "instrument_efd",
    "instrument_butler",
    "science_program",
    "observation_reason",
    "observation_type",
    "has_simulated",
    "dome",
    "coordinateSystem",
    "rotationSystem",
    "private_identity",
    "private_origin",
    "private_revCode",
    "salIndex",
    "totalCheckpoints",
    "nimages",
    "day_obs",
    "survey",
    "exposure_time",
]
for col in boring_cols:
    if df_md[col].nunique() == 1:
        logger.debug(f"column {col} has only {df_md[col].unique()}")
        df_md.drop(
            columns=[
                col,
            ],
            inplace=True,
        )
    else:
        logger.warning(f"Column {col} has {df_md[col].unique()}")

In [None]:
for col in ("physical_filter",):
    logger.info(f"Column {col} has {df_md[col].unique()}")

In [None]:
logger.debug(f"Columns: {df_md.columns.to_list()}")

In [None]:
df_md["offset_ra"] = df_md["position0"] - df_md["tracking_ra"]
df_md["offset_dec"] = df_md["position1"] - df_md["tracking_dec"]
df_md["offset_ang"] = (df_md["cameraAngle"] - df_md["sky_angle"])%360.0 

In [None]:
if level < 20:
    display(
        df_md[
            [
                "seq_num",
                "position0",
                "tracking_ra",
                "offset_ra",
                "position1",
                "tracking_dec",
                "offset_dec",
                "cameraAngle",
                "sky_angle",
                "offset_ang",
                "physical_filter",
                "id",
            ]
        ]
    )

In [None]:
df_md[["offset_ra", "offset_dec", "offset_ang"]].describe()

In [None]:
if instrument != "LATISS":
    logger.warning(f"The rest of the notebook is not ready for {instrument}")

In [None]:
def get_exposure_id(butler, dayobs: int):
    """
    Returns
    -------
    df : `pandas.DataFrame`
    """
    results = butler.registry.queryDimensionRecords(
        "exposure",
        where="instrument=instrument_name and exposure.observation_type='science' and"
        " exposure.day_obs=dayobs",
        bind={"dayobs": dayobs, "instrument_name": instrument},
    )

    df = pandas.DataFrame(
        [(_.group_name, _.id) for _ in results], columns=["groupId", "expId"]
    ).set_index("groupId")

    return df

In [None]:
def get_df_file(df_expId, butler, datasetType, where="", collections=...):
    """
    Get the last-moditied timestamps of the dataset files in a bucket-based butler repo at USDF

    Returns
    -------
    df : `pandas.DataFrame`
    """
    s3_endpoint = "https://s3dfrgw.slac.stanford.edu"
    s3client = boto3.client("s3", endpoint_url=s3_endpoint)
    refs = butler.registry.queryDatasets(
        datasetType=datasetType,
        collections=collections,
        where=where,
    )

    timestamps = list()
    dimension = None
    if not refs.count():
        logger.warning(f"No {datasetType} were found. Expect errors.")
    for ref in refs:
        if not dimension:
            if "visit" in ref.dataId:
                dimension = "visit"
            else:
                dimension = "exposure"
        fits_uri = butler.getURI(ref)
        time_written = s3client.head_object(
            Bucket=fits_uri.netloc,
            Key=fits_uri.relativeToPathRoot,
        )["LastModified"]
        timestamps.append((ref.dataId[dimension], ref.dataId["detector"], time_written))

    df = pandas.DataFrame.from_records(data=timestamps, columns=["expId", "detector", "file"])
    df = pandas.merge(
        df,
        df_expId.reset_index(),
        left_on="expId",
        right_on="expId",
        how="left",
        validate="many_to_one",
    ).set_index("groupId")
    return df

In [None]:
df_expId = get_exposure_id(butler, dayobs)

In [None]:
df_raw = get_df_file(
    df_expId,
    butler,
    "raw",
    collections=[f"{instrument}/raw/all"],
    where=f"exposure.science_program IN ('{survey}') and instrument='{instrument}' and exposure.day_obs={dayobs}",
)

In [None]:
if len(df_raw) != len(df_md) * n_detector:
    logger.warning("Counts of raw files do not match; need attention")

In [None]:
df_md2 = df_md.merge(
    df_raw[["file", "detector"]],
    how="outer",
    left_index=True,
    right_index=True,
    validate="one_to_many",
)

In [None]:
df_md2.rename(columns={"file": "ts_raw"}, inplace=True)

In [None]:
output_chain = f"{instrument}/prompt/output-{date}"
collections = [_ for _ in butler.registry.queryCollections(output_chain, flattenChains=True)]
logger.info(f"Output chain {output_chain} has {collections}")

In [None]:
def count_files(dataset_type = "isr_log"):
    df = get_df_file(
        df_expId,
        butler,
        dataset_type,
        collections=collections,
    )
    count = len(df[df["file"].notnull()].index)
    logger.info(f"{count} {dataset_type} were stored")
    return df

In [None]:
tasks = ["isr", "calibrateImage", "retrieveTemplate", "subtractImages",
         "detectAndMeasure", "diffimTaskCore", "filterDiaSrcCat",
         "transformDiaSrcCat", "diaPipe"]

In [None]:
for dataset_type in [_+"_log" for _ in tasks]:
    count_files(dataset_type)

In [None]:
df_output = count_files("initial_pvi")
count = len(df_output[df_output["file"].notnull()].index)
df_output = count_files("apdb_marker")

In [None]:
if count:
    df_md2 = df_md2.merge(
        df_output,
        how="outer",
        on=["groupId", "detector"],
        validate="one_to_one",
    )
    df_md2.rename(columns={"file": "ts_output"}, inplace=True)

In [None]:
def get_loki_command_with_phrase(time_start, time_end, phrase, extra=f" |~ \"{instrument}\""):
    command = f"""/sdf/data/rubin/user/hchiang2/times_square/logcli-linux-amd64 --output=jsonl --tls-skip-verify query --addr=http://sdfloki.slac.stanford.edu:80 --timezone=UTC -q --limit=2000 --from="{time_start}" --to="{time_end}" --proxy-url=http://sdfproxy.sdf.slac.stanford.edu:3128  '{{namespace="vcluster--usdf-prompt-processing", container="user-container", pod=~"prompt-proto-service-.+"}} |~ "{phrase}" {extra} ' """
    return command

In [None]:
def get_df_from_loki(date, search_phrase="Waiting for snaps", extra=f" |~ \"{instrument}\""):
    start = Time(date, scale="utc", format="isot") + TimeDelta(
        12 * 60 * 60, format="sec"
    )
    end = start + TimeDelta(1, format="jd")
    command = get_loki_command_with_phrase(
        start.strftime("%Y-%m-%dT%H:%M:%SZ"),
        end.strftime("%Y-%m-%dT%H:%M:%SZ"),
        search_phrase,
        extra,
    )
    results = !{command}
    logger.debug(f"Got {len(results)} Loki records for {search_phrase}")
    if not results:
        return pandas.DataFrame(columns=["group", "detector", "ts"])

    data = [json.loads(_) for _ in results]
    df = pandas.json_normalize(data)
    df = df.merge(
        pandas.json_normalize(df["line"].apply(json.loads)),
        left_index=True,
        right_index=True,
    ).drop(columns=["line"])

    if "group" not in df.columns and "message" in df.columns:
        df["group"] = df["message"].str.extract(r"groupId='([T:.\d-]*)',")
        df["detector"] = df["message"].str.extract(r"detector=(\d*),")
    if "detector" not in df.columns:
        logger.info(f"Missing detector info in Loki querying {search_phrase}")
        df["detector"] = -1
    else:
        df["detector"] = df["detector"].astype({'detector': 'int32'})
    df["ts"] = pandas.to_datetime(df["timestamp"])

    return df[["group", "detector", "ts"]]

In [None]:
phrases = {
    "unpckMsg": f'Unpacked message as " |~ "{survey}',
    "prepBtlr": "Preparing Butler for visit",
    "waitSnap": "Waiting for snaps",
    "runPipe1": "Running pipeline",
    "pipeSucc": "Pipeline successfully run",
    # There can be more than 1 of this log line per exposure when retry...
    "outputSa": "Pipeline products saved to collection",
    "timeout1": "Timed out waiting for image after receiving exposures ",
    "dbConErr": f'SSL connection has been closed unexpectedly" |~ "Traceback',
    "brokrErr" : "Failed to get metadata: Local: Broker transport failure",
}

In [None]:
def examine_log(key):
    df2 = get_df_from_loki(date, phrases[key])
    ids = df2[df2["ts"].notnull()][["group", "detector"]]
    if len(ids):
        logger.debug(f"{len(ids)}   with error {phrases[key]}: {ids} ")
    phrases.pop(key)

In [None]:
for _ in ("timeout1", "dbConErr", "brokrErr"):
    examine_log(_)

In [None]:
df_loki = pandas.DataFrame(columns=["group", "detector"])
for phrase in phrases:
    df2 = get_df_from_loki(date, phrases[phrase]).rename(columns={"ts": "ts_" + phrase})
    df_loki = df_loki.merge(
        df2,
        on=["group", "detector"],
        how="outer",
        validate="one_to_one",
    )
df_loki = df_loki.rename(columns={"group": "groupId"}).set_index(["groupId", "detector"])


In [None]:
if df_loki.empty and not df_md2.empty:
    logger.warning("No Loki query results. Possible data loss or service issue.")

In [None]:
phases = ["raw"] + list(phrases.keys())
groups = dict()
groups["raw"] = set(df_md2.index)
for ph in ["unpckMsg", "prepBtlr", "waitSnap", "runPipe1", "pipeSucc", "outputSa"]:
    groups[ph] = set(df_loki["ts_" + ph].dropna().reset_index("detector").index)

for i in range(len(groups)):
    no_raw = set(groups[phases[i]]) - set(groups["raw"])
    extra = f"{len(no_raw)} no files {no_raw}" if no_raw else ""
    logger.info(f"{len(groups[phases[i]])} groups at {phases[i]}.  {extra} ")

    if i > 0:
        groups_diff = set(groups[phases[i - 1]]) - set(groups[phases[i]])
        if groups_diff:
            logger.info(
                " " * 4
                + f"{len(groups_diff)} {phases[i-1]}-{phases[i]}\n"
                + " ".join(groups_diff)
            )
        if i == 5:
            groups_diff = set(groups[phases[i + 1]]) - set(groups[phases[i]])
            if groups_diff:
                logger.info(
                    " " * 4
                    + f"{len(groups_diff)} {phases[i+1]}-{phases[i]}, likely partial outputs\n"
                    + " ".join(groups_diff)
                )

In [None]:
def get_loki_command_with_phrase_fanout(time_start, time_end, phrase, extra=""):
    command = f"""/sdf/data/rubin/user/hchiang2/times_square/logcli-linux-amd64 --output=jsonl --tls-skip-verify query --addr=http://sdfloki.slac.stanford.edu:80 --timezone=UTC -q --limit=2000 --from="{time_start}" --to="{time_end}" --proxy-url=http://sdfproxy.sdf.slac.stanford.edu:3128  '{{app="vcluster--usdf-prompt-processing",pod=~"next-visit-fan-out-.+"}} |~ "group" |~ "{phrase}" {extra} ' """
    return command

In [None]:
def get_df_from_loki_fanout(date, search_phrase="status code 502", extra=""):
    start = Time(date, scale="utc", format="isot") + TimeDelta(
        12 * 60 * 60, format="sec"
    )
    end = start + TimeDelta(1, format="jd")
    command = get_loki_command_with_phrase_fanout(
        start.strftime("%Y-%m-%dT%H:%M:%SZ"),
        end.strftime("%Y-%m-%dT%H:%M:%SZ"),
        search_phrase,
        extra,
    )
    results = !{command}
    logger.debug(f"Got {len(results)} Loki records with fanout {search_phrase}")
    fanout = dict()
    pattern = re.compile(
        r".*group(Id': '| id )(?P<group>[^' ]*).*timestamp\":\"(?P<timestamp>\S*)\""
    )
    for result in results:
        m = pattern.match(result)
        if m:
            fanout[m["group"]] = [pandas.to_datetime(m["timestamp"])]

    df = pandas.DataFrame.from_dict(fanout, orient="index", columns=["ts"])
    df = df.rename_axis("groupId")

    return df

In [None]:
groups502 = get_df_from_loki_fanout(date, "status code 502").index.to_list()

In [None]:
groups = set(groups502) - set(df_butler.index.to_list())
if groups:
    logger.debug(f"These had no image but also got status 502\n" + " ".join(groups))

In [None]:
groups = set(groups502) & set(df_butler.index.to_list())
logger.info(
    f"{len(groups)} groups likely had their pods shutdown prematurely\n"
    + " ".join(groups)
)

In [None]:
# This plot only uses Loki timestamp.  Hence it includes groups with no data taken.
df1 = pandas.DataFrame(index=df_loki.index)
ref = "ts_unpckMsg"
for col_name in df_loki.columns:
    # Notes: if the ref column doesn't exist, all become NaN in df1
    # This can happens e.g. an exposure wasn't taken, so that group isn't in df_md2
    df1[col_name] = (df_loki[col_name] - df_loki[ref]).dt.total_seconds()

ax = df1.drop(columns=[ref]).plot(
    kind="hist",
    title=f"{date}; ref={ref}",
    xlabel="seconds",
    ylabel="",
    bins=50,
    # subplots=True, layout=(2,4), figsize=(12,6),
    alpha=0.5,
    rot=45,
)

pandas.merge(
    df1,
    df_loki,
    how="outer",
    left_index=True,
    right_index=True,
    suffixes=("_diff", "_loki"),
    validate="one_to_one",
).head()

In [None]:
df_md2["ts_begin"] = df_md2["timespan"].apply(
    lambda _: pandas.Timestamp(_.begin.utc.datetime, tz="UTC")
)
df_md2["ts_end"] = df_md2["timespan"].apply(
    lambda _: pandas.Timestamp(_.end.utc.datetime, tz="UTC")
)

# typically a small fraction of seconds before sndStamp
df_md2["ts_group_utc"] = df_md2["group_name"].apply(
    lambda _: pandas.Timestamp(Time(_, scale="tai").utc.datetime, tz="UTC")
)

# time of visit publication; TAI in unix seconds
df_md2["ts_sndStamp"] = df_md2["private_sndStamp"].apply(
    lambda _: pandas.Timestamp(Time(_, format="unix_tai").utc.datetime, tz="UTC")
)
# time of visit publication; UTC in unix seconds
df_md2["ts_efdStamp"] = df_md2["private_efdStamp"].apply(
    lambda _: pandas.Timestamp.fromtimestamp(_, tz="UTC")
)
# Let them be private
df_md2["ts_rcvStamp"] = df_md2["private_rcvStamp"].apply(
    lambda _: pandas.Timestamp.fromtimestamp(_, tz="UTC")
)
df_md2["ts_kafkaStamp"] = df_md2["private_kafkaStamp"].apply(
    lambda _: pandas.Timestamp.fromtimestamp(_, tz="UTC")
)

In [None]:
# When fan-out handles a group
df = get_df_from_loki_fanout(date, "message deserialized", f'|~ "{survey}"')
df.rename(columns={"ts": "ts_fanout"}, inplace=True)
df_loki = df_loki.merge(
    df,
    on="groupId",
    how="outer",
    validate="one_to_one",
)

In [None]:
columns = ["ts_sndStamp", "ts_begin", "ts_end", "ts_raw", "ts_output"]
df0 = pandas.merge(
    df_md2[columns],
    df_loki,
    # only those with data in butler, not those groups with events but no data taken.
    how="left",
    left_index=True,
    right_index=True,
    validate="one_to_one",
)

In [None]:
df2 = pandas.DataFrame(index=df0.index)
ref = "ts_sndStamp"
for col_name in df0.columns:
    df2[col_name] = (df0[col_name] - df0[ref]).dt.total_seconds()

df2 = df2[["ts_sndStamp", "ts_fanout", "ts_unpckMsg"]]
ax = df2.plot(
    kind="hist",
    title=f"{date}; ref={ref}",
    xlabel="seconds",
    ylabel="",
    bins=150,
    alpha=0.5,
    rot=45,
)

ax.legend(
    labels=df2.median().sort_values().to_string().split("\n"),
    loc="upper right",
    title="median",
)

In [None]:
df2 = pandas.DataFrame(index=df0.index)
ref = "ts_sndStamp"
for col_name in df0.columns:
    df2[col_name] = (df0[col_name] - df0[ref]).dt.total_seconds()

df2[
    [
        "ts_sndStamp",
        "ts_waitSnap",
        "ts_begin",
        "ts_end",
        "ts_runPipe1",
        "ts_outputSa",
        "ts_raw",
        "ts_output",
    ]
].plot(
    kind="hist",
    title=f"{date}; ref={ref}",
    xlabel="seconds",
    ylabel="",
    bins=150,
    alpha=0.5,
    rot=45,
)

df2.describe()

In [None]:
df2 = pandas.DataFrame(index=df0.index)
ref = "ts_end"
for col_name in df0.columns:
    df2[col_name] = (df0[col_name] - df0[ref]).dt.total_seconds()

ax = df2[
    [
        "ts_sndStamp",
        "ts_waitSnap",
        "ts_begin",
        "ts_end",
        "ts_raw",
        "ts_runPipe1",
        "ts_output",
    ]
].plot(
    kind="hist",
    title=f"{date}; ref={ref}",
    xlabel="seconds since exposure end",
    ylabel="counts",
    bins=150,
    alpha=0.5,
    rot=45,
)

ax.minorticks_on()
ax.legend(
    [
        "visit event publication",
        "activator ready",
        "exposure start",
        "exposure end",
        "raw file arrival at USDF",
        "pipeline processing start",
        "output written",
    ]
)
ax.axvline(x=120, color="black", linestyle="--", linewidth=1)
ax.plot(120, 1.0, "v", color="k")

df2.describe()

In [None]:
ax = df2.plot.box(
    title=f"{date}; ref={ref}",
    ylabel="seconds",
    figsize=(10, 6),
    column=df2.median().sort_values(ascending=False).index.tolist(),
    vert=False,
    widths=0.3,
)
ax.legend(
    labels=df2.median().sort_values().to_string().split("\n"),
    loc="lower left",
    title="median",
    handlelength=0,
)
ax.minorticks_on()
ax.secondary_xaxis("top").minorticks_on()