Pipeline Errors for night of {{ params.day_obs }}
=====

In [None]:
day_obs = "2025-05-01"
instrument = "LSSTCam"
survey = "BLOCK-365"

In [None]:
!eups list lsst_distrib 

In [None]:
!echo $IMAGE_DESCRIPTION

In [None]:
butler_alias = "/repo/embargo"

In [None]:
import itertools
import lsst.daf.butler as dafButler
from lsst.daf.butler import DimensionNameError
from lsst.pipe.base import Pipeline
import tabulate
from dataclasses import dataclass
import pandas as pd
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', None)

@dataclass
class error_summary:
    visit: int
    detector: int
    error_messages: list

b = dafButler.Butler(butler_alias, collections=f"{instrument}/prompt/output-{day_obs:s}")

In [None]:
exposures = [e.id for e in b.query_dimension_records(
        "exposure", where=f"exposure.instrument='{instrument}' AND exposure.science_program='{survey}' AND exposure.day_obs={day_obs.replace("-", "")}", explain=False)]
visits = [v.id for v in b.query_dimension_records(
        "visit", where=f"visit.instrument='{instrument}' AND visit.science_program='{survey}' AND visit.day_obs={day_obs.replace("-", "")}", explain=False)]

print(f"Number of unique exposure ids: {len(set(exposures))}")
print(f"Number of unique visit ids: {len(set(visits))}")

print(f"Exposures not in visit list: {set(exposures) - set(visits)}")

In [None]:
# Cache butler queries which get used multiple times
query_cache = {}
def cached_query(dataset_type, where):
    key = (dataset_type, where)
    if key not in query_cache:
        query_cache[key] = set(
            (x.dataId.get('visit') or x.dataId.get('exposure'), x.dataId['detector'])
            for x in b.query_datasets(dataset_type, where=where, limit=None, explain=False)
        )
    else:
        print("We used this function")
    return query_cache[key]

log_visit_detector = set([(x.dataId['visit'], x.dataId['detector']) for x in b.query_datasets(
    "calibrateImage_log", where=f"exposure.science_program='{survey}' AND instrument='{instrument}'", limit=None, explain=False
)])

# Query for number of records

log_exposure_detector = cached_query("isr_log", f"exposure.science_program='{survey}' AND instrument='{instrument}'")
log_visit_detector = cached_query("calibrateImage_log", f"visit.science_program='{survey}' AND instrument='{instrument}'")
isr_exposure_detector = cached_query("post_isr_image", f"exposure.science_program='{survey}' AND instrument='{instrument}'")
pvi_visit_detector = cached_query("initial_photometry_match_detector", f"visit.science_program='{survey}' AND instrument='{instrument}'")
dia_visit_detector = cached_query("apdb_marker", f"visit.science_program='{survey}' AND instrument='{instrument}'")

missing_pvis = log_visit_detector - pvi_visit_detector
missing_visits = [x[0] for x in missing_pvis]

print(f"Number of ISR records in butler: {len(log_exposure_detector)}")
print(f"Number of calibrateImage records in butler: {len(log_visit_detector)}")
print(f"Number of successful ISR results: {len(isr_exposure_detector)}")
print(f"Number of successful processCcd results: {len(pvi_visit_detector)}")
print(f"Number of unsuccessful processCcd attempts: {len(missing_pvis)}")
print(f"Number of successful DIA attempts: {len(dia_visit_detector)}")

In [None]:
def make_error_summaries(log_dataset_types_exposure, log_dataset_types_visit, data_ids):
    error_summaries = []
    for visit, detector in data_ids:
    
        visit_errors = []
        
        for ds_types in log_dataset_types_exposure:
            log_messages = b.get(ds_types, dataId={"instrument": instrument, "exposure": visit, "detector": detector})
            isr_errors = [msg for msg in log_messages if msg.levelno > 30]
            visit_errors.extend(isr_errors)
        
        for ds_types in log_dataset_types_visit:
            try:
                log_messages = b.get(ds_types, dataId={"instrument": instrument, "visit": visit, "detector": detector})
            except DimensionNameError: # Visit records can be missing due to corrupted headers.
                errors = []
            else:
                errors = [msg for msg in log_messages if msg.levelno > 30 or "SIGTERM" in msg.message]
            finally:
                visit_errors.extend(errors)
    
        error_summaries.append(error_summary(visit=visit, detector=detector, error_messages=visit_errors))
    return error_summaries

