Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bin/prompt_prototype_upload_raws.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,6 @@ gsutil cp "${RAW_DIR}/HSCA05915116.fits" \
gs://${UPLOAD_BUCKET}/HSC/58/2016030700003/0/0059150/HSC-G/HSC-2016030700003-0-0059150-HSC-G-58.fits

gsutil cp "${RAW_DIR}/HSCA05916109.fits" \
gs://${UPLOAD_BUCKET}/HSC/43/2016030700004/0/0059150/HSC-G/HSC-2016030700004-0-0059160-HSC-G-43.fits
gs://${UPLOAD_BUCKET}/HSC/43/2016030700004/0/0059160/HSC-G/HSC-2016030700004-0-0059160-HSC-G-43.fits
gsutil cp "${RAW_DIR}/HSCA05916113.fits" \
gs://${UPLOAD_BUCKET}/HSC/51/2016030700004/0/0059150/HSC-G/HSC-2016030700004-0-0059160-HSC-G-51.fits
gs://${UPLOAD_BUCKET}/HSC/51/2016030700004/0/0059160/HSC-G/HSC-2016030700004-0-0059160-HSC-G-51.fits
32 changes: 17 additions & 15 deletions python/activator/activator.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@
# However, we don't want MiddlewareInterface to need to know details like where
# the central repo is located, either, so perhaps we need a new module.
central_butler = Butler(calib_repo,
# TODO: investigate whether these defaults, esp. skymap, slow down queries
instrument=active_instrument.getName(),
# NOTE: with inferDefaults=True, it's possible we don't need to hardcode this
# value from the real repository.
Expand All @@ -94,6 +93,7 @@
inferDefaults=True)
repo = f"/tmp/butler-{os.getpid()}"
butler = Butler(Butler.makeRepo(repo), writeable=True)
_log.info("Created local Butler repo at %s.", repo)
mwi = MiddlewareInterface(central_butler, image_bucket, config_instrument, butler)


Expand Down Expand Up @@ -184,10 +184,7 @@ def next_visit_handler() -> Tuple[str, int]:
mwi.ingest_image(oid)
expid_set.add(m.group('expid'))

