In [None]:
# Set a range of dayobs values to search - 
day_obs_min = "Today"
day_obs_min = "2024-11-25"
day_obs_max = "Today"
day_obs_max = "2024-11-26"
time_order = 'newest first'

# EFD Scripts + Logs 

In [None]:
import os
import warnings
import numpy as np
import pandas as pd
from pandas import option_context
from IPython.display import display, Markdown, HTML
from astropy.time import Time, TimeDelta
import datetime
import astropy.units as u
import requests
import yaml
from enum import Enum

# To generate a tiny gap in time
EPS_TIME = np.timedelta64(1, 'ms')
TIMESTAMP_ZERO = Time(0, format='unix_tai').utc.datetime

from lsst_efd_client import EfdClient

In [None]:
# lsst-ts-xml is in conda
# See https://github.com/lsst-ts/ts_xml/blob/develop/python/lsst/ts/xml/enums 
from lsst.ts.xml.sal_enums import State as CSCState
from lsst.ts.xml.enums.ScriptQueue import ScriptProcessState, SalIndex
from lsst.ts.xml.enums.Script import ScriptState
from lsst.ts.xml.enums.Watcher import AlarmSeverity

from lsst.summit.utils import getSite
from lsst.summit.utils.efdUtils import makeEfdClient

In [None]:
# Run as 'apply' per row (axis=1)
def apply_enum(x: pd.Series, column: str, enumvals: Enum) -> str:
    return enumvals(x[column]).name


def get_clients() -> dict:
    """Return site-specific client connections. 

    Returns
    -------
    endpoints : `dict`
        Dictionary with `efd`, `obsenv`, 
        `narrative_log`, and `exposure_log`
        connection information.
        For the obsenv, narrative log and exposure log, these are only
        defined for the summit or USDF.
    """
    site = getSite()
    # This will fail if site is "unknown"
    # although could go with usdf_efd ..
    efd_client = makeEfdClient()
    if site == "summit":
        API_BASE = "https://summit-lsp.lsst.codes/"
        obsenv_client = EfdClient('summit_efd', db_name='lsst.obsenv')
    elif site == "tucson" or site == "base":
        # I don't know what the logging looks like here, 
        # but suspect it doesn't exist
        obsenv_client = None
        API_BASE = None
    else:
        obsenv_client = EfdClient('usdf_efd', db_name='lsst.obsenv')
        API_BASE = "https://usdf-rsp.slac.stanford.edu/"
    narrative_log_url =  API_BASE + "narrativelog/messages"
    exposure_log_url = API_BASE + "exposurelog/messages"
    return {'efd': efd_client, 'obsenv': obsenv_client, 
            'narrative_log': narrative_log_url, 'exposure_log': exposure_log_url}


def query_logging_services(API_ENDPOINT: str, params: dict, return_dataframe: bool =True) -> pd.DataFrame:
    """Send query to narrative log or exposure log services.
    
    Parameters
    ----------
    API_ENDPOINT : `str`
        The URL to send the query to.
        Usually like `https://usdf-rsp.slac.stanford.edu/narrativelog/messages`
    params : `dict`
        Dictionary of parameters for the REST API query.
        See docs for each service for more details.

    Returns
    -------
    messages : `pd.DataFrame`
        The returned log messages (if any available), in a dataframe.
    """
    # Very often, requests from the logging endpoints fail the first time.
    response = requests.get(API_ENDPOINT, params)
    # Try twice.
    if response.status_code != 200:
        response = requests.get(API_ENDPOINT, params)
    if response.status_code != 200:
        err_string = f"{API_ENDPOINT} "
        err_string += " unavailable."
        print(err_string)
        messages = []
    else:
        messages = response.json()
    messages = pd.DataFrame(messages)
    return messages

In [None]:
# Query any EFD topic for the timespan day_obs_min to day_obs_max, when you don't already know the fields
# topic = lsst.sal.ScriptQueue.command_add
# fields = await efd_client.get_fields(topic)
# fields = [f for f in fields if 'private' not in f and f != 'name' and f!= "duration"]
# dd = await efd_client.select_time_series(topic, fields, tstart, tend)
# or top 5 .. 
# dd = await efd_client.select_top_n(topic, fields, 5)

In [None]:
async def get_script_stream(t_start: Time, t_end: Time, efd_client: EfdClient) -> pd.DataFrame:
    """Get script description and configuration from lsst.sal.Script.logevent_description
    and lsst.sal.Script.command_configure topics.
    """
    # Script will find information about how scripts are configured. 
    # The description topic gives a more succinct human name to the scripts
    topic = 'lsst.sal.Script.logevent_description'
    fields = ['classname', 'description', 'salIndex']
    scriptdescription = await efd_client.select_time_series(topic, fields, t_start, t_end)
    scriptdescription.rename({'salIndex': 'script_salIndex'}, axis=1, inplace=True)
            
    # This gets us more information about the script parameters, how they were configured
    topic = 'lsst.sal.Script.command_configure'
    fields = ['blockId', 'config',' executionId', 'salIndex']
    fields = await efd_client.get_fields(topic)
    fields = [f for f in fields if 'private' not in f]
    # note blockId is only filled for JSON BLOCK activities
    scriptconfig = await efd_client.select_time_series(topic, fields, t_start, t_end)
    scriptconfig.rename({'salIndex': 'script_salIndex'}, axis=1, inplace=True)

    # Merge these together on script_salIndex which is unique over tinterval
    # Found that (command_configure - script description) index time is mostly << 1 second for each script and < 1 second over a night
    script_stream = pd.merge(scriptdescription, scriptconfig, on='script_salIndex', suffixes=['_d', '_r'])
    return script_stream


