Skip to content
Merged
  •  
  •  
  •  
9 changes: 8 additions & 1 deletion python/activator/activator.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,15 @@

# Initialize middleware interface; TODO: we'll need one of these per detector.
repo = f"/tmp/butler-{os.getpid()}"
central_butler = Butler(calib_repo,
# TODO: How do we get the appropriate instrument name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is config_instrument incorrect? Or if there needs to be a lookup table, that's OK, as there is a well-defined list of supported instruments.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has to do with some UIs requiring a class name and others an instrument short name. As mentioned on Slack, I would just create a global Instrument object and then use that for the translation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to leave this question to the "reintegrate activator and interface" ticket (that I guess doesn't exist?), if that's ok? We could create an Instrument here, but we'd still have to change the dict in uploader.py to have full class names, I think? And we'd also have to expand the eups table to have all the correct obs packages. I feel like that's enough moving parts that it should be a separate ticket.

Copy link
Member

@kfindeisen kfindeisen Mar 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any reason why we'd have to change uploader.py; we'd just need to make sure this file is always keeping straight when it wants the class name and when it wants the short name, and not trying to mix the two.

# here and for what we pass to MiddlewareInterface?
instrument=config_instrument,
skymap="deepCoadd_skyMap",
collections=[f"{config_instrument}/defaults"],
writeable=False)
butler = Butler(Butler.makeRepo(repo.name), writeable=True)
mwi = MiddlewareInterface(calib_repo, image_bucket, config_instrument, butler)
mwi = MiddlewareInterface(central_butler, image_bucket, config_instrument, butler)


