Skip to content
25 changes: 15 additions & 10 deletions doc/playbook.rst
Original file line number Diff line number Diff line change
Expand Up @@ -245,24 +245,29 @@ A few useful commands for managing the service:
* ``kubectl get pods`` lists the Kubernetes pods that are currently running, how long they have been active, and how recently they crashed.
* ``kubectl logs <pod>`` outputs the entire log associated with a particular pod.
This can be a long file, so consider piping to ``less`` or ``grep``.
``kubectl logs`` also offers the ``-f`` flag for streaming output.

tester
======

``python/tester/upload.py`` is a script that simulates the CCS image writer.
On a local machine, it requires a JSON token in ``./prompt-proto-upload.json``.
To obtain a token, see the GCP documentation on `service account keys`_; the relevant service account is ``prompt-image-upload@prompt-proto.iam.gserviceaccount.com``.
It can be run from ``rubin-devl``, but requires the user to install the ``confluent_kafka`` package in their environment.

.. _service account keys: https://cloud.google.com/iam/docs/creating-managing-service-account-keys
You must have a profile set up for the ``rubin-pp`` bucket (see `Buckets`_, above), and must set the ``KAFKA_CLUSTER`` environment variable.
Run:

Run the tester either on a local machine, or in Cloud Shell.
In Cloud Shell, install the prototype code and the GCP PubSub client:
.. code-block:: sh

kubectl get service -n kafka prompt-processing-kafka-external-bootstrap

and look up the ``EXTERNAL-IP``; set ``KAFKA_CLUSTER=<ip>:9094``.
The IP address is fixed, so you should only need to look it up once.

Install the prototype code:

.. code-block:: sh

gcloud config set project prompt-proto
git clone https://github.com/lsst-dm/prompt_prototype.git
pip3 install google-cloud-pubsub
git clone https://github.com/lsst-dm/prompt_prototype

Command line arguments are the instrument name (currently HSC only; other values will upload dummy raws that the pipeline can't process) and the number of groups of images to send.

Expand All @@ -272,8 +277,8 @@ Sample command line:

python upload.py HSC 3

It sends ``next_visit`` events for each detector via Google Pub/Sub on the ``nextVisit`` topic.
It then uploads a batch of files representing the snaps of the visit to the ``rubin-prompt-proto-main`` GCS bucket.
It sends ``next_visit`` events for each detector via Kafka on the ``next-visit-topic`` topic.
It then uploads a batch of files representing the snaps of the visit to the ``rubin-pp`` S3 bucket.

Eventually a set of parallel processes running on multiple nodes will be needed to upload the images sufficiently rapidly.

Expand Down
17 changes: 10 additions & 7 deletions python/activator/activator.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
instrument_name = os.environ["RUBIN_INSTRUMENT"]
# URI to the main repository containing calibs and templates
calib_repo = os.environ["CALIB_REPO"]
# S3 Endpoint for Buckets; needed for direct Boto access but not Butler
s3_endpoint = os.environ["S3_ENDPOINT_URL"]
# Bucket name (not URI) containing raw images
image_bucket = os.environ["IMAGE_BUCKET"]
# Time to wait for raw image upload, in seconds
Expand All @@ -57,7 +59,7 @@
kafka_group_id = str(uuid.uuid4())
# The topic on which to listen to updates to image_bucket
# bucket_topic = f"{instrument_name}-image"
bucket_topic = "pp-bucket-notify-topic"
bucket_topic = "rubin-prompt-processing"

setup_usdf_logger(
labels={"instrument": instrument_name},
Expand All @@ -78,7 +80,7 @@
"group.id": kafka_group_id,
})

storage_client = boto3.client('s3')
storage_client = boto3.client('s3', endpoint_url=s3_endpoint)

