diff --git a/doc/playbook.rst b/doc/playbook.rst index 33c661f6..405e175a 100644 --- a/doc/playbook.rst +++ b/doc/playbook.rst @@ -245,24 +245,29 @@ A few useful commands for managing the service: * ``kubectl get pods`` lists the Kubernetes pods that are currently running, how long they have been active, and how recently they crashed. * ``kubectl logs `` outputs the entire log associated with a particular pod. This can be a long file, so consider piping to ``less`` or ``grep``. + ``kubectl logs`` also offers the ``-f`` flag for streaming output. tester ====== ``python/tester/upload.py`` is a script that simulates the CCS image writer. -On a local machine, it requires a JSON token in ``./prompt-proto-upload.json``. -To obtain a token, see the GCP documentation on `service account keys`_; the relevant service account is ``prompt-image-upload@prompt-proto.iam.gserviceaccount.com``. +It can be run from ``rubin-devl``, but requires the user to install the ``confluent_kafka`` package in their environment. -.. _service account keys: https://cloud.google.com/iam/docs/creating-managing-service-account-keys +You must have a profile set up for the ``rubin-pp`` bucket (see `Buckets`_, above), and must set the ``KAFKA_CLUSTER`` environment variable. +Run: -Run the tester either on a local machine, or in Cloud Shell. -In Cloud Shell, install the prototype code and the GCP PubSub client: +.. code-block:: sh + + kubectl get service -n kafka prompt-processing-kafka-external-bootstrap + +and look up the ``EXTERNAL-IP``; set ``KAFKA_CLUSTER=:9094``. +The IP address is fixed, so you should only need to look it up once. + +Install the prototype code: .. code-block:: sh - gcloud config set project prompt-proto - git clone https://github.com/lsst-dm/prompt_prototype.git - pip3 install google-cloud-pubsub + git clone https://github.com/lsst-dm/prompt_prototype Command line arguments are the instrument name (currently HSC only; other values will upload dummy raws that the pipeline can't process) and the number of groups of images to send. @@ -272,8 +277,8 @@ Sample command line: python upload.py HSC 3 -It sends ``next_visit`` events for each detector via Google Pub/Sub on the ``nextVisit`` topic. -It then uploads a batch of files representing the snaps of the visit to the ``rubin-prompt-proto-main`` GCS bucket. +It sends ``next_visit`` events for each detector via Kafka on the ``next-visit-topic`` topic. +It then uploads a batch of files representing the snaps of the visit to the ``rubin-pp`` S3 bucket. Eventually a set of parallel processes running on multiple nodes will be needed to upload the images sufficiently rapidly. diff --git a/python/activator/activator.py b/python/activator/activator.py index 6e3e2e5b..948d7bdb 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -45,6 +45,8 @@ instrument_name = os.environ["RUBIN_INSTRUMENT"] # URI to the main repository containing calibs and templates calib_repo = os.environ["CALIB_REPO"] +# S3 Endpoint for Buckets; needed for direct Boto access but not Butler +s3_endpoint = os.environ["S3_ENDPOINT_URL"] # Bucket name (not URI) containing raw images image_bucket = os.environ["IMAGE_BUCKET"] # Time to wait for raw image upload, in seconds @@ -57,7 +59,7 @@ kafka_group_id = str(uuid.uuid4()) # The topic on which to listen to updates to image_bucket # bucket_topic = f"{instrument_name}-image" -bucket_topic = "pp-bucket-notify-topic" +bucket_topic = "rubin-prompt-processing" setup_usdf_logger( labels={"instrument": instrument_name}, @@ -78,7 +80,7 @@ "group.id": kafka_group_id, }) -storage_client = boto3.client('s3') +storage_client = boto3.client('s3', endpoint_url=s3_endpoint) # Initialize middleware interface. mwi = MiddlewareInterface(get_central_butler(calib_repo, instrument_name), @@ -108,14 +110,15 @@ def check_for_snap( """ prefix = f"{instrument}/{detector}/{group}/{snap}/" _log.debug(f"Checking for '{prefix}'") - blobs = storage_client.list_objects_v2(Bucket=image_bucket, Prefix=prefix)['Contents'] - if not blobs: + response = storage_client.list_objects_v2(Bucket=image_bucket, Prefix=prefix) + if response["KeyCount"] == 0: return None - elif len(blobs) > 1: + elif response["KeyCount"] > 1: _log.error( f"Multiple files detected for a single detector/group/snap: '{prefix}'" ) - return blobs[0]['Key'] + # Contents only exists if >0 objects found. + return response["Contents"][0]['Key'] def parse_next_visit(http_request): @@ -268,7 +271,7 @@ def next_visit_handler() -> Tuple[str, int]: _log.debug("Received %r", raw_info) if raw_info.is_consistent(expected_visit): # Ingest the snap - mwi.ingest_image(oid) + mwi.ingest_image(expected_visit, oid) expid_set.add(raw_info.exp_id) except ValueError: _log.error(f"Failed to match object id '{oid}'") diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 376cfc85..b237da57 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -115,7 +115,7 @@ class MiddlewareInterface: prefix : `str`, optional URI scheme followed by ``://``; prepended to ``image_bucket`` when constructing URIs to retrieve incoming files. The default is - appropriate for use in the Google Cloud environment; typically only + appropriate for use in the USDF environment; typically only change this when running local tests. """ _COLLECTION_TEMPLATE = "templates" @@ -136,10 +136,12 @@ class MiddlewareInterface: # pre-made inputs, and raws, in that order. However, self.butler is not # guaranteed to contain concrete data, or even the dimensions # corresponding to self.camera and self.skymap. + # if it exists, the latest run in self.output_collection is always the first + # in the chain. def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, local_storage: str, - prefix: str = "gs://"): + prefix: str = "s3://"): self._apdb_uri = self._make_apdb_uri() self._apdb_namespace = os.environ.get("NAMESPACE_APDB", None) self.central_butler = central_butler @@ -585,8 +587,7 @@ def ingest_image(self, visit: Visit, oid: str) -> None: visit : Visit The visit for which the image was taken. oid : `str` - Google storage identifier for incoming image, relative to the - image bucket. + Identifier for incoming image, relative to the image bucket. """ # TODO: consider allowing pre-existing raws, as may happen when a # pipeline is rerun (see DM-34141). @@ -664,16 +665,21 @@ def export_outputs(self, visit: Visit, exposure_ids: set[int]) -> None: self._export_subset(visit, exposure_ids, "raw", in_collections=self._get_raw_run_name(visit), out_collection=self.instrument.makeDefaultRawIngestRunName()) + umbrella = self.instrument.makeCollectionName("prompt-results") - self._export_subset(visit, exposure_ids, - # TODO: find a way to merge datasets like *_config - # or *_schema that are duplicated across multiple - # workers. - self._get_safe_dataset_types(self.butler), - self.output_collection + "/*", # exclude inputs - umbrella) - _log.info(f"Pipeline products saved to collection '{umbrella}' for " - f"detector {visit.detector} of {exposure_ids}.") + latest_run = self.butler.registry.getCollectionChain(self.output_collection)[0] + if latest_run.startswith(self.output_collection + "/"): + self._export_subset(visit, exposure_ids, + # TODO: find a way to merge datasets like *_config + # or *_schema that are duplicated across multiple + # workers. + self._get_safe_dataset_types(self.butler), + in_collections=latest_run, + out_collection=umbrella) + _log.info(f"Pipeline products saved to collection '{umbrella}' for " + f"detector {visit.detector} of {exposure_ids}.") + else: + _log.warning(f"No output runs to save! Called for {visit.detector} of {exposure_ids}.") @staticmethod def _get_safe_dataset_types(butler): diff --git a/python/tester/upload.py b/python/tester/upload.py index ca501e9b..51047a0e 100644 --- a/python/tester/upload.py +++ b/python/tester/upload.py @@ -147,19 +147,23 @@ def main(): {"bootstrap.servers": kafka_cluster, "client.id": socket.gethostname()} ) - last_group = get_last_group(dest_bucket, instrument, date) - _log.info(f"Last group {last_group}") - - src_bucket = s3.Bucket("rubin-pp-users") - raw_pool = get_samples(src_bucket, instrument) - - new_group_base = last_group + random.randrange(10, 19) - if raw_pool: - _log.info(f"Observing real raw files from {instrument}.") - upload_from_raws(producer, instrument, raw_pool, src_bucket, dest_bucket, n_groups, new_group_base) - else: - _log.info(f"No raw files found for {instrument}, generating dummy files instead.") - upload_from_random(producer, instrument, dest_bucket, n_groups, new_group_base) + try: + last_group = get_last_group(dest_bucket, instrument, date) + _log.info(f"Last group {last_group}") + + src_bucket = s3.Bucket("rubin-pp-users") + raw_pool = get_samples(src_bucket, instrument) + + new_group_base = last_group + random.randrange(10, 19) + if raw_pool: + _log.info(f"Observing real raw files from {instrument}.") + upload_from_raws(producer, instrument, raw_pool, src_bucket, dest_bucket, + n_groups, new_group_base) + else: + _log.info(f"No raw files found for {instrument}, generating dummy files instead.") + upload_from_random(producer, instrument, dest_bucket, n_groups, new_group_base) + finally: + producer.flush(30.0) def get_last_group(bucket, instrument, date): diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 8bf2c91c..192561bb 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -22,6 +22,7 @@ import dataclasses import itertools import tempfile +import time import os.path import unittest import unittest.mock @@ -504,6 +505,7 @@ def setUp(self): dec=dec, rot=rot, kind="SURVEY") + self.second_visit = dataclasses.replace(self.next_visit, group="2") self.logger_name = "lsst.activator.middleware_interface" # Populate repository. @@ -521,9 +523,19 @@ def setUp(self): with unittest.mock.patch.object(self.interface.rawIngestTask, "extractMetadata") as mock: mock.return_value = file_data self.interface.ingest_image(self.next_visit, filename) + self.interface.ingest_image(self.second_visit, filename) self.interface.define_visits.run([self.raw_data_id]) - # Simulate pipeline execution. + self._simulate_run() + + def _simulate_run(self): + """Create a mock pipeline execution that stores a calexp for self.raw_data_id. + + Returns + ------- + run : `str` + The output run containing the output. + """ exp = lsst.afw.image.ExposureF(20, 20) run = self.interface._prep_collections() self.processed_data_id = {(k if k != "exposure" else "visit"): v for k, v in self.raw_data_id.items()} @@ -532,6 +544,7 @@ def setUp(self): butler_tests.addDatasetType(self.interface.butler, "calexp", {"instrument", "visit", "detector"}, "ExposureF") self.interface.butler.put(exp, "calexp", self.processed_data_id, run=run) + return run def _check_datasets(self, butler, types, collections, count, data_id): datasets = list(butler.registry.queryDatasets(types, collections=collections)) @@ -548,7 +561,7 @@ def test_export_outputs(self): self._check_datasets(central_butler, "raw", raw_collection, 1, self.raw_data_id) # Did not export raws directly to raw/all. - self.assertNotEqual(central_butler.registry.getCollectionType(raw_collection), CollectionType.RUN) + self.assertEqual(central_butler.registry.getCollectionType(raw_collection), CollectionType.CHAINED) self._check_datasets(central_butler, "calexp", export_collection, 1, self.processed_data_id) # Did not export calibs or other inputs. @@ -567,3 +580,27 @@ def test_export_outputs_bad_visit(self): def test_export_outputs_bad_exposure(self): with self.assertRaises(ValueError): self.interface.export_outputs(self.next_visit, {88}) + + def test_export_outputs_retry(self): + self.interface.export_outputs(self.next_visit, {self.raw_data_id["exposure"]}) + + time.sleep(1.0) # Force _simulate_run() to create a new timestamped run + self._simulate_run() + self.interface.export_outputs(self.second_visit, {self.raw_data_id["exposure"]}) + + central_butler = Butler(self.central_repo.name, writeable=False) + raw_collection = f"{instname}/raw/all" + export_collection = f"{instname}/prompt-results" + self._check_datasets(central_butler, + "raw", raw_collection, 2, self.raw_data_id) + # Did not export raws directly to raw/all. + self.assertEqual(central_butler.registry.getCollectionType(raw_collection), CollectionType.CHAINED) + self._check_datasets(central_butler, + "calexp", export_collection, 2, self.processed_data_id) + # Did not export calibs or other inputs. + self._check_datasets(central_butler, + ["cpBias", "gaia", "skyMap", "*Coadd"], export_collection, + 0, {"error": "dnc"}) + # Nothing placed in "input" collections. + self._check_datasets(central_butler, + ["raw", "calexp"], f"{instname}/defaults", 0, {"error": "dnc"})