Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions python/activator/activator.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,9 @@
# However, we don't want MiddlewareInterface to need to know details like where
# the central repo is located, either, so perhaps we need a new module.
central_butler = Butler(calib_repo,
instrument=active_instrument.getName(),
# NOTE: with inferDefaults=True, it's possible we don't need to hardcode this
# value from the real repository.
# skymap="hsc_rings_v1",
collections=[active_instrument.makeCollectionName("defaults")],
writeable=False,
inferDefaults=True)
inferDefaults=False)
repo = f"/tmp/butler-{os.getpid()}"
butler = Butler(Butler.makeRepo(repo), writeable=True)
_log.info("Created local Butler repo at %s.", repo)
Expand Down
28 changes: 19 additions & 9 deletions python/activator/middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,18 @@ def __init__(self, central_butler: Butler, image_bucket: str, instrument: str,
define_visits_config = lsst.obs.base.DefineVisitsConfig()
self.define_visits = lsst.obs.base.DefineVisitsTask(config=define_visits_config, butler=self.butler)

# TODO DM-34098: note that we currently need to supply instrument here.
# HACK: explicit collection gets around the fact that we don't have any
# timestamp/exposure information in a form we can pass to the Butler.
# This code will break once cameras start being versioned.
self.camera = self.central_butler.get(
"camera", instrument=self.instrument.getName(),
collections=self.instrument.makeUnboundedCalibrationRunName()
)
self.skymap = self.central_butler.get("skyMap")
# TODO: is central_butler guaranteed to have only one skymap dimension?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the assert below, I think you don't need this TODO. We're defining the central repo as having exactly one skymap as part of this, and I think that's something that we as the AP group can ensure.

Copy link
Member Author

@kfindeisen kfindeisen Sep 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why the assert makes the TODO irrelevant. The assert is computer-readable but easy for a human to overlook.

I'm not so confident that we will only have one skymap -- I can imagine a lot of experimentation over which skymap gives e.g. the best performance, especially during commissioning, and there might even be a policy some day that different datasets or data sources are based on different skymaps. Ultimately, it's not our call, but K-T's and the commissioning team's.

skymaps = list(self.central_butler.registry.queryDataIds("skymap"))
assert len(skymaps) == 1, "Ambiguous or missing skymap in central repo."
self.skymap_name = skymaps[0]["skymap"]
self.skymap = self.central_butler.get("skyMap", skymap=self.skymap_name)

# How much to pad the refcat region we will copy over.
self.padding = 30*lsst.geom.arcseconds
Expand Down Expand Up @@ -216,7 +219,7 @@ def prep_butler(self, visit: Visit) -> None:
with tempfile.NamedTemporaryFile(mode="w+b", suffix=".yaml") as export_file:
with self.central_butler.export(filename=export_file.name, format="yaml") as export:
self._export_refcats(export, center, radius)
self._export_skymap_and_templates(export, center, detector, wcs)
self._export_skymap_and_templates(export, center, detector, wcs, visit.filter)
self._export_calibs(export, visit.detector, visit.filter)

# CHAINED collections
Expand Down Expand Up @@ -244,9 +247,8 @@ def _export_refcats(self, export, center, radius):
Radius to search for refcat shards in.
"""
indexer = HtmIndexer(depth=7)
shards = indexer.getShardIds(center, radius+self.padding)
# getShardIds returns a tuple, the first item is the ids list.
htm_where = f"htm7 in ({','.join(str(x) for x in shards[0])})"
shard_ids, _ = indexer.getShardIds(center, radius+self.padding)
htm_where = f"htm7 in ({','.join(str(x) for x in shard_ids)})"
# Get shards from all refcats that overlap this detector.
# TODO: `...` doesn't work for this queryDatasets call
# currently, and we can't queryDatasetTypes in just the refcats
Expand All @@ -262,7 +264,7 @@ def _export_refcats(self, export, center, radius):
_log.debug("Found %d new refcat datasets.", len(refcats))
export.saveDatasets(refcats)

def _export_skymap_and_templates(self, export, center, detector, wcs):
def _export_skymap_and_templates(self, export, center, detector, wcs, filter):
"""Export the skymap and templates for this visit from the central
butler.

Expand All @@ -276,11 +278,14 @@ def _export_skymap_and_templates(self, export, center, detector, wcs):
Detector we are loading data for.
wcs : `lsst.afw.geom.SkyWcs`
Rough WCS for the upcoming visit, to help finding patches.
filter : `str`
Physical filter for which to export templates.
"""
# TODO: This exports the whole skymap, but we want to only export the
# subset of the skymap that covers this data.
skymaps = set(_query_missing_datasets(self.central_butler, self.butler,
"skyMap",
skymap=self.skymap_name,
collections=self._COLLECTION_SKYMAP,
findFirst=True))
_log.debug("Found %d new skymap datasets.", len(skymaps))
Expand All @@ -294,7 +299,7 @@ def _export_skymap_and_templates(self, export, center, detector, wcs):
points.append(wcs.pixelToSky(corner))
patches = tract.findPatchList(points)
patches_str = ','.join(str(p.sequential_index) for p in patches)
template_where = f"patch in ({patches_str}) and tract={tract.tract_id}"
template_where = f"patch in ({patches_str}) and tract={tract.tract_id} and physical_filter='{filter}'"
# TODO: do we need to have the coadd name used in the pipeline
# specified as a class kwarg, so that we only load one here?
# TODO: alternately, we need to extract it from the pipeline? (best?)
Expand All @@ -303,6 +308,8 @@ def _export_skymap_and_templates(self, export, center, detector, wcs):
templates = set(_query_missing_datasets(self.central_butler, self.butler,
"*Coadd",
collections=self._COLLECTION_TEMPLATE,
instrument=self.instrument.getName(),
skymap=self.skymap_name,
where=template_where))
_log.debug("Found %d new template datasets.", len(templates))
export.saveDatasets(templates)
Expand All @@ -326,6 +333,7 @@ def _export_calibs(self, export, detector_id, filter):
self.central_butler, self.butler,
...,
collections=self.instrument.makeCalibrationCollectionName(),
instrument=self.instrument.getName(),
where=calib_where))
if calibs:
for dataset_type, n_datasets in self._count_by_type(calibs):
Expand Down Expand Up @@ -485,7 +493,8 @@ def run_pipeline(self, visit: Visit, exposure_ids: set) -> None:
raise RuntimeError("No data to process.") from e

# TODO: can we move this from_pipeline call to prep_butler?
where = f"detector={visit.detector} and exposure in ({','.join(str(x) for x in exposure_ids)})"
where = f"instrument='{visit.instrument}' and detector={visit.detector} " \
f"and exposure in ({','.join(str(x) for x in exposure_ids)})"
executor = SimplePipelineExecutor.from_pipeline(self.pipeline, where=where, butler=self.butler)
if len(executor.quantum_graph) == 0:
# TODO: a good place for a custom exception?
Expand Down Expand Up @@ -515,6 +524,7 @@ def _query_missing_datasets(src_repo: Butler, dest_repo: Butler,
*args, **kwargs
Parameters for describing the dataset query. They have the same
meanings as the parameters of `lsst.daf.butler.Registry.queryDatasets`.
The query must be valid for both ``src_repo`` and ``dest_repo``.

Returns
-------
Expand Down
8 changes: 4 additions & 4 deletions python/activator/visit.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
@dataclass(frozen=True)
class Visit:
# elements must be hashable and JSON-persistable; built-in types recommended
instrument: str
instrument: str # short name
detector: int
group: str
snaps: int
filter: str
group: str # observatory-specific ID; not the same as visit number
snaps: int # number of snaps expected
filter: str # physical filter
# all angles are in degrees
ra: float
dec: float
Expand Down
10 changes: 4 additions & 6 deletions tests/test_middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,11 @@ def setUp(self):
data_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), "data")
central_repo = os.path.join(data_dir, "central_repo")
central_butler = Butler(central_repo,
instrument=instname,
skymap="deepCoadd_skyMap",
collections=[f"{instname}/defaults"],
writeable=False)
writeable=False,
inferDefaults=False)
instrument = "lsst.obs.decam.DarkEnergyCamera"
instrument_name = "DECam"
self.input_data = os.path.join(data_dir, "input_data")
# Have to preserve the tempdir, so that it doesn't get cleaned up.
self.repo = tempfile.TemporaryDirectory()
Expand All @@ -127,7 +127,7 @@ def setUp(self):
dec = -4.950050405424033
# DECam has no rotator; instrument angle is 90 degrees in our system.
rot = 90.
self.next_visit = Visit(instrument,
self.next_visit = Visit(instrument_name,
detector=56,
group=1,
snaps=1,
Expand Down Expand Up @@ -169,7 +169,6 @@ def _check_imports(self, butler, detector, expected_shards):
# Check that the right skymap is in the chained output collection.
self.assertTrue(
butler.datasetExists("skyMap",
# TODO: we shouldn't need skymap here?
skymap="deepCoadd_skyMap",
collections=self.interface.output_collection)
)
Expand Down Expand Up @@ -208,7 +207,6 @@ def _check_imports(self, butler, detector, expected_shards):
butler.registry.refresh()
for patch in (464, 465, 509, 510):
butler.datasetExists('deepCoadd', tract=0, patch=patch, band="g",
# TODO: we shouldn't need skymap here?
skymap="deepCoadd_skyMap",
collections=self.interface.output_collection)

Expand Down