# Extracting metrics and plots from Operations Rehearsal 3 (OR3) repositories

Last verified to run: 23 Apr 2024

LSST Science Pipelines version: weekly 2024_14

Contact authors: Jeff Carlin, Peter Ferguson


In [None]:
from lsst.daf.butler import Butler

from astropy.table import Table
import copy
import numpy as np
import pandas as pd
import re
import tqdm
pd.set_option("display.max_columns", None)

In [None]:
def get_config(metric_bundle_name, task_frame, butler):
    """Get the configuration corresponding to a MetricMeasurementBundle

    Parameters
    ----------
    metric_bundle_name: `string`
        Name of the metric bundle to get config for
    task_frame: `DataFrame`
        DataFrame consisting of "task_names", "task_type", "pipelines_names", and "pipelines_url"
    butler: `lsst.daf.Butler`
        Butler initialized for the repo/collection of interest
    """
    # Define the dataIds here rather than requiring them to be passed to this function:
    did_tract = {'tract': 9813, 'skymap': 'ops_rehersal_prep_2k_v1', 'band': 'g', 'instrument': 'LSSTComCamSim'}
    did_visit_det = {'visit': 7024040300001, 'skymap': 'ops_rehersal_prep_2k_v1', 'instrument': 'LSSTComCamSim', 'detector': 0}

    # Extract the task name by string manipulation
    task_name = str.split(metric_bundle_name, 'Metric')[0]
    # Extract the pipeline name from the task_frame dict
    pipeline_name = task_frame[task_frame.name == task_name].pipelines_name
    config_name = pipeline_name.values[0]+'_config'

    task_type = task_frame[task_frame.name == task_name].type

    if 'tract' in task_type:
        uri = butler.getURI(config_name, dataId=did_tract) # , collections=collection_nv)
    else:
        uri = butler.getURI(config_name, dataId=did_visit_det) # , collections=collection_nv)
    config_string = uri.read().decode('utf-8')

    return config_string

In [None]:
def print_metrics(meas_bundle):
    """Extract metrics from an input metric bundle and print them to the screen.

    Parameters
    ----------
    meas_bundle: `MetricsMeasurementBundle`
        metric bundle from the Butler repo
    """
    for key in meas_bundle.data.keys():
        print(key, '\n')
        metrics_tmp = [m for m in meas_bundle[key]]
        for met in metrics_tmp:
            print(met)
        print('\n')


def extract_metrics(meas_bundle):
    """Extract metrics from an input metric bundle and return as lists

    Parameters
    ----------
    meas_bundle: `MetricsMeasurementBundle`
        metric bundle from the Butler repo

    Returns
    -------
    names: `list` of `strings`
        Metric names
    values: `list` of `floats`
        Measured metric values
    units: `list` of `units`
        Units associated with each metric
    """
    names = []
    values = []
    units = []
    for key in meas_bundle.data.keys():
        for m in meas_bundle[key]:
            names.append(key+'_'+m.metric_name.metric)
            values.append(m.quantity.value)
            units.append(m.quantity.unit)
    return names, values, units


def extract_metrics_table(meas_bundle):
    """Extract metrics from an input metric bundle and return as a Table

    Parameters
    ----------
    meas_bundle: `MetricsMeasurementBundle`
        metric bundle from the Butler repo

    Returns
    -------
    tab: `Astropy Table`
        Table with columns of "metric", "value", and "unit" for all metrics
        from the input bundle
    """
    names = []
    values = []
    units = []
    for key in meas_bundle.data.keys():
        for m in meas_bundle[key]:
            names.append(key+'_'+m.metric_name.metric)
            values.append(m.quantity.value)
            units.append(m.quantity.unit)
    tab = Table([names, values, units], names=['metric', 'value', 'unit'])
    return tab