async def get_script_state(t_start: Time, t_end: Time, queueIndex: int | None, efd_client: EfdClient) -> pd.DataFrame:
    """Get script status from lsst.sal.ScriptQueue.logevent_script topic."""
    # The status of each of these scripts is stored in scriptQueue.logevent_script
    # so find the status of each of these scripts (this is status at individual stages).
    topic = 'lsst.sal.ScriptQueue.logevent_script'
    fields = await efd_client.get_fields(topic)
    fields = ['blockId', 'path', 'processState', 'scriptState', 'salIndex', 'scriptSalIndex', 
             'timestampProcessStart', 'timestampConfigureStart', 'timestampConfigureEnd', 'timestampRunStart', 'timestampProcessEnd']
    # Providing an integer salIndex will restrict this query to a single queue, but None will query all queues.
    scripts = await efd_client.select_time_series(topic, fields, t_start, t_end, index=queueIndex)
    scripts.rename({'scriptSalIndex': 'script_salIndex'}, axis=1, inplace=True)

    # Group scripts on 'script_salIndex' to consolidate the information about its status stages
    # Make a new column which we will fill with the max script state (== final state, given enum)
    # (new column so we don't have to deal with multi-indexes from multiple aggregation methods)
    scripts['finalScriptState'] = scripts['scriptState']
    script_status = scripts.groupby('script_salIndex').agg({'path': 'first', 
                                                              'salIndex': 'max', 
                                                              'finalScriptState': 'max', 
                                                              'scriptState': 'unique', 
                                                              'processState': 'unique', 
                                                              'timestampProcessStart': 'min', 
                                                              'timestampConfigureStart': 'min', 
                                                              'timestampConfigureEnd': 'max', 
                                                              'timestampRunStart': 'max', 
                                                              'timestampProcessEnd': 'max'})
    # Convert timestamp columns from unix_tai timestamps for readability.
    # Yes, these timestamps really are unix_tai. 
    for col in [c for c in script_status.columns if c.startswith('timestamp')]:
        script_status[col] = Time(script_status[col], format='unix_tai').utc.datetime
    # Apply ScriptState enum for readability of final state
    script_status['finalScriptState'] = script_status.apply(apply_enum, args=['finalScriptState', ScriptState], axis=1)
    # Will apply 'best time' index after merge with script_stream
    return script_status

