diff --git a/doc/playbook.rst b/doc/playbook.rst index 33c661f6..405e175a 100644 --- a/doc/playbook.rst +++ b/doc/playbook.rst @@ -245,24 +245,29 @@ A few useful commands for managing the service: * ``kubectl get pods`` lists the Kubernetes pods that are currently running, how long they have been active, and how recently they crashed. * ``kubectl logs `` outputs the entire log associated with a particular pod. This can be a long file, so consider piping to ``less`` or ``grep``. + ``kubectl logs`` also offers the ``-f`` flag for streaming output. tester ====== ``python/tester/upload.py`` is a script that simulates the CCS image writer. -On a local machine, it requires a JSON token in ``./prompt-proto-upload.json``. -To obtain a token, see the GCP documentation on `service account keys`_; the relevant service account is ``prompt-image-upload@prompt-proto.iam.gserviceaccount.com``. +It can be run from ``rubin-devl``, but requires the user to install the ``confluent_kafka`` package in their environment. -.. _service account keys: https://cloud.google.com/iam/docs/creating-managing-service-account-keys +You must have a profile set up for the ``rubin-pp`` bucket (see `Buckets`_, above), and must set the ``KAFKA_CLUSTER`` environment variable. +Run: -Run the tester either on a local machine, or in Cloud Shell. -In Cloud Shell, install the prototype code and the GCP PubSub client: +.. code-block:: sh + + kubectl get service -n kafka prompt-processing-kafka-external-bootstrap + +and look up the ``EXTERNAL-IP``; set ``KAFKA_CLUSTER=:9094``. +The IP address is fixed, so you should only need to look it up once. + +Install the prototype code: .. code-block:: sh - gcloud config set project prompt-proto - git clone https://github.com/lsst-dm/prompt_prototype.git - pip3 install google-cloud-pubsub + git clone https://github.com/lsst-dm/prompt_prototype Command line arguments are the instrument name (currently HSC only; other values will upload dummy raws that the pipeline can't process) and the number of groups of images to send. @@ -272,8 +277,8 @@ Sample command line: python upload.py HSC 3 -It sends ``next_visit`` events for each detector via Google Pub/Sub on the ``nextVisit`` topic. -It then uploads a batch of files representing the snaps of the visit to the ``rubin-prompt-proto-main`` GCS bucket. +It sends ``next_visit`` events for each detector via Kafka on the ``next-visit-topic`` topic. +It then uploads a batch of files representing the snaps of the visit to the ``rubin-pp`` S3 bucket. Eventually a set of parallel processes running on multiple nodes will be needed to upload the images sufficiently rapidly. diff --git a/python/activator/activator.py b/python/activator/activator.py index 6e3e2e5b..948d7bdb 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -45,6 +45,8 @@ instrument_name = os.environ["RUBIN_INSTRUMENT"] # URI to the main repository containing calibs and templates calib_repo = os.environ["CALIB_REPO"] +# S3 Endpoint for Buckets; needed for direct Boto access but not Butler +s3_endpoint = os.environ["S3_ENDPOINT_URL"] # Bucket name (not URI) containing raw images image_bucket = os.environ["IMAGE_BUCKET"] # Time to wait for raw image upload, in seconds @@ -57,7 +59,7 @@ kafka_group_id = str(uuid.uuid4()) # The topic on which to listen to updates to image_bucket # bucket_topic = f"{instrument_name}-image" -bucket_topic = "pp-bucket-notify-topic" +bucket_topic = "rubin-prompt-processing" setup_usdf_logger( labels={"instrument": instrument_name}, @@ -78,7 +80,7 @@ "group.id": kafka_group_id, }) -storage_client = boto3.client('s3') +storage_client = boto3.client('s3', endpoint_url=s3_endpoint) # Initialize middleware interface. mwi = MiddlewareInterface(get_central_butler(calib_repo, instrument_name), @@ -108,14 +110,15 @@ def check_for_snap( """ prefix = f"{instrument}/{detector}/{group}/{snap}/" _log.debug(f"Checking for '{prefix}'") - blobs = storage_client.list_objects_v2(Bucket=image_bucket, Prefix=prefix)['Contents'] - if not blobs: + response = storage_client.list_objects_v2(Bucket=image_bucket, Prefix=prefix) + if response["KeyCount"] == 0: return None - elif len(blobs) > 1: + elif response["KeyCount"] > 1: _log.error( f"Multiple files detected for a single detector/group/snap: '{prefix}'" ) - return blobs[0]['Key'] + # Contents only exists if >0 objects found. + return response["Contents"][0]['Key'] def parse_next_visit(http_request): @@ -268,7 +271,7 @@ def next_visit_handler() -> Tuple[str, int]: _log.debug("Received %r", raw_info) if raw_info.is_consistent(expected_visit): # Ingest the snap - mwi.ingest_image(oid) + mwi.ingest_image(expected_visit, oid) expid_set.add(raw_info.exp_id) except ValueError: _log.error(f"Failed to match object id '{oid}'") diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 376cfc85..301b631a 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -115,7 +115,7 @@ class MiddlewareInterface: prefix : `str`, optional URI scheme followed by ``://``; prepended to ``image_bucket`` when constructing URIs to retrieve incoming files. The default is - appropriate for use in the Google Cloud environment; typically only + appropriate for use in the USDF environment; typically only change this when running local tests. """ _COLLECTION_TEMPLATE = "templates" @@ -139,7 +139,7 @@ class MiddlewareInterface: def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, local_storage: str, - prefix: str = "gs://"): + prefix: str = "s3://"): self._apdb_uri = self._make_apdb_uri() self._apdb_namespace = os.environ.get("NAMESPACE_APDB", None) self.central_butler = central_butler @@ -585,8 +585,7 @@ def ingest_image(self, visit: Visit, oid: str) -> None: visit : Visit The visit for which the image was taken. oid : `str` - Google storage identifier for incoming image, relative to the - image bucket. + Identifier for incoming image, relative to the image bucket. """ # TODO: consider allowing pre-existing raws, as may happen when a # pipeline is rerun (see DM-34141).