Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions python/activator/activator.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from .make_pgpass import make_pgpass
from .middleware_interface import get_central_butler, make_local_repo, MiddlewareInterface
from .raw import Snap
from .visit import Visit
from .visit import FannedOutVisit

PROJECT_ID = "prompt-proto"

Expand Down Expand Up @@ -131,7 +131,7 @@ def parse_next_visit(http_request):

Returns
-------
next_visit : `activator.visit.Visit`
next_visit : `activator.visit.FannedOutVisit`
The next_visit message contained in the request.

Raises
Expand All @@ -147,7 +147,7 @@ def parse_next_visit(http_request):

# Message format is determined by the nextvisit-start deployment.
data = json.loads(event.data)
return Visit(**data)
return FannedOutVisit(**data)


def _filter_messages(messages):
Expand Down
61 changes: 31 additions & 30 deletions python/activator/middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import lsst.obs.base
import lsst.pipe.base

from .visit import Visit
from .visit import FannedOutVisit

_log = logging.getLogger("lsst." + __name__)
_log.setLevel(logging.DEBUG)
Expand Down Expand Up @@ -292,14 +292,16 @@ def _init_visit_definer(self):
define_visits_config = lsst.obs.base.DefineVisitsConfig()
self.define_visits = lsst.obs.base.DefineVisitsTask(config=define_visits_config, butler=self.butler)