In [None]:
async def get_script_status(t_start: Time, t_end: Time, efd_client: EfdClient) -> pd.DataFrame:
    """Given a start and end time, appropriately query each ScriptQueue to find 
    script descriptions, configurations and status.

    Parameters
    ----------
    t_start : `astropy.Time`
        The time to start searching for script events.
    t_end : `astropy.Time`
        The time at which to end searching for script events.
    efd_client : `EfdClient`
        EfdClient to query the efd.
    obsenv_client: `EfdClient`
        EfdClient to query the obsenv (different database).

    Returns
    -------
    script_status : `pd.DataFrame`
        DataFrame containing script description, configuration, timing information and states.


    Note
    ----
    The index of the returned dataframe is chosen from the timestamps recorded for the script. 
    In order to best place the script message inline with other events such as acquired images,
    the time used is the `timestampRunStart` if available, `timestampConfigureEnd` next, and
    then falls back to `timestampConfigureStart` or `timestampProcessStart` if those are also not
    available.
    """

    # The script_salIndex is ONLY unique during the time that a particular queue remains not OFFLINE
    # However, each queue can go offline independently, so the time intervals that are required for each queue
    # can be different, and requires inefficient querying of the lsst.sal.Script topics (which don't include 
    # the queue identification explicitly). Furthermore, the downtime is infrequent, so probably we'd
    # most of the time prefer to do the efficient thing and query everything all at once. 

    # So first - see if that's possible.
    topic = 'lsst.sal.ScriptQueue.logevent_summaryState'
    fields = ['salIndex', 'summaryState']
    # Were there breaks in this queue?
    dd = await efd_client.select_time_series(topic, fields, t_start, t_end)
    if len(dd) == 0:
        offline_events = 0
    else:
        offline_state = CSCState.OFFLINE.value
        offline_events = len(dd.query('summaryState == @offline_state'))
    
    if offline_events == 0:
        print(f"No OFFLINE events during time interval {t_start} to {t_end} for any queue.")
        # So then go ahead and just do a single big query.
        script_stream = await get_script_stream(t_start, t_end, efd_client)
        script_status = await get_script_state(t_start, t_end, None, efd_client)
        script_status = pd.merge(script_stream, script_status, left_on='script_salIndex', right_index=True, suffixes=['', '_s'])
    
    else:
        # The ScriptQueues can be started and stopped independently, so run needs to run per-scriptqueue, per-uptime
        script_status = []
        for queue in SalIndex:
            topic = 'lsst.sal.ScriptQueue.logevent_summaryState'
            fields = ['salIndex', 'summaryState']
            # Were there breaks in this particular queue?
            dd = await efd_client.select_time_series(topic, fields, t_start, t_end, index=queue)
            if len(dd) == 0:
                tstops = []
                tintervals = [[t_start, t_end]]
            else:
                dd['state'] = dd.apply(apply_enum, args=['summaryState', CSCState], axis=1)
                dd['state_time'] = Time(dd.index.values)
            
                tstops = dd.query('state == "OFFLINE"').state_time.values
                if len(tstops) == 0:
                    tintervals = [[t_start, t_end]]
                if len(tstops) > 0:
                    ts = tstops[0]
                    ts_next = ts + TimeDelta(EPS_TIME)
                    ts_next = Time(ts_next)
                    tintervals = [[t_start, ts]]    
                    for ts in tstops[1:]:
                        tintervals.append([ts_next, ts])
                        ts_next = ts + TimeDelta(EPS_TIME)
                    tintervals.append([ts_next, t_end])
            if len(tstops) == 0:
                print(f"For {queue.name}, found 0 ScriptQueue OFFLINE events in the time period  {t_start} to {t_end}.")
            else:
                print(f"For {queue.name}, found {len(tstops)} ScriptQueue restarts in the time period {t_start} to {t_end}, so will query in {len(tstops)+1} chunks")
            
            # Do the script queue queries for each time interval in this queue
            for tinterval in tintervals:
                script_stream_t = await get_script_stream(tinterval[0], tinterval[1], efd_client)    
                script_status_t = await get_script_state(tinterval[0], tinterval[1], queue, efd_client)
                # Merge with script_stream so we get better descriptions and configuration information
                dd = pd.merge(script_stream_t, script_status_t, left_on='script_salIndex', right_index=True, suffixes=['', '_s'])
                script_status.append(dd)
                print(f"Found {len(dd)} script-status messages during {[e.iso for e in tinterval]} for {queue.name}")
        # Convert to a single dataframe
        script_status = pd.concat(script_status)
    
    print(f"Found {len(script_status)} script status messages")
    
    # script_status columns: 
    # ['classname', 'description', 'script_salIndex', 'ScriptID', 'blockId',
    # 'config', 'executionId', 'logLevel', 'pauseCheckpoint',
    # 'stopCheckpoint', 'path', 'salIndex', 'finalScriptState', 'scriptState',
    # 'processState', 'timestampProcessStart', 'timestampConfigureStart',
    # 'timestampConfigureEnd', 'timestampRunStart', 'timestampProcessEnd'] 
    # columns used in final merged dataframe:
    # ['time', 'name', 'description', 'config', 'script_salIndex', 'salIndex', 
    # 'finalStatus', 'timestampProcessStart', 'timestampConfigureEnd', 'timestampRunStart', 'timestampProcessEnd'] 

    def _find_best_script_time(x):
        # Try run start first
        best_time = x.timestampRunStart
        if best_time == TIMESTAMP_ZERO:
            best_time = x.timestampConfigureEnd
        if best_time == TIMESTAMP_ZERO:
            best_time = x.timestampConfigureStart
        if best_time ==  TIMESTAMP_ZERO:
            best_time = x.timestampProcessStart
        return best_time    
    # Create an index that will slot this into the proper place for runtime / image acquisition, etc
    script_status.index = script_status.apply(_find_best_script_time, axis=1)
    script_status.index = script_status.index.tz_localize("UTC")
    
    script_status = pd.concat([script_status, dd])
    script_status.sort_index(inplace=True)
    return script_status


