# Alert Production Performance Report for {{params.instrument}} on {{ params.date }}

In [26]:
# Parameters.  Set defaults here.

date = "2024-09-04"
#date = "2024-09-24"

instrument = "LATISS"

In [27]:
day_obs = int(date.replace("-", ""))

In [28]:
match instrument:
    case "LATISS":
        sal_index = 2
        n_detector = 1
        survey = "BLOCK-306" 
    case "LSSTComCamSim":
        sal_index = 3
        n_detector = 9
        survey = "BLOCK-297" 
    case _:
        logger.error(f"Unknown instrument {instrument}")

1.3.15 Level 1 Performance Report Definition

ID: DMS-REQ-0099 (Priority: 1b)

Specification: The DMS shall produce a Level 1 Performance Report that provides indicators 
of how the DMS has performed in processing the night's observations, including at least:
number of observations successfully processed through each pipeline; number of
observations for each pipeline that had recoverable failures (with a record of the failure type
and recovery mechanism); number of observations for each pipeline that had unrecoverable
failures; number of observations archived at each DMS Facility; number of observations
satisfying the science criteria for each active science program.

In [29]:
from dataclasses import dataclass
import pandas
import tabulate
from astropy.time import Time, TimeDelta
from lsst_efd_client import EfdClient
from lsst.daf.butler import Butler
from lsst.pipe.base import Pipeline

In [30]:
async def get_df_from_next_visit_events(date):
    client = EfdClient("usdf_efd")

    start = Time(date, scale="utc", format="isot") + TimeDelta(
        12 * 60 * 60, format="sec"
    )
    end = start + TimeDelta(1, format="jd")

    topic = "lsst.sal.ScriptQueue.logevent_nextVisit"
    df = await client.select_time_series(topic, ["*"], start.utc, end.utc)
    all_canceled = await client.select_time_series(
        topic + "Canceled", ["*"], start.utc, end.utc
    )

    if df.empty:
        logger.info(f"No events on {date}")
        return pandas.DataFrame()

    # Only select imaging survey
    df = df.loc[
        (df["coordinateSystem"] == 2)
        & (df["salIndex"] == sal_index)
        & (df["survey"] == survey)
    ].set_index("groupId")
    print(f"There were {len(df)} {survey} nextVisit events on {date}")

    # Ignore the explicitly canceled groups
    if not all_canceled.empty:
        canceled = df.index.intersection(
            all_canceled.set_index("groupId").index
        ).tolist()
        if canceled:
            print(f"{len(canceled)} events were canceled {canceled}")
            df = df.drop(canceled)

    return df

In [31]:
df_efd = await get_df_from_next_visit_events(date)

There were 32 BLOCK-306 nextVisit events on 2024-09-24


In [32]:
if instrument == "LATISS":
    butler_alias = "embargo_old"
else:
    butler_alias = "embargo_new"
    
butler = Butler(butler_alias,
                collections=[f"{instrument}/raw/all", f"{instrument}/prompt/output-{date:s}"],
                writeable=False)

In [33]:
results = butler.registry.queryDimensionRecords(
    "exposure",
    where="exposure.science_program IN (survey) "
    "and instrument=instrument_name and day_obs=dayobs  ",
    bind={"dayobs": day_obs, "instrument_name": instrument, "survey": survey},
)
print(f"{results.count()} raw exposure records are in {butler} for {day_obs}")

embargo_records = dict()
for record in results:
    embargo_records[record.id] = record

if embargo_records:
    df_butler = pandas.DataFrame.from_records(
        [embargo_records[num].toDict() for num in embargo_records]
    ).set_index("group")
else:
    df_butler = None
    logger.warning(f"No raw records found in {butler}. Notebook will fail.")

32 raw exposure records are in Butler(collections=<lsst.daf.butler.direct_butler._direct_butler_collections.DirectButlerCollections object at 0x7fd4f7e1b710>, run=None, datastore='s3://rubin-summit-users/', registry='PostgreSQL@lsstdb1:embargo_old') for 20240924


In [34]:
groups_no_raw = set(df_efd.index) - set(df_butler.index)
if groups_no_raw:
    print(
        f"{len(groups_no_raw)} group had records in EFD but no raws in the embargo butler: {groups_no_raw}"
    )

In [35]:
groups = set(df_efd.index) & set(df_butler.index)

In [36]:
print(f"Number of groups with raw from {survey} nextVisit: {len(groups)}")

Number of groups with raw from BLOCK-306 nextVisit: 32