def check_for_snap(
Expand Down
232 changes: 134 additions & 98 deletions python/activator/middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
__all__ = ["MiddlewareInterface"]

import logging
import os
import shutil
import tempfile

from astropy.time import Time

from lsst.daf.butler import Butler
import lsst.afw.cameraGeom
from lsst.daf.butler import Butler, CollectionType
from lsst.meas.algorithms.htmIndexer import HtmIndexer
import lsst.obs.base
import lsst.geom

from .visit import Visit

Expand All @@ -54,9 +54,10 @@ class MiddlewareInterface:

Parameters
----------
input_repo : `str`
Path to a butler repo containing the calibration and other data needed
for processing images as they are received.
central_butler : `lsst.daf.butler.Butler`
Butler repo containing the calibration and other data needed for
processing images as they are received. This butler must be created
with the default instrument, skymap, and collections assigned.
image_bucket : `str`
Storage bucket where images will be written to as they arrive.
See also ``prefix``.
Expand All @@ -74,70 +75,25 @@ class MiddlewareInterface:
appropriate for use in the Google Cloud environment; typically only
change this when running local tests.
"""
def __init__(self, input_repo: str, image_bucket: str, instrument: str,
def __init__(self, central_butler: Butler, image_bucket: str, instrument: str,
butler: Butler,
prefix: str = "gs://"):
self.prefix = prefix
# self.src = Butler(input_repo, writeable=False)
_log.debug(f"Butler({input_repo}, writeable=False)")
self.central_butler = central_butler
self.image_bucket = image_bucket
self.instrument = lsst.obs.base.utils.getInstrument(instrument)

self._init_local_butler(butler)
self._init_ingester()
# TODO DM-34098: note that we currently need to supply instrument here.
self.camera = self.central_butler.get("camera", instrument=self.instrument.getName())
self.skymap = self.central_butler.get("skyMap")

# self.r = self.src.registry
self.calibration_collection = f"{instrument}/calib"
refcat_collection = "refcats/DM-28636"

export_collections = set()
export_collections.add(self.calibration_collection)
_log.debug("Finding secondary collections")
# calib_collections = list(
# self.r.queryCollections(
# self.calibration_collection,
# flattenChains=True,
# includeChains=True,
# collectionTypes={CollectionType.CALIBRATION, CollectionType.CHAINED},
# )
# )
# for collection in calib_collections:
# export_collections.add(collection)
export_collections.add(refcat_collection)
# NOTE: I don't think we need this at all: we can just use ingest_files
# for the refcats. See jointcalTestBase.importRepository().

_log.debug("Finding refcats")
# for dataset in self.r.queryDatasets(
# "gaia_dr2_20200414",
# where=f"htm7 IN ({htm7})",
# collections=refcat_collection,
# ):
# export_datasets.add(dataset)
# for dataset in self.r.queryDatasets(
# "ps1_pv3_3pi_20170110",
# where=f"htm7 IN ({htm7})",
# collections=refcat_collection,
# ):
# export_datasets.add(dataset)

prep_dir = "/tmp/butler-export"
os.makedirs(prep_dir)
_log.debug(f"Exporting to {prep_dir}")
# with self.src.export(directory=prep_dir, format="yaml", transfer="copy") as e:
# for collection in export_collections:
# e.saveCollection(collection)
# e.saveDatasets(export_datasets)
_log.debug(f"Importing from {prep_dir}")
# self.butler.import_(directory=prep_dir, format="yaml", transfer="hardlink")
shutil.rmtree(prep_dir, ignore_errors=True)

# self.calib_types = [
# dataset_type
# for dataset_type in self.src.registry.queryDatasetTypes(...)
# if dataset_type.isCalibration()
# ]
self.calib_types = ["bias", "dark", "defects", "flat", "fringe", ]

# How much to pad the refcat region we will copy over.
self.padding = 30*lsst.geom.arcseconds

def _init_local_butler(self, butler: Butler):
"""Prepare the local butler to ingest into and process from.
Expand All @@ -150,7 +106,10 @@ def _init_local_butler(self, butler: Butler):
"""
# TODO: Replace registring the instrument with importing a "base" repo
# structure from an export.yaml file.
self.instrument.register(butler.registry)
# TODO: Cannot do this until we have a way to only extract calibs we
# don't already have, otherwise we get a unique constraint error when
# importing the export in prep_butler().
# self.instrument.register(butler.registry)
# Refresh butler after configuring it, to ensure all required
# dimensions are available.
butler.registry.refresh()
Expand All @@ -176,42 +135,119 @@ def prep_butler(self, visit: Visit) -> None:
Group of snaps from one detector to prepare the butler for.
"""
_log.info(f"Preparing Butler for visit '{visit}'")
visit_info = visit.__dict__
for calib_type in self.calib_types:
_log.debug(f"Finding {calib_type} datasets dataId={visit_info}"
f" collections={self.calibration_collection}"
f" timespan={Time.now()}")
# dataset = self.r.findDataset(
# calib_type,
# dataId=visit_info,
# collections=self.calibration_collection,
# timespan=Timespan(Time.now(), Time.now()),
# )
# if dataset is not None:
# export_datasets.add(dataset)
# Optimization: look for datasets in destination repo to avoid copy.

for calib_type in self.calib_types:
_log.debug(f"Finding {calib_type} associations")
# for association in r.queryDatasetAssociations(
# calib_type,
# collections=self.calibration_collection,
# collectionTypes=[CollectionType.CALIBRATION],
# flattenChains=True,
# ):
# if filter_calibs(association.ref, visit_info):
# export_collections.add(association.ref.run)

visit_dir = f"/tmp/visit-{visit.group}-export"
_log.debug(f"Exporting to {visit_dir}")
os.makedirs(visit_dir)
# with self.src.export(directory=visit_dir, format="yaml", transfer="copy") as e:
# for collection in export_collections:
# e.saveCollection(collection)
# e.saveDatasets(export_datasets)
_log.debug(f"Importing from {visit_dir}")
# self.butler.import_(directory=visit_dir, format="yaml", transfer="hardlink")
shutil.rmtree(visit_dir, ignore_errors=True)

with tempfile.NamedTemporaryFile(mode="w+b", suffix=".yaml") as export_file:
with self.central_butler.export(filename=export_file.name, format="yaml") as export:
detector = self.camera[visit.detector]
# TODO: where do we get flipX from? See RFC-605
wcs = lsst.obs.base.createInitialSkyWcsFromBoresight(visit.boresight_center,
visit.orientation,
detector,
flipX=False)
radii = []
center = wcs.pixelToSky(detector.getCenter(lsst.afw.cameraGeom.PIXELS))
for corner in detector.getCorners(lsst.afw.cameraGeom.PIXELS):
radii.append(wcs.pixelToSky(corner).separation(center))
radius = max(radii)

self._export_refcats(export, center, radius)
self._export_skymap_and_templates(export, center, detector, wcs)
self._export_calibs(export, visit.detector, visit.filter)

# CHAINED collections
export.saveCollection("refcats")

self.butler.import_(filename=export_file.name,
directory=self.central_butler.datastore.root,
transfer="copy")

def _export_refcats(self, export, center, radius):
"""Export the refcats for this visit from the central butler.

Parameters
----------
export : `Iterator[RepoExportContext]`
Export context manager.
center : `lsst.geom.SpherePoint`
Center of the region to find refcat shards in.
radius : `lst.geom.Angle`
Radius to search for refcat shards in.
"""
indexer = HtmIndexer(depth=7)
shards = indexer.getShardIds(center, radius+self.padding)
# getShardIds returns a tuple, the first item is the ids list.
htm_where = f"htm7 in ({','.join(str(x) for x in shards[0])})"
# Get shards from all refcats that overlap this detector.
# TODO: `...` doesn't work for this queryDatasets call
# currently, and we can't queryDatasetTypes in just the refcats
# collection, so we have to specify a list here. Replace this
# with another solution ASAP.
possible_refcats = ["gaia", "panstarrs", "gaia_dr2_20200414", "ps1_pv3_3pi_20170110"]
export.saveDatasets(self.central_butler.registry.queryDatasets(possible_refcats,
collections="refcats",
where=htm_where,
findFirst=True))

def _export_skymap_and_templates(self, export, center, detector, wcs):
"""Export the skymap and templates for this visit from the central
butler.

Parameters
----------
export : `Iterator[RepoExportContext]`
Export context manager.
center : `lsst.geom.SpherePoint`
Center of the region to load the skyamp tract/patches for.
detector : `lsst.afw.cameraGeom.Detector`
Detector we are loading data for.
wcs : `lsst.afw.geom.SkyWcs`
Rough WCS for the upcoming visit, to help finding patches.
"""
export.saveDatasets(self.central_butler.registry.queryDatasets("skyMap",
collections="skymaps",
findFirst=True))
# Getting only one tract should be safe: we're getting the
# tract closest to this detector, so we should be well within
# the tract bbox.
tract = self.skymap.findTract(center)
points = [center]
for corner in detector.getCorners(lsst.afw.cameraGeom.PIXELS):
points.append(wcs.pixelToSky(corner))
patches = tract.findPatchList(points)
patches_str = ','.join(str(p.sequential_index) for p in patches)
template_where = f"patch in ({patches_str}) and tract={tract.tract_id}"
# TODO: do we need to have the coadd name used in the pipeline
# specified as a class kwarg, so that we only load one here?
# TODO: alternately, we need to extract it from the pipeline? (best?)
# TODO: alternately, can we just assume that there is exactly
# one coadd type in the central butler?
export.saveDatasets(self.central_butler.registry.queryDatasets("*Coadd",
collections="templates",
where=template_where))

def _export_calibs(self, export, detector_id, filter):
"""Export the calibs for this visit from the central butler.

Parameters
----------
export : `Iterator[RepoExportContext]`
Export context manager.
detector_id : `int`
Identifier of the detector to load calibs for.
filter : `str`
Physical filter name of the upcoming visit.
"""
# TODO: we can't filter by validity range because it's not
# supported in queryDatasets yet.
calib_where = f"detector={detector_id} and physical_filter='{filter}'"
export.saveDatasets(self.central_butler.registry.queryDatasets(
...,
collections=self.instrument.makeCalibrationCollectionName(),
where=calib_where))
target_types = {CollectionType.CALIBRATION}
for collection in self.central_butler.registry.queryCollections(...,
collectionTypes=target_types):
export.saveCollection(collection)

def ingest_image(self, oid: str) -> None:
"""Ingest an image into the temporary butler.
Expand Down
5 changes: 3 additions & 2 deletions python/activator/visit.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
__all__ = ["Visit"]

from dataclasses import dataclass
import lsst.geom


@dataclass(frozen=True)
Expand All @@ -10,6 +11,6 @@ class Visit:
group: str
snaps: int
filter: str
ra: float
dec: float
boresight_center: lsst.geom.SpherePoint
orientation: lsst.geom.Angle
kind: str
20 changes: 13 additions & 7 deletions python/tester/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import time
from visit import Visit

from lsst.geom import SpherePoint, degrees


@dataclass
class Instrument:
Expand Down Expand Up @@ -241,10 +243,12 @@ def make_random_visits(instrument, group):
"""
kind = KINDS[group % len(KINDS)]
filter = FILTER_LIST[random.randrange(0, len(FILTER_LIST))]
ra = random.uniform(0.0, 360.0)
dec = random.uniform(-90.0, 90.0)
ra = random.uniform(0.0, 360.0) * degrees
dec = random.uniform(-90.0, 90.0) * degrees
rot = random.uniform(0.0, 360.0) * degrees
return {
Visit(instrument, detector, group, INSTRUMENTS[instrument].n_snaps, filter, ra, dec, kind)
Visit(instrument, detector, group, INSTRUMENTS[instrument].n_snaps, filter,
SpherePoint(ra, dec), rot, kind)
for detector in range(INSTRUMENTS[instrument].n_detectors)
}

Expand Down Expand Up @@ -295,8 +299,10 @@ def get_samples(bucket, instrument):
group=group,
snaps=INSTRUMENTS[instrument].n_snaps,
filter=parsed.group('filter'),
ra=hsc_metadata[exposure_id]["ra"],
dec=hsc_metadata[exposure_id]["dec"],
boresight_center=SpherePoint(hsc_metadata[exposure_id]["ra"],
hsc_metadata[exposure_id]["dec"],
degrees),
orientation=hsc_metadata[exposure_id]["rot"] * degrees,
kind="SURVEY",
)
_log.debug(f"File {blob.name} parsed as snap {snap_id} of visit {visit}.")
Expand Down Expand Up @@ -421,8 +427,8 @@ def splice_group(visit, group):
group=group,
snaps=visit.snaps,
filter=visit.filter,
ra=visit.ra,
dec=visit.dec,
boresight_center=visit.boresight_center,
orientation=visit.orientation,
kind=visit.kind,
)

Expand Down
Loading