async def get_scheduler_configs(t_start: Time, t_end: Time, efd_client: EfdClient, obsenv_client: EfdClient | None) -> pd.DataFrame:
    # Scheduler dependency information
    t_start_local = t_start
    topic = 'lsst.sal.Scheduler.logevent_dependenciesVersions'
    fields = await efd_client.get_fields(topic)
    fields = [f for f in fields if "private" not in f]
    deps = await efd_client.select_time_series(topic, fields, t_start_local, t_end)
    # Sometimes the scheduler hasn't been set up, if it's a limited timespan.
    if len(deps) == 0:
        t_start_local = t_start - TimeDelta(1, format='jd')
        deps = await efd_client.select_time_series(topic, fields, t_start_local, t_end)
        deps = deps.iloc[:1]
    # Reconfigure output to fit into script_status fields 
    deps['classname'] = "Scheduler dependencies"
    deps['description'] = deps['scheduler'] + ' ' + deps['seeingModel']
    models = [c for c in deps.columns if 'observatory' in c or 'Model' in c]
    def build_dep_string(x, models): 
        dep_string = ''
        for m in models:
            dep_string += f"{m}: {x[m]}, "
        dep_string = dep_string[:-2]
        return dep_string
    deps['config'] = deps.apply(build_dep_string, args=[models], axis=1)
    deps['script_salIndex'] = -1
    
    # And within Scheduler, what is ts_config_ocs and scripts versions
    # Need find the previous version of tc_config_ocs 
    topic = 'lsst.obsenv.summary'
    fields = ['summit_extras', 'ts_standardscripts', 'ts_externalscripts', 'ts_config_ocs']
    # Query longer time period for obsenv, so we can be sure to know how scheduler enables
    obsenv = await obsenv_client.select_time_series(topic, fields, t_start_local - TimeDelta(1, format='jd'), t_end)
    fields = ['summit_extras', 'ts_standardscripts', 'ts_externalscripts', 'ts_config_ocs']
    check = np.all((obsenv[fields][1:].values == obsenv[fields][:-1].values), axis=1)
    classname = np.where(check, "Obsenv Check", "Obsenv Update")
    obsenv['classname'] = np.concatenate([np.array(['Obsenv']), classname])
    obsenv['description'] = ("ts_config_ocs: " + obsenv['ts_config_ocs'] + 
                            " summit_extras: " + obsenv['summit_extras'])
    obsenv['config'] = ("ts_standardscripts: " + obsenv['ts_standardscripts'] + 
                        " ts_externalscripts: " + obsenv['ts_externalscripts'])
    obsenv['salIndex'] = 1
    obsenv['script_salIndex'] = -1
    
    # I think these should be every time scheduler is "ENABLED"
    topic = 'lsst.sal.Scheduler.logevent_configurationApplied'
    fields = await efd_client.get_fields(topic)
    fields = [f for f in fields if "private" not in f]
    con = await efd_client.select_time_series(topic, fields, t_start_local, t_end)
    con['classname'] = "Scheduler configuration"
    # Build description from schemaVersion (just in case) and ts_config_ocs 
    ts_config_ocs_in_place = []
    for time in con.index:
        prev_obsenv = obsenv.query('index < @time')
        if len(prev_obsenv) == 0:
            ts_config_ocs_in_place.append('Unknown')
        else:
            ts_config_ocs_in_place.append(prev_obsenv.iloc[-1]['ts_config_ocs'])
    con['ts_config_ocs'] = ts_config_ocs_in_place
    con['description'] = 'ts_config_ocs ' + con['ts_config_ocs'] + ' ' + con['schemaVersion']
    con.rename({'configurations': 'config'}, axis=1, inplace=True)
    con['script_salIndex'] = -1

    # Combine results
    dd =  pd.concat([deps, con, obsenv])
    # Trim back results to t_start, keeping last previous update information
    # Trim obsenv back to range for other values
    # But keep last entry so we have easy record 
    tt = pd.to_datetime(t_start.utc.datetime).tz_localize("UTC")
    # Keep last scheduler configuration update
    old_dd_sched = dd.query('index < @tt and classname == "Scheduler configuration"')[-1:]
    old_dd_deps = dd.query('index < @tt and classname == "Scheduler dependencies"')[-1:]
    old_dd_obsenv = dd.query('index < @tt and classname.str.contains("Obsenv")')[-1:]
    dd = dd.query('index >= @tt')
    sched_config = pd.concat([old_dd_sched, old_dd_obsenv, old_dd_deps, dd])

    # Reformat
    cols = ['classname', 'description', 'config', 'salIndex', 'script_salIndex']
    drop_cols = [c for c in sched_config.columns if c not in cols]
    sched_config.drop(drop_cols, axis=1, inplace=True)
    sched_config.sort_index(inplace=True)
    sched_config['timestampProcessStart'] = sched_config.index.copy().tz_localize(None).astype('datetime64[ns]')
    sched_config['finalScriptState'] = "Configuration"
    print(f"Found {len(sched_config)} scheduler configuration records")
    return sched_config

In [None]:
async def get_error_codes(t_start: Time, t_end: Time, efd_client: EfdClient) -> pd.DataFrame:
    """Get all messages from logevent_errorCode topics."""
    # Get error codes
    topics = await efd_client.get_topics()
    err_codes = [t for t in topics if 'errorCode' in t]
    
    errs = []
    for topic in err_codes:
        df = await efd_client.select_time_series(topic, ['errorCode', 'errorReport'], t_start, t_end)
        if len(df) > 0:
            df['topic'] = topic
            errs += [df]
    if len(errs) > 0:
        errs = pd.concat(errs).sort_index()
        def strip_csc(x):
            return x.topic.replace("lsst.sal", "").replace("logevent_errorCode", "").replace(".", "") + "CSC error"
        errs['component'] = errs.apply(strip_csc, axis=1)
        # Rename some columns to match narrative log columns
        errs.rename({'errorCode': 'error_code', 'errorReport': 'message_text', 'topic': 'origin'}, axis=1, inplace=True)
        # Add a salindex so we can color-code based on this as a "source"
        errs['salIndex'] = 4
        errs['finalStatus'] = "ERR"
        errs['timestampProcessStart'] = errs.index.values.copy()
    
    print(f"Found {len(errs)} error messages")
    return errs

