From c85b0192c2639c4b9ae1770c447818768154a0d0 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Fri, 15 Dec 2023 12:47:09 -0800 Subject: [PATCH 1/2] Use find_dataset instead of queryDatasetAssociations to filter calibs. queryDatasetAssociations is extremely slow (the loop over dataset types takes a minute to complete), so its use makes filtering a bottleneck. Use find_dataset instead to select by timestamp. We cannot yet use find_dataset to search for calibs directly, because it can only be run on one dataset type at a time and we don't yet have the sophistication to organically determine which types the pipelines need. --- python/activator/middleware_interface.py | 38 +++++++++++------------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index b27ecdcc..d356af73 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -37,7 +37,7 @@ import lsst.afw.cameraGeom import lsst.ctrl.mpexec from lsst.ctrl.mpexec import SeparablePipelineExecutor, SingleQuantumExecutor, MPGraphExecutor -from lsst.daf.butler import Butler, CollectionType +from lsst.daf.butler import Butler, CollectionType, Timespan import lsst.dax.apdb import lsst.geom from lsst.meas.algorithms.htmIndexer import HtmIndexer @@ -1188,28 +1188,24 @@ def _filter_calibs_by_date(butler: Butler, Returns ------- filtered_calibs : iterable [`lsst.daf.butler.DatasetRef`] - The subset of ``unfiltered_calibs`` that is valid on ``date``. + The datasets in ``unfiltered_calibs`` that are valid on ``date``. Not + guaranteed to be the same `~lsst.daf.butler.DatasetRef` objects passesd + to ``unfiltered_calibs``, but guaranteed to be fully expanded. """ - dataset_types = {ref.datasetType for ref in unfiltered_calibs} - associations = {} - for dataset_type in dataset_types: - associations.update( - (a.ref, a) for a in butler.registry.queryDatasetAssociations( - dataset_type, collections, collectionTypes={CollectionType.CALIBRATION}, flattenChains=True - ) - ) - - t = astropy.time.Time(date, scale='utc') + t = Timespan.fromInstant(astropy.time.Time(date, scale='utc')) _log_trace.debug("Looking up calibs for %s in %s.", t, collections) - # DatasetAssociation.timespan guaranteed not None - filtered_calibs = [] + filtered_calibs = set() for ref in unfiltered_calibs: - if ref in associations: - if associations[ref].timespan.contains(t): - filtered_calibs.append(ref) - _log_trace.debug("%s (valid over %s) matches %s.", ref, associations[ref].timespan, t) - else: - _log_trace.debug("%s (valid over %s) does not match %s.", ref, associations[ref].timespan, t) + # Use find_dataset to simultaneously filter by validity and chain order + found_ref = butler.find_dataset(ref.datasetType, + ref.dataId, + collections=collections, + timespan=t, + dimension_records=True, + ) + if found_ref: + filtered_calibs.add(found_ref) + _log_trace.debug("%s matches %s.", ref, t) else: - _log_trace.debug("No calib associations for %s.", ref) + _log_trace.debug("%s does not match %s.", ref, t) return filtered_calibs From 4d1d68848b295cb9f8e915f5924a818810f6b609 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Fri, 15 Dec 2023 13:48:49 -0800 Subject: [PATCH 2/2] Optimize calls to find_dataset. The unfiltered calibs has many sets of calibs with the same type and data ID, but different run and validity range. Calls to find_dataset on each such calib return the same result. Do the filtering locally so that we call find_dataset only once per each filtered calib. --- python/activator/middleware_interface.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index d356af73..e228f621 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -1192,20 +1192,20 @@ def _filter_calibs_by_date(butler: Butler, guaranteed to be the same `~lsst.daf.butler.DatasetRef` objects passesd to ``unfiltered_calibs``, but guaranteed to be fully expanded. """ + # Unfiltered_calibs can have up to one copy of each calib per certify cycle. + # Minimize redundant queries to find_dataset. + unique_ids = {(ref.datasetType, ref.dataId) for ref in unfiltered_calibs} t = Timespan.fromInstant(astropy.time.Time(date, scale='utc')) _log_trace.debug("Looking up calibs for %s in %s.", t, collections) - filtered_calibs = set() - for ref in unfiltered_calibs: + filtered_calibs = [] + for dataset_type, data_id in unique_ids: # Use find_dataset to simultaneously filter by validity and chain order - found_ref = butler.find_dataset(ref.datasetType, - ref.dataId, + found_ref = butler.find_dataset(dataset_type, + data_id, collections=collections, timespan=t, dimension_records=True, ) if found_ref: - filtered_calibs.add(found_ref) - _log_trace.debug("%s matches %s.", ref, t) - else: - _log_trace.debug("%s does not match %s.", ref, t) + filtered_calibs.append(found_ref) return filtered_calibs