In [None]:
def get_docstrings_for_task(atools_pipeline_config, bundles_list):
    """Extract the docstrings for a task associated with the input bundles

    Parameters
    ----------
    atools_pipeline_config: `config`
        Pipeline config extracted from the Butler. (Could be extracted using "get_config")
    bundles_list: `list`
        A list of the MetricMeasurementBundles created by the task of interest

    Returns
    -------
    docstrings_dict: `dict`
        Dict containing the docstrings for each bundle, keyed on the bundle name
    """
    docstring_dict = {}
    for bun in bundles_list:
        # Tasks are always specified in pipelines by assigning them via something like
        # "atools.stellarPhotometricRepeatability: StellarPhotometricRepeatability"
        # will appear in the config as bundle=task; for example:
        # "config.atools.stellarPhotometricRepeatability=lsst.analysis.tools.atools.StellarPhotometricRepeatability"
        # where the bundle will be "stellarPhotometricRepeatability", and the task is
        # prepended with the path to the analysis_tools, "lsst.analysis.tools.atools".
        sub_str = 'config.atools.'+bun+'=lsst.analysis.tools.atools'

        # Find all instances of the substring, which should be at the start of the definition of any atool:
        res = [i.start() for i in re.finditer(sub_str, atools_pipeline_config)]

        for r in res:
            # Process the string to extract the bundle name, module, and task:
            full_string = atools_pipeline_config[r: r+150]
            split0 = str.split(full_string, 'config.atools.')
            bundle_name = str.split(split0[1], '=')[0]
            split1 = str.split(split0[1], sub_str)
            split2 = str.split(split1[0], '=')
            last_period = split2[1].rfind('.')
            module = split2[1][:last_period]
            task = split2[1][last_period+1:][:-3]

            # Import the module from the task and extract the docstring
            task_dict = {}
            exec(f"from {module} import {task} as tmp_task", task_dict)
            # print(task_dict['tmp_task'].__name__)
            doc = task_dict['tmp_task'].__doc__
            if doc is not None:
                doc0 = task_dict['tmp_task'].__doc__.replace('\n', '')
                doc1 = doc0.replace('  ', ' ')
                doc2 = doc1.replace('  ', ' ')
            else:
                doc2 = doc

            # Link to w_2024_14 version of the docs on github:
            mod_new = module.replace('.', '/')
            path = 'https://github.com/lsst/analysis_tools/blob/w.2024.14/python/'+mod_new+'.py'
            info = {'docstring': doc2, 'path': path}
            docstring_dict[bundle_name] = info

    return docstring_dict

In [None]:
def print_metric_names_markdown_table(metric_bundle, metric_bundle_name, task_frame, butler, nodocs=False):
    """Function to extract related info for MetricMeasurementBundles and print a table formatted for markdown.

    Parameters:
    -----------
    metric_bundle: `MetricMeasurementBundle`
        metric bundle from the Butler repo
    metric_bundle_name: `string`
        name of the metric bundle (usually "TaskName"+"Metrics")
    task_frame: `DataFrame`
        DataFrame consisting of "task_names", "task_type", "pipelines_names", and "pipelines_url"
    butler: `lsst.daf.Butler`
        Butler initialized for the repo/collection of interest
    nodocs: `boolean` (default: False)
        Set to True to turn off looking up of docstrings (useful in the case where it is known
        that there are no docstrings)
    """
    metrics_extract = extract_metrics(metric_bundle)
    metric_names = metrics_extract[0][:]

    bundles = []
    mets = []

    for met in metric_names:
        bundle_name = met[:met.index("_")]
        bundles.append(bundle_name)
        metric_name = met[met.index("_")+1:]
        mets.append(metric_name)

    bundles = np.array(bundles)
    mets = np.array(mets)

    # Extract a list of unique bundle names
    uniq_bundles = np.unique(bundles)

    if nodocs is False:
        # Retrieve the config for the task:
        cfg = get_config(metric_bundle_name, task_frame, butler)
        # Use the config to get docstrings for each bundle:
        docstrings_dict = get_docstrings_for_task(cfg, uniq_bundles.tolist())

    # Print a table formatted for a Markdown document (e.g., a TechNote)
    for bun in uniq_bundles:
        if nodocs is False:
            link = f"([source]({docstrings_dict[bun]['path']}))"
            print(f'\nmetricBundle: **{bun}** {link}\n')
            if docstrings_dict[bun]['docstring'] is None:
                print('_Docstring:_ No docstring\n')
            else:
                print('_Docstring:_ '+docstrings_dict[bun]['docstring']+'\n')
        print('| metric name | value | units |\n| ---  |--- |--- |')
        inbundle = np.where(bundles == bun)
        for i in inbundle[0]:
            if metrics_extract[2][i] == 'ct':
                print(f'| {mets[i]} | {metrics_extract[1][i]:.1f} | {metrics_extract[2][i]} |')
            else:
                print(f'| {mets[i]} | {metrics_extract[1][i]:.6f} | {metrics_extract[2][i]} |')