async def get_watcher_alarms(t_start: Time, t_end: Time, efd_client: EfdClient) -> pd.DataFrame:
    """Get and consolidate watcher alarms from lsst.sal.Watcher.logevent_alarm topic."""
    topic = 'lsst.sal.Watcher.logevent_alarm'
    fields = await efd_client.get_fields(topic)
    fields = [f for f in fields if ('private' not in f) and (f != 'name') and (f != 'duration')]
    watcher_messages = await efd_client.select_time_series(topic, fields, t_start, t_end)
    # Convert severity to readable string.
    watcher_messages['severity'] = watcher_messages.apply(apply_enum, args=('severity', AlarmSeverity), axis=1)
    # Convert times for readability.
    for col in [c for c in watcher_messages.columns if 'timestamp' in c]:
        watcher_messages[col] = Time(watcher_messages[col], format='unix_tai').utc.datetime
    # Join on reason to consolidate messages, then join on timestampAcknowledged and timestampSeverityOldest
    watcher_messages = watcher_messages.groupby(['reason', 'timestampAcknowledged']).first() 
    watcher_messages.reset_index(drop=False, inplace=True)
    # Join watcher messages based on timestampSeverityOldest too, maybe
    watcher_messages = watcher_messages.groupby('timestampSeverityOldest').first()
    watcher_messages.reset_index(drop=False, inplace=True)
    watcher_messages.index = watcher_messages['timestampSeverityOldest'].copy()
    watcher_messages.index.names = [None]
    watcher_messages.index = watcher_messages.index.tz_localize("UTC")
    # And since the timestampSeverityOldest can be different while the future 
    # Rename some columns for merge with errors 
    watcher_messages.rename({'reason': 'message_text', 'escalateTo': 'component',  'acknowledgedBy': 'origin', 'severity': 'error_code'}, axis=1, inplace=True)
    watcher_messages['salIndex'] = 4
    watcher_messages['error_code'] = 0
    watcher_messages['finalStatus'] = "ALARM"
    print(f"Found {len(watcher_messages)} watcher messages")
    return watcher_messages.sort_index()


def get_narrative_log(t_start: Time, t_end: Time, narrative_log_endpoint: str) -> pd.DataFrame:
    """Get the narrative log entries."""    
    log_limit = 50000
    params = {"is_human" : "either",
              "is_valid" : "true",
              "has_date_begin" : True,
              "min_date_begin" : t_start.to_datetime(),
              "max_date_begin" : t_end.to_datetime(),
              "order_by" : "date_begin",
              "limit": log_limit, 
             }
    messages = query_logging_services(narrative_log_endpoint, params)
    # Modify narrative log content to match dataframes from errors and watcher topics better.
    # Strip out repeated \n\n and \r\n characters for nicer printing in dataframe.
    if len(messages) > 0:
        def strip_rns(x):
            return x.message_text.replace("\r\n", "\n").replace("\n\n", "\n").rstrip("\n")
        def make_time(x, column):
            return Time(x[column], format='isot', scale='tai').utc.datetime
        def clarify_log(x):
            if x.components is None:
                component = "Log"
            else:
                component = "Log " + " ".join(x.components)
            return component
        # Strip excessive \r\n and \n\n from messages
        messages['message_text'] = messages.apply(strip_rns, axis=1)
        # Add a time index
        messages['time'] = messages.apply(make_time, args=["date_begin"], axis=1)
        messages.set_index('time', inplace=True)
        messages.index = messages.index.tz_localize("UTC")
        # Join the components and add "Log" explicitly
        messages['component'] = messages.apply(clarify_log, axis=1)
        # rename some columns to match error data
        messages.rename({'time_lost_type': 'error_code', 'user_id': 'origin'}, axis=1, inplace=True)
        # Add a salindex so we can color-code based on this as a "source"
        messages['salIndex'] = 0
        messages['error_code'] = 0
        messages['finalStatus'] = "Log"
        messages['timestampProcessStart'] = messages.apply(make_time, args=["date_begin"], axis=1)
        messages['timestampRunStart'] = messages.apply(make_time, args=["date_added"], axis=1)
        messages['timestampProcessEnd'] = messages.apply(make_time, args=["date_end"], axis=1)
    print(f"Found {len(messages)} messages in the narrative log")
    if len(messages) == log_limit:
        print(f"Whoops, likely lost some log messages due to limit of {log_limit}.")
    return messages

In [None]:
async def get_narrative_and_errs(t_start: Time, t_end: Time, efd_client: EfdClient, narrative_log_endpoint: str | None, include_watcher : bool = True) -> pd.DataFrame:
    """Get narrative, errorCode and (possibly) watcher alarms."""

    if narrative_log_endpoint is not None:
        messages = get_narrative_log(t_start, t_end, narrative_log_endpoint)
    else:
        messages = pd.DataFrame([])
        
    errs = await get_error_codes(t_start, t_end, efd_client)
    if include_watcher:
        watcher = await get_watcher_alarms(t_start,  t_end, efd_client)
    else:
        # Maybe we'll get some of the messages, for start of the night state, for now
        # watcher = await get_watcher_alarms(t_start,  t_end, efd_client)
        # watcher = watcher.query('message_text.str.len() > 100')
        watcher = pd.DataFrame([])
        print(f"Kept {len(watcher)} watcher messages")
        
    # Merge narrative log messages and error messages    
    narrative_and_errs = pd.concat([errs, watcher, messages]).sort_index()
    ncols = ['component', 'origin', 'message_text', 'error_code', 'salIndex', 'timestampSeverityOldest', 'timestampAcknowledged', 'timestampMaxSeverity']
    return narrative_and_errs

