Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions doc/playbook.rst
Original file line number Diff line number Diff line change
Expand Up @@ -245,24 +245,29 @@ A few useful commands for managing the service:
* ``kubectl get pods`` lists the Kubernetes pods that are currently running, how long they have been active, and how recently they crashed.
* ``kubectl logs <pod>`` outputs the entire log associated with a particular pod.
This can be a long file, so consider piping to ``less`` or ``grep``.
``kubectl logs`` also offers the ``-f`` flag for streaming output.

tester
======

``python/tester/upload.py`` is a script that simulates the CCS image writer.
On a local machine, it requires a JSON token in ``./prompt-proto-upload.json``.
To obtain a token, see the GCP documentation on `service account keys`_; the relevant service account is ``prompt-image-upload@prompt-proto.iam.gserviceaccount.com``.
It can be run from ``rubin-devl``, but requires the user to install the ``confluent_kafka`` package in their environment.

.. _service account keys: https://cloud.google.com/iam/docs/creating-managing-service-account-keys
You must have a profile set up for the ``rubin-pp`` bucket (see `Buckets`_, above), and must set the ``KAFKA_CLUSTER`` environment variable.
Run:

Run the tester either on a local machine, or in Cloud Shell.
In Cloud Shell, install the prototype code and the GCP PubSub client:
.. code-block:: sh

kubectl get service -n kafka prompt-processing-kafka-external-bootstrap

and look up the ``EXTERNAL-IP``; set ``KAFKA_CLUSTER=<ip>:9094``.
The IP address is fixed, so you should only need to look it up once.

Install the prototype code:

.. code-block:: sh

gcloud config set project prompt-proto
git clone https://github.com/lsst-dm/prompt_prototype.git
pip3 install google-cloud-pubsub
git clone https://github.com/lsst-dm/prompt_prototype

Command line arguments are the instrument name (currently HSC only; other values will upload dummy raws that the pipeline can't process) and the number of groups of images to send.

Expand All @@ -272,8 +277,8 @@ Sample command line:

python upload.py HSC 3

It sends ``next_visit`` events for each detector via Google Pub/Sub on the ``nextVisit`` topic.
It then uploads a batch of files representing the snaps of the visit to the ``rubin-prompt-proto-main`` GCS bucket.
It sends ``next_visit`` events for each detector via Kafka on the ``next-visit-topic`` topic.
It then uploads a batch of files representing the snaps of the visit to the ``rubin-pp`` S3 bucket.

Eventually a set of parallel processes running on multiple nodes will be needed to upload the images sufficiently rapidly.

Expand Down
17 changes: 10 additions & 7 deletions python/activator/activator.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
instrument_name = os.environ["RUBIN_INSTRUMENT"]
# URI to the main repository containing calibs and templates
calib_repo = os.environ["CALIB_REPO"]
# S3 Endpoint for Buckets; needed for direct Boto access but not Butler
s3_endpoint = os.environ["S3_ENDPOINT_URL"]
# Bucket name (not URI) containing raw images
image_bucket = os.environ["IMAGE_BUCKET"]
# Time to wait for raw image upload, in seconds
Expand All @@ -57,7 +59,7 @@
kafka_group_id = str(uuid.uuid4())
# The topic on which to listen to updates to image_bucket
# bucket_topic = f"{instrument_name}-image"
bucket_topic = "pp-bucket-notify-topic"
bucket_topic = "rubin-prompt-processing"

setup_usdf_logger(
labels={"instrument": instrument_name},
Expand All @@ -78,7 +80,7 @@
"group.id": kafka_group_id,
})

storage_client = boto3.client('s3')
storage_client = boto3.client('s3', endpoint_url=s3_endpoint)

# Initialize middleware interface.
mwi = MiddlewareInterface(get_central_butler(calib_repo, instrument_name),
Expand Down Expand Up @@ -108,14 +110,15 @@ def check_for_snap(
"""
prefix = f"{instrument}/{detector}/{group}/{snap}/"
_log.debug(f"Checking for '{prefix}'")
blobs = storage_client.list_objects_v2(Bucket=image_bucket, Prefix=prefix)['Contents']
if not blobs:
response = storage_client.list_objects_v2(Bucket=image_bucket, Prefix=prefix)
if response["KeyCount"] == 0:
return None
elif len(blobs) > 1:
elif response["KeyCount"] > 1:
_log.error(
f"Multiple files detected for a single detector/group/snap: '{prefix}'"
)
return blobs[0]['Key']
# Contents only exists if >0 objects found.
return response["Contents"][0]['Key']


def parse_next_visit(http_request):
Expand Down Expand Up @@ -268,7 +271,7 @@ def next_visit_handler() -> Tuple[str, int]:
_log.debug("Received %r", raw_info)
if raw_info.is_consistent(expected_visit):
# Ingest the snap
mwi.ingest_image(oid)
mwi.ingest_image(expected_visit, oid)
expid_set.add(raw_info.exp_id)
except ValueError:
_log.error(f"Failed to match object id '{oid}'")
Expand Down
7 changes: 3 additions & 4 deletions python/activator/middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class MiddlewareInterface:
prefix : `str`, optional
URI scheme followed by ``://``; prepended to ``image_bucket`` when
constructing URIs to retrieve incoming files. The default is
appropriate for use in the Google Cloud environment; typically only
appropriate for use in the USDF environment; typically only
change this when running local tests.
"""
_COLLECTION_TEMPLATE = "templates"
Expand All @@ -139,7 +139,7 @@ class MiddlewareInterface:

def __init__(self, central_butler: Butler, image_bucket: str, instrument: str,
local_storage: str,
prefix: str = "gs://"):
prefix: str = "s3://"):
self._apdb_uri = self._make_apdb_uri()
self._apdb_namespace = os.environ.get("NAMESPACE_APDB", None)
self.central_butler = central_butler
Expand Down Expand Up @@ -585,8 +585,7 @@ def ingest_image(self, visit: Visit, oid: str) -> None:
visit : Visit
The visit for which the image was taken.
oid : `str`
Google storage identifier for incoming image, relative to the
image bucket.
Identifier for incoming image, relative to the image bucket.
"""
# TODO: consider allowing pre-existing raws, as may happen when a
# pipeline is rerun (see DM-34141).
Expand Down