_log.debug(
"Waiting for snaps from group"
f" '{expected_visit.group}' detector {expected_visit.detector}"
)
_log.debug(f"Waiting for snaps from {expected_visit}.")
start = time.time()
while len(expid_set) < expected_visit.snaps:
response = subscriber.pull(
Expand All @@ -198,14 +195,11 @@ def next_visit_handler() -> Tuple[str, int]:
end = time.time()
if len(response.received_messages) == 0:
if end - start < timeout:
_log.debug(
f"Empty pull after {end - start}s"
f" for group '{expected_visit.group}'"
)
_log.debug(f"Empty pull after {end - start}s for {expected_visit}.")
continue
_log.warning(
"Timed out waiting for image in"
f" group '{expected_visit.group}' after receiving exposures {expid_set}"
f"Timed out waiting for image in {expected_visit} "
f"after receiving exposures {expid_set}"
)
break

Expand All @@ -230,10 +224,18 @@ def next_visit_handler() -> Tuple[str, int]:
_log.error(f"Failed to match object id '{oid}'")
subscriber.acknowledge(subscription=subscription.name, ack_ids=ack_list)

# Got all the snaps; run the pipeline
_log.info(f"Running pipeline on group: {expected_visit.group} detector: {expected_visit.detector}")
mwi.run_pipeline(expected_visit, expid_set)
return "Pipeline executed", 200
if expid_set:
# Got at least some snaps; run the pipeline.
# If this is only a partial set, the processed results may still be
# useful for quality purposes.
if len(expid_set) < expected_visit.snaps:
_log.warning(f"Processing {len(expid_set)} snaps, expected {expected_visit.snaps}.")
_log.info(f"Running pipeline on {expected_visit}.")
mwi.run_pipeline(expected_visit, expid_set)
return "Pipeline executed", 200
else:
_log.fatal(f"Timed out waiting for images for {expected_visit}.")
return "Timed out waiting for images", 500
finally:
subscriber.delete_subscription(subscription=subscription.name)

Expand Down
188 changes: 152 additions & 36 deletions python/activator/middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

__all__ = ["MiddlewareInterface"]

import collections.abc
import itertools
import logging
import os
import os.path
Expand Down Expand Up @@ -144,33 +146,75 @@ def _init_ingester(self):
self.rawIngestTask = lsst.obs.base.RawIngestTask(config=config,
butler=self.butler)

def _predict_wcs(self, detector: lsst.afw.cameraGeom.Detector, visit: Visit) -> lsst.afw.geom.SkyWcs:
"""Calculate the expected detector WCS for an incoming observation.

Parameters
----------
detector : `lsst.afw.cameraGeom.Detector`
The detector for which to generate a WCS.
visit : `Visit`
Predicted observation metadata for the detector.

Returns
-------
wcs : `lsst.afw.geom.SkyWcs`
An approximate WCS for ``visit``.
"""
boresight_center = lsst.geom.SpherePoint(visit.ra, visit.dec, lsst.geom.degrees)
orientation = lsst.geom.Angle(visit.rot, lsst.geom.degrees)
flip_x = True if self.instrument.getName() == "DECam" else False
return lsst.obs.base.createInitialSkyWcsFromBoresight(boresight_center,
orientation,
detector,
flipX=flip_x)

def _detector_bounding_circle(self, detector: lsst.afw.cameraGeom.Detector,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another good one to have refactored out, but I wonder if it shouldn't live in afw? Maybe in afw.geom.wcsUtils.py?

wcs: lsst.afw.geom.SkyWcs
) -> (lsst.geom.SpherePoint, lsst.geom.Angle):
# Could return a sphgeom.Circle, but that would require a lot of
# sphgeom->geom conversions downstream. Even their Angles are different!
"""Compute a small sky circle that contains the detector.

Parameters
----------
detector : `lsst.afw.cameraGeom.Detector`
The detector for which to compute an on-sky bounding circle.
wcs : `lsst.afw.geom.SkyWcs`
The conversion from detector to sky coordinates.

Returns
-------
center : `lsst.geom.SpherePoint`
The center of the bounding circle.
radius : `lsst.geom.Angle`
The opening angle of the bounding circle.
"""
radii = []
center = wcs.pixelToSky(detector.getCenter(lsst.afw.cameraGeom.PIXELS))
for corner in detector.getCorners(lsst.afw.cameraGeom.PIXELS):
radii.append(wcs.pixelToSky(corner).separation(center))
return center, max(radii)

def prep_butler(self, visit: Visit) -> None:
"""Prepare a temporary butler repo for processing the incoming data.

Parameters
----------
visit : Visit
visit : `Visit`
Group of snaps from one detector to prepare the butler for.
"""
_log.info(f"Preparing Butler for visit '{visit}'")
_log.info(f"Preparing Butler for visit {visit!r}")

detector = self.camera[visit.detector]
wcs = self._predict_wcs(detector, visit)
center, radius = self._detector_bounding_circle(detector, wcs)

# Need up-to-date census of what's already present.
self.butler.registry.refresh()

with tempfile.NamedTemporaryFile(mode="w+b", suffix=".yaml") as export_file:
with self.central_butler.export(filename=export_file.name, format="yaml") as export:
boresight_center = lsst.geom.SpherePoint(visit.ra, visit.dec, lsst.geom.degrees)
orientation = lsst.geom.Angle(visit.rot, lsst.geom.degrees)
detector = self.camera[visit.detector]
flip_x = True if self.instrument.getName() == "DECam" else False
wcs = lsst.obs.base.createInitialSkyWcsFromBoresight(boresight_center,
orientation,
detector,
flipX=flip_x)
# Compute the maximum sky circle that contains the detector.
radii = []
center = wcs.pixelToSky(detector.getCenter(lsst.afw.cameraGeom.PIXELS))
for corner in detector.getCorners(lsst.afw.cameraGeom.PIXELS):
radii.append(wcs.pixelToSky(corner).separation(center))
radius = max(radii)

self._export_refcats(export, center, radius)
self._export_skymap_and_templates(export, center, detector, wcs)
self._export_calibs(export, visit.detector, visit.filter)
Expand Down Expand Up @@ -209,11 +253,14 @@ def _export_refcats(self, export, center, radius):
# collection, so we have to specify a list here. Replace this
# with another solution ASAP.
possible_refcats = ["gaia", "panstarrs", "gaia_dr2_20200414", "ps1_pv3_3pi_20170110"]
export.saveDatasets(self.central_butler.registry.queryDatasets(
possible_refcats,
collections=self.instrument.makeRefCatCollectionName(),
where=htm_where,
findFirst=True))
refcats = set(_query_missing_datasets(
self.central_butler, self.butler,
possible_refcats,
collections=self.instrument.makeRefCatCollectionName(),
where=htm_where,
findFirst=True))
_log.debug("Found %d new refcat datasets.", len(refcats))
export.saveDatasets(refcats)

def _export_skymap_and_templates(self, export, center, detector, wcs):
"""Export the skymap and templates for this visit from the central
Expand All @@ -232,12 +279,12 @@ def _export_skymap_and_templates(self, export, center, detector, wcs):
"""
# TODO: This exports the whole skymap, but we want to only export the
# subset of the skymap that covers this data.
# TODO: We only want to import the skymap dimension once in init,
# otherwise we get a UNIQUE constraint error when prepping for the
# second visit.
export.saveDatasets(self.central_butler.registry.queryDatasets("skyMap",
collections=self._COLLECTION_SKYMAP,
findFirst=True))
skymaps = set(_query_missing_datasets(self.central_butler, self.butler,
"skyMap",
collections=self._COLLECTION_SKYMAP,
findFirst=True))
_log.debug("Found %d new skymap datasets.", len(skymaps))
export.saveDatasets(skymaps)
# Getting only one tract should be safe: we're getting the
# tract closest to this detector, so we should be well within
# the tract bbox.
Expand All @@ -253,9 +300,12 @@ def _export_skymap_and_templates(self, export, center, detector, wcs):
# TODO: alternately, we need to extract it from the pipeline? (best?)
# TODO: alternately, can we just assume that there is exactly
# one coadd type in the central butler?
export.saveDatasets(self.central_butler.registry.queryDatasets("*Coadd",
collections=self._COLLECTION_TEMPLATE,
where=template_where))
templates = set(_query_missing_datasets(self.central_butler, self.butler,
"*Coadd",
collections=self._COLLECTION_TEMPLATE,
where=template_where))
_log.debug("Found %d new template datasets.", len(templates))
export.saveDatasets(templates)

def _export_calibs(self, export, detector_id, filter):
"""Export the calibs for this visit from the central butler.
Expand All @@ -272,17 +322,47 @@ def _export_calibs(self, export, detector_id, filter):
# TODO: we can't filter by validity range because it's not
# supported in queryDatasets yet.
calib_where = f"detector={detector_id} and physical_filter='{filter}'"
calibs = set(_query_missing_datasets(
self.central_butler, self.butler,
...,
collections=self.instrument.makeCalibrationCollectionName(),
where=calib_where))
if calibs:
for dataset_type, n_datasets in self._count_by_type(calibs):
_log.debug("Found %d new calib datasets of type '%s'.", n_datasets, dataset_type)
else:
_log.debug("Found 0 new calib datasets.")
export.saveDatasets(
self.central_butler.registry.queryDatasets(
...,
collections=self.instrument.makeCalibrationCollectionName(),
where=calib_where),
calibs,
elements=[]) # elements=[] means do not export dimension records
target_types = {CollectionType.CALIBRATION}
for collection in self.central_butler.registry.queryCollections(...,
collectionTypes=target_types):
export.saveCollection(collection)

@staticmethod
def _count_by_type(refs):
"""Count the number of dataset references of each type.

Parameters
----------
refs : iterable [`lsst.daf.butler.DatasetRef`]
The references to classify.

Yields
------
type : `str`
The name of a dataset type in ``refs``.
count : `int`
The number of elements of type ``type`` in ``refs``.
"""
def get_key(ref):
return ref.datasetType.name

ordered = sorted(refs, key=get_key)
for k, g in itertools.groupby(ordered, key=get_key):
yield k, len(list(g))

def _prep_collections(self):
"""Pre-register output collections in advance of running the pipeline.
"""
Expand Down Expand Up @@ -418,4 +498,40 @@ def run_pipeline(self, visit: Visit, exposure_ids: set) -> None:
# If this is a fresh (local) repo, then types like calexp,
# *Diff_diaSrcTable, etc. have not been registered.
result = executor.run(register_dataset_types=True)
_log.info(f"Pipeline successfully run on {len(result)} quanta.")
_log.info(f"Pipeline successfully run on {len(result)} quanta for "
f"detector {visit.detector} of {exposure_ids}.")


def _query_missing_datasets(src_repo: Butler, dest_repo: Butler,
*args, **kwargs) -> collections.abc.Iterable[lsst.daf.butler.DatasetRef]:
"""Return datasets that are present in one repository but not another.

Parameters
----------
src_repo : `lsst.daf.butler.Butler`
The repository in which a dataset must be present.
dest_repo : `lsst.daf.butler.Butler`
The repository in which a dataset must not be present.
*args, **kwargs
Parameters for describing the dataset query. They have the same
meanings as the parameters of `lsst.daf.butler.Registry.queryDatasets`.

Returns
-------
datasets : iterable [`lsst.daf.butler.DatasetRef`]
The datasets that exist in ``src_repo`` but not ``dest_repo``.
"""
try:
known_datasets = set(dest_repo.registry.queryDatasets(*args, **kwargs))
except lsst.daf.butler.registry.DataIdValueError as e:
_log.debug("Pre-export query with args '%s, %s' failed with %s",
", ".join(repr(a) for a in args),
", ".join(f"{k}={v!r}" for k, v in kwargs.items()),
e)
# If dimensions are invalid, then *any* such datasets are missing.
known_datasets = set()

# Let exceptions from src_repo query raise: if it fails, that invalidates
# this operation.
return itertools.filterfalse(lambda ref: ref in known_datasets,
src_repo.registry.queryDatasets(*args, **kwargs))
6 changes: 6 additions & 0 deletions python/activator/visit.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,9 @@ class Visit:
dec: float
rot: float
kind: str

def __str__(self):
"""Return a short string that disambiguates the visit but does not
include "metadata" fields.
"""
return f"(instrument={self.instrument}, group={self.group}, detector={self.detector})"
Loading