In [37]:
sfm_counts = len(
    butler.query_datasets(
        "isr_log",
        collections=f"LATISS/prompt/output-{date}/SingleFrame*",
        where=f"exposure.science_program IN (survey)",
        bind={"survey": survey},
        find_first=False,
        explain=False,
    )
)
dia_counts = len(
    butler.query_datasets(
        "isr_log",
        collections=f"LATISS/prompt/output-{date}/ApPipe*",
        where=f"exposure.science_program IN (survey)",
        bind={"survey": survey},
        find_first=False,
        explain=False,
    )
)

In [38]:
print("Number of PP pipeline runs: {:d} single-frame-only, {:d} ApPipe".format(sfm_counts, dia_counts))

Number of PP pipeline runs: 32 single-frame-only, 0 ApPipe


In [39]:
print(f"{sfm_counts} did not run the full ApPipe, likely due to no template coverage.")

32 did not run the full ApPipe, likely due to no template coverage.


In [41]:
log_visit_detector = set([(x.dataId['exposure'], x.dataId['detector']) for x in butler.query_datasets("isr_log", explain=False, where=f"exposure.science_program='{survey}'")])
print("Number of ISR records in butler: {:d}".format(len(log_visit_detector)))

last_pvi_type = "initial_photometry_match_detector"
pvi_visit_detector = set([(x.dataId['visit'], x.dataId['detector']) for x in butler.query_datasets(last_pvi_type, explain=False, where=f"visit.science_program='{survey}'")])
print("Number of successful processCcd: {:d}".format(len(pvi_visit_detector)))

dia_visit_detector = set([(x.dataId['visit'], x.dataId['detector']) for x in butler.query_datasets("apdb_marker", explain=False, where=f"visit.science_program='{survey}'")])
print("Number of successful DIA: {:d}".format(len(dia_visit_detector)))

Number of ISR records in butler: 32
Number of successful processCcd: 29
Number of successful DIA: 0


In [42]:
@dataclass
class error_summary:
    visit: int
    detector: int
    error_messages: list

In [43]:
def make_error_summaries(log_dataset_types_exposure, log_dataset_types_visit, data_ids):
    error_summaries = []
    for visit, detector in data_ids:
    
        visit_errors = []
        
        for ds_types in log_dataset_types_exposure:
            log_messages = butler.get(ds_types, dataId={"instrument": instrument, "exposure": visit, "detector": detector})
            isr_errors = [msg for msg in log_messages if msg.levelno > 30]
            visit_errors.extend(isr_errors)
        
        for ds_types in log_dataset_types_visit:
            try:
                log_messages = butler.get(ds_types, dataId={"instrument": instrument, "visit": visit, "detector": detector})
            except DimensionNameError: # Visit records can be missing due to corrupted headers.
                errors = []
            else:
                errors = [msg for msg in log_messages if msg.levelno > 30]
            finally:
                visit_errors.extend(errors)
    
        error_summaries.append(error_summary(visit=visit, detector=detector, error_messages=visit_errors))
    return error_summaries

In [44]:
pipeline = Pipeline.from_uri(f"$AP_PIPE_DIR/pipelines/{instrument}/ApPipe.yaml#prompt")
pipeline.addConfigOverride("diaPipe", "apdb_config_url", "dummy")

pipeline_graph = pipeline.to_graph(registry=butler.registry)

In [51]:
last_records = log_visit_detector
table_contents = []
last_task = "isr"
for task in pipeline_graph.tasks:
    if task in ("isr",):
        continue
    records = set([(x.dataId['visit'], x.dataId['detector']) for x in butler.query_datasets(task+"_log", explain=False, where=f"visit.science_program='{survey}'")])
    error_summaries = make_error_summaries(["isr_log"], [last_task+"_log"], set(last_records) - set(records))
    for e in error_summaries: 
        if e.error_messages:
            msg = e.error_messages[0].message[:1000]  
            table_contents.append((e.visit, e.detector, last_task, msg)) 
    last_task = task
    last_records = records

In [54]:
print(f"Number of unrecoverable pipeline failures: {len(table_contents)}")

Number of unrecoverable pipeline failures: 3


In [52]:
table = tabulate.tabulate(sorted(table_contents), tablefmt='unsafehtml', headers=("Visit", "Det", "Last Task", "Error Message"))
table

Visit,Det,Last Task,Error Message
2024092400389,0,calibrateImage,Failure fitting astrometry. MatcherFailure: No matches found: {'iterations': 1}
2024092400393,0,calibrateImage,Failure fitting astrometry. MatcherFailure: Not enough catalog objects (3) to make a shape for the matcher (need 5).: {'iterations': 1}
2024092400395,0,calibrateImage,"Task 'calibrateImage' on quantum {instrument: 'LATISS', detector: 0, visit: 2024092400395, band: 'g', day_obs: 20240924, physical_filter: 'SDSSg_65mm~empty'} exited with partial outputs; considering this a qualified success and proceeding."