def _predict_wcs(self, detector: lsst.afw.cameraGeom.Detector, visit: Visit) -> lsst.afw.geom.SkyWcs:
def _predict_wcs(self, detector: lsst.afw.cameraGeom.Detector,
visit: FannedOutVisit
) -> lsst.afw.geom.SkyWcs:
"""Calculate the expected detector WCS for an incoming observation.

Parameters
----------
detector : `lsst.afw.cameraGeom.Detector`
The detector for which to generate a WCS.
visit : `Visit`
visit : `FannedOutVisit`
Predicted observation metadata for the detector.

Returns
Expand All @@ -313,13 +315,13 @@ def _predict_wcs(self, detector: lsst.afw.cameraGeom.Detector, visit: Visit) ->
Raised if ``visit`` does not have equatorial coordinates and sky
rotation angle.
"""
if visit.coordinateSystem != Visit.CoordSys.ICRS:
raise ValueError("Only ICRS coordinates are supported in Visit, "
if visit.coordinateSystem != FannedOutVisit.CoordSys.ICRS:
raise ValueError("Only ICRS coordinates are supported in FannedOutVisit, "
f"got {visit.coordinateSystem!r} instead.")
boresight_center = lsst.geom.SpherePoint(visit.position[0], visit.position[1], lsst.geom.degrees)

if visit.rotationSystem != Visit.RotSys.SKY:
raise ValueError("Only sky camera rotations are supported in Visit, "
if visit.rotationSystem != FannedOutVisit.RotSys.SKY:
raise ValueError("Only sky camera rotations are supported in FannedOutVisit, "
f"got {visit.rotationSystem!r} instead.")
orientation = visit.cameraAngle * lsst.geom.degrees

Expand Down Expand Up @@ -356,7 +358,7 @@ def _detector_bounding_circle(self, detector: lsst.afw.cameraGeom.Detector,
radii.append(wcs.pixelToSky(corner).separation(center))
return center, max(radii)

def prep_butler(self, visit: Visit) -> None:
def prep_butler(self, visit: FannedOutVisit) -> None:
"""Prepare a temporary butler repo for processing the incoming data.

After this method returns, the internal butler is guaranteed to contain
Expand All @@ -367,7 +369,7 @@ def prep_butler(self, visit: Visit) -> None:

Parameters
----------
visit : `Visit`
visit : `FannedOutVisit`
Group of snaps from one detector to prepare the butler for.

Raises
Expand All @@ -391,7 +393,6 @@ def prep_butler(self, visit: Visit) -> None:
with tempfile.NamedTemporaryFile(mode="w+b", suffix=".yaml") as export_file:
with self.central_butler.export(filename=export_file.name, format="yaml") as export:
self._export_refcats(export, center, radius)
# TODO: Summit filter names may not match Butler names, especially for composite filters.
self._export_skymap_and_templates(export, center, detector, wcs, visit.filters)
self._export_calibs(export, visit.detector, visit.filters)
self._export_collections(export, self.instrument.makeUmbrellaCollectionName())
Expand Down Expand Up @@ -574,14 +575,14 @@ def get_key(ref):
yield k, len(list(g))

def _get_init_output_run(self,
visit: Visit,
visit: FannedOutVisit,
date: datetime.date = datetime.datetime.now(datetime.timezone.utc)) -> str:
"""Generate a deterministic init-output collection name that avoids
configuration conflicts.

Parameters
----------
visit : Visit
visit : FannedOutVisit
Group of snaps whose processing goes into the run.
date : `datetime.date`
Date of the processing run (not observation!)
Expand All @@ -596,14 +597,14 @@ def _get_init_output_run(self,
return self._get_output_run(visit, date)

def _get_output_run(self,
visit: Visit,
visit: FannedOutVisit,
date: datetime.date = datetime.datetime.now(datetime.timezone.utc)) -> str:
"""Generate a deterministic collection name that avoids version or
provenance conflicts.

Parameters
----------
visit : Visit
visit : FannedOutVisit
Group of snaps whose processing goes into the run.
date : `datetime.date`
Date of the processing run (not observation!)
Expand All @@ -618,12 +619,12 @@ def _get_output_run(self,
return self.instrument.makeCollectionName(
"prompt", f"output-{date:%Y-%m-%d}", pipeline_name, self._deployment)

def _prep_collections(self, visit: Visit):
def _prep_collections(self, visit: FannedOutVisit):
"""Pre-register output collections in advance of running the pipeline.

Parameters
----------
visit : Visit
visit : FannedOutVisit
Group of snaps needing an output run.

Returns
Expand All @@ -640,13 +641,13 @@ def _prep_collections(self, visit: Visit):
_prepend_collection(self.butler, self.output_collection, [output_run])
return output_run

def _get_pipeline_file(self, visit: Visit) -> str:
def _get_pipeline_file(self, visit: FannedOutVisit) -> str:
"""Identify the pipeline to be run, based on the configured instrument
and details of the visit.

Parameters
----------
visit : Visit
visit : FannedOutVisit
Group of snaps from one detector to prepare the pipeline for.

Returns
Expand All @@ -662,13 +663,13 @@ def _get_pipeline_file(self, visit: Visit) -> str:
visit.instrument,
"ApPipe.yaml")

def _prep_pipeline(self, visit: Visit) -> lsst.pipe.base.Pipeline:
def _prep_pipeline(self, visit: FannedOutVisit) -> lsst.pipe.base.Pipeline:
"""Setup the pipeline to be run, based on the configured instrument and
details of the incoming visit.

Parameters
----------
visit : Visit
visit : FannedOutVisit
Group of snaps from one detector to prepare the pipeline for.

Returns
Expand Down Expand Up @@ -716,7 +717,7 @@ def _download(self, remote):
local.transfer_from(remote, "copy")
return local

def ingest_image(self, visit: Visit, oid: str) -> None:
def ingest_image(self, visit: FannedOutVisit, oid: str) -> None:
"""Ingest an image into the temporary butler.

The temporary butler must not already contain a ``raw`` dataset
Expand All @@ -726,7 +727,7 @@ def ingest_image(self, visit: Visit, oid: str) -> None:

Parameters
----------
visit : Visit
visit : FannedOutVisit
The visit for which the image was taken.
oid : `str`
Identifier for incoming image, relative to the image bucket.
Expand All @@ -745,7 +746,7 @@ def ingest_image(self, visit: Visit, oid: str) -> None:
assert len(result) == 1, "Should have ingested exactly one image."
_log.info("Ingested one %s with dataId=%s", result[0].datasetType.name, result[0].dataId)

def run_pipeline(self, visit: Visit, exposure_ids: set[int]) -> None:
def run_pipeline(self, visit: FannedOutVisit, exposure_ids: set[int]) -> None:
"""Process the received image(s).

The internal butler must contain all data and all dimensions needed to
Expand All @@ -754,7 +755,7 @@ def run_pipeline(self, visit: Visit, exposure_ids: set[int]) -> None:

Parameters
----------
visit : Visit
visit : FannedOutVisit
Group of snaps from one detector to be processed.
exposure_ids : `set` [`int`]
Identifiers of the exposures that were received.
Expand Down Expand Up @@ -790,13 +791,13 @@ def run_pipeline(self, visit: Visit, exposure_ids: set[int]) -> None:
_log.info(f"Pipeline successfully run on "
f"detector {visit.detector} of {exposure_ids}.")

def export_outputs(self, visit: Visit, exposure_ids: set[int]) -> None:
def export_outputs(self, visit: FannedOutVisit, exposure_ids: set[int]) -> None:
"""Copy raws and pipeline outputs from processing a set of images back
to the central Butler.

Parameters
----------
visit : Visit
visit : FannedOutVisit
The visit whose outputs need to be exported.
exposure_ids : `set` [`int`]
Identifiers of the exposures that were processed.
Expand Down Expand Up @@ -834,14 +835,14 @@ def _get_safe_dataset_types(butler):
return [dstype for dstype in butler.registry.queryDatasetTypes(...)
if "detector" in dstype.dimensions]

def _export_subset(self, visit: Visit, exposure_ids: set[int],
def _export_subset(self, visit: FannedOutVisit, exposure_ids: set[int],
dataset_types: typing.Any, in_collections: typing.Any) -> None:
"""Copy datasets associated with a processing run back to the
central Butler.

Parameters
----------
visit : Visit
visit : FannedOutVisit
The visit whose outputs need to be exported.
exposure_ids : `set` [`int`]
Identifiers of the exposures that were processed.
Expand Down Expand Up @@ -891,14 +892,14 @@ def _export_subset(self, visit: Visit, exposure_ids: set[int],
"skymap", "tract", "patch"},
transfer="copy")

def clean_local_repo(self, visit: Visit, exposure_ids: set[int]) -> None:
def clean_local_repo(self, visit: FannedOutVisit, exposure_ids: set[int]) -> None:
"""Remove local repo content that is only needed for a single visit.

This includes raws and pipeline outputs.

Parameter
---------
visit : Visit
visit : FannedOutVisit
The visit to be removed.
exposure_ids : `set` [`int`]
Identifiers of the exposures to be removed.
Expand Down
6 changes: 3 additions & 3 deletions python/activator/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from dataclasses import dataclass
import re

from .visit import Visit
from .visit import FannedOutVisit


@dataclass(frozen=True)
Expand Down Expand Up @@ -68,12 +68,12 @@ def from_oid(cls, oid: str):
else:
raise ValueError(f"{oid} could not be parsed into a Snap")

def is_consistent(self, visit: Visit):
def is_consistent(self, visit: FannedOutVisit):
"""Test if this snap could have come from a particular visit.

Parameters
----------
visit : `activator.visit.Visit`
visit : `activator.visit.FannedOutVisit`
The visit from which snaps were expected.
"""
return (self.instrument == visit.instrument
Expand Down
45 changes: 38 additions & 7 deletions python/activator/visit.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
__all__ = ["Visit"]
__all__ = ["FannedOutVisit", "SummitVisit", "BareVisit"]

from dataclasses import dataclass, field
from dataclasses import dataclass, field, asdict
import enum


@dataclass(frozen=True, kw_only=True)
class Visit:
class BareVisit:
# Elements must be hashable and JSON-persistable; built-in types
# recommended. list is not hashable, but gets special treatment because
# neither Kafka nor JSON deserialize sequences as tuples.
Expand All @@ -14,7 +14,7 @@ class Visit:
# https://ts-xml.lsst.io/sal_interfaces/ScriptQueue.html#nextvisit
class CoordSys(enum.IntEnum):
# This is a redeclaration of lsst.ts.idl.enums.Script.MetadataCoordSys,
# but we need Visit to work in code that can't import lsst.ts.
# but we need BareVisit to work in code that can't import lsst.ts.
NONE = 1
ICRS = 2
OBSERVED = 3
Expand All @@ -33,22 +33,33 @@ class Dome(enum.IntEnum):
OPEN = 2
EITHER = 3

salIndex: int
salIndex: int # this maps to an instrument
scriptSalIndex: int
groupId: str # observatory-specific ID; not the same as visit number
coordinateSystem: CoordSys # coordinate system of position
# (ra, dec) or (az, alt) in degrees. Use compare=False to exclude from hash.
position: list[float] = field(compare=False)
rotationSystem: RotSys # coordinate system of cameraAngle
cameraAngle: float # in degrees
filters: str # physical filter(s)
# physical filter(s) name as used in Middleware. It is a combination of filter and
# grating joined by a "~". For example, "SDSSi_65mm~empty".
filters: str
dome: Dome
duration: float # script execution, not exposure
nimages: int # number of snaps expected, 0 if unknown
survey: str # survey name
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also update the documentation of filters to say that these filter names are in the Middleware format, including ~ for composite filters? Following up on https://lsstc.slack.com/archives/C0345PUU4N9/p1679429952829539.

totalCheckpoints: int

# Added by the Kafka consumer at USDF.
def __str__(self):
"""Return a short string that represents the visit but does not
include complete metadata.
"""
return f"(groupId={self.groupId}, salIndex={self.salIndex})"


@dataclass(frozen=True, kw_only=True)
class FannedOutVisit(BareVisit):
# Extra information is added by the fan-out service at USDF.
instrument: str # short name
detector: int

Expand All @@ -57,3 +68,23 @@ def __str__(self):
include "metadata" fields.
"""
return f"(instrument={self.instrument}, groupId={self.groupId}, detector={self.detector})"

def get_bare_visit(self):
"""Return visit-level info as a dict"""
info = asdict(self)
info.pop("instrument")
info.pop("detector")
return info


@dataclass(frozen=True, kw_only=True)
class SummitVisit(BareVisit):
# Extra fields are in the NextVisit messages from the summit
private_efdStamp: float = 0.0
private_kafkaStamp: float = 0.0
private_identity: str = "ScriptQueue"
private_revCode: str = "c9aab3df"
private_origin: int = 0
private_seqNum: int = 0
private_rcvStamp: float = 0.0
private_sndStamp: float = 0.0
Loading