In [None]:
async def get_exposure_info(t_start: Time, t_end: Time, efd_client: EfdClient, exposure_log_endpoint: str | None) -> pd.DataFrame:
    """Get exposure information from lsst.sal.CCCamera.logevent_endOfImageTelemetry
    and join it with exposure log information. 
    """
    # Find exposure information
    topic = 'lsst.sal.CCCamera.logevent_endOfImageTelemetry' 
    fields = ['imageName', 'imageIndex', 'exposureTime', 'darkTime', 'measuredShutterOpenTime', 'additionalValues', 'timestampAcquisitionStart', 'timestampDateEnd', 'timestampDateObs']
    image_acquisition = await efd_client.select_time_series(topic, fields, t_start, t_end)
    for col in [c for c in image_acquisition.columns if c.startswith("timestamp")]:
        image_acquisition[col] = Time(image_acquisition[col], format='unix_tai').utc.datetime
    image_acquisition['salIndex'] = -1
    image_acquisition['script_salIndex'] = 0
    image_acquisition['finalStatus'] = "Image Acquired"
    def make_config_col_for_image(x):
        return f"exp {x.exposureTime} // dark {x.darkTime} // open {x.measuredShutterOpenTime} "
    image_acquisition['config'] = image_acquisition.apply(make_config_col_for_image, axis=1)
    image_acquisition.index = image_acquisition['timestampAcquisitionStart'].copy()
    image_acquisition.index = image_acquisition.index.tz_localize("UTC")
    print(f"Found {len(image_acquisition)} image times")

    # Now get exposure log information if exposure_log_endpoint defined.
    if exposure_log_endpoint is not None:
        log_limit = 50000
        # A cheap conversion to dayobs int
        min_dayobs_int = int(t_start.iso[0:10].replace('-', ''))
        max_dayobs_int = int(t_end.iso[0:10].replace('-', ''))
        params = {"is_human" : "either",
                  "is_valid" : "true",
                  "min_day_obs" : min_dayobs_int,
                  "max_day_obs" : max_dayobs_int,
                  "limit": log_limit, 
                 }
        
        exp_logs = query_logging_services(exposure_log_endpoint, params)
        print(f"Found {len(exp_logs)} messages in the exposure log")
        
        # Modify exposure log and match with exposures to add time tag.
        if len(exp_logs) > 0:
            # Find a time to add the exposure logs into the records (next to the image).
            exp = pd.merge(image_acquisition, exp_logs, how='right', left_on='imageName', right_on='obs_id')
            # Set the time for the exposure log just slightly after the image start time
            exp_log_image_time = exp['timestampAcquisitionStart'] + EPS_TIME
            exp_logs['img_time'] = exp_log_image_time
            exp_logs.set_index('img_time', inplace=True)
            exp_logs.index = exp_logs.index.tz_localize("UTC")
            exp_logs['salIndex'] = 0
            exp_logs['script_salIndex'] = 0
            # Rename some columns in the exposure log so that we can consolidate them here
            exp_logs.rename({'obs_id': 'imageName', 'user_id': 'config', 'message_text': 'additionalValues', 'exposure_flag': 'finalStatus'}, axis=1, inplace=True)
            image_acquisition = pd.concat([image_acquisition, exp_logs]).sort_index()
            print("Joined exposure and exposure log")
    return image_acquisition

In [None]:
# from lsst.summit.utils import ConsDbClient
# # Not sure of summit consdb access, just use USDF for now
# os.environ["LSST_CONSDB_PQ_URL"] = "http://consdb-pq.consdb:8080/consdb"
# os.environ["no_proxy"] += ",.consdb"

# day_obs_int_min = int(day_obs_min.replace('-', ''))
# day_obs_int_max = int(day_obs_max.replace('-', ''))

# # Use the ConsDB Client, and add a couple of tries 
# consdb = ConsDbClient()

# instrument = 'lsstcomcam'
# visit_query = f'''
#     SELECT * 
#     FROM cdb_{instrument}.visit1
#      WHERE day_obs >= {day_obs_int_min}
#      and day_obs  <= {day_obs_int_max}
# '''
# try:
#     visits = consdb.query(visit_query).to_pandas()
# except requests.HTTPError or requests.JSONDecodeError:
#     # Try twice
#     visits = consdb.query(visit_query).to_pandas()
    
# visits.set_index('visit_id', inplace=True)
# (Time(visits.query('exposure_name == "CC_O_20241115_000347"')['exp_midpt'].values[0], format='isot', scale='tai') - TimeDelta(15 * u.second)).utc.iso