# Initialize middleware interface.
mwi = MiddlewareInterface(get_central_butler(calib_repo, instrument_name),
Expand Down Expand Up @@ -108,14 +110,15 @@ def check_for_snap(
"""
prefix = f"{instrument}/{detector}/{group}/{snap}/"
_log.debug(f"Checking for '{prefix}'")
blobs = storage_client.list_objects_v2(Bucket=image_bucket, Prefix=prefix)['Contents']
if not blobs:
response = storage_client.list_objects_v2(Bucket=image_bucket, Prefix=prefix)
if response["KeyCount"] == 0:
return None
elif len(blobs) > 1:
elif response["KeyCount"] > 1:
_log.error(
f"Multiple files detected for a single detector/group/snap: '{prefix}'"
)
return blobs[0]['Key']
# Contents only exists if >0 objects found.
return response["Contents"][0]['Key']


def parse_next_visit(http_request):
Expand Down Expand Up @@ -268,7 +271,7 @@ def next_visit_handler() -> Tuple[str, int]:
_log.debug("Received %r", raw_info)
if raw_info.is_consistent(expected_visit):
# Ingest the snap
mwi.ingest_image(oid)
mwi.ingest_image(expected_visit, oid)
expid_set.add(raw_info.exp_id)
except ValueError:
_log.error(f"Failed to match object id '{oid}'")
Expand Down
32 changes: 19 additions & 13 deletions python/activator/middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class MiddlewareInterface:
prefix : `str`, optional
URI scheme followed by ``://``; prepended to ``image_bucket`` when
constructing URIs to retrieve incoming files. The default is
appropriate for use in the Google Cloud environment; typically only
appropriate for use in the USDF environment; typically only
change this when running local tests.
"""
_COLLECTION_TEMPLATE = "templates"
Expand All @@ -136,10 +136,12 @@ class MiddlewareInterface:
# pre-made inputs, and raws, in that order. However, self.butler is not
# guaranteed to contain concrete data, or even the dimensions
# corresponding to self.camera and self.skymap.
# if it exists, the latest run in self.output_collection is always the first
# in the chain.

def __init__(self, central_butler: Butler, image_bucket: str, instrument: str,
local_storage: str,
prefix: str = "gs://"):
prefix: str = "s3://"):
self._apdb_uri = self._make_apdb_uri()
self._apdb_namespace = os.environ.get("NAMESPACE_APDB", None)
self.central_butler = central_butler
Expand Down Expand Up @@ -585,8 +587,7 @@ def ingest_image(self, visit: Visit, oid: str) -> None:
visit : Visit
The visit for which the image was taken.
oid : `str`
Google storage identifier for incoming image, relative to the
image bucket.
Identifier for incoming image, relative to the image bucket.
"""
# TODO: consider allowing pre-existing raws, as may happen when a
# pipeline is rerun (see DM-34141).
Expand Down Expand Up @@ -664,16 +665,21 @@ def export_outputs(self, visit: Visit, exposure_ids: set[int]) -> None:
self._export_subset(visit, exposure_ids, "raw",
in_collections=self._get_raw_run_name(visit),
out_collection=self.instrument.makeDefaultRawIngestRunName())

umbrella = self.instrument.makeCollectionName("prompt-results")
self._export_subset(visit, exposure_ids,
# TODO: find a way to merge datasets like *_config
# or *_schema that are duplicated across multiple
# workers.
self._get_safe_dataset_types(self.butler),
self.output_collection + "/*", # exclude inputs
umbrella)
_log.info(f"Pipeline products saved to collection '{umbrella}' for "
f"detector {visit.detector} of {exposure_ids}.")
latest_run = self.butler.registry.getCollectionChain(self.output_collection)[0]
if latest_run.startswith(self.output_collection + "/"):
self._export_subset(visit, exposure_ids,
# TODO: find a way to merge datasets like *_config
# or *_schema that are duplicated across multiple
# workers.
self._get_safe_dataset_types(self.butler),
in_collections=latest_run,
out_collection=umbrella)
_log.info(f"Pipeline products saved to collection '{umbrella}' for "
f"detector {visit.detector} of {exposure_ids}.")
else:
_log.warning(f"No output runs to save! Called for {visit.detector} of {exposure_ids}.")

@staticmethod
def _get_safe_dataset_types(butler):
Expand Down
30 changes: 17 additions & 13 deletions python/tester/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,19 +147,23 @@ def main():
{"bootstrap.servers": kafka_cluster, "client.id": socket.gethostname()}
)

last_group = get_last_group(dest_bucket, instrument, date)
_log.info(f"Last group {last_group}")

src_bucket = s3.Bucket("rubin-pp-users")
raw_pool = get_samples(src_bucket, instrument)

new_group_base = last_group + random.randrange(10, 19)
if raw_pool:
_log.info(f"Observing real raw files from {instrument}.")
upload_from_raws(producer, instrument, raw_pool, src_bucket, dest_bucket, n_groups, new_group_base)
else:
_log.info(f"No raw files found for {instrument}, generating dummy files instead.")
upload_from_random(producer, instrument, dest_bucket, n_groups, new_group_base)
try:
last_group = get_last_group(dest_bucket, instrument, date)
_log.info(f"Last group {last_group}")

src_bucket = s3.Bucket("rubin-pp-users")
raw_pool = get_samples(src_bucket, instrument)

new_group_base = last_group + random.randrange(10, 19)
if raw_pool:
_log.info(f"Observing real raw files from {instrument}.")
upload_from_raws(producer, instrument, raw_pool, src_bucket, dest_bucket,
n_groups, new_group_base)
else:
_log.info(f"No raw files found for {instrument}, generating dummy files instead.")
upload_from_random(producer, instrument, dest_bucket, n_groups, new_group_base)
finally:
producer.flush(30.0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the warning but I'm not sure if it matters to flush and wait here. Don't they get sent out still?

Copy link
Member Author

@kfindeisen kfindeisen Nov 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the messages get sent, but such warnings usually mean that a class finalizer is doing cleanup that it can't be trusted to do. Also, the correctness of not flushing depends on the lower-level code not using callbacks, and there's no need for main to assume that.

As for waiting, the delay when I tested it was too small to measure.



def get_last_group(bucket, instrument, date):
Expand Down
41 changes: 39 additions & 2 deletions tests/test_middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import dataclasses
import itertools
import tempfile
import time
import os.path
import unittest
import unittest.mock
Expand Down Expand Up @@ -504,6 +505,7 @@ def setUp(self):
dec=dec,
rot=rot,
kind="SURVEY")
self.second_visit = dataclasses.replace(self.next_visit, group="2")
self.logger_name = "lsst.activator.middleware_interface"

# Populate repository.
Expand All @@ -521,9 +523,19 @@ def setUp(self):
with unittest.mock.patch.object(self.interface.rawIngestTask, "extractMetadata") as mock:
mock.return_value = file_data
self.interface.ingest_image(self.next_visit, filename)
self.interface.ingest_image(self.second_visit, filename)
self.interface.define_visits.run([self.raw_data_id])

# Simulate pipeline execution.
self._simulate_run()

def _simulate_run(self):
"""Create a mock pipeline execution that stores a calexp for self.raw_data_id.

Returns
-------
run : `str`
The output run containing the output.
"""
exp = lsst.afw.image.ExposureF(20, 20)
run = self.interface._prep_collections()
self.processed_data_id = {(k if k != "exposure" else "visit"): v for k, v in self.raw_data_id.items()}
Expand All @@ -532,6 +544,7 @@ def setUp(self):
butler_tests.addDatasetType(self.interface.butler, "calexp", {"instrument", "visit", "detector"},
"ExposureF")
self.interface.butler.put(exp, "calexp", self.processed_data_id, run=run)
return run

def _check_datasets(self, butler, types, collections, count, data_id):
datasets = list(butler.registry.queryDatasets(types, collections=collections))
Expand All @@ -548,7 +561,7 @@ def test_export_outputs(self):
self._check_datasets(central_butler,
"raw", raw_collection, 1, self.raw_data_id)
# Did not export raws directly to raw/all.
self.assertNotEqual(central_butler.registry.getCollectionType(raw_collection), CollectionType.RUN)
self.assertEqual(central_butler.registry.getCollectionType(raw_collection), CollectionType.CHAINED)
self._check_datasets(central_butler,
"calexp", export_collection, 1, self.processed_data_id)
# Did not export calibs or other inputs.
Expand All @@ -567,3 +580,27 @@ def test_export_outputs_bad_visit(self):
def test_export_outputs_bad_exposure(self):
with self.assertRaises(ValueError):
self.interface.export_outputs(self.next_visit, {88})

def test_export_outputs_retry(self):
self.interface.export_outputs(self.next_visit, {self.raw_data_id["exposure"]})

time.sleep(1.0) # Force _simulate_run() to create a new timestamped run
self._simulate_run()
self.interface.export_outputs(self.second_visit, {self.raw_data_id["exposure"]})

central_butler = Butler(self.central_repo.name, writeable=False)
raw_collection = f"{instname}/raw/all"
export_collection = f"{instname}/prompt-results"
self._check_datasets(central_butler,
"raw", raw_collection, 2, self.raw_data_id)
# Did not export raws directly to raw/all.
self.assertEqual(central_butler.registry.getCollectionType(raw_collection), CollectionType.CHAINED)
self._check_datasets(central_butler,
"calexp", export_collection, 2, self.processed_data_id)
# Did not export calibs or other inputs.
self._check_datasets(central_butler,
["cpBias", "gaia", "skyMap", "*Coadd"], export_collection,
0, {"error": "dnc"})
# Nothing placed in "input" collections.
self._check_datasets(central_butler,
["raw", "calexp"], f"{instname}/defaults", 0, {"error": "dnc"})