diff --git a/python/activator/activator.py b/python/activator/activator.py index 50b03630..e15ab345 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -37,7 +37,7 @@ from .make_pgpass import make_pgpass from .middleware_interface import get_central_butler, make_local_repo, MiddlewareInterface from .raw import Snap -from .visit import Visit +from .visit import FannedOutVisit PROJECT_ID = "prompt-proto" @@ -131,7 +131,7 @@ def parse_next_visit(http_request): Returns ------- - next_visit : `activator.visit.Visit` + next_visit : `activator.visit.FannedOutVisit` The next_visit message contained in the request. Raises @@ -147,7 +147,7 @@ def parse_next_visit(http_request): # Message format is determined by the nextvisit-start deployment. data = json.loads(event.data) - return Visit(**data) + return FannedOutVisit(**data) def _filter_messages(messages): diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 1c858996..a960e0fc 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -41,7 +41,7 @@ import lsst.obs.base import lsst.pipe.base -from .visit import Visit +from .visit import FannedOutVisit _log = logging.getLogger("lsst." + __name__) _log.setLevel(logging.DEBUG) @@ -292,14 +292,16 @@ def _init_visit_definer(self): define_visits_config = lsst.obs.base.DefineVisitsConfig() self.define_visits = lsst.obs.base.DefineVisitsTask(config=define_visits_config, butler=self.butler) - def _predict_wcs(self, detector: lsst.afw.cameraGeom.Detector, visit: Visit) -> lsst.afw.geom.SkyWcs: + def _predict_wcs(self, detector: lsst.afw.cameraGeom.Detector, + visit: FannedOutVisit + ) -> lsst.afw.geom.SkyWcs: """Calculate the expected detector WCS for an incoming observation. Parameters ---------- detector : `lsst.afw.cameraGeom.Detector` The detector for which to generate a WCS. - visit : `Visit` + visit : `FannedOutVisit` Predicted observation metadata for the detector. Returns @@ -313,13 +315,13 @@ def _predict_wcs(self, detector: lsst.afw.cameraGeom.Detector, visit: Visit) -> Raised if ``visit`` does not have equatorial coordinates and sky rotation angle. """ - if visit.coordinateSystem != Visit.CoordSys.ICRS: - raise ValueError("Only ICRS coordinates are supported in Visit, " + if visit.coordinateSystem != FannedOutVisit.CoordSys.ICRS: + raise ValueError("Only ICRS coordinates are supported in FannedOutVisit, " f"got {visit.coordinateSystem!r} instead.") boresight_center = lsst.geom.SpherePoint(visit.position[0], visit.position[1], lsst.geom.degrees) - if visit.rotationSystem != Visit.RotSys.SKY: - raise ValueError("Only sky camera rotations are supported in Visit, " + if visit.rotationSystem != FannedOutVisit.RotSys.SKY: + raise ValueError("Only sky camera rotations are supported in FannedOutVisit, " f"got {visit.rotationSystem!r} instead.") orientation = visit.cameraAngle * lsst.geom.degrees @@ -356,7 +358,7 @@ def _detector_bounding_circle(self, detector: lsst.afw.cameraGeom.Detector, radii.append(wcs.pixelToSky(corner).separation(center)) return center, max(radii) - def prep_butler(self, visit: Visit) -> None: + def prep_butler(self, visit: FannedOutVisit) -> None: """Prepare a temporary butler repo for processing the incoming data. After this method returns, the internal butler is guaranteed to contain @@ -367,7 +369,7 @@ def prep_butler(self, visit: Visit) -> None: Parameters ---------- - visit : `Visit` + visit : `FannedOutVisit` Group of snaps from one detector to prepare the butler for. Raises @@ -391,7 +393,6 @@ def prep_butler(self, visit: Visit) -> None: with tempfile.NamedTemporaryFile(mode="w+b", suffix=".yaml") as export_file: with self.central_butler.export(filename=export_file.name, format="yaml") as export: self._export_refcats(export, center, radius) - # TODO: Summit filter names may not match Butler names, especially for composite filters. self._export_skymap_and_templates(export, center, detector, wcs, visit.filters) self._export_calibs(export, visit.detector, visit.filters) self._export_collections(export, self.instrument.makeUmbrellaCollectionName()) @@ -574,14 +575,14 @@ def get_key(ref): yield k, len(list(g)) def _get_init_output_run(self, - visit: Visit, + visit: FannedOutVisit, date: datetime.date = datetime.datetime.now(datetime.timezone.utc)) -> str: """Generate a deterministic init-output collection name that avoids configuration conflicts. Parameters ---------- - visit : Visit + visit : FannedOutVisit Group of snaps whose processing goes into the run. date : `datetime.date` Date of the processing run (not observation!) @@ -596,14 +597,14 @@ def _get_init_output_run(self, return self._get_output_run(visit, date) def _get_output_run(self, - visit: Visit, + visit: FannedOutVisit, date: datetime.date = datetime.datetime.now(datetime.timezone.utc)) -> str: """Generate a deterministic collection name that avoids version or provenance conflicts. Parameters ---------- - visit : Visit + visit : FannedOutVisit Group of snaps whose processing goes into the run. date : `datetime.date` Date of the processing run (not observation!) @@ -618,12 +619,12 @@ def _get_output_run(self, return self.instrument.makeCollectionName( "prompt", f"output-{date:%Y-%m-%d}", pipeline_name, self._deployment) - def _prep_collections(self, visit: Visit): + def _prep_collections(self, visit: FannedOutVisit): """Pre-register output collections in advance of running the pipeline. Parameters ---------- - visit : Visit + visit : FannedOutVisit Group of snaps needing an output run. Returns @@ -640,13 +641,13 @@ def _prep_collections(self, visit: Visit): _prepend_collection(self.butler, self.output_collection, [output_run]) return output_run - def _get_pipeline_file(self, visit: Visit) -> str: + def _get_pipeline_file(self, visit: FannedOutVisit) -> str: """Identify the pipeline to be run, based on the configured instrument and details of the visit. Parameters ---------- - visit : Visit + visit : FannedOutVisit Group of snaps from one detector to prepare the pipeline for. Returns @@ -662,13 +663,13 @@ def _get_pipeline_file(self, visit: Visit) -> str: visit.instrument, "ApPipe.yaml") - def _prep_pipeline(self, visit: Visit) -> lsst.pipe.base.Pipeline: + def _prep_pipeline(self, visit: FannedOutVisit) -> lsst.pipe.base.Pipeline: """Setup the pipeline to be run, based on the configured instrument and details of the incoming visit. Parameters ---------- - visit : Visit + visit : FannedOutVisit Group of snaps from one detector to prepare the pipeline for. Returns @@ -716,7 +717,7 @@ def _download(self, remote): local.transfer_from(remote, "copy") return local - def ingest_image(self, visit: Visit, oid: str) -> None: + def ingest_image(self, visit: FannedOutVisit, oid: str) -> None: """Ingest an image into the temporary butler. The temporary butler must not already contain a ``raw`` dataset @@ -726,7 +727,7 @@ def ingest_image(self, visit: Visit, oid: str) -> None: Parameters ---------- - visit : Visit + visit : FannedOutVisit The visit for which the image was taken. oid : `str` Identifier for incoming image, relative to the image bucket. @@ -745,7 +746,7 @@ def ingest_image(self, visit: Visit, oid: str) -> None: assert len(result) == 1, "Should have ingested exactly one image." _log.info("Ingested one %s with dataId=%s", result[0].datasetType.name, result[0].dataId) - def run_pipeline(self, visit: Visit, exposure_ids: set[int]) -> None: + def run_pipeline(self, visit: FannedOutVisit, exposure_ids: set[int]) -> None: """Process the received image(s). The internal butler must contain all data and all dimensions needed to @@ -754,7 +755,7 @@ def run_pipeline(self, visit: Visit, exposure_ids: set[int]) -> None: Parameters ---------- - visit : Visit + visit : FannedOutVisit Group of snaps from one detector to be processed. exposure_ids : `set` [`int`] Identifiers of the exposures that were received. @@ -790,13 +791,13 @@ def run_pipeline(self, visit: Visit, exposure_ids: set[int]) -> None: _log.info(f"Pipeline successfully run on " f"detector {visit.detector} of {exposure_ids}.") - def export_outputs(self, visit: Visit, exposure_ids: set[int]) -> None: + def export_outputs(self, visit: FannedOutVisit, exposure_ids: set[int]) -> None: """Copy raws and pipeline outputs from processing a set of images back to the central Butler. Parameters ---------- - visit : Visit + visit : FannedOutVisit The visit whose outputs need to be exported. exposure_ids : `set` [`int`] Identifiers of the exposures that were processed. @@ -834,14 +835,14 @@ def _get_safe_dataset_types(butler): return [dstype for dstype in butler.registry.queryDatasetTypes(...) if "detector" in dstype.dimensions] - def _export_subset(self, visit: Visit, exposure_ids: set[int], + def _export_subset(self, visit: FannedOutVisit, exposure_ids: set[int], dataset_types: typing.Any, in_collections: typing.Any) -> None: """Copy datasets associated with a processing run back to the central Butler. Parameters ---------- - visit : Visit + visit : FannedOutVisit The visit whose outputs need to be exported. exposure_ids : `set` [`int`] Identifiers of the exposures that were processed. @@ -891,14 +892,14 @@ def _export_subset(self, visit: Visit, exposure_ids: set[int], "skymap", "tract", "patch"}, transfer="copy") - def clean_local_repo(self, visit: Visit, exposure_ids: set[int]) -> None: + def clean_local_repo(self, visit: FannedOutVisit, exposure_ids: set[int]) -> None: """Remove local repo content that is only needed for a single visit. This includes raws and pipeline outputs. Parameter --------- - visit : Visit + visit : FannedOutVisit The visit to be removed. exposure_ids : `set` [`int`] Identifiers of the exposures to be removed. diff --git a/python/activator/raw.py b/python/activator/raw.py index d410fd7e..e0661f32 100644 --- a/python/activator/raw.py +++ b/python/activator/raw.py @@ -30,7 +30,7 @@ from dataclasses import dataclass import re -from .visit import Visit +from .visit import FannedOutVisit @dataclass(frozen=True) @@ -68,12 +68,12 @@ def from_oid(cls, oid: str): else: raise ValueError(f"{oid} could not be parsed into a Snap") - def is_consistent(self, visit: Visit): + def is_consistent(self, visit: FannedOutVisit): """Test if this snap could have come from a particular visit. Parameters ---------- - visit : `activator.visit.Visit` + visit : `activator.visit.FannedOutVisit` The visit from which snaps were expected. """ return (self.instrument == visit.instrument diff --git a/python/activator/visit.py b/python/activator/visit.py index cd59b25c..3ab1cd14 100644 --- a/python/activator/visit.py +++ b/python/activator/visit.py @@ -1,11 +1,11 @@ -__all__ = ["Visit"] +__all__ = ["FannedOutVisit", "SummitVisit", "BareVisit"] -from dataclasses import dataclass, field +from dataclasses import dataclass, field, asdict import enum @dataclass(frozen=True, kw_only=True) -class Visit: +class BareVisit: # Elements must be hashable and JSON-persistable; built-in types # recommended. list is not hashable, but gets special treatment because # neither Kafka nor JSON deserialize sequences as tuples. @@ -14,7 +14,7 @@ class Visit: # https://ts-xml.lsst.io/sal_interfaces/ScriptQueue.html#nextvisit class CoordSys(enum.IntEnum): # This is a redeclaration of lsst.ts.idl.enums.Script.MetadataCoordSys, - # but we need Visit to work in code that can't import lsst.ts. + # but we need BareVisit to work in code that can't import lsst.ts. NONE = 1 ICRS = 2 OBSERVED = 3 @@ -33,7 +33,7 @@ class Dome(enum.IntEnum): OPEN = 2 EITHER = 3 - salIndex: int + salIndex: int # this maps to an instrument scriptSalIndex: int groupId: str # observatory-specific ID; not the same as visit number coordinateSystem: CoordSys # coordinate system of position @@ -41,14 +41,25 @@ class Dome(enum.IntEnum): position: list[float] = field(compare=False) rotationSystem: RotSys # coordinate system of cameraAngle cameraAngle: float # in degrees - filters: str # physical filter(s) + # physical filter(s) name as used in Middleware. It is a combination of filter and + # grating joined by a "~". For example, "SDSSi_65mm~empty". + filters: str dome: Dome duration: float # script execution, not exposure nimages: int # number of snaps expected, 0 if unknown survey: str # survey name totalCheckpoints: int - # Added by the Kafka consumer at USDF. + def __str__(self): + """Return a short string that represents the visit but does not + include complete metadata. + """ + return f"(groupId={self.groupId}, salIndex={self.salIndex})" + + +@dataclass(frozen=True, kw_only=True) +class FannedOutVisit(BareVisit): + # Extra information is added by the fan-out service at USDF. instrument: str # short name detector: int @@ -57,3 +68,23 @@ def __str__(self): include "metadata" fields. """ return f"(instrument={self.instrument}, groupId={self.groupId}, detector={self.detector})" + + def get_bare_visit(self): + """Return visit-level info as a dict""" + info = asdict(self) + info.pop("instrument") + info.pop("detector") + return info + + +@dataclass(frozen=True, kw_only=True) +class SummitVisit(BareVisit): + # Extra fields are in the NextVisit messages from the summit + private_efdStamp: float = 0.0 + private_kafkaStamp: float = 0.0 + private_identity: str = "ScriptQueue" + private_revCode: str = "c9aab3df" + private_origin: int = 0 + private_seqNum: int = 0 + private_rcvStamp: float = 0.0 + private_sndStamp: float = 0.0 diff --git a/python/tester/upload.py b/python/tester/upload.py index 93e7b6b3..d1467946 100644 --- a/python/tester/upload.py +++ b/python/tester/upload.py @@ -1,20 +1,16 @@ -__all__ = ["get_last_group", ] - import dataclasses import itertools import logging import os import random -import socket import sys import tempfile import time import boto3 -from confluent_kafka import Producer from activator.raw import Snap, get_raw_path -from activator.visit import Visit +from activator.visit import FannedOutVisit, SummitVisit from tester.utils import get_last_group, make_exposure_id, replace_header_key, send_next_visit @@ -46,18 +42,18 @@ class Instrument: _log.setLevel(logging.INFO) -def process_group(producer, visit_infos, uploader): +def process_group(kafka_url, visit_infos, uploader): """Simulate the observation of a single on-sky pointing. Parameters ---------- - producer : `confluent_kafka.Producer` - The client that posts ``next_visit`` messages. - visit_infos : `set` [`activator.Visit`] + kafka_url : `str` + The URL of the Kafka REST Proxy to send ``next_visit`` messages to. + visit_infos : `set` [`activator.FannedOutVisit`] The visit-detector combinations to be observed; each object may represent multiple snaps. Assumed to represent a single group, and to share instrument, nimages, filters, and survey. - uploader : callable [`activator.Visit`, int] + uploader : callable [`activator.FannedOutVisit`, int] A callable that takes an exposure spec and a snap ID, and uploads the visit's data. """ @@ -65,12 +61,13 @@ def process_group(producer, visit_infos, uploader): for info in visit_infos: group = info.groupId n_snaps = info.nimages + visit = SummitVisit(**info.get_bare_visit()) + send_next_visit(kafka_url, group, {visit}) break else: _log.info("No observations to make; aborting.") return - send_next_visit(producer, group, visit_infos) # TODO: need asynchronous code to handle next_visit delay correctly for snap in range(n_snaps): _log.info(f"Taking group: {group} snap: {snap}") @@ -92,29 +89,24 @@ def main(): date = time.strftime("%Y%m%d") + kafka_url = "https://usdf-rsp-dev.slac.stanford.edu/sasquatch-rest-proxy/topics/test.next-visit" endpoint_url = "https://s3dfrgw.slac.stanford.edu" s3 = boto3.resource("s3", endpoint_url=endpoint_url) dest_bucket = s3.Bucket("rubin-pp") - producer = Producer( - {"bootstrap.servers": kafka_cluster, "client.id": socket.gethostname()} - ) - - try: - last_group = get_last_group(dest_bucket, instrument, date) - _log.info(f"Last group {last_group}") - - src_bucket = s3.Bucket("rubin-pp-users") - raw_pool = get_samples(src_bucket, instrument) - - new_group_base = last_group + random.randrange(10, 19) - if raw_pool: - _log.info(f"Observing real raw files from {instrument}.") - upload_from_raws(producer, instrument, raw_pool, src_bucket, dest_bucket, - n_groups, new_group_base) - else: - _log.error(f"No raw files found for {instrument}, aborting.") - finally: - producer.flush(30.0) + + last_group = get_last_group(dest_bucket, instrument, date) + _log.info(f"Last group {last_group}") + + src_bucket = s3.Bucket("rubin-pp-users") + raw_pool = get_samples(src_bucket, instrument) + + new_group_base = last_group + random.randrange(10, 19) + if raw_pool: + _log.info(f"Observing real raw files from {instrument}.") + upload_from_raws(kafka_url, instrument, raw_pool, src_bucket, dest_bucket, + n_groups, new_group_base) + else: + _log.error(f"No raw files found for {instrument}, aborting.") def get_samples(bucket, instrument): @@ -129,7 +121,7 @@ def get_samples(bucket, instrument): Returns ------- - raws : mapping [`str`, mapping [`int`, mapping [`activator.Visit`, `s3.ObjectSummary`]]] + raws : mapping [`str`, mapping [`int`, mapping [`activator.FannedOutVisit`, `s3.ObjectSummary`]]] A mapping from group IDs to a mapping of snap ID. The value of the innermost mapping is the observation metadata for each detector, and a Blob representing the image taken in that detector-snap. @@ -160,22 +152,25 @@ def get_samples(bucket, instrument): # Assume that the unobserved bucket uses the same filename scheme as # the observed bucket. snap = Snap.from_oid(blob.key) - visit = Visit(instrument=instrument, - detector=snap.detector, - groupId=snap.group, - nimages=INSTRUMENTS[instrument].n_snaps, - filters=snap.filter, - coordinateSystem=Visit.CoordSys.ICRS, - position=[hsc_metadata[snap.exp_id]["ra"], hsc_metadata[snap.exp_id]["dec"]], - rotationSystem=Visit.RotSys.SKY, - cameraAngle=hsc_metadata[snap.exp_id]["rot"], - survey="SURVEY", - salIndex=42, - scriptSalIndex=42, - dome=Visit.Dome.OPEN, - duration=float(EXPOSURE_INTERVAL+SLEW_INTERVAL), - totalCheckpoints=1, - ) + visit = FannedOutVisit( + instrument=instrument, + detector=snap.detector, + groupId=snap.group, + nimages=INSTRUMENTS[instrument].n_snaps, + filters=snap.filter, + coordinateSystem=FannedOutVisit.CoordSys.ICRS, + position=[hsc_metadata[snap.exp_id]["ra"], hsc_metadata[snap.exp_id]["dec"]], + rotationSystem=FannedOutVisit.RotSys.SKY, + cameraAngle=hsc_metadata[snap.exp_id]["rot"], + survey="SURVEY", + # Fan-out uses salIndex to know which instrument and detector config to use. + # The exp_id of this test dataset is coded into fan-out's pattern matching. + salIndex=snap.exp_id, + scriptSalIndex=42, + dome=FannedOutVisit.Dome.OPEN, + duration=float(EXPOSURE_INTERVAL+SLEW_INTERVAL), + totalCheckpoints=1, + ) _log.debug(f"File {blob.key} parsed as snap {snap.snap} of visit {visit}.") if snap.group in result: snap_dict = result[snap.group] @@ -193,16 +188,16 @@ def get_samples(bucket, instrument): return result -def upload_from_raws(producer, instrument, raw_pool, src_bucket, dest_bucket, n_groups, group_base): +def upload_from_raws(kafka_url, instrument, raw_pool, src_bucket, dest_bucket, n_groups, group_base): """Upload visits and files using real raws. Parameters ---------- - producer : `confluent_kafka.Producer` - The client that posts ``next_visit`` messages. + kafka_url : `str` + The URL of the Kafka REST Proxy to send ``next_visit`` messages to. instrument : `str` The short name of the instrument carrying out the observation. - raw_pool : mapping [`str`, mapping [`int`, mapping [`activator.Visit`, `s3.ObjectSummary`]]] + raw_pool : mapping [`str`, mapping [`int`, mapping [`activator.FannedOutVisit`, `s3.ObjectSummary`]]] Available raws as a mapping from group IDs to a mapping of snap ID. The value of the innermost mapping is the observation metadata for each detector, and a Blob representing the image taken in that @@ -233,12 +228,12 @@ def upload_from_raws(producer, instrument, raw_pool, src_bucket, dest_bucket, n_ # snap_dict maps snap_id to {visit: blob} snap_dict = {} # Copy all the visit-blob dictionaries under each snap_id, - # replacing the (immutable) Visit objects to point to group + # replacing the (immutable) FannedOutVisit objects to point to group # instead of true_group. for snap_id, old_visits in raw_pool[true_group].items(): snap_dict[snap_id] = {dataclasses.replace(true_visit, groupId=group): blob for true_visit, blob in old_visits.items()} - # Gather all the Visit objects found in snap_dict, merging + # Gather all the FannedOutVisit objects found in snap_dict, merging # duplicates for different snaps of the same detector. visit_infos = {info for det_dict in snap_dict.values() for info in det_dict} @@ -257,7 +252,7 @@ def upload_from_pool(visit, snap_id): buffer.seek(0) # Assumed by upload_fileobj. dest_bucket.upload_fileobj(buffer, filename) - process_group(producer, visit_infos, upload_from_pool) + process_group(kafka_url, visit_infos, upload_from_pool) _log.info("Slewing to next group") time.sleep(SLEW_INTERVAL) diff --git a/python/tester/upload_hsc_rc2.py b/python/tester/upload_hsc_rc2.py index dab02e90..dbbff8e8 100644 --- a/python/tester/upload_hsc_rc2.py +++ b/python/tester/upload_hsc_rc2.py @@ -22,18 +22,16 @@ import logging import os import random -import socket import sys import tempfile import time import boto3 -from confluent_kafka import Producer from lsst.daf.butler import Butler from activator.raw import get_raw_path -from activator.visit import Visit +from activator.visit import SummitVisit from tester.utils import get_last_group, make_exposure_id, replace_header_key, send_next_visit @@ -59,32 +57,26 @@ def main(): date = time.strftime("%Y%m%d") + kafka_url = "https://usdf-rsp-dev.slac.stanford.edu/sasquatch-rest-proxy/topics/test.next-visit" endpoint_url = "https://s3dfrgw.slac.stanford.edu" s3 = boto3.resource("s3", endpoint_url=endpoint_url) dest_bucket = s3.Bucket("rubin-pp") - producer = Producer( - {"bootstrap.servers": kafka_cluster, "client.id": socket.gethostname()} - ) - last_group = get_last_group(dest_bucket, "HSC", date) group_num = last_group + random.randrange(10, 19) _log.debug(f"Last group {last_group}; new group base {group_num}") butler = Butler("/repo/main") visit_list = get_hsc_visit_list(butler, n_groups) - try: - for visit in visit_list: - group_num += 1 - _log.info(f"Slewing to group {group_num}, with HSC visit {visit}") - time.sleep(SLEW_INTERVAL) - refs = prepare_one_visit(producer, str(group_num), butler, visit) - _log.info(f"Taking exposure for group {group_num}") - time.sleep(EXPOSURE_INTERVAL) - _log.info(f"Uploading detector images for group {group_num}") - upload_hsc_images(dest_bucket, str(group_num), butler, refs) - finally: - producer.flush(30.0) + for visit in visit_list: + group_num += 1 + _log.info(f"Slewing to group {group_num}, with HSC visit {visit}") + time.sleep(SLEW_INTERVAL) + refs = prepare_one_visit(kafka_url, str(group_num), butler, visit) + _log.info(f"Taking exposure for group {group_num}") + time.sleep(EXPOSURE_INTERVAL) + _log.info(f"Uploading detector images for group {group_num}") + upload_hsc_images(dest_bucket, str(group_num), butler, refs) def get_hsc_visit_list(butler, n_sample): @@ -115,7 +107,7 @@ def get_hsc_visit_list(butler, n_sample): return visits -def prepare_one_visit(producer, group_id, butler, visit_id): +def prepare_one_visit(kafka_url, group_id, butler, visit_id): """Extract metadata and send next_visit events for one HSC-RC2 visit One ``next_visit`` message is sent for each detector, to mimic the @@ -125,8 +117,8 @@ def prepare_one_visit(producer, group_id, butler, visit_id): Parameters ---------- - producer : `confluent_kafka.Producer` - The client that posts ``next_visit`` messages. + kafka_url : `str` + The URL of the Kafka REST Proxy to send ``next_visit`` messages to. group_id : `str` The group ID for the message to send. butler : `lsst.daf.butler.Butler` @@ -145,28 +137,24 @@ def prepare_one_visit(producer, group_id, butler, visit_id): dataId={"exposure": visit_id, "instrument": "HSC"}, ) - visits = set() - for data_id in refs.dataIds.expanded(): - visit = Visit( - instrument="HSC", - detector=data_id.records["detector"].id, + # all items in refs share the same visit info and one event is to be sent + for data_id in refs.dataIds.limit(1).expanded(): + visit = SummitVisit( groupId=group_id, nimages=1, filters=data_id.records["physical_filter"].name, - coordinateSystem=Visit.CoordSys.ICRS, + coordinateSystem=SummitVisit.CoordSys.ICRS, position=[data_id.records["exposure"].tracking_ra, data_id.records["exposure"].tracking_dec], - rotationSystem=Visit.RotSys.SKY, + rotationSystem=SummitVisit.RotSys.SKY, cameraAngle=data_id.records["exposure"].sky_angle, survey="SURVEY", - salIndex=42, - scriptSalIndex=42, - dome=Visit.Dome.OPEN, + salIndex=999, + scriptSalIndex=999, + dome=SummitVisit.Dome.OPEN, duration=float(EXPOSURE_INTERVAL+SLEW_INTERVAL), totalCheckpoints=1, ) - visits.add(visit) - - send_next_visit(producer, group_id, visits) + send_next_visit(kafka_url, group_id, {visit}) return refs diff --git a/python/tester/utils.py b/python/tester/utils.py index fdec1eb2..6353bc9e 100644 --- a/python/tester/utils.py +++ b/python/tester/utils.py @@ -21,8 +21,10 @@ __all__ = ["get_last_group", "make_exposure_id", "replace_header_key", "send_next_visit"] +from dataclasses import asdict import json import logging +import requests from astropy.io import fits @@ -148,27 +150,30 @@ def make_hsc_id(group_num, snap): return f"HSCE{exposure_id:08d}", exposure_id -def send_next_visit(producer, group, visit_infos): - """Simulate the transmission of a ``next_visit`` message. +def send_next_visit(url, group, visit_infos): + """Simulate the transmission of a ``next_visit`` message to Sasquatch. Parameters ---------- - producer : `confluent_kafka.Producer` - The client that posts ``next_visit`` messages. + url : `str` + The URL of the Kafka REST Proxy to send ``next_visit`` messages to. group : `str` The group ID for the message to send. - visit_infos : `set` [`activator.Visit`] - The visit-detector combinations to be sent; each object may + visit_infos : `set` [`activator.SummitVisit`] + The ``next_visit`` message to be sent; each object may represent multiple snaps. """ - _log.info(f"Sending next_visit for group: {group}") - topic = "next-visit-topic" + _log.info(f"Sending next_visit for group to kafka http proxy: {group}") + header = {"Content-Type": "application/vnd.kafka.avro.v2+json"} for info in visit_infos: - _log.debug(f"Sending next_visit for group: {info.groupId} detector: {info.detector} " + _log.debug(f"Sending next_visit for group: {info.groupId} " f"filters: {info.filters} ra: {info.position[0]} dec: {info.position[1]} " f"survey: {info.survey}") - data = json.dumps(info.__dict__).encode("utf-8") - producer.produce(topic, data) + records_level = dict(value=asdict(info)) + value_schema_level = dict(value_schema_id=1, records=[records_level]) + + r = requests.post(url, data=json.dumps(value_schema_level), headers=header) + _log.debug(r.content) def replace_header_key(file, key, value): diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index a059f24d..8fc858dc 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -40,7 +40,7 @@ from lsst.obs.base.ingest import RawFileDatasetInfo, RawFileData import lsst.resources -from activator.visit import Visit +from activator.visit import FannedOutVisit from activator.middleware_interface import get_central_butler, make_local_repo, MiddlewareInterface, \ _filter_datasets, _prepend_collection, _remove_from_chain, _MissingDatasetError @@ -63,7 +63,7 @@ def fake_file_data(filename, dimensions, instrument, visit): The full set of dimensions for this butler. instrument : `lsst.obs.base.Instrument` The instrument the file is supposed to be from. - visit : `Visit` + visit : `FannedOutVisit` Group of snaps from one detector to be processed. Returns @@ -143,22 +143,22 @@ def setUp(self): dec = -4.950050405424033 # DECam has no rotator; instrument angle is 90 degrees in our system. rot = 90. - self.next_visit = Visit(instrument=instname, - detector=56, - groupId="1", - nimages=1, - filters=filter, - coordinateSystem=Visit.CoordSys.ICRS, - position=[ra, dec], - rotationSystem=Visit.RotSys.SKY, - cameraAngle=rot, - survey="SURVEY", - salIndex=42, - scriptSalIndex=42, - dome=Visit.Dome.OPEN, - duration=35.0, - totalCheckpoints=1, - ) + self.next_visit = FannedOutVisit(instrument=instname, + detector=56, + groupId="1", + nimages=1, + filters=filter, + coordinateSystem=FannedOutVisit.CoordSys.ICRS, + position=[ra, dec], + rotationSystem=FannedOutVisit.RotSys.SKY, + cameraAngle=rot, + survey="SURVEY", + salIndex=42, + scriptSalIndex=42, + dome=FannedOutVisit.Dome.OPEN, + duration=35.0, + totalCheckpoints=1, + ) self.logger_name = "lsst.activator.middleware_interface" def tearDown(self): @@ -628,22 +628,22 @@ def setUp(self): dec = -4.950050405424033 # DECam has no rotator; instrument angle is 90 degrees in our system. rot = 90. - self.next_visit = Visit(instrument=instname, - detector=56, - groupId="1", - nimages=1, - filters=filter, - coordinateSystem=Visit.CoordSys.ICRS, - position=[ra, dec], - rotationSystem=Visit.RotSys.SKY, - cameraAngle=rot, - survey="SURVEY", - salIndex=42, - scriptSalIndex=42, - dome=Visit.Dome.OPEN, - duration=35.0, - totalCheckpoints=1, - ) + self.next_visit = FannedOutVisit(instrument=instname, + detector=56, + groupId="1", + nimages=1, + filters=filter, + coordinateSystem=FannedOutVisit.CoordSys.ICRS, + position=[ra, dec], + rotationSystem=FannedOutVisit.RotSys.SKY, + cameraAngle=rot, + survey="SURVEY", + salIndex=42, + scriptSalIndex=42, + dome=FannedOutVisit.Dome.OPEN, + duration=35.0, + totalCheckpoints=1, + ) self.second_visit = dataclasses.replace(self.next_visit, groupId="2") self.logger_name = "lsst.activator.middleware_interface" diff --git a/tests/test_raw.py b/tests/test_raw.py index 4f661d00..146d517a 100644 --- a/tests/test_raw.py +++ b/tests/test_raw.py @@ -23,7 +23,7 @@ import unittest from activator.raw import Snap, RAW_REGEXP, get_raw_path -from activator.visit import Visit +from activator.visit import FannedOutVisit class RawTest(unittest.TestCase): @@ -44,22 +44,22 @@ def setUp(self): self.snap = 1 self.exposure = 404 - self.visit = Visit(instrument=self.instrument, - detector=self.detector, - groupId=self.group, - nimages=self.snaps, - filters=self.filter, - coordinateSystem=Visit.CoordSys.ICRS, - position=[self.ra, self.dec], - rotationSystem=Visit.RotSys.SKY, - cameraAngle=self.rot, - survey=self.survey, - salIndex=42, - scriptSalIndex=42, - dome=Visit.Dome.OPEN, - duration=35.0, - totalCheckpoints=1, - ) + self.visit = FannedOutVisit(instrument=self.instrument, + detector=self.detector, + groupId=self.group, + nimages=self.snaps, + filters=self.filter, + coordinateSystem=FannedOutVisit.CoordSys.ICRS, + position=[self.ra, self.dec], + rotationSystem=FannedOutVisit.RotSys.SKY, + cameraAngle=self.rot, + survey=self.survey, + salIndex=42, + scriptSalIndex=42, + dome=FannedOutVisit.Dome.OPEN, + duration=35.0, + totalCheckpoints=1, + ) def test_writeread(self): """Test that raw module can parse the paths it creates. diff --git a/tests/test_visit.py b/tests/test_visit.py index 96f48b2b..16713f77 100644 --- a/tests/test_visit.py +++ b/tests/test_visit.py @@ -19,38 +19,39 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +import dataclasses import json import unittest -from activator.visit import Visit +from activator.visit import FannedOutVisit, BareVisit -class VisitTest(unittest.TestCase): - """Test the Visit class's functionality. +class FannedOutVisitTest(unittest.TestCase): + """Test the FannedOutVisit class's functionality. """ def setUp(self): super().setUp() - self.testbed = Visit( + self.testbed = FannedOutVisit( instrument="NotACam", detector=42, groupId="2023-01-23T23:33:14.762", nimages=2, filters="k2022", - coordinateSystem=Visit.CoordSys.ICRS, + coordinateSystem=FannedOutVisit.CoordSys.ICRS, position=[134.5454, -65.3261], - rotationSystem=Visit.RotSys.SKY, + rotationSystem=FannedOutVisit.RotSys.SKY, cameraAngle=135.0, survey="IMAGINARY", salIndex=42, scriptSalIndex=42, - dome=Visit.Dome.OPEN, + dome=FannedOutVisit.Dome.OPEN, duration=35.0, totalCheckpoints=1, ) def test_hash(self): - # Strictly speaking should test whether Visit fulfills the hash + # Strictly speaking should test whether FannedOutVisit fulfills the hash # contract, but it's not clear what kinds of differences the default # __hash__ might be insensitive to. So just test that the object # is hashable. @@ -59,15 +60,50 @@ def test_hash(self): def test_json(self): serialized = json.dumps(self.testbed.__dict__).encode("utf-8") - deserialized = Visit(**json.loads(serialized)) + deserialized = FannedOutVisit(**json.loads(serialized)) self.assertEqual(deserialized, self.testbed) # Test that enums are handled correctly despite being serialized as shorts. # isinstance checks are ambigious because IntEnum is-an int. - self.assertIs(type(self.testbed.coordinateSystem), Visit.CoordSys) + self.assertIs(type(self.testbed.coordinateSystem), FannedOutVisit.CoordSys) self.assertIs(type(deserialized.coordinateSystem), int) - self.assertIsNot(type(deserialized.coordinateSystem), Visit.CoordSys) + self.assertIsNot(type(deserialized.coordinateSystem), FannedOutVisit.CoordSys) def test_str(self): self.assertNotEqual(str(self.testbed), repr(self.testbed)) self.assertIn(str(self.testbed.detector), str(self.testbed)) self.assertIn(str(self.testbed.groupId), str(self.testbed)) + + +class BareVisitTest(unittest.TestCase): + """Test the BareVisit class's functionality. + """ + def setUp(self): + super().setUp() + + visit_info = dict( + groupId="2023-01-23T23:33:14.762", + nimages=2, + filters="k2022", + coordinateSystem=BareVisit.CoordSys.ICRS, + position=[134.5454, -65.3261], + rotationSystem=BareVisit.RotSys.SKY, + cameraAngle=135.0, + survey="IMAGINARY", + salIndex=42, + scriptSalIndex=42, + dome=BareVisit.Dome.OPEN, + duration=35.0, + totalCheckpoints=1, + ) + self.visit = BareVisit(**visit_info) + self.fannedOutVisit = FannedOutVisit( + instrument="NotACam", + detector=42, + **visit_info + ) + + def test_get_bare(self): + self.assertEqual( + self.fannedOutVisit.get_bare_visit(), + dataclasses.asdict(self.visit) + )