## Metrics from the nightly validation run:

In [None]:
repo_nv = '/repo/embargo'

# Include one night of the "nightly validation" collections, the "defaults" (for calibrations, etc.),
# and (optionally) the "quicklook" collection:
collections_nv = ['LSSTComCamSim/runs/nightlyvalidation/20240403/d_2024_03_29/DM-43612', 'LSSTComCamSim/defaults'] # , 'LSSTComCamSim/quickLook/24']

butler_nv = Butler(repo_nv, collections=collections_nv)
registry_nv = butler_nv.registry

In [None]:
# Determine which dataset types with storage class "MetricMeasurementBundle" exist in the collection
metrics_datasets = []

for datasetType in registry_nv.queryDatasetTypes():
    if registry_nv.queryDatasets(datasetType, collections=collections_nv).any(execute=False, exact=False):
        if datasetType.storageClass_name == 'MetricMeasurementBundle':
            print(datasetType)
            metrics_datasets.append(datasetType)

### Make a DataFrame to map task names, types, URLs, and pipeline names.

(Ideally this could be done programmatically in the future, but for now it is manually created.)

In [None]:
task_names = [
    'calexpSummary',
    'diaSourceTableTract',
    'matchedVisitCore',
    'objectTableColumnValidate',
    'objectTableCore',
    'objectTableExtended',
    'preSourceTableCore',
    'propertyMapTract',
    'sourceTable_visit_gaia_dr3_20230707_match'
]
task_type = [
    "visit",
    "tract",
    "tract",
    "tract",
    "tract",
    "tract",
    "visit",
    "tract",
    "visit",
]
pipelines_names = [
    'calibrate',
    'analyzeDiaSourceTableTract',
    'analyzeMatchedVisitCore',
    'validateObjectTableCore',
    'analyzeObjectTableCore',
    'analyzeObjectTableExtended',
    'analyzePreSourceTableCore',
    'propertyMapTract',
    'sourceTable_visit_gaia_dr3_20230707_match',
]
pipelines_url = [
    'https://github.com/lsst/drp_pipe/blob/w.2024.14/pipelines/LSSTComCamSim/nightly-validation-ops-rehearsal-3.yaml#L18-L26',
    'https://github.com/lsst/analysis_tools/blob/w.2024.14/pipelines/diaTractQualityCore.yaml',
    'https://github.com/lsst/analysis_tools/blob/w.2024.14/pipelines/matchedVisitQualityCore.yaml',
    'https://github.com/lsst/analysis_tools/blob/w.2024.14/pipelines/coaddColumnValidate.yaml',
    'https://github.com/lsst/analysis_tools/blob/w.2024.14/pipelines/coaddQualityCore.yaml',
    'https://github.com/lsst/analysis_tools/blob/w.2024.14/pipelines/coaddQualityExtended.yaml',
    'https://github.com/lsst/drp_pipe/blob/w.2024.14/pipelines/_ingredients/LSSTComCamSim/DRP.yaml#L68-L78',
    # TO DO: put in real links here
    'https://github.com/lsst/analysis_tools/blob/w.2024.14/pipelines/coaddQualityCore.yaml#L90-L123',
    'NA'
]
task_frame = pd.DataFrame({'name': task_names,
                           'type': task_type, 
                           'pipelines_name': pipelines_names,
                           'pipelines_url': pipelines_url})

task_frame

### Create dataIds to use for data lookup

In [None]:
did_tract = {'tract': 9813, 'skymap': 'ops_rehersal_prep_2k_v1', 'band': 'g', 'instrument': 'LSSTComCamSim'}
did_patch = {'tract': 9813, 'skymap': 'ops_rehersal_prep_2k_v1', 'band': 'g', 'instrument': 'LSSTComCamSim', 'patch': 74}
did_visit = {'visit': 7024040300001, 'skymap': 'ops_rehersal_prep_2k_v1', 'instrument': 'LSSTComCamSim'}
did_visit_det = {'visit': 7024040300001, 'skymap': 'ops_rehersal_prep_2k_v1', 'instrument': 'LSSTComCamSim', 'detector': 0}