In [None]:
def make_url_from_visit(visit):
    s = str(visit)
    day_string = f"{s[0:4]}-{s[4:6]}-{s[6:8]}"
    counter = int(s[8:])
    if instrument == "LATISS":
        # Example: https://usdf-rsp.slac.stanford.edu/rubintv/summit-usdf/auxtel/event?key=auxtel/2024-08-12/monitor/000351/auxtel_monitor_2024-08-12_000351.png
        url = f"https://usdf-rsp.slac.stanford.edu/rubintv/summit-usdf/auxtel/event?key=auxtel/{day_string}/monitor/{counter:06d}/auxtel_monitor_{day_string}_{counter:06d}.png"
    elif instrument == "LSSTComCam":
        short_name = "comcam"
        # Example: https://usdf-rsp.slac.stanford.edu/rubintv/summit-usdf/comcam/event?key=comcam/2024-11-23/focal_plane_mosaic/000336/comcam_focal_plane_mosaic_2024-11-23_000336.jpg
        url = f"https://usdf-rsp.slac.stanford.edu/rubintv/summit-usdf/{short_name}/event?key={short_name}/{day_string}/focal_plane_mosaic/{counter:06d}/{short_name}_focal_plane_mosaic_{day_string}_{counter:06d}.jpg"
    elif instrument == "LSSTCam":
        short_name = "lsstcam"
        url = f"https://usdf-rsp.slac.stanford.edu/rubintv/summit-usdf/{short_name}/event?key={short_name}/{day_string}/focal_plane_mosaic/{counter:06d}/{short_name}_focal_plane_mosaic_{day_string}_{counter:06d}.jpg"
    return url

In [None]:
pipeline = Pipeline.from_uri(f"$AP_PIPE_DIR/pipelines/{instrument}/ApPipe.yaml#prompt")
pipeline.addConfigOverride("associateApdb", "apdb_config_url", "dummy")

pipeline_graph = pipeline.to_graph(registry=b.registry)

In [None]:
recurrent_errors = {
    "Exception BadAstrometryFit: Poor quality astrometric fit",
    "Exception NonfinitePsfShapeError: Failed to determine PSF",
    "Exception NormalizedCalibrationFluxError",
    "Exception MeasureApCorrError: Unable to measure aperture correction",
    "Exception ObjectSizeNoGoodSourcesError",
    "MatcherFailure: No matches found",
    "MatcherFailure: Not enough catalog objects",
    "MatcherFailure: Not enough refcat objects",
    "MatcherFailure: No matches to use for photocal",
    "NoPsfStarsToStarsMatchError",
    "PsfexTooFewGoodStarsError",
    "RuntimeError: Cannot compute PSF matching kernel: too few sources selected",
    "RuntimeError: No good PSF candidates to pass to PSFEx",
    "PsfexNoGoodStarsError",
    "RuntimeError: No objects passed our cuts for consideration as psf stars",
    "SIGTERM",
    "ValueError: cannot convert float NaN to integer",
    "Exception AllCentroidsFlaggedError",
}

In [None]:
last_records = log_exposure_detector
table_contents = []
last_task = "isr"
for task in itertools.chain(pipeline_graph.tasks, ["end"]):
    if task in ("isr", "getRegionTimeFromVisit"):
        continue
    if task != "end":
        records = set([(x.dataId['visit'], x.dataId['detector']) for x in b.query_datasets(
            task+"_log", where=f"visit.science_program='{survey}'AND instrument='{instrument}'", limit=None, explain=False
        )])
        error_summaries = make_error_summaries(["isr_log"], [last_task+"_log"], last_records - records)
        print(f"  {len(records):d} {task} records")
    else:
        error_summaries = make_error_summaries(["isr_log"], [last_task+"_log"], last_records - dia_visit_detector)
    
    for e in error_summaries: 
        if e.error_messages:
            msg = e.error_messages[-1].message[:1000]
        # Ignore those that did not run rewarpTemplate; likely single frame only.
        elif task == "rewarpTemplate":
            continue
        elif task == "end":
            msg = "?"
        else:
            msg = ""
        table_contents.append((e.visit, e.detector, make_url_from_visit(e.visit), last_task, msg)) 
        
    last_task = task
    last_records = records

In [None]:
print(f"{len(table_contents):d} errors, not including those falling back from ApPipe to SingleFrame")

In [None]:
table = pd.DataFrame(sorted(table_contents), columns=("Visit", "Det", "Img", "Last Task", "Error Message"))

In [None]:
# Create a mask for rows containing any of the substrings
if table.empty:
    print("There were no pipeline errors")
else:
    mask = table['Error Message'].apply(lambda x: any(err.lower() in x.lower() for err in recurrent_errors) if pd.notnull(x) else False)

    # Filter the DataFrame to show matching rows
    known_errs = table[mask]
    
    # Count occurrences of each substring
    error_counts = {err: known_errs['Error Message'].str.contains(err, case=False, na=False, regex=False).sum() for err in recurrent_errors}
    
    for err, counts in error_counts.items():
        if counts != 0:
            print(f"- {counts} {err}")
    
    print("Matching rows:")
    display(known_errs)

In [None]:
if table.empty:
    pass
else:
    other_errs = table[~mask]
    other_errs

In [None]:
known_errs.to_csv(f"known_errors_{day_obs}.csv")

In [None]:
other_errs.to_csv(f"new_errors_{day_obs}.csv")