From d696418bde18252cae0d755630ce00297958d278 Mon Sep 17 00:00:00 2001 From: Hsin-Fang Chiang Date: Thu, 6 Apr 2023 11:54:48 -0700 Subject: [PATCH 1/5] Rename Visit to FannedOutVisit This is a preparation for the upcoming change that we will start to differentiate detector-level Visit versus visit-level Visit. --- python/activator/activator.py | 6 +-- python/activator/middleware_interface.py | 60 +++++++++++---------- python/activator/raw.py | 6 +-- python/activator/visit.py | 6 +-- python/tester/upload.py | 47 ++++++++-------- python/tester/upload_hsc_rc2.py | 10 ++-- tests/test_middleware_interface.py | 68 ++++++++++++------------ tests/test_raw.py | 34 ++++++------ tests/test_visit.py | 22 ++++---- 9 files changed, 131 insertions(+), 128 deletions(-) diff --git a/python/activator/activator.py b/python/activator/activator.py index 50b03630..e15ab345 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -37,7 +37,7 @@ from .make_pgpass import make_pgpass from .middleware_interface import get_central_butler, make_local_repo, MiddlewareInterface from .raw import Snap -from .visit import Visit +from .visit import FannedOutVisit PROJECT_ID = "prompt-proto" @@ -131,7 +131,7 @@ def parse_next_visit(http_request): Returns ------- - next_visit : `activator.visit.Visit` + next_visit : `activator.visit.FannedOutVisit` The next_visit message contained in the request. Raises @@ -147,7 +147,7 @@ def parse_next_visit(http_request): # Message format is determined by the nextvisit-start deployment. data = json.loads(event.data) - return Visit(**data) + return FannedOutVisit(**data) def _filter_messages(messages): diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 1c858996..7c1db155 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -41,7 +41,7 @@ import lsst.obs.base import lsst.pipe.base -from .visit import Visit +from .visit import FannedOutVisit _log = logging.getLogger("lsst." + __name__) _log.setLevel(logging.DEBUG) @@ -292,14 +292,16 @@ def _init_visit_definer(self): define_visits_config = lsst.obs.base.DefineVisitsConfig() self.define_visits = lsst.obs.base.DefineVisitsTask(config=define_visits_config, butler=self.butler) - def _predict_wcs(self, detector: lsst.afw.cameraGeom.Detector, visit: Visit) -> lsst.afw.geom.SkyWcs: + def _predict_wcs(self, detector: lsst.afw.cameraGeom.Detector, + visit: FannedOutVisit + ) -> lsst.afw.geom.SkyWcs: """Calculate the expected detector WCS for an incoming observation. Parameters ---------- detector : `lsst.afw.cameraGeom.Detector` The detector for which to generate a WCS. - visit : `Visit` + visit : `FannedOutVisit` Predicted observation metadata for the detector. Returns @@ -313,13 +315,13 @@ def _predict_wcs(self, detector: lsst.afw.cameraGeom.Detector, visit: Visit) -> Raised if ``visit`` does not have equatorial coordinates and sky rotation angle. """ - if visit.coordinateSystem != Visit.CoordSys.ICRS: - raise ValueError("Only ICRS coordinates are supported in Visit, " + if visit.coordinateSystem != FannedOutVisit.CoordSys.ICRS: + raise ValueError("Only ICRS coordinates are supported in FannedOutVisit, " f"got {visit.coordinateSystem!r} instead.") boresight_center = lsst.geom.SpherePoint(visit.position[0], visit.position[1], lsst.geom.degrees) - if visit.rotationSystem != Visit.RotSys.SKY: - raise ValueError("Only sky camera rotations are supported in Visit, " + if visit.rotationSystem != FannedOutVisit.RotSys.SKY: + raise ValueError("Only sky camera rotations are supported in FannedOutVisit, " f"got {visit.rotationSystem!r} instead.") orientation = visit.cameraAngle * lsst.geom.degrees @@ -356,7 +358,7 @@ def _detector_bounding_circle(self, detector: lsst.afw.cameraGeom.Detector, radii.append(wcs.pixelToSky(corner).separation(center)) return center, max(radii) - def prep_butler(self, visit: Visit) -> None: + def prep_butler(self, visit: FannedOutVisit) -> None: """Prepare a temporary butler repo for processing the incoming data. After this method returns, the internal butler is guaranteed to contain @@ -367,7 +369,7 @@ def prep_butler(self, visit: Visit) -> None: Parameters ---------- - visit : `Visit` + visit : `FannedOutVisit` Group of snaps from one detector to prepare the butler for. Raises @@ -574,14 +576,14 @@ def get_key(ref): yield k, len(list(g)) def _get_init_output_run(self, - visit: Visit, + visit: FannedOutVisit, date: datetime.date = datetime.datetime.now(datetime.timezone.utc)) -> str: """Generate a deterministic init-output collection name that avoids configuration conflicts. Parameters ---------- - visit : Visit + visit : FannedOutVisit Group of snaps whose processing goes into the run. date : `datetime.date` Date of the processing run (not observation!) @@ -596,14 +598,14 @@ def _get_init_output_run(self, return self._get_output_run(visit, date) def _get_output_run(self, - visit: Visit, + visit: FannedOutVisit, date: datetime.date = datetime.datetime.now(datetime.timezone.utc)) -> str: """Generate a deterministic collection name that avoids version or provenance conflicts. Parameters ---------- - visit : Visit + visit : FannedOutVisit Group of snaps whose processing goes into the run. date : `datetime.date` Date of the processing run (not observation!) @@ -618,12 +620,12 @@ def _get_output_run(self, return self.instrument.makeCollectionName( "prompt", f"output-{date:%Y-%m-%d}", pipeline_name, self._deployment) - def _prep_collections(self, visit: Visit): + def _prep_collections(self, visit: FannedOutVisit): """Pre-register output collections in advance of running the pipeline. Parameters ---------- - visit : Visit + visit : FannedOutVisit Group of snaps needing an output run. Returns @@ -640,13 +642,13 @@ def _prep_collections(self, visit: Visit): _prepend_collection(self.butler, self.output_collection, [output_run]) return output_run - def _get_pipeline_file(self, visit: Visit) -> str: + def _get_pipeline_file(self, visit: FannedOutVisit) -> str: """Identify the pipeline to be run, based on the configured instrument and details of the visit. Parameters ---------- - visit : Visit + visit : FannedOutVisit Group of snaps from one detector to prepare the pipeline for. Returns @@ -662,13 +664,13 @@ def _get_pipeline_file(self, visit: Visit) -> str: visit.instrument, "ApPipe.yaml") - def _prep_pipeline(self, visit: Visit) -> lsst.pipe.base.Pipeline: + def _prep_pipeline(self, visit: FannedOutVisit) -> lsst.pipe.base.Pipeline: """Setup the pipeline to be run, based on the configured instrument and details of the incoming visit. Parameters ---------- - visit : Visit + visit : FannedOutVisit Group of snaps from one detector to prepare the pipeline for. Returns @@ -716,7 +718,7 @@ def _download(self, remote): local.transfer_from(remote, "copy") return local - def ingest_image(self, visit: Visit, oid: str) -> None: + def ingest_image(self, visit: FannedOutVisit, oid: str) -> None: """Ingest an image into the temporary butler. The temporary butler must not already contain a ``raw`` dataset @@ -726,7 +728,7 @@ def ingest_image(self, visit: Visit, oid: str) -> None: Parameters ---------- - visit : Visit + visit : FannedOutVisit The visit for which the image was taken. oid : `str` Identifier for incoming image, relative to the image bucket. @@ -745,7 +747,7 @@ def ingest_image(self, visit: Visit, oid: str) -> None: assert len(result) == 1, "Should have ingested exactly one image." _log.info("Ingested one %s with dataId=%s", result[0].datasetType.name, result[0].dataId) - def run_pipeline(self, visit: Visit, exposure_ids: set[int]) -> None: + def run_pipeline(self, visit: FannedOutVisit, exposure_ids: set[int]) -> None: """Process the received image(s). The internal butler must contain all data and all dimensions needed to @@ -754,7 +756,7 @@ def run_pipeline(self, visit: Visit, exposure_ids: set[int]) -> None: Parameters ---------- - visit : Visit + visit : FannedOutVisit Group of snaps from one detector to be processed. exposure_ids : `set` [`int`] Identifiers of the exposures that were received. @@ -790,13 +792,13 @@ def run_pipeline(self, visit: Visit, exposure_ids: set[int]) -> None: _log.info(f"Pipeline successfully run on " f"detector {visit.detector} of {exposure_ids}.") - def export_outputs(self, visit: Visit, exposure_ids: set[int]) -> None: + def export_outputs(self, visit: FannedOutVisit, exposure_ids: set[int]) -> None: """Copy raws and pipeline outputs from processing a set of images back to the central Butler. Parameters ---------- - visit : Visit + visit : FannedOutVisit The visit whose outputs need to be exported. exposure_ids : `set` [`int`] Identifiers of the exposures that were processed. @@ -834,14 +836,14 @@ def _get_safe_dataset_types(butler): return [dstype for dstype in butler.registry.queryDatasetTypes(...) if "detector" in dstype.dimensions] - def _export_subset(self, visit: Visit, exposure_ids: set[int], + def _export_subset(self, visit: FannedOutVisit, exposure_ids: set[int], dataset_types: typing.Any, in_collections: typing.Any) -> None: """Copy datasets associated with a processing run back to the central Butler. Parameters ---------- - visit : Visit + visit : FannedOutVisit The visit whose outputs need to be exported. exposure_ids : `set` [`int`] Identifiers of the exposures that were processed. @@ -891,14 +893,14 @@ def _export_subset(self, visit: Visit, exposure_ids: set[int], "skymap", "tract", "patch"}, transfer="copy") - def clean_local_repo(self, visit: Visit, exposure_ids: set[int]) -> None: + def clean_local_repo(self, visit: FannedOutVisit, exposure_ids: set[int]) -> None: """Remove local repo content that is only needed for a single visit. This includes raws and pipeline outputs. Parameter --------- - visit : Visit + visit : FannedOutVisit The visit to be removed. exposure_ids : `set` [`int`] Identifiers of the exposures to be removed. diff --git a/python/activator/raw.py b/python/activator/raw.py index d410fd7e..e0661f32 100644 --- a/python/activator/raw.py +++ b/python/activator/raw.py @@ -30,7 +30,7 @@ from dataclasses import dataclass import re -from .visit import Visit +from .visit import FannedOutVisit @dataclass(frozen=True) @@ -68,12 +68,12 @@ def from_oid(cls, oid: str): else: raise ValueError(f"{oid} could not be parsed into a Snap") - def is_consistent(self, visit: Visit): + def is_consistent(self, visit: FannedOutVisit): """Test if this snap could have come from a particular visit. Parameters ---------- - visit : `activator.visit.Visit` + visit : `activator.visit.FannedOutVisit` The visit from which snaps were expected. """ return (self.instrument == visit.instrument diff --git a/python/activator/visit.py b/python/activator/visit.py index cd59b25c..1d045a37 100644 --- a/python/activator/visit.py +++ b/python/activator/visit.py @@ -1,11 +1,11 @@ -__all__ = ["Visit"] +__all__ = ["FannedOutVisit"] from dataclasses import dataclass, field import enum @dataclass(frozen=True, kw_only=True) -class Visit: +class FannedOutVisit: # Elements must be hashable and JSON-persistable; built-in types # recommended. list is not hashable, but gets special treatment because # neither Kafka nor JSON deserialize sequences as tuples. @@ -14,7 +14,7 @@ class Visit: # https://ts-xml.lsst.io/sal_interfaces/ScriptQueue.html#nextvisit class CoordSys(enum.IntEnum): # This is a redeclaration of lsst.ts.idl.enums.Script.MetadataCoordSys, - # but we need Visit to work in code that can't import lsst.ts. + # but we need FannedOutVisit to work in code that can't import lsst.ts. NONE = 1 ICRS = 2 OBSERVED = 3 diff --git a/python/tester/upload.py b/python/tester/upload.py index 93e7b6b3..4b6c6701 100644 --- a/python/tester/upload.py +++ b/python/tester/upload.py @@ -14,7 +14,7 @@ from confluent_kafka import Producer from activator.raw import Snap, get_raw_path -from activator.visit import Visit +from activator.visit import FannedOutVisit from tester.utils import get_last_group, make_exposure_id, replace_header_key, send_next_visit @@ -53,11 +53,11 @@ def process_group(producer, visit_infos, uploader): ---------- producer : `confluent_kafka.Producer` The client that posts ``next_visit`` messages. - visit_infos : `set` [`activator.Visit`] + visit_infos : `set` [`activator.FannedOutVisit`] The visit-detector combinations to be observed; each object may represent multiple snaps. Assumed to represent a single group, and to share instrument, nimages, filters, and survey. - uploader : callable [`activator.Visit`, int] + uploader : callable [`activator.FannedOutVisit`, int] A callable that takes an exposure spec and a snap ID, and uploads the visit's data. """ @@ -129,7 +129,7 @@ def get_samples(bucket, instrument): Returns ------- - raws : mapping [`str`, mapping [`int`, mapping [`activator.Visit`, `s3.ObjectSummary`]]] + raws : mapping [`str`, mapping [`int`, mapping [`activator.FannedOutVisit`, `s3.ObjectSummary`]]] A mapping from group IDs to a mapping of snap ID. The value of the innermost mapping is the observation metadata for each detector, and a Blob representing the image taken in that detector-snap. @@ -160,22 +160,23 @@ def get_samples(bucket, instrument): # Assume that the unobserved bucket uses the same filename scheme as # the observed bucket. snap = Snap.from_oid(blob.key) - visit = Visit(instrument=instrument, - detector=snap.detector, - groupId=snap.group, - nimages=INSTRUMENTS[instrument].n_snaps, - filters=snap.filter, - coordinateSystem=Visit.CoordSys.ICRS, - position=[hsc_metadata[snap.exp_id]["ra"], hsc_metadata[snap.exp_id]["dec"]], - rotationSystem=Visit.RotSys.SKY, - cameraAngle=hsc_metadata[snap.exp_id]["rot"], - survey="SURVEY", - salIndex=42, - scriptSalIndex=42, - dome=Visit.Dome.OPEN, - duration=float(EXPOSURE_INTERVAL+SLEW_INTERVAL), - totalCheckpoints=1, - ) + visit = FannedOutVisit( + instrument=instrument, + detector=snap.detector, + groupId=snap.group, + nimages=INSTRUMENTS[instrument].n_snaps, + filters=snap.filter, + coordinateSystem=FannedOutVisit.CoordSys.ICRS, + position=[hsc_metadata[snap.exp_id]["ra"], hsc_metadata[snap.exp_id]["dec"]], + rotationSystem=FannedOutVisit.RotSys.SKY, + cameraAngle=hsc_metadata[snap.exp_id]["rot"], + survey="SURVEY", + salIndex=42, + scriptSalIndex=42, + dome=FannedOutVisit.Dome.OPEN, + duration=float(EXPOSURE_INTERVAL+SLEW_INTERVAL), + totalCheckpoints=1, + ) _log.debug(f"File {blob.key} parsed as snap {snap.snap} of visit {visit}.") if snap.group in result: snap_dict = result[snap.group] @@ -202,7 +203,7 @@ def upload_from_raws(producer, instrument, raw_pool, src_bucket, dest_bucket, n_ The client that posts ``next_visit`` messages. instrument : `str` The short name of the instrument carrying out the observation. - raw_pool : mapping [`str`, mapping [`int`, mapping [`activator.Visit`, `s3.ObjectSummary`]]] + raw_pool : mapping [`str`, mapping [`int`, mapping [`activator.FannedOutVisit`, `s3.ObjectSummary`]]] Available raws as a mapping from group IDs to a mapping of snap ID. The value of the innermost mapping is the observation metadata for each detector, and a Blob representing the image taken in that @@ -233,12 +234,12 @@ def upload_from_raws(producer, instrument, raw_pool, src_bucket, dest_bucket, n_ # snap_dict maps snap_id to {visit: blob} snap_dict = {} # Copy all the visit-blob dictionaries under each snap_id, - # replacing the (immutable) Visit objects to point to group + # replacing the (immutable) FannedOutVisit objects to point to group # instead of true_group. for snap_id, old_visits in raw_pool[true_group].items(): snap_dict[snap_id] = {dataclasses.replace(true_visit, groupId=group): blob for true_visit, blob in old_visits.items()} - # Gather all the Visit objects found in snap_dict, merging + # Gather all the FannedOutVisit objects found in snap_dict, merging # duplicates for different snaps of the same detector. visit_infos = {info for det_dict in snap_dict.values() for info in det_dict} diff --git a/python/tester/upload_hsc_rc2.py b/python/tester/upload_hsc_rc2.py index dab02e90..fc22e36c 100644 --- a/python/tester/upload_hsc_rc2.py +++ b/python/tester/upload_hsc_rc2.py @@ -33,7 +33,7 @@ from lsst.daf.butler import Butler from activator.raw import get_raw_path -from activator.visit import Visit +from activator.visit import FannedOutVisit from tester.utils import get_last_group, make_exposure_id, replace_header_key, send_next_visit @@ -147,20 +147,20 @@ def prepare_one_visit(producer, group_id, butler, visit_id): visits = set() for data_id in refs.dataIds.expanded(): - visit = Visit( + visit = FannedOutVisit( instrument="HSC", detector=data_id.records["detector"].id, groupId=group_id, nimages=1, filters=data_id.records["physical_filter"].name, - coordinateSystem=Visit.CoordSys.ICRS, + coordinateSystem=FannedOutVisit.CoordSys.ICRS, position=[data_id.records["exposure"].tracking_ra, data_id.records["exposure"].tracking_dec], - rotationSystem=Visit.RotSys.SKY, + rotationSystem=FannedOutVisit.RotSys.SKY, cameraAngle=data_id.records["exposure"].sky_angle, survey="SURVEY", salIndex=42, scriptSalIndex=42, - dome=Visit.Dome.OPEN, + dome=FannedOutVisit.Dome.OPEN, duration=float(EXPOSURE_INTERVAL+SLEW_INTERVAL), totalCheckpoints=1, ) diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index a059f24d..8fc858dc 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -40,7 +40,7 @@ from lsst.obs.base.ingest import RawFileDatasetInfo, RawFileData import lsst.resources -from activator.visit import Visit +from activator.visit import FannedOutVisit from activator.middleware_interface import get_central_butler, make_local_repo, MiddlewareInterface, \ _filter_datasets, _prepend_collection, _remove_from_chain, _MissingDatasetError @@ -63,7 +63,7 @@ def fake_file_data(filename, dimensions, instrument, visit): The full set of dimensions for this butler. instrument : `lsst.obs.base.Instrument` The instrument the file is supposed to be from. - visit : `Visit` + visit : `FannedOutVisit` Group of snaps from one detector to be processed. Returns @@ -143,22 +143,22 @@ def setUp(self): dec = -4.950050405424033 # DECam has no rotator; instrument angle is 90 degrees in our system. rot = 90. - self.next_visit = Visit(instrument=instname, - detector=56, - groupId="1", - nimages=1, - filters=filter, - coordinateSystem=Visit.CoordSys.ICRS, - position=[ra, dec], - rotationSystem=Visit.RotSys.SKY, - cameraAngle=rot, - survey="SURVEY", - salIndex=42, - scriptSalIndex=42, - dome=Visit.Dome.OPEN, - duration=35.0, - totalCheckpoints=1, - ) + self.next_visit = FannedOutVisit(instrument=instname, + detector=56, + groupId="1", + nimages=1, + filters=filter, + coordinateSystem=FannedOutVisit.CoordSys.ICRS, + position=[ra, dec], + rotationSystem=FannedOutVisit.RotSys.SKY, + cameraAngle=rot, + survey="SURVEY", + salIndex=42, + scriptSalIndex=42, + dome=FannedOutVisit.Dome.OPEN, + duration=35.0, + totalCheckpoints=1, + ) self.logger_name = "lsst.activator.middleware_interface" def tearDown(self): @@ -628,22 +628,22 @@ def setUp(self): dec = -4.950050405424033 # DECam has no rotator; instrument angle is 90 degrees in our system. rot = 90. - self.next_visit = Visit(instrument=instname, - detector=56, - groupId="1", - nimages=1, - filters=filter, - coordinateSystem=Visit.CoordSys.ICRS, - position=[ra, dec], - rotationSystem=Visit.RotSys.SKY, - cameraAngle=rot, - survey="SURVEY", - salIndex=42, - scriptSalIndex=42, - dome=Visit.Dome.OPEN, - duration=35.0, - totalCheckpoints=1, - ) + self.next_visit = FannedOutVisit(instrument=instname, + detector=56, + groupId="1", + nimages=1, + filters=filter, + coordinateSystem=FannedOutVisit.CoordSys.ICRS, + position=[ra, dec], + rotationSystem=FannedOutVisit.RotSys.SKY, + cameraAngle=rot, + survey="SURVEY", + salIndex=42, + scriptSalIndex=42, + dome=FannedOutVisit.Dome.OPEN, + duration=35.0, + totalCheckpoints=1, + ) self.second_visit = dataclasses.replace(self.next_visit, groupId="2") self.logger_name = "lsst.activator.middleware_interface" diff --git a/tests/test_raw.py b/tests/test_raw.py index 4f661d00..146d517a 100644 --- a/tests/test_raw.py +++ b/tests/test_raw.py @@ -23,7 +23,7 @@ import unittest from activator.raw import Snap, RAW_REGEXP, get_raw_path -from activator.visit import Visit +from activator.visit import FannedOutVisit class RawTest(unittest.TestCase): @@ -44,22 +44,22 @@ def setUp(self): self.snap = 1 self.exposure = 404 - self.visit = Visit(instrument=self.instrument, - detector=self.detector, - groupId=self.group, - nimages=self.snaps, - filters=self.filter, - coordinateSystem=Visit.CoordSys.ICRS, - position=[self.ra, self.dec], - rotationSystem=Visit.RotSys.SKY, - cameraAngle=self.rot, - survey=self.survey, - salIndex=42, - scriptSalIndex=42, - dome=Visit.Dome.OPEN, - duration=35.0, - totalCheckpoints=1, - ) + self.visit = FannedOutVisit(instrument=self.instrument, + detector=self.detector, + groupId=self.group, + nimages=self.snaps, + filters=self.filter, + coordinateSystem=FannedOutVisit.CoordSys.ICRS, + position=[self.ra, self.dec], + rotationSystem=FannedOutVisit.RotSys.SKY, + cameraAngle=self.rot, + survey=self.survey, + salIndex=42, + scriptSalIndex=42, + dome=FannedOutVisit.Dome.OPEN, + duration=35.0, + totalCheckpoints=1, + ) def test_writeread(self): """Test that raw module can parse the paths it creates. diff --git a/tests/test_visit.py b/tests/test_visit.py index 96f48b2b..86b029b6 100644 --- a/tests/test_visit.py +++ b/tests/test_visit.py @@ -22,35 +22,35 @@ import json import unittest -from activator.visit import Visit +from activator.visit import FannedOutVisit -class VisitTest(unittest.TestCase): - """Test the Visit class's functionality. +class FannedOutVisitTest(unittest.TestCase): + """Test the FannedOutVisit class's functionality. """ def setUp(self): super().setUp() - self.testbed = Visit( + self.testbed = FannedOutVisit( instrument="NotACam", detector=42, groupId="2023-01-23T23:33:14.762", nimages=2, filters="k2022", - coordinateSystem=Visit.CoordSys.ICRS, + coordinateSystem=FannedOutVisit.CoordSys.ICRS, position=[134.5454, -65.3261], - rotationSystem=Visit.RotSys.SKY, + rotationSystem=FannedOutVisit.RotSys.SKY, cameraAngle=135.0, survey="IMAGINARY", salIndex=42, scriptSalIndex=42, - dome=Visit.Dome.OPEN, + dome=FannedOutVisit.Dome.OPEN, duration=35.0, totalCheckpoints=1, ) def test_hash(self): - # Strictly speaking should test whether Visit fulfills the hash + # Strictly speaking should test whether FannedOutVisit fulfills the hash # contract, but it's not clear what kinds of differences the default # __hash__ might be insensitive to. So just test that the object # is hashable. @@ -59,13 +59,13 @@ def test_hash(self): def test_json(self): serialized = json.dumps(self.testbed.__dict__).encode("utf-8") - deserialized = Visit(**json.loads(serialized)) + deserialized = FannedOutVisit(**json.loads(serialized)) self.assertEqual(deserialized, self.testbed) # Test that enums are handled correctly despite being serialized as shorts. # isinstance checks are ambigious because IntEnum is-an int. - self.assertIs(type(self.testbed.coordinateSystem), Visit.CoordSys) + self.assertIs(type(self.testbed.coordinateSystem), FannedOutVisit.CoordSys) self.assertIs(type(deserialized.coordinateSystem), int) - self.assertIsNot(type(deserialized.coordinateSystem), Visit.CoordSys) + self.assertIsNot(type(deserialized.coordinateSystem), FannedOutVisit.CoordSys) def test_str(self): self.assertNotEqual(str(self.testbed), repr(self.testbed)) From 1b6ff2969d1bbb895539c09cfdfecdad772408ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cdspeck1=E2=80=9D?= <“dspeck@burwood.com”> Date: Mon, 3 Apr 2023 08:29:46 -0700 Subject: [PATCH 2/5] Support message format and post next visit message to kafka http proxy This changes the schema to be the same as messages from the summit. It is an interim commit to review before removing kakfa producer. --- python/tester/upload.py | 2 +- python/tester/upload_hsc_rc2.py | 2 +- python/tester/utils.py | 20 ++++++++++++-------- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/python/tester/upload.py b/python/tester/upload.py index 4b6c6701..0f6bc1e7 100644 --- a/python/tester/upload.py +++ b/python/tester/upload.py @@ -70,7 +70,7 @@ def process_group(producer, visit_infos, uploader): _log.info("No observations to make; aborting.") return - send_next_visit(producer, group, visit_infos) + send_next_visit(group, visit_infos) # TODO: need asynchronous code to handle next_visit delay correctly for snap in range(n_snaps): _log.info(f"Taking group: {group} snap: {snap}") diff --git a/python/tester/upload_hsc_rc2.py b/python/tester/upload_hsc_rc2.py index fc22e36c..2600f2c9 100644 --- a/python/tester/upload_hsc_rc2.py +++ b/python/tester/upload_hsc_rc2.py @@ -166,7 +166,7 @@ def prepare_one_visit(producer, group_id, butler, visit_id): ) visits.add(visit) - send_next_visit(producer, group_id, visits) + send_next_visit(group_id, visits) return refs diff --git a/python/tester/utils.py b/python/tester/utils.py index fdec1eb2..6e9886ba 100644 --- a/python/tester/utils.py +++ b/python/tester/utils.py @@ -23,6 +23,7 @@ import json import logging +import requests from astropy.io import fits @@ -148,27 +149,30 @@ def make_hsc_id(group_num, snap): return f"HSCE{exposure_id:08d}", exposure_id -def send_next_visit(producer, group, visit_infos): +def send_next_visit(group, visit_infos): """Simulate the transmission of a ``next_visit`` message. Parameters ---------- - producer : `confluent_kafka.Producer` - The client that posts ``next_visit`` messages. group : `str` The group ID for the message to send. visit_infos : `set` [`activator.Visit`] The visit-detector combinations to be sent; each object may represent multiple snaps. """ - _log.info(f"Sending next_visit for group: {group}") - topic = "next-visit-topic" + _log.info(f"Sending next_visit for group to kafka http proxy: {group}") + header = {"Content-Type": "application/vnd.kafka.avro.v2+json"} + url = "https://usdf-rsp-dev.slac.stanford.edu/sasquatch-rest-proxy/topics/test.next-visit" for info in visit_infos: - _log.debug(f"Sending next_visit for group: {info.groupId} detector: {info.detector} " + _log.debug(f"Sending next_visit for group: {info.groupId} " f"filters: {info.filters} ra: {info.position[0]} dec: {info.position[1]} " f"survey: {info.survey}") - data = json.dumps(info.__dict__).encode("utf-8") - producer.produce(topic, data) + message_values = dict(private_efdStamp=0, private_kafkaStamp=0, salIndex=info.salIndex, private_revCode="c9aab3df", private_sndStamp=0.0, private_rcvStamp=0.0, private_seqNum=0, private_identity="ScriptQueue", private_origin=0, scriptSalIndex=info.scriptSalIndex, groupId=info.groupId, coordinateSystem=info.coordinateSystem.value, position=info.position, rotationSystem=info.rotationSystem.value, cameraAngle=info.cameraAngle, filters=info.filters, dome=info.dome.value, duration=info.duration, nimages=info.nimages, survey=info.survey, totalCheckpoints=info.totalCheckpoints) + records_level = dict(value=message_values) + value_schema_level = dict(value_schema_id=1, records=[records_level]) + + r = requests.post(url, data=json.dumps(value_schema_level), headers=header) + _log.debug(r.content) def replace_header_key(file, key, value): From b053f9c90788163371842d0b1c61e1252e505a42 Mon Sep 17 00:00:00 2001 From: Hsin-Fang Chiang Date: Wed, 5 Apr 2023 15:42:25 -0700 Subject: [PATCH 3/5] Remove the direct use of the Confluent Kafka Producer This cleans up codes that are no longer used. next_visit messages are now sent to Sasquatch via the HTTP-based Confluent REST proxy. --- python/tester/upload.py | 52 ++++++++++++++------------------- python/tester/upload_hsc_rc2.py | 36 +++++++++-------------- python/tester/utils.py | 5 ++-- 3 files changed, 39 insertions(+), 54 deletions(-) diff --git a/python/tester/upload.py b/python/tester/upload.py index 0f6bc1e7..1bbc6221 100644 --- a/python/tester/upload.py +++ b/python/tester/upload.py @@ -5,13 +5,11 @@ import logging import os import random -import socket import sys import tempfile import time import boto3 -from confluent_kafka import Producer from activator.raw import Snap, get_raw_path from activator.visit import FannedOutVisit @@ -46,13 +44,13 @@ class Instrument: _log.setLevel(logging.INFO) -def process_group(producer, visit_infos, uploader): +def process_group(kafka_url, visit_infos, uploader): """Simulate the observation of a single on-sky pointing. Parameters ---------- - producer : `confluent_kafka.Producer` - The client that posts ``next_visit`` messages. + kafka_url : `str` + The URL of the Kafka REST Proxy to send ``next_visit`` messages to. visit_infos : `set` [`activator.FannedOutVisit`] The visit-detector combinations to be observed; each object may represent multiple snaps. Assumed to represent a single group, and to @@ -70,7 +68,7 @@ def process_group(producer, visit_infos, uploader): _log.info("No observations to make; aborting.") return - send_next_visit(group, visit_infos) + send_next_visit(kafka_url, group, visit_infos) # TODO: need asynchronous code to handle next_visit delay correctly for snap in range(n_snaps): _log.info(f"Taking group: {group} snap: {snap}") @@ -95,26 +93,20 @@ def main(): endpoint_url = "https://s3dfrgw.slac.stanford.edu" s3 = boto3.resource("s3", endpoint_url=endpoint_url) dest_bucket = s3.Bucket("rubin-pp") - producer = Producer( - {"bootstrap.servers": kafka_cluster, "client.id": socket.gethostname()} - ) - - try: - last_group = get_last_group(dest_bucket, instrument, date) - _log.info(f"Last group {last_group}") - - src_bucket = s3.Bucket("rubin-pp-users") - raw_pool = get_samples(src_bucket, instrument) - - new_group_base = last_group + random.randrange(10, 19) - if raw_pool: - _log.info(f"Observing real raw files from {instrument}.") - upload_from_raws(producer, instrument, raw_pool, src_bucket, dest_bucket, - n_groups, new_group_base) - else: - _log.error(f"No raw files found for {instrument}, aborting.") - finally: - producer.flush(30.0) + + last_group = get_last_group(dest_bucket, instrument, date) + _log.info(f"Last group {last_group}") + + src_bucket = s3.Bucket("rubin-pp-users") + raw_pool = get_samples(src_bucket, instrument) + + new_group_base = last_group + random.randrange(10, 19) + if raw_pool: + _log.info(f"Observing real raw files from {instrument}.") + upload_from_raws(kafka_url, instrument, raw_pool, src_bucket, dest_bucket, + n_groups, new_group_base) + else: + _log.error(f"No raw files found for {instrument}, aborting.") def get_samples(bucket, instrument): @@ -194,13 +186,13 @@ def get_samples(bucket, instrument): return result -def upload_from_raws(producer, instrument, raw_pool, src_bucket, dest_bucket, n_groups, group_base): +def upload_from_raws(kafka_url, instrument, raw_pool, src_bucket, dest_bucket, n_groups, group_base): """Upload visits and files using real raws. Parameters ---------- - producer : `confluent_kafka.Producer` - The client that posts ``next_visit`` messages. + kafka_url : `str` + The URL of the Kafka REST Proxy to send ``next_visit`` messages to. instrument : `str` The short name of the instrument carrying out the observation. raw_pool : mapping [`str`, mapping [`int`, mapping [`activator.FannedOutVisit`, `s3.ObjectSummary`]]] @@ -258,7 +250,7 @@ def upload_from_pool(visit, snap_id): buffer.seek(0) # Assumed by upload_fileobj. dest_bucket.upload_fileobj(buffer, filename) - process_group(producer, visit_infos, upload_from_pool) + process_group(kafka_url, visit_infos, upload_from_pool) _log.info("Slewing to next group") time.sleep(SLEW_INTERVAL) diff --git a/python/tester/upload_hsc_rc2.py b/python/tester/upload_hsc_rc2.py index 2600f2c9..a65e4bb5 100644 --- a/python/tester/upload_hsc_rc2.py +++ b/python/tester/upload_hsc_rc2.py @@ -22,13 +22,11 @@ import logging import os import random -import socket import sys import tempfile import time import boto3 -from confluent_kafka import Producer from lsst.daf.butler import Butler @@ -59,32 +57,26 @@ def main(): date = time.strftime("%Y%m%d") + kafka_url = "https://usdf-rsp-dev.slac.stanford.edu/sasquatch-rest-proxy/topics/test.next-visit" endpoint_url = "https://s3dfrgw.slac.stanford.edu" s3 = boto3.resource("s3", endpoint_url=endpoint_url) dest_bucket = s3.Bucket("rubin-pp") - producer = Producer( - {"bootstrap.servers": kafka_cluster, "client.id": socket.gethostname()} - ) - last_group = get_last_group(dest_bucket, "HSC", date) group_num = last_group + random.randrange(10, 19) _log.debug(f"Last group {last_group}; new group base {group_num}") butler = Butler("/repo/main") visit_list = get_hsc_visit_list(butler, n_groups) - try: - for visit in visit_list: - group_num += 1 - _log.info(f"Slewing to group {group_num}, with HSC visit {visit}") - time.sleep(SLEW_INTERVAL) - refs = prepare_one_visit(producer, str(group_num), butler, visit) - _log.info(f"Taking exposure for group {group_num}") - time.sleep(EXPOSURE_INTERVAL) - _log.info(f"Uploading detector images for group {group_num}") - upload_hsc_images(dest_bucket, str(group_num), butler, refs) - finally: - producer.flush(30.0) + for visit in visit_list: + group_num += 1 + _log.info(f"Slewing to group {group_num}, with HSC visit {visit}") + time.sleep(SLEW_INTERVAL) + refs = prepare_one_visit(kafka_url, str(group_num), butler, visit) + _log.info(f"Taking exposure for group {group_num}") + time.sleep(EXPOSURE_INTERVAL) + _log.info(f"Uploading detector images for group {group_num}") + upload_hsc_images(dest_bucket, str(group_num), butler, refs) def get_hsc_visit_list(butler, n_sample): @@ -115,7 +107,7 @@ def get_hsc_visit_list(butler, n_sample): return visits -def prepare_one_visit(producer, group_id, butler, visit_id): +def prepare_one_visit(kafka_url, group_id, butler, visit_id): """Extract metadata and send next_visit events for one HSC-RC2 visit One ``next_visit`` message is sent for each detector, to mimic the @@ -125,8 +117,8 @@ def prepare_one_visit(producer, group_id, butler, visit_id): Parameters ---------- - producer : `confluent_kafka.Producer` - The client that posts ``next_visit`` messages. + kafka_url : `str` + The URL of the Kafka REST Proxy to send ``next_visit`` messages to. group_id : `str` The group ID for the message to send. butler : `lsst.daf.butler.Butler` @@ -166,7 +158,7 @@ def prepare_one_visit(producer, group_id, butler, visit_id): ) visits.add(visit) - send_next_visit(group_id, visits) + send_next_visit(kafka_url, group_id, visits) return refs diff --git a/python/tester/utils.py b/python/tester/utils.py index 6e9886ba..0545b6ca 100644 --- a/python/tester/utils.py +++ b/python/tester/utils.py @@ -149,11 +149,13 @@ def make_hsc_id(group_num, snap): return f"HSCE{exposure_id:08d}", exposure_id -def send_next_visit(group, visit_infos): +def send_next_visit(url, group, visit_infos): """Simulate the transmission of a ``next_visit`` message. Parameters ---------- + url : `str` + The URL of the Kafka REST Proxy to send ``next_visit`` messages to. group : `str` The group ID for the message to send. visit_infos : `set` [`activator.Visit`] @@ -162,7 +164,6 @@ def send_next_visit(group, visit_infos): """ _log.info(f"Sending next_visit for group to kafka http proxy: {group}") header = {"Content-Type": "application/vnd.kafka.avro.v2+json"} - url = "https://usdf-rsp-dev.slac.stanford.edu/sasquatch-rest-proxy/topics/test.next-visit" for info in visit_infos: _log.debug(f"Sending next_visit for group: {info.groupId} " f"filters: {info.filters} ra: {info.position[0]} dec: {info.position[1]} " From 81da86bd0cf557a2c9ab59601ce86700bc78b754 Mon Sep 17 00:00:00 2001 From: Hsin-Fang Chiang Date: Thu, 23 Mar 2023 10:39:06 -0700 Subject: [PATCH 4/5] Send visit-level next-visit message As the fan-out service is standing up, the next visit messages sent by the upload tester scripts should be at the visit level as those from the summit, and they are to be consumed by the fan-out service. A SummitVisit class is added for the summit-style messages, which have some extra private_ fields. --- python/activator/visit.py | 41 ++++++++++++++++++++++++++++----- python/tester/upload.py | 12 ++++++---- python/tester/upload_hsc_rc2.py | 24 ++++++++----------- python/tester/utils.py | 10 ++++---- tests/test_visit.py | 38 +++++++++++++++++++++++++++++- 5 files changed, 94 insertions(+), 31 deletions(-) diff --git a/python/activator/visit.py b/python/activator/visit.py index 1d045a37..584870db 100644 --- a/python/activator/visit.py +++ b/python/activator/visit.py @@ -1,11 +1,11 @@ -__all__ = ["FannedOutVisit"] +__all__ = ["FannedOutVisit", "SummitVisit", "BareVisit"] -from dataclasses import dataclass, field +from dataclasses import dataclass, field, asdict import enum @dataclass(frozen=True, kw_only=True) -class FannedOutVisit: +class BareVisit: # Elements must be hashable and JSON-persistable; built-in types # recommended. list is not hashable, but gets special treatment because # neither Kafka nor JSON deserialize sequences as tuples. @@ -14,7 +14,7 @@ class FannedOutVisit: # https://ts-xml.lsst.io/sal_interfaces/ScriptQueue.html#nextvisit class CoordSys(enum.IntEnum): # This is a redeclaration of lsst.ts.idl.enums.Script.MetadataCoordSys, - # but we need FannedOutVisit to work in code that can't import lsst.ts. + # but we need BareVisit to work in code that can't import lsst.ts. NONE = 1 ICRS = 2 OBSERVED = 3 @@ -33,7 +33,7 @@ class Dome(enum.IntEnum): OPEN = 2 EITHER = 3 - salIndex: int + salIndex: int # this maps to an instrument scriptSalIndex: int groupId: str # observatory-specific ID; not the same as visit number coordinateSystem: CoordSys # coordinate system of position @@ -48,7 +48,16 @@ class Dome(enum.IntEnum): survey: str # survey name totalCheckpoints: int - # Added by the Kafka consumer at USDF. + def __str__(self): + """Return a short string that represents the visit but does not + include complete metadata. + """ + return f"(groupId={self.groupId}, salIndex={self.salIndex})" + + +@dataclass(frozen=True, kw_only=True) +class FannedOutVisit(BareVisit): + # Extra information is added by the fan-out service at USDF. instrument: str # short name detector: int @@ -57,3 +66,23 @@ def __str__(self): include "metadata" fields. """ return f"(instrument={self.instrument}, groupId={self.groupId}, detector={self.detector})" + + def get_bare_visit(self): + """Return visit-level info as a dict""" + info = asdict(self) + info.pop("instrument") + info.pop("detector") + return info + + +@dataclass(frozen=True, kw_only=True) +class SummitVisit(BareVisit): + # Extra fields are in the NextVisit messages from the summit + private_efdStamp: float = 0.0 + private_kafkaStamp: float = 0.0 + private_identity: str = "ScriptQueue" + private_revCode: str = "c9aab3df" + private_origin: int = 0 + private_seqNum: int = 0 + private_rcvStamp: float = 0.0 + private_sndStamp: float = 0.0 diff --git a/python/tester/upload.py b/python/tester/upload.py index 1bbc6221..d1467946 100644 --- a/python/tester/upload.py +++ b/python/tester/upload.py @@ -1,5 +1,3 @@ -__all__ = ["get_last_group", ] - import dataclasses import itertools import logging @@ -12,7 +10,7 @@ import boto3 from activator.raw import Snap, get_raw_path -from activator.visit import FannedOutVisit +from activator.visit import FannedOutVisit, SummitVisit from tester.utils import get_last_group, make_exposure_id, replace_header_key, send_next_visit @@ -63,12 +61,13 @@ def process_group(kafka_url, visit_infos, uploader): for info in visit_infos: group = info.groupId n_snaps = info.nimages + visit = SummitVisit(**info.get_bare_visit()) + send_next_visit(kafka_url, group, {visit}) break else: _log.info("No observations to make; aborting.") return - send_next_visit(kafka_url, group, visit_infos) # TODO: need asynchronous code to handle next_visit delay correctly for snap in range(n_snaps): _log.info(f"Taking group: {group} snap: {snap}") @@ -90,6 +89,7 @@ def main(): date = time.strftime("%Y%m%d") + kafka_url = "https://usdf-rsp-dev.slac.stanford.edu/sasquatch-rest-proxy/topics/test.next-visit" endpoint_url = "https://s3dfrgw.slac.stanford.edu" s3 = boto3.resource("s3", endpoint_url=endpoint_url) dest_bucket = s3.Bucket("rubin-pp") @@ -163,7 +163,9 @@ def get_samples(bucket, instrument): rotationSystem=FannedOutVisit.RotSys.SKY, cameraAngle=hsc_metadata[snap.exp_id]["rot"], survey="SURVEY", - salIndex=42, + # Fan-out uses salIndex to know which instrument and detector config to use. + # The exp_id of this test dataset is coded into fan-out's pattern matching. + salIndex=snap.exp_id, scriptSalIndex=42, dome=FannedOutVisit.Dome.OPEN, duration=float(EXPOSURE_INTERVAL+SLEW_INTERVAL), diff --git a/python/tester/upload_hsc_rc2.py b/python/tester/upload_hsc_rc2.py index a65e4bb5..dbbff8e8 100644 --- a/python/tester/upload_hsc_rc2.py +++ b/python/tester/upload_hsc_rc2.py @@ -31,7 +31,7 @@ from lsst.daf.butler import Butler from activator.raw import get_raw_path -from activator.visit import FannedOutVisit +from activator.visit import SummitVisit from tester.utils import get_last_group, make_exposure_id, replace_header_key, send_next_visit @@ -137,28 +137,24 @@ def prepare_one_visit(kafka_url, group_id, butler, visit_id): dataId={"exposure": visit_id, "instrument": "HSC"}, ) - visits = set() - for data_id in refs.dataIds.expanded(): - visit = FannedOutVisit( - instrument="HSC", - detector=data_id.records["detector"].id, + # all items in refs share the same visit info and one event is to be sent + for data_id in refs.dataIds.limit(1).expanded(): + visit = SummitVisit( groupId=group_id, nimages=1, filters=data_id.records["physical_filter"].name, - coordinateSystem=FannedOutVisit.CoordSys.ICRS, + coordinateSystem=SummitVisit.CoordSys.ICRS, position=[data_id.records["exposure"].tracking_ra, data_id.records["exposure"].tracking_dec], - rotationSystem=FannedOutVisit.RotSys.SKY, + rotationSystem=SummitVisit.RotSys.SKY, cameraAngle=data_id.records["exposure"].sky_angle, survey="SURVEY", - salIndex=42, - scriptSalIndex=42, - dome=FannedOutVisit.Dome.OPEN, + salIndex=999, + scriptSalIndex=999, + dome=SummitVisit.Dome.OPEN, duration=float(EXPOSURE_INTERVAL+SLEW_INTERVAL), totalCheckpoints=1, ) - visits.add(visit) - - send_next_visit(kafka_url, group_id, visits) + send_next_visit(kafka_url, group_id, {visit}) return refs diff --git a/python/tester/utils.py b/python/tester/utils.py index 0545b6ca..6353bc9e 100644 --- a/python/tester/utils.py +++ b/python/tester/utils.py @@ -21,6 +21,7 @@ __all__ = ["get_last_group", "make_exposure_id", "replace_header_key", "send_next_visit"] +from dataclasses import asdict import json import logging import requests @@ -150,7 +151,7 @@ def make_hsc_id(group_num, snap): def send_next_visit(url, group, visit_infos): - """Simulate the transmission of a ``next_visit`` message. + """Simulate the transmission of a ``next_visit`` message to Sasquatch. Parameters ---------- @@ -158,8 +159,8 @@ def send_next_visit(url, group, visit_infos): The URL of the Kafka REST Proxy to send ``next_visit`` messages to. group : `str` The group ID for the message to send. - visit_infos : `set` [`activator.Visit`] - The visit-detector combinations to be sent; each object may + visit_infos : `set` [`activator.SummitVisit`] + The ``next_visit`` message to be sent; each object may represent multiple snaps. """ _log.info(f"Sending next_visit for group to kafka http proxy: {group}") @@ -168,8 +169,7 @@ def send_next_visit(url, group, visit_infos): _log.debug(f"Sending next_visit for group: {info.groupId} " f"filters: {info.filters} ra: {info.position[0]} dec: {info.position[1]} " f"survey: {info.survey}") - message_values = dict(private_efdStamp=0, private_kafkaStamp=0, salIndex=info.salIndex, private_revCode="c9aab3df", private_sndStamp=0.0, private_rcvStamp=0.0, private_seqNum=0, private_identity="ScriptQueue", private_origin=0, scriptSalIndex=info.scriptSalIndex, groupId=info.groupId, coordinateSystem=info.coordinateSystem.value, position=info.position, rotationSystem=info.rotationSystem.value, cameraAngle=info.cameraAngle, filters=info.filters, dome=info.dome.value, duration=info.duration, nimages=info.nimages, survey=info.survey, totalCheckpoints=info.totalCheckpoints) - records_level = dict(value=message_values) + records_level = dict(value=asdict(info)) value_schema_level = dict(value_schema_id=1, records=[records_level]) r = requests.post(url, data=json.dumps(value_schema_level), headers=header) diff --git a/tests/test_visit.py b/tests/test_visit.py index 86b029b6..16713f77 100644 --- a/tests/test_visit.py +++ b/tests/test_visit.py @@ -19,10 +19,11 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +import dataclasses import json import unittest -from activator.visit import FannedOutVisit +from activator.visit import FannedOutVisit, BareVisit class FannedOutVisitTest(unittest.TestCase): @@ -71,3 +72,38 @@ def test_str(self): self.assertNotEqual(str(self.testbed), repr(self.testbed)) self.assertIn(str(self.testbed.detector), str(self.testbed)) self.assertIn(str(self.testbed.groupId), str(self.testbed)) + + +class BareVisitTest(unittest.TestCase): + """Test the BareVisit class's functionality. + """ + def setUp(self): + super().setUp() + + visit_info = dict( + groupId="2023-01-23T23:33:14.762", + nimages=2, + filters="k2022", + coordinateSystem=BareVisit.CoordSys.ICRS, + position=[134.5454, -65.3261], + rotationSystem=BareVisit.RotSys.SKY, + cameraAngle=135.0, + survey="IMAGINARY", + salIndex=42, + scriptSalIndex=42, + dome=BareVisit.Dome.OPEN, + duration=35.0, + totalCheckpoints=1, + ) + self.visit = BareVisit(**visit_info) + self.fannedOutVisit = FannedOutVisit( + instrument="NotACam", + detector=42, + **visit_info + ) + + def test_get_bare(self): + self.assertEqual( + self.fannedOutVisit.get_bare_visit(), + dataclasses.asdict(self.visit) + ) From 94669f872c2a3f87836a06fe479b816a26b48937 Mon Sep 17 00:00:00 2001 From: Hsin-Fang Chiang Date: Fri, 14 Apr 2023 10:19:38 -0700 Subject: [PATCH 5/5] Update comments on physical filter names Currently the filter naming format in the summit messages are differnt from the format in the Middleware, but will be fixed in CAP-967. Even if the summit format is different, the fan-out service fixes it to be in the Middleware format. --- python/activator/middleware_interface.py | 1 - python/activator/visit.py | 4 +++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 7c1db155..a960e0fc 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -393,7 +393,6 @@ def prep_butler(self, visit: FannedOutVisit) -> None: with tempfile.NamedTemporaryFile(mode="w+b", suffix=".yaml") as export_file: with self.central_butler.export(filename=export_file.name, format="yaml") as export: self._export_refcats(export, center, radius) - # TODO: Summit filter names may not match Butler names, especially for composite filters. self._export_skymap_and_templates(export, center, detector, wcs, visit.filters) self._export_calibs(export, visit.detector, visit.filters) self._export_collections(export, self.instrument.makeUmbrellaCollectionName()) diff --git a/python/activator/visit.py b/python/activator/visit.py index 584870db..3ab1cd14 100644 --- a/python/activator/visit.py +++ b/python/activator/visit.py @@ -41,7 +41,9 @@ class Dome(enum.IntEnum): position: list[float] = field(compare=False) rotationSystem: RotSys # coordinate system of cameraAngle cameraAngle: float # in degrees - filters: str # physical filter(s) + # physical filter(s) name as used in Middleware. It is a combination of filter and + # grating joined by a "~". For example, "SDSSi_65mm~empty". + filters: str dome: Dome duration: float # script execution, not exposure nimages: int # number of snaps expected, 0 if unknown