## Extract the lists of metric names to a table that can be added to a markdown TechNote

In [None]:
for task in task_frame.name:
    print('Extracting info for ', task)

    # Choose the correct dataId depending on the "type" entry in task_frame:
    if (task_frame[task_frame.name == task].type == 'tract').values[0]:
        did = did_tract
    else:
        did = did_visit

    # Wrap in a try/except so it doesn't fail when datasets don't exist
    try:
        # Extract the metric bundle
        metrics = butler_nv.get(task+'_metrics', collections=collections_nv, dataId=did)
        # Bundle names always end with "Metrics"
        metric_bundle_name = task+'Metrics'
        # calexpSummary and preSourceTableCore don't seem to have docs, so don't bother looking (to avoid errors)
        if (task == 'calexpSummary') | (task == 'preSourceTableCore'):
            print_metric_names_markdown_table(metrics, metric_bundle_name, task_frame, butler_nv, nodocs=True)
        else:
            print_metric_names_markdown_table(metrics, metric_bundle_name, task_frame, butler_nv)
    except:
        print('No metric bundles found for ', task)

    print('\n\n------------------------------------\n')

#### To see an example of extracting Astropy tables instead, uncomment and execute the following cell

(Note that "extract_metrics_table" could be replaced with "print_metrics_table" if desired.)

In [None]:
'''
for task in task_frame.name:
    print('\nExtracting info for ', task, '\n')

    # Choose the correct dataId depending on the "type" entry in task_frame:
    if (task_frame[task_frame.name == task].type == 'tract').values[0]:
        did = did_tract
    else:
        did = did_visit_det

    # Wrap in a try/except so it doesn't fail when datasets don't exist
    try:
        # Extract the metric bundle
        metrics = butler_nv.get(task+'_metrics', collections=collections_nv, dataId=did)
        tab = extract_metrics_table(metrics)
        print(tab)
    except:
        print('No metric bundles found for ', task)
'''

## Extract task info and plots

In [None]:
datasets_nv = registry_nv.queryDatasetTypes()
# grab plots in our collections
plot_datasets=[ds for ds in datasets_nv if ds.storageClass_name == 'Plot']
plot_datasets=[ds for ds in plot_datasets if registry_nv.queryDatasets(ds, collections=collections_nv).any(execute=False, exact=False)]

# only use examples with g-band or no band
example_datasets=[]
for ds in plot_datasets:
    exampleBool=True
    for band in ["u","r","i","z","y"]:
        if f"_{band}_" in ds.name:
            exampleBool=False
    if exampleBool:
        example_datasets.append(ds)
        
plot_datasets=example_datasets

metric_datasets=[ds for ds in datasets_nv if ds.storageClass_name == 'MetricMeasurementBundle']
metric_datasets=[ds for ds in metric_datasets if registry_nv.queryDatasets(ds, collections=collections_nv).any(execute=False, exact=False)]

In [None]:
tasks = [ds.name[:ds.name.find('_')] for ds in plot_datasets]
tasks +=[ds.name[:ds.name.rfind('_')] for ds in metric_datasets]
tasks =sorted(np.unique(tasks))
print(tasks)

In [None]:
def write_plot(plot_name,dataId, collections,butler):
    
    try:
        ref = butler.registry.findDataset(plot_name, dataId, collections=collections)
        uri = butler.getURI(ref)
    except:
        return
    image_bytes = uri.read()
    file_name = uri.basename()
    task_name = file_name[:file_name.find('_')]
    if "Plot_" in file_name:
        str_rfind="Plot_"
        offset = 4
    elif "TwoHists_" in file_name:
        str_rfind="TwoHists_"
        offset = 8
    else: 
        raise Exception(f"bad filename {file_name}")
    file_name = file_name[:file_name.rfind(str_rfind)+offset]
    file_name = file_name.lower().replace('_g_','_{band}_').replace("_","-")
    base_directory = '../_static/plots/' + task_name +'/'
    file_path = base_directory + file_name + '.png'
    with open(file_path, 'wb') as file:
        file.write(image_bytes)

