From d81072fdc51bbfab92699b97e3c787feb5985185 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Mon, 14 Nov 2022 15:02:07 -0800 Subject: [PATCH 1/7] Use explicit S3 endpoint in activator. This appears to be a limit of the boto3 API; see #35 for discussion. --- python/activator/activator.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/activator/activator.py b/python/activator/activator.py index 6e3e2e5b..b86aa093 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -45,6 +45,8 @@ instrument_name = os.environ["RUBIN_INSTRUMENT"] # URI to the main repository containing calibs and templates calib_repo = os.environ["CALIB_REPO"] +# S3 Endpoint for Buckets; needed for direct Boto access but not Butler +s3_endpoint = os.environ["S3_ENDPOINT_URL"] # Bucket name (not URI) containing raw images image_bucket = os.environ["IMAGE_BUCKET"] # Time to wait for raw image upload, in seconds @@ -78,7 +80,7 @@ "group.id": kafka_group_id, }) -storage_client = boto3.client('s3') +storage_client = boto3.client('s3', endpoint_url=s3_endpoint) # Initialize middleware interface. mwi = MiddlewareInterface(get_central_butler(calib_repo, instrument_name), From be1a7426b361a470cb9665fab727ec89d6e3c014 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Mon, 14 Nov 2022 15:21:10 -0800 Subject: [PATCH 2/7] Update instructions for upload.py to reflect USDF. --- doc/playbook.rst | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/doc/playbook.rst b/doc/playbook.rst index 33c661f6..6ace6d57 100644 --- a/doc/playbook.rst +++ b/doc/playbook.rst @@ -250,19 +250,23 @@ tester ====== ``python/tester/upload.py`` is a script that simulates the CCS image writer. -On a local machine, it requires a JSON token in ``./prompt-proto-upload.json``. -To obtain a token, see the GCP documentation on `service account keys`_; the relevant service account is ``prompt-image-upload@prompt-proto.iam.gserviceaccount.com``. +It can be run from ``rubin-devl``, but requires the user to install the ``confluent_kafka`` package in their environment. -.. _service account keys: https://cloud.google.com/iam/docs/creating-managing-service-account-keys +You must have a profile set up for the ``rubin-pp`` bucket (see `Buckets`_, above), and must set the ``KAFKA_CLUSTER`` environment variable. +Run: -Run the tester either on a local machine, or in Cloud Shell. -In Cloud Shell, install the prototype code and the GCP PubSub client: +.. code-block:: sh + + kubectl get service -n kafka prompt-processing-kafka-external-bootstrap + +and look up the ``EXTERNAL-IP``; set ``KAFKA_CLUSTER=:9094``. +The IP address is fixed, so you should only need to look it up once. + +Install the prototype code: .. code-block:: sh - gcloud config set project prompt-proto - git clone https://github.com/lsst-dm/prompt_prototype.git - pip3 install google-cloud-pubsub + git clone https://github.com/lsst-dm/prompt_prototype Command line arguments are the instrument name (currently HSC only; other values will upload dummy raws that the pipeline can't process) and the number of groups of images to send. @@ -272,8 +276,8 @@ Sample command line: python upload.py HSC 3 -It sends ``next_visit`` events for each detector via Google Pub/Sub on the ``nextVisit`` topic. -It then uploads a batch of files representing the snaps of the visit to the ``rubin-prompt-proto-main`` GCS bucket. +It sends ``next_visit`` events for each detector via Kafka on the ``next-visit-topic`` topic. +It then uploads a batch of files representing the snaps of the visit to the ``rubin-pp`` S3 bucket. Eventually a set of parallel processes running on multiple nodes will be needed to upload the images sufficiently rapidly. From d87df995769e4aa44e9d26349a7aa11f44fa091e Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Mon, 14 Nov 2022 15:35:30 -0800 Subject: [PATCH 3/7] Document log streaming in playbook. --- doc/playbook.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/doc/playbook.rst b/doc/playbook.rst index 6ace6d57..405e175a 100644 --- a/doc/playbook.rst +++ b/doc/playbook.rst @@ -245,6 +245,7 @@ A few useful commands for managing the service: * ``kubectl get pods`` lists the Kubernetes pods that are currently running, how long they have been active, and how recently they crashed. * ``kubectl logs `` outputs the entire log associated with a particular pod. This can be a long file, so consider piping to ``less`` or ``grep``. + ``kubectl logs`` also offers the ``-f`` flag for streaming output. tester ====== From 8a580998cb019d297e18d03506c0aa3180fe6a05 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Mon, 14 Nov 2022 16:11:25 -0800 Subject: [PATCH 4/7] Fix fatal bug in check_for_snap. The Contents field does not exist if no files are found, so the old code would crash with a KeyError. --- python/activator/activator.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/python/activator/activator.py b/python/activator/activator.py index b86aa093..c668f0bb 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -110,14 +110,15 @@ def check_for_snap( """ prefix = f"{instrument}/{detector}/{group}/{snap}/" _log.debug(f"Checking for '{prefix}'") - blobs = storage_client.list_objects_v2(Bucket=image_bucket, Prefix=prefix)['Contents'] - if not blobs: + response = storage_client.list_objects_v2(Bucket=image_bucket, Prefix=prefix) + if response["KeyCount"] == 0: return None - elif len(blobs) > 1: + elif response["KeyCount"] > 1: _log.error( f"Multiple files detected for a single detector/group/snap: '{prefix}'" ) - return blobs[0]['Key'] + # Contents only exists if >0 objects found. + return response["Contents"][0]['Key'] def parse_next_visit(http_request): From 52981abf195defc63e15c46e792a8ad85e7856cc Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Mon, 14 Nov 2022 16:34:25 -0800 Subject: [PATCH 5/7] Remove Google dependencies from MiddlewareInterface. --- python/activator/middleware_interface.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 376cfc85..301b631a 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -115,7 +115,7 @@ class MiddlewareInterface: prefix : `str`, optional URI scheme followed by ``://``; prepended to ``image_bucket`` when constructing URIs to retrieve incoming files. The default is - appropriate for use in the Google Cloud environment; typically only + appropriate for use in the USDF environment; typically only change this when running local tests. """ _COLLECTION_TEMPLATE = "templates" @@ -139,7 +139,7 @@ class MiddlewareInterface: def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, local_storage: str, - prefix: str = "gs://"): + prefix: str = "s3://"): self._apdb_uri = self._make_apdb_uri() self._apdb_namespace = os.environ.get("NAMESPACE_APDB", None) self.central_butler = central_butler @@ -585,8 +585,7 @@ def ingest_image(self, visit: Visit, oid: str) -> None: visit : Visit The visit for which the image was taken. oid : `str` - Google storage identifier for incoming image, relative to the - image bucket. + Identifier for incoming image, relative to the image bucket. """ # TODO: consider allowing pre-existing raws, as may happen when a # pipeline is rerun (see DM-34141). From de3458f65bdbe61f85b59192aa34149c7fd22c95 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Mon, 14 Nov 2022 16:49:04 -0800 Subject: [PATCH 6/7] Fix preliminary bucket notification topic. The topic is still hard-coded into the file, until we can decide how we want topics to be organized. --- python/activator/activator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/activator/activator.py b/python/activator/activator.py index c668f0bb..d1429e52 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -59,7 +59,7 @@ kafka_group_id = str(uuid.uuid4()) # The topic on which to listen to updates to image_bucket # bucket_topic = f"{instrument_name}-image" -bucket_topic = "pp-bucket-notify-topic" +bucket_topic = "rubin-prompt-processing" setup_usdf_logger( labels={"instrument": instrument_name}, From f52b86b199ad81a1e2139340fa018a84f1521442 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Mon, 14 Nov 2022 17:12:27 -0800 Subject: [PATCH 7/7] Fix use of old ingestion API in raw-notification branch. --- python/activator/activator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/activator/activator.py b/python/activator/activator.py index d1429e52..948d7bdb 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -271,7 +271,7 @@ def next_visit_handler() -> Tuple[str, int]: _log.debug("Received %r", raw_info) if raw_info.is_consistent(expected_visit): # Ingest the snap - mwi.ingest_image(oid) + mwi.ingest_image(expected_visit, oid) expid_set.add(raw_info.exp_id) except ValueError: _log.error(f"Failed to match object id '{oid}'")