In [None]:
async def get_consolidated_messages(t_start: Time, t_end: Time, include_watcher: bool = False) -> pd.DataFrame:
    """Get consolidated messages from EFD ScriptQueue, errorCodes, CCCamera, exposure and narrative logs.

    Parameters
    ----------
    t_start : `astropy.Time`
        Time of the start of the messages.
    t_end : `astropy.Time`
        Time of the end of the messages.
    include_watcher : `bool`
        Include messages from Watcher.logevent_alarms?

    Returns
    -------
    efd_and_messages : `pd.DataFrame`
    """
        
    endpoints = get_clients()
    print(endpoints)

    # Now rename columns so we can put these all into the same dataframe
    # goal columns : 
    cols = ['time', 'name', 'description', 'config', 'script_salIndex', 'salIndex', 'finalStatus', 'timestampProcessStart', 'timestampConfigureEnd', 'timestampRunStart', 'timestampProcessEnd'] 
    
    # columns from scripts
    script_status = await get_script_status(t_start, t_end, endpoints['efd'])
    # script_cols = ['classname', 'description', 'config', 'script_salIndex', 'salIndex', 'blockId', 'finalScriptState', 'scriptState', 'timestampProcessStart', 'timestampConfigureEnd', 'timestampRunStart', 'timestampProcessEnd']
    scheduler_configs = await get_scheduler_configs(t_start, t_end, endpoints['efd'], endpoints['obsenv'])
    script_status = pd.concat([scheduler_configs, script_status])
    script_status.rename({'classname': 'name', 'finalScriptState': 'finalStatus'}, axis=1, inplace=True)
    
    # columns from narrative and errors
    narrative_and_errs = await get_narrative_and_errs(t_start, t_end, endpoints['efd'], endpoints['narrative_log'], include_watcher=include_watcher)
    # narrative_cols = ['component', 'origin', 'message_text', 'error_code', 'salIndex']
    # if include_watcher:
    #     narrative_cols = narrative_cols + ['timestampSeverityOldest', 'timestampAcknowledged', 'timestampMaxSeverity']
    narrative_and_errs.rename({'component': 'name', 'origin': 'config', 'message_text': 'description', 'error_code': 'script_salIndex'}, axis=1, inplace=True)
    if 'timestampSeverityOldest' in narrative_and_errs.columns:
        narrative_and_errs.rename({'timestampSeverityOldest': 'timestampProcessStart', 'timestampAcknowledged': 'timestampConfigureEnd', 'timestampMaxSeverity': 'timestampRunStart'}, axis=1, inplace=True)
    
    # columns from images_and_logs
    image_and_logs = await get_exposure_info(t_start, t_end, endpoints['efd'], endpoints['exposure_log'])
    # image_cols = ['imageName', 'additionalValues', 'config', 'finalStatus', 'script_salIndex', 'salIndex', 'timestampAcquisitionStart', 'timestampDateObs', 'timestampDateEnd']
    image_and_logs.rename({'imageName': 'name', 'additionalValues' : 'description', 
                           'timestampAcquisitionStart': 'timestampProcessStart', 'timestampDateObs': 'timestampRunStart', 'timestampDateEnd': 'timestampProcessEnd'}, axis=1, inplace=True) 
    
    efd_and_messages = pd.concat([script_status, narrative_and_errs, image_and_logs]).sort_index()
    # Wrap description, which can may have long zero-space messages in the errors
    efd_and_messages['description'] = efd_and_messages['description'].str.wrap(100)
    # use an integer index, which makes it easier to pull up values plus avoids occasional failures of time uniqueness
    efd_and_messages.reset_index(drop=False, inplace=True)
    efd_and_messages.rename({'index': 'time'}, axis=1, inplace=True)

    print(f"Total combined messages {len(efd_and_messages)}")
    return efd_and_messages, cols

# Add a custom formatter to handle YAML-like strings with dynamic background colors
def format_config_as_yaml_with_colors(row):
    config_value = row['config']
    sal_index = row['salIndex']
    script_salindex = row['script_salIndex']
    
    # Define background colors based on salIndex
    background_colors = {
        1: '#b4c546',  # Simonyi queue
        2: '#bab980',  # Aux Tel queue
        3: '#b2baad',  # OCS queue
    }
    # Default color if salIndex doesn't match any condition
    background_color = background_colors.get(sal_index, '#f9f9f9')
    
    if script_salindex > 0 and sal_index in [1,2,3] and isinstance(config_value, str):
        try:
            # Parse the YAML-like string
            parsed_yaml = yaml.safe_load(config_value)
            # Format back to YAML with proper indentation
            formatted_yaml = yaml.dump(parsed_yaml, default_flow_style=False)
            return (
                f"<pre style='background: {background_color}; padding: 10px; border: 1px solid #ddd; margin: 0;'>"
                f"{formatted_yaml}</pre>"
            )
        except yaml.YAMLError:
            # If parsing fails, return as plain text in a styled <pre> block
            return (
                f"<pre style='background: {background_color}; padding: 10px; border: 1px solid #ddd; margin: 0;'>"
                f"{config_value}</pre>"
            )
    else:
        return config_value  # Return as-is if salIndex is 0 or invalid type
    