In [None]:
task_table_dict={}
single_task_dict={col:[] for col in ["name","original_name","storage_class","link"]}
for i,row in task_frame.iterrows():
    task = row['name']
    os.makedirs('../_static/plots/' + task, exist_ok=True)
    task_table_dict[task]=copy.deepcopy(single_task_dict)
    for ds in metric_datasets:
        if ds.name[:ds.name.rfind('_')] in task:
            task_table_dict[task]["name"].append(ds.name)
            task_table_dict[task]["original_name"].append(ds.name)
            task_table_dict[task]["storage_class"].append(ds.storageClass_name)
            task_table_dict[task]["link"].append(ds.name.lower())
    for ds in plot_datasets:
        if ds.name[:ds.name.find('_')] in task:
            task_table_dict[task]["name"].append(ds.name.replace('_g_','_{band}_'))
            task_table_dict[task]["original_name"].append(ds.name)
            task_table_dict[task]["storage_class"].append(ds.storageClass_name)
            task_table_dict[task]["link"].append(ds.name.lower().replace('_g_','_{band}_').replace("_","-"))

            # I was lazy and did try/except statements to make this work
            write_plot(plot_name=ds.name, 
                       dataId = did_tract,
                       collections=collections_nv,
                       butler=butler_nv)
            
            write_plot(plot_name=ds.name, 
                       dataId = did_visit,
                       collections=collections_nv,
                       butler=butler_nv)
    task_table_dict[task] = pd.DataFrame(task_table_dict[task])


In [None]:
def create_initial_row(task,row):
    if row['storage_class'] == "Plot":
        str = f"| [{task['pipelines_name']}]({task['pipelines_url']})|"
        str +="{ref}`"
        str +=f"{row['link']} <{row['link']}>`| {row['storage_class']}|\n"
    if row['storage_class'] == 'MetricMeasurementBundle':
        str = f"| [{task['pipelines_name']}]({task['pipelines_url']})|"
        str +=f"[{row['original_name']}](#{row['link']})| {row['storage_class']}|\n"
    return str


def create_other_row(row):
    str = "||{ref}`"
    str +=f"{row['link']} <{row['link']}>`| {row['storage_class']}|\n"
    return str


def create_task_tables_markdown(task_frame,task_table_dict):
    header_row =  "Task (link to pipeline yaml) | datasetType | StorageClass |\n|---   |---   |---   |\n"
    skip_row = "|||\n"

    visit_task_string = header_row
    tract_task_string = header_row
    for index, task in task_frame.iterrows():
        #import pdb; pdb.set_trace()
        if task['type'] == 'visit':
            rows_frame = task_table_dict[task['name']]
            for i in range(rows_frame.shape[0]):
                if i==0:
                    visit_task_string += create_initial_row(task,rows_frame.iloc[i])
                else:
                    visit_task_string += create_other_row(rows_frame.iloc[i])
            visit_task_string += skip_row   
        elif task['type'] == 'tract':
            rows_frame = task_table_dict[task['name']]
            for i in range(rows_frame.shape[0]):
                if i==0:
                    tract_task_string += create_initial_row(task,rows_frame.iloc[i])
                else:
                    tract_task_string += create_other_row(rows_frame.iloc[i])
            tract_task_string += skip_row 
        else:
            raise Execption("bad task type")
            
    # visit task table
    print("visit task table")
    print(visit_task_string)
    print("\n\n\n")

    print('tract task table')
    print(tract_task_string)
    print("\n\n\n")


def create_plot_references(task_frame,task_table_dict):
    for index, task in task_frame.iterrows():
        print(task['name'])
        print()
        rows_frame = task_table_dict[task['name']]
        for i in range(rows_frame.shape[0]):
            row = rows_frame.iloc[i]
            if row['storage_class'] == "Plot":
                str ="```{figure} "
                str +=f"/_static/plots/{task['name']}/{row['link'] + '.png'}\n"
                str +=f":name: {row['link']}\n\n"
                str +=f"**Plot Name**: {row['original_name']}\n\n"
                str +=f"Associated metrics: [{task['name']}_metrics](#{task['name'].lower()}_metrics)\n"
                str +="```\n\n"
                print(str)
       # :name: row['link']

In [None]:
create_plot_references(task_frame, task_table_dict)

In [None]:
create_task_tables_markdown(task_frame, task_table_dict)