def pretty_print_messages(efd_and_messages: pd.DataFrame, cols: list, time_order: str) -> None:
    
    def highlight_salindex(s):
        # Colors from https://medialab.github.io/iwanthue/
        if s.salIndex == 0:     # narrative log
            return ['background-color: #cf7ddc'] * len(s)
        elif s.salIndex ==  4:  # error messages
            return ['background-color: #9cb5d5'] * len(s)
        elif s.salIndex == 1:   # simonyi queue
            return ['background-color: #b4c546'] * len(s)
        elif s.salIndex == 2:   # aux tel queue
            return ['background-color: #bab980'] * len(s)
        elif s.salIndex == 3:   # ocs queue
            return ['background-color: #b2baad'] * len(s)
        #elif s.salIndex == -1:  # image
        #    return ['background-color: #b6ecf5'] * len(s)
        else:
            return [''] * len(s)
    
    print("Color coding by salIndex (1/2/3 scriptqueue index) + data source (narrative or exposure log, or EFD logevent_errorCode messages)")
    print('')
    
    if time_order == "newest first": 
        efd_and_messages = efd_and_messages[::-1]

    # Apply yaml-like formatting conditionally
    efd_and_messages['config'] = efd_and_messages.apply(format_config_as_yaml_with_colors, axis=1)
    
    # Adjust the display call to include the formatted column
    styled_table = (
        efd_and_messages[cols]
        .style.apply(highlight_salindex, axis=1)  # Preserve color formatting for other columns
        .set_table_styles([dict(selector='th', props=[('text-align', 'left')])])
        .set_properties(**{'text-align': 'left'})
    )
    
    # Render with HTML
    display(HTML(styled_table.format().to_html()))

    return 

In [None]:
# Set a range of times to search, based on dayobs
if day_obs_min == "Today":
    # Shift the 12hour offset following the definition of day_obs in https://sitcomtn-032.lsst.io/    
    # Drop the hours, minutes, seconds to get the ISO formatted day_obs
    day_obs_min = Time(np.floor(Time.now().mjd - 0.5), format='mjd', scale='utc').iso[0:10]

if day_obs_min == "Yesterday":
    # Shift the 12hour offset following the definition of day_obs in https://sitcomtn-032.lsst.io/
    # Drop the hours, minutes, seconds to get the ISO fromatted day_obs
    day_obs_min = (Time(np.floor(Time.now().mjd - 0.5), format='mjd', scale='utc') - TimeDelta(1, format='jd')).iso[0:10]


# Set a range of times to search, based on dayobs
if day_obs_max == "Today":
    # Shift the 12hour offset following the definition of day_obs in https://sitcomtn-032.lsst.io/    
    # Drop the hours, minutes, seconds to get the ISO formatted day_obs
    day_obs_max = Time(np.floor(Time.now().mjd - 0.5), format='mjd', scale='utc').iso[0:10]

if day_obs_max == "Yesterday":
    # Shift the 12hour offset following the definition of day_obs in https://sitcomtn-032.lsst.io/
    # Drop the hours, minutes, seconds to get the ISO fromatted day_obs
    day_obs_max = (Time(np.floor(Time.now().mjd - 0.5), format='mjd', scale='utc') - TimeDelta(1, format='jd')).iso[0:10]

try:
    t_start = Time(f"{day_obs_min}T12:00:00", format='isot', scale='utc')
except ValueError:
    print(f"Is day_obs_min the right format? {day_obs_min} should be YYYY-MM-DD")
    t_start = None
try:
    t_end = Time(f"{day_obs_max}T12:00:00", format='isot', scale='utc') + TimeDelta(1, format='jd')
except ValueError:
    print(f"Is day_obs_max the right format? {day_obs_max} should be YYYY-MM-DD")
    t_start = None

if t_start is None or t_end is None:
    print("Did not get valid inputs for time period.")


print(f"Querying for messages from {t_start.iso} to {t_end.iso}")
print(f"Notebook executed at {Time.now().utc.iso}")
efd_and_messages, cols = await get_consolidated_messages(t_start, t_end)

# Could add these to parameters
save_log = False
make_link = False

if save_log:
    log_filename = f"log_{day_obs_min}_{day_obs_max}.h5"
    # We will always get a performance warning here, because the dataframe includes string objects
    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        efd_and_messages[cols].to_hdf(log_filename, key='messages')
        print(f"Wrote to {log_filename}")
if make_link:
    import base64
    html_table = efd_and_messages[cols].to_xml(index=False)
    b64 = base64.b64encode(html_table.encode())
    payload = b64.decode()
    log_xml =  f"log_{day_obs_min}_{day_obs_max}.xml"
    html_link = f'<a download="{log_xml}" href="data:text/csv;base64,{payload}" target="_blank">Download XML table of log messages</a>'
    display(HTML(html_link))
    print(" read download with pandas.read_xml, convert times using .astype('datetime64[ns]')")
        
pretty_print_messages(efd_and_messages, cols, time_order)