From 89ca223eeac808a4ba41c227815443868b70a8c2 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Mon, 3 Oct 2022 16:29:09 -0700 Subject: [PATCH 01/19] Clean up redundant GoogleFormatterTest method. --- tests/test_logger.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/tests/test_logger.py b/tests/test_logger.py index c859ab3a..7e4f82b0 100644 --- a/tests/test_logger.py +++ b/tests/test_logger.py @@ -31,13 +31,6 @@ class GoogleFormatterTest(unittest.TestCase): """Test GCloudStructuredLogFormatter with fake log messages. """ - @classmethod - def setUpClass(cls): - super().setUpClass() - - # Each test has its own handler - # logging.basicConfig(handlers=[logging.NullHandler()]) - def setUp(self): super().setUp() From 232e53bfff99c33485c2c28f71bbfc57cf5df047 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Tue, 4 Oct 2022 12:10:54 -0700 Subject: [PATCH 02/19] Factor logger setup into factory function. This avoids leaving any global variables dangling in `activator.py`. Unfortunately, I don't see a way to unit test `setup_google_logger`: its reliance on global state and standard error means that neither assertLogs nor a custom stream will work. --- python/activator/activator.py | 10 +++------- python/activator/logger.py | 28 +++++++++++++++++++++++++++- 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/python/activator/activator.py b/python/activator/activator.py index 9480d499..0ae1780f 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -34,7 +34,7 @@ from lsst.daf.butler import Butler from lsst.obs.base import Instrument -from .logger import GCloudStructuredLogFormatter +from .logger import setup_google_logger from .make_pgpass import make_pgpass from .middleware_interface import MiddlewareInterface from .raw import RAW_REGEXP @@ -50,15 +50,11 @@ image_bucket = os.environ["IMAGE_BUCKET"] timeout = os.environ.get("IMAGE_TIMEOUT", 50) -# Set up logging for all modules used by this worker. -log_handler = logging.StreamHandler() -log_handler.setFormatter(GCloudStructuredLogFormatter( +setup_google_logger( labels={"instrument": active_instrument.getName()}, -)) -logging.basicConfig(handlers=[log_handler]) +) _log = logging.getLogger("lsst." + __name__) _log.setLevel(logging.DEBUG) -logging.captureWarnings(True) # Write PostgreSQL credentials. diff --git a/python/activator/logger.py b/python/activator/logger.py index 348ed073..142f12a4 100644 --- a/python/activator/logger.py +++ b/python/activator/logger.py @@ -19,12 +19,38 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -__all__ = ["GCloudStructuredLogFormatter"] +__all__ = ["GCloudStructuredLogFormatter", "setup_google_logger"] import json import logging +# TODO: replace with something more extensible, once we know what needs to +# vary besides the formatter (handler type?). +def setup_google_logger(labels=None): + """Set global logging settings for prompt_prototype. + + Calling this function makes `GCloudStructuredLogFormatter` the root + formatter and redirects all warnings to go through it. + + Parameters + ---------- + labels : `dict` [`str`, `str`] + Any metadata that should be attached to all logs. See + ``LogEntry.labels`` in Google Cloud REST API documentation. + + Returns + ------- + handler : `logging.Handler` + The handler used by the root logger. + """ + log_handler = logging.StreamHandler() + log_handler.setFormatter(GCloudStructuredLogFormatter(labels)) + logging.basicConfig(handlers=[log_handler]) + logging.captureWarnings(True) + return log_handler + + class GCloudStructuredLogFormatter(logging.Formatter): """A formatter that can be parsed by the Google Cloud logging agent. From 6b03d91267586060e1a2c43983b62b6042f0e253 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 5 Oct 2022 10:30:58 -0700 Subject: [PATCH 03/19] Move local repo path into envvar. Putting the base path in an environment variable lets us control the (environment-dependent) path inside the activator, while letting MiddlewareInterface take responsibility for the repo without needing to know about the environment. --- doc/playbook.rst | 3 ++- python/activator/activator.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/doc/playbook.rst b/doc/playbook.rst index 1598f6d6..fd9aab88 100644 --- a/doc/playbook.rst +++ b/doc/playbook.rst @@ -109,9 +109,10 @@ To create or edit the Cloud Run service in the Google Cloud Console: * IP_APDB: IP address and port of the APDB (see `Databases`_, below) * IP_REGISTRY: IP address and port of the registry database (see `Databases`_) -* There is also one optional parameter: +* There are also two optional parameters: * IMAGE_TIMEOUT: timeout in seconds to wait for raw image, default 50 sec. + * LOCAL_REPOS: absolute path (in the container) where local repos are created, default ``/tmp``. * One variable is set by Cloud Run and should not be overridden: diff --git a/python/activator/activator.py b/python/activator/activator.py index 0ae1780f..9db21c6d 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -49,6 +49,7 @@ calib_repo = os.environ["CALIB_REPO"] image_bucket = os.environ["IMAGE_BUCKET"] timeout = os.environ.get("IMAGE_TIMEOUT", 50) +local_repos = os.environ.get("LOCAL_REPOS", "/tmp") setup_google_logger( labels={"instrument": active_instrument.getName()}, @@ -82,7 +83,7 @@ collections=[active_instrument.makeCollectionName("defaults")], writeable=True, inferDefaults=False) -repo = f"/tmp/butler-{os.getpid()}" +repo = os.path.join(local_repos, f"butler-{os.getpid()}") butler = Butler(Butler.makeRepo(repo), writeable=True) _log.info("Created local Butler repo at %s.", repo) mwi = MiddlewareInterface(central_butler, image_bucket, config_instrument, butler) From 039ece81fe98a46c1f35f30f0337e5f7bc56a553 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 6 Oct 2022 10:56:13 -0700 Subject: [PATCH 04/19] Avoid assuming external Butler in test code. All tests have been rewritten to use the MiddlewareInterface's internal Butler, allowing future changes to how MiddlewareInterface is constructed. --- tests/test_middleware_interface.py | 47 +++++++++++++++--------------- 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 114b1364..a7adf301 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -155,7 +155,7 @@ def test_init(self): # * On init, is the local butler repo purely in memory? # Check that the butler instance is properly configured. - instruments = list(self.butler.registry.queryDimensionRecords("instrument")) + instruments = list(self.interface.butler.registry.queryDimensionRecords("instrument")) self.assertEqual(instname, instruments[0].name) self.assertEqual(set(self.interface.butler.collections), {self.interface.output_collection}) @@ -224,7 +224,7 @@ def test_prep_butler(self): # TODO DM-34112: check these shards again with some plots, once I've # determined whether ci_hits2015 actually has enough shards. expected_shards = {157394, 157401, 157405} - self._check_imports(self.butler, detector=56, expected_shards=expected_shards) + self._check_imports(self.interface.butler, detector=56, expected_shards=expected_shards) def test_prep_butler_twice(self): """prep_butler should have the correct calibs (and not raise an @@ -239,7 +239,7 @@ def test_prep_butler_twice(self): self.next_visit = dataclasses.replace(self.next_visit, group=str(int(self.next_visit.group) + 1)) self.interface.prep_butler(self.next_visit) expected_shards = {157394, 157401, 157405} - self._check_imports(self.butler, detector=56, expected_shards=expected_shards) + self._check_imports(self.interface.butler, detector=56, expected_shards=expected_shards) # Third visit with different detector and coordinates. # Only 5, 10, 56, 60 have valid calibs. @@ -252,7 +252,7 @@ def test_prep_butler_twice(self): ) self.interface.prep_butler(self.next_visit) expected_shards.update({157218, 157229}) - self._check_imports(self.butler, detector=5, expected_shards=expected_shards) + self._check_imports(self.interface.butler, detector=5, expected_shards=expected_shards) # TODO: regression test for prep_butler having a stale cache for the butler it's updating. # This may be impossible to unit test, since it seems to depend on Google-side parallelism. @@ -262,15 +262,15 @@ def test_ingest_image(self): filename = "fakeRawImage.fits" filepath = os.path.join(self.input_data, filename) data_id, file_data = fake_file_data(filepath, - self.butler.dimensions, + self.interface.butler.dimensions, self.interface.instrument, self.next_visit) with unittest.mock.patch.object(self.interface.rawIngestTask, "extractMetadata") as mock: mock.return_value = file_data self.interface.ingest_image(self.next_visit, filename) - datasets = list(self.butler.registry.queryDatasets('raw', - collections=[f'{instname}/raw/all'])) + datasets = list(self.interface.butler.registry.queryDatasets('raw', + collections=[f'{instname}/raw/all'])) self.assertEqual(datasets[0].dataId, data_id) # TODO: After raw ingest, we can define exposure dimension records # and check that the visits are defined @@ -290,7 +290,7 @@ def test_ingest_image_fails_missing_file(self): filename = "nonexistentImage.fits" filepath = os.path.join(self.input_data, filename) data_id, file_data = fake_file_data(filepath, - self.butler.dimensions, + self.interface.butler.dimensions, self.interface.instrument, self.next_visit) with unittest.mock.patch.object(self.interface.rawIngestTask, "extractMetadata") as mock, \ @@ -298,8 +298,8 @@ def test_ingest_image_fails_missing_file(self): mock.return_value = file_data self.interface.ingest_image(self.next_visit, filename) # There should not be any raw files in the registry. - datasets = list(self.butler.registry.queryDatasets('raw', - collections=[f'{instname}/raw/all'])) + datasets = list(self.interface.butler.registry.queryDatasets('raw', + collections=[f'{instname}/raw/all'])) self.assertEqual(datasets, []) def test_run_pipeline(self): @@ -312,7 +312,7 @@ def test_run_pipeline(self): filename = "fakeRawImage.fits" filepath = os.path.join(self.input_data, filename) data_id, file_data = fake_file_data(filepath, - self.butler.dimensions, + self.interface.butler.dimensions, self.interface.instrument, self.next_visit) with unittest.mock.patch.object(self.interface.rawIngestTask, "extractMetadata") as mock: @@ -336,7 +336,7 @@ def test_run_pipeline_empty_quantum_graph(self): filename = "fakeRawImage.fits" filepath = os.path.join(self.input_data, filename) data_id, file_data = fake_file_data(filepath, - self.butler.dimensions, + self.interface.butler.dimensions, self.interface.instrument, self.next_visit) with unittest.mock.patch.object(self.interface.rawIngestTask, "extractMetadata") as mock: @@ -391,20 +391,21 @@ def test_query_missing_datasets_nodim(self): self.assertEqual(result, {data1}) def test_prepend_collection(self): - self.butler.registry.registerCollection("_prepend1", CollectionType.TAGGED) - self.butler.registry.registerCollection("_prepend2", CollectionType.TAGGED) - self.butler.registry.registerCollection("_prepend3", CollectionType.TAGGED) - self.butler.registry.registerCollection("_prepend_base", CollectionType.CHAINED) + butler = self.interface.butler + butler.registry.registerCollection("_prepend1", CollectionType.TAGGED) + butler.registry.registerCollection("_prepend2", CollectionType.TAGGED) + butler.registry.registerCollection("_prepend3", CollectionType.TAGGED) + butler.registry.registerCollection("_prepend_base", CollectionType.CHAINED) # Empty chain. - self.assertEqual(list(self.butler.registry.getCollectionChain("_prepend_base")), []) - _prepend_collection(self.butler, "_prepend_base", ["_prepend1"]) - self.assertEqual(list(self.butler.registry.getCollectionChain("_prepend_base")), ["_prepend1"]) + self.assertEqual(list(butler.registry.getCollectionChain("_prepend_base")), []) + _prepend_collection(butler, "_prepend_base", ["_prepend1"]) + self.assertEqual(list(butler.registry.getCollectionChain("_prepend_base")), ["_prepend1"]) # Non-empty chain. - self.butler.registry.setCollectionChain("_prepend_base", ["_prepend1", "_prepend2"]) - _prepend_collection(self.butler, "_prepend_base", ["_prepend3"]) - self.assertEqual(list(self.butler.registry.getCollectionChain("_prepend_base")), + butler.registry.setCollectionChain("_prepend_base", ["_prepend1", "_prepend2"]) + _prepend_collection(butler, "_prepend_base", ["_prepend3"]) + self.assertEqual(list(butler.registry.getCollectionChain("_prepend_base")), ["_prepend3", "_prepend1", "_prepend2"]) @@ -495,7 +496,7 @@ def setUp(self): filename = "fakeRawImage.fits" filepath = os.path.join(self.input_data, filename) self.raw_data_id, file_data = fake_file_data(filepath, - self.butler.dimensions, + self.interface.butler.dimensions, self.interface.instrument, self.next_visit) with unittest.mock.patch.object(self.interface.rawIngestTask, "extractMetadata") as mock: From 0d9ffae794e59392f936dcf69db1a7e8c12f40df Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 6 Oct 2022 11:00:28 -0700 Subject: [PATCH 05/19] Factor local Butler construction out of activator. This change not only partially decouples the activator from LSST/Butler code, it also allows MiddlewareInterface to guarantee that the Butler is unique and isolated from all other MiddlewareInterface instances. Previously, this had to be enforced by the creator as a precondition. --- python/activator/activator.py | 5 +---- python/activator/middleware_interface.py | 25 +++++++++++++----------- tests/test_middleware_interface.py | 15 +++++++------- 3 files changed, 22 insertions(+), 23 deletions(-) diff --git a/python/activator/activator.py b/python/activator/activator.py index 9db21c6d..8d1dbca0 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -83,10 +83,7 @@ collections=[active_instrument.makeCollectionName("defaults")], writeable=True, inferDefaults=False) -repo = os.path.join(local_repos, f"butler-{os.getpid()}") -butler = Butler(Butler.makeRepo(repo), writeable=True) -_log.info("Created local Butler repo at %s.", repo) -mwi = MiddlewareInterface(central_butler, image_bucket, config_instrument, butler) +mwi = MiddlewareInterface(central_butler, image_bucket, config_instrument, local_repos) def check_for_snap( diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 921c365f..33b1ff7a 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -79,10 +79,9 @@ class MiddlewareInterface: butler collections and dataIds. Example: "lsst.obs.lsst.LsstCam" TODO: this arg can probably be removed and replaced with internal use of the butler. - butler : `lsst.daf.butler.Butler` - Local butler to process data in and hold calibrations, etc.; must be - writeable. Must not be shared with any other ``MiddlewareInterface`` - object. + local_storage : `str` + An absolute path to a space where this object can create a local + Butler repo. The repo is guaranteed to be unique to this object. prefix : `str`, optional URI scheme followed by ``://``; prepended to ``image_bucket`` when constructing URIs to retrieve incoming files. The default is @@ -107,7 +106,7 @@ class MiddlewareInterface: # corresponding to self.camera and self.skymap. def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, - butler: Butler, + local_storage: str, prefix: str = "gs://"): self.ip_apdb = os.environ["IP_APDB"] self.central_butler = central_butler @@ -121,7 +120,7 @@ def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, self.output_collection = self.instrument.makeCollectionName("prompt") - self._init_local_butler(butler) + self._init_local_butler(local_storage) self._init_ingester() self._init_visit_definer() @@ -141,18 +140,22 @@ def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, # How much to pad the refcat region we will copy over. self.padding = 30*lsst.geom.arcseconds - def _init_local_butler(self, butler: Butler): + def _init_local_butler(self, base_path: str): """Prepare the local butler to ingest into and process from. ``self.instrument`` must already exist. ``self.butler`` is correctly - initialized after this method returns. + initialized after this method returns, and is guaranteed to be unique + to this object. Parameters ---------- - butler : `lsst.daf.butler.Butler` - Local butler to process data in and hold calibrations, etc.; must - be writeable. + base_path : `str` + An absolute path to a space where the repo can be created. """ + repo = os.path.join(base_path, f"butler-{os.getpid()}") + butler = Butler(Butler.makeRepo(repo), writeable=True) + _log.info("Created local Butler repo at %s.", repo) + self.instrument.register(butler.registry) # Will be populated in prep_butler. diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index a7adf301..b050edbe 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -120,9 +120,9 @@ def setUp(self): instrument_name = "DECam" self.input_data = os.path.join(data_dir, "input_data") # Have to preserve the tempdir, so that it doesn't get cleaned up. - self.repo = tempfile.TemporaryDirectory() - self.butler = Butler(Butler.makeRepo(self.repo.name), writeable=True) - self.interface = MiddlewareInterface(central_butler, self.input_data, instrument, self.butler, + self.workspace = tempfile.TemporaryDirectory() + self.interface = MiddlewareInterface(central_butler, self.input_data, instrument, + self.workspace.name, prefix="file://") # coordinates from DECam data in ap_verify_ci_hits2015 for visit 411371 @@ -144,7 +144,7 @@ def setUp(self): def tearDown(self): super().tearDown() # TemporaryDirectory warns on leaks - self.repo.cleanup() + self.workspace.cleanup() def test_init(self): """Basic tests of the initialized interface object. @@ -467,11 +467,10 @@ def setUp(self): instrument = "lsst.obs.decam.DarkEnergyCamera" data_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), "data") self.input_data = os.path.join(data_dir, "input_data") - repo = tempfile.TemporaryDirectory() + workspace = tempfile.TemporaryDirectory() # TemporaryDirectory warns on leaks; addCleanup also keeps the TD from # getting garbage-collected. - self.addCleanup(tempfile.TemporaryDirectory.cleanup, repo) - self.butler = Butler(Butler.makeRepo(repo.name), writeable=True) + self.addCleanup(tempfile.TemporaryDirectory.cleanup, workspace) # coordinates from DECam data in ap_verify_ci_hits2015 for visit 411371 ra = 155.4702849608958 @@ -490,7 +489,7 @@ def setUp(self): self.logger_name = "lsst.activator.middleware_interface" # Populate repository. - self.interface = MiddlewareInterface(central_butler, self.input_data, instrument, self.butler, + self.interface = MiddlewareInterface(central_butler, self.input_data, instrument, workspace.name, prefix="file://") self.interface.prep_butler(self.next_visit) filename = "fakeRawImage.fits" From 61b8292c59b44502fe698848f70e27d08d89c75d Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 6 Oct 2022 11:17:55 -0700 Subject: [PATCH 06/19] Use tempdir instead of PID for local repo path. There is no guarantee that each MiddlewareInterface object will be associated with a unique PID in all future architectures; for example, on GCP each worker always has PID 503, and the repos are disambiguated by being on different (virtual) filesystems. --- python/activator/middleware_interface.py | 8 +++++--- tests/test_middleware_interface.py | 3 +++ 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 33b1ff7a..7fe4b623 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -99,6 +99,7 @@ class MiddlewareInterface: # self.image_host is a valid URI with non-empty path and no query or fragment. # self._download_store is None if and only if self.image_host is a local URI. # self.instrument, self.camera, and self.skymap do not change after __init__. + # self._repo is the only reference to its TemporaryDirectory object. # self.butler defaults to a chained collection named # self.output_collection, which contains zero or more output runs, # pre-made inputs, and raws, in that order. However, self.butler is not @@ -152,9 +153,10 @@ def _init_local_butler(self, base_path: str): base_path : `str` An absolute path to a space where the repo can be created. """ - repo = os.path.join(base_path, f"butler-{os.getpid()}") - butler = Butler(Butler.makeRepo(repo), writeable=True) - _log.info("Created local Butler repo at %s.", repo) + # Directory has same lifetime as this object. + self._repo = tempfile.TemporaryDirectory(dir=base_path, prefix="butler-") + butler = Butler(Butler.makeRepo(self._repo.name), writeable=True) + _log.info("Created local Butler repo at %s.", self._repo.name) self.instrument.register(butler.registry) diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index b050edbe..dbd98244 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -144,6 +144,7 @@ def setUp(self): def tearDown(self): super().tearDown() # TemporaryDirectory warns on leaks + self.interface._repo.cleanup() # TODO: should MiddlewareInterface have a cleanup method? self.workspace.cleanup() def test_init(self): @@ -491,6 +492,8 @@ def setUp(self): # Populate repository. self.interface = MiddlewareInterface(central_butler, self.input_data, instrument, workspace.name, prefix="file://") + # TODO: should MiddlewareInterface have a cleanup method? + self.addCleanup(tempfile.TemporaryDirectory.cleanup, self.interface._repo) self.interface.prep_butler(self.next_visit) filename = "fakeRawImage.fits" filepath = os.path.join(self.input_data, filename) From e8b8264ac7f8f72b38839027ac93d1dafcbc7ff2 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 6 Oct 2022 12:22:01 -0700 Subject: [PATCH 07/19] Remove redundant variable from test setup. --- tests/test_middleware_interface.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index dbd98244..2cb0bbb1 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -117,7 +117,6 @@ def setUp(self): writeable=False, inferDefaults=False) instrument = "lsst.obs.decam.DarkEnergyCamera" - instrument_name = "DECam" self.input_data = os.path.join(data_dir, "input_data") # Have to preserve the tempdir, so that it doesn't get cleaned up. self.workspace = tempfile.TemporaryDirectory() @@ -130,7 +129,7 @@ def setUp(self): dec = -4.950050405424033 # DECam has no rotator; instrument angle is 90 degrees in our system. rot = 90. - self.next_visit = Visit(instrument_name, + self.next_visit = Visit(instname, detector=56, group="1", snaps=1, From 3fe23b51db9fc57cab452eb095449ef7b41c6872 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 6 Oct 2022 12:34:49 -0700 Subject: [PATCH 08/19] Factor central Butler construction out of activator. This change removes any need for activator.py to know about the Butler, although a Butler object is still returned from one function call in the activator. This is a lesser evil than having the details of the central Butler definition be coded into the MiddlewareInterface class. --- python/activator/activator.py | 16 +++++--------- python/activator/middleware_interface.py | 28 +++++++++++++++++++++++- tests/test_middleware_interface.py | 16 +++++++++++--- 3 files changed, 45 insertions(+), 15 deletions(-) diff --git a/python/activator/activator.py b/python/activator/activator.py index 8d1dbca0..93ee0e13 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -32,11 +32,10 @@ from flask import Flask, request from google.cloud import pubsub_v1, storage -from lsst.daf.butler import Butler from lsst.obs.base import Instrument from .logger import setup_google_logger from .make_pgpass import make_pgpass -from .middleware_interface import MiddlewareInterface +from .middleware_interface import get_central_butler, MiddlewareInterface from .raw import RAW_REGEXP from .visit import Visit @@ -75,15 +74,10 @@ storage_client = storage.Client() # Initialize middleware interface. -# TODO: this should not be done in activator.py, which is supposed to have only -# framework/messaging support (ideally, it should not contain any LSST imports). -# However, we don't want MiddlewareInterface to need to know details like where -# the central repo is located, either, so perhaps we need a new module. -central_butler = Butler(calib_repo, - collections=[active_instrument.makeCollectionName("defaults")], - writeable=True, - inferDefaults=False) -mwi = MiddlewareInterface(central_butler, image_bucket, config_instrument, local_repos) +mwi = MiddlewareInterface(get_central_butler(calib_repo, config_instrument), + image_bucket, + config_instrument, + local_repos) def check_for_snap( diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 7fe4b623..168b9c5c 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -19,7 +19,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -__all__ = ["MiddlewareInterface"] +__all__ = ["get_central_butler", "MiddlewareInterface"] import collections.abc import itertools @@ -45,6 +45,32 @@ _log.setLevel(logging.DEBUG) +def get_central_butler(central_repo: str, instrument_class: str): + """Provide a Butler that can access the given repository and read and write + data for the given instrument. + + Parameters + ---------- + central_repo : `str` + The path or URI to the central repository. + instrument_class : `str` + The fully-qualified class name of the instrument whose data will be + retrieved or written. + + Returns + ------- + butler : `lsst.daf.butler.Butler` + A Butler for ``central_repo`` pre-configured to load and store + ``instrument_name`` data. + """ + instrument = lsst.obs.base.Instrument.from_string(instrument_class) + return Butler(central_repo, + collections=[instrument.makeCollectionName("defaults")], + writeable=True, + inferDefaults=False, + ) + + class MiddlewareInterface: """Interface layer between the Butler middleware and the prompt processing data handling system, to handle processing individual images. diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 2cb0bbb1..cc2d5ffa 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -38,7 +38,8 @@ import lsst.resources from activator.visit import Visit -from activator.middleware_interface import MiddlewareInterface, _query_missing_datasets, _prepend_collection +from activator.middleware_interface import get_central_butler, MiddlewareInterface, \ + _query_missing_datasets, _prepend_collection # The short name of the instrument used in the test repo. instname = "DECam" @@ -111,8 +112,8 @@ def tearDownClass(cls): def setUp(self): data_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), "data") - central_repo = os.path.join(data_dir, "central_repo") - central_butler = Butler(central_repo, + self.central_repo = os.path.join(data_dir, "central_repo") + central_butler = Butler(self.central_repo, collections=[f"{instname}/defaults"], writeable=False, inferDefaults=False) @@ -146,6 +147,15 @@ def tearDown(self): self.interface._repo.cleanup() # TODO: should MiddlewareInterface have a cleanup method? self.workspace.cleanup() + def test_get_butler(self): + butler = get_central_butler(self.central_repo, "lsst.obs.decam.DarkEnergyCamera") + # TODO: better way to test repo location? + self.assertTrue( + butler.getURI("camera", instrument=instname, run="foo", predict=True).ospath + .startswith(self.central_repo)) + self.assertEqual(list(butler.collections), [f"{instname}/defaults"]) + self.assertTrue(butler.isWriteable()) + def test_init(self): """Basic tests of the initialized interface object. """ From 7f1508ebad4f3fe443fa81bdac214a261be02aee Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 6 Oct 2022 12:40:00 -0700 Subject: [PATCH 09/19] Factor Instrument object out of activator. A temporary Instrument object is still needed to translate between class name and short name. --- python/activator/activator.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python/activator/activator.py b/python/activator/activator.py index 93ee0e13..6888b116 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -44,14 +44,14 @@ verification_token = os.environ["PUBSUB_VERIFICATION_TOKEN"] # The full instrument class name, including module path. config_instrument = os.environ["RUBIN_INSTRUMENT"] -active_instrument = Instrument.from_string(config_instrument) +instrument_name = Instrument.from_string(config_instrument).getName() calib_repo = os.environ["CALIB_REPO"] image_bucket = os.environ["IMAGE_BUCKET"] timeout = os.environ.get("IMAGE_TIMEOUT", 50) local_repos = os.environ.get("LOCAL_REPOS", "/tmp") setup_google_logger( - labels={"instrument": active_instrument.getName()}, + labels={"instrument": instrument_name}, ) _log = logging.getLogger("lsst." + __name__) _log.setLevel(logging.DEBUG) @@ -67,7 +67,7 @@ subscriber = pubsub_v1.SubscriberClient() topic_path = subscriber.topic_path( PROJECT_ID, - f"{active_instrument.getName()}-image", + f"{instrument_name}-image", ) subscription = None @@ -147,8 +147,8 @@ def next_visit_handler() -> Tuple[str, int]: payload = base64.b64decode(envelope["message"]["data"]) data = json.loads(payload) expected_visit = Visit(**data) - assert expected_visit.instrument == active_instrument.getName(), \ - f"Expected {active_instrument.getName()}, received {expected_visit.instrument}." + assert expected_visit.instrument == instrument_name, \ + f"Expected {instrument_name}, received {expected_visit.instrument}." expid_set = set() # Copy calibrations for this detector/visit From 5414c72a182391487aaac974a5c84581593ca2ad Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 6 Oct 2022 13:35:44 -0700 Subject: [PATCH 10/19] Allow MiddlewareInterface code to take either instrument name. Making MWI more flexible on this matter gives us a lot more freedom in how we handle instrument information in activator.py. --- python/activator/middleware_interface.py | 17 +++++++++++------ tests/test_middleware_interface.py | 16 +++++++++------- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 168b9c5c..cb17fd1e 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -54,8 +54,8 @@ def get_central_butler(central_repo: str, instrument_class: str): central_repo : `str` The path or URI to the central repository. instrument_class : `str` - The fully-qualified class name of the instrument whose data will be - retrieved or written. + The name of the instrument whose data will be retrieved or written. May + be either the fully qualified class name or the short name. Returns ------- @@ -63,7 +63,10 @@ def get_central_butler(central_repo: str, instrument_class: str): A Butler for ``central_repo`` pre-configured to load and store ``instrument_name`` data. """ - instrument = lsst.obs.base.Instrument.from_string(instrument_class) + # TODO: how much overhead do we take on from asking the central repo about + # the instrument instead of handling it internally? + registry = Butler(central_repo).registry + instrument = lsst.obs.base.Instrument.from_string(instrument_class, registry) return Butler(central_repo, collections=[instrument.makeCollectionName("defaults")], writeable=True, @@ -101,8 +104,9 @@ class MiddlewareInterface: Storage bucket where images will be written to as they arrive. See also ``prefix``. instrument : `str` - Full class name of the instrument taking the data, for populating - butler collections and dataIds. Example: "lsst.obs.lsst.LsstCam" + Name of the instrument taking the data, for populating + butler collections and dataIds. May be either the fully qualified class + name or the short name. Examples: "LsstCam", "lsst.obs.lsst.LsstCam". TODO: this arg can probably be removed and replaced with internal use of the butler. local_storage : `str` @@ -143,7 +147,8 @@ def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, self._download_store = tempfile.TemporaryDirectory(prefix="holding-") else: self._download_store = None - self.instrument = lsst.obs.base.Instrument.from_string(instrument) + # TODO: how much overhead do we pick up from going through the registry? + self.instrument = lsst.obs.base.Instrument.from_string(instrument, central_butler.registry) self.output_collection = self.instrument.makeCollectionName("prompt") diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index cc2d5ffa..29298b14 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -148,13 +148,15 @@ def tearDown(self): self.workspace.cleanup() def test_get_butler(self): - butler = get_central_butler(self.central_repo, "lsst.obs.decam.DarkEnergyCamera") - # TODO: better way to test repo location? - self.assertTrue( - butler.getURI("camera", instrument=instname, run="foo", predict=True).ospath - .startswith(self.central_repo)) - self.assertEqual(list(butler.collections), [f"{instname}/defaults"]) - self.assertTrue(butler.isWriteable()) + for butler in [get_central_butler(self.central_repo, "lsst.obs.decam.DarkEnergyCamera"), + get_central_butler(self.central_repo, instname), + ]: + # TODO: better way to test repo location? + self.assertTrue( + butler.getURI("camera", instrument=instname, run="foo", predict=True).ospath + .startswith(self.central_repo)) + self.assertEqual(list(butler.collections), [f"{instname}/defaults"]) + self.assertTrue(butler.isWriteable()) def test_init(self): """Basic tests of the initialized interface object. From 93e3a46b9cc39d4939d335e886b4fe244d9baeec Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 6 Oct 2022 13:39:35 -0700 Subject: [PATCH 11/19] Use only short instrument name in activator. The short name is used in most contexts, including (by requirement) the next_visit protocol. Eliminating all uses of the class name allows us to remove conversion code between the two, thereby removing the last LSST imports from the activator. This is a breaking API change to the service's environment variables. --- doc/playbook.rst | 2 +- python/activator/activator.py | 10 ++++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/doc/playbook.rst b/doc/playbook.rst index fd9aab88..04c8fc4b 100644 --- a/doc/playbook.rst +++ b/doc/playbook.rst @@ -102,7 +102,7 @@ To create or edit the Cloud Run service in the Google Cloud Console: * Select the container image URL from "Artifact Registry > prompt-proto-service" * In the Variables & Secrets tab, set the following required parameters: - * RUBIN_INSTRUMENT: full instrument class name, including module path + * RUBIN_INSTRUMENT: the "short" instrument name * PUBSUB_VERIFICATION_TOKEN: choose an arbitrary string matching the Pub/Sub endpoint URL below * IMAGE_BUCKET: bucket containing raw images (``rubin-prompt-proto-main``) * CALIB_REPO: repo containing calibrations (and templates) diff --git a/python/activator/activator.py b/python/activator/activator.py index 6888b116..ce60bdfb 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -32,7 +32,6 @@ from flask import Flask, request from google.cloud import pubsub_v1, storage -from lsst.obs.base import Instrument from .logger import setup_google_logger from .make_pgpass import make_pgpass from .middleware_interface import get_central_butler, MiddlewareInterface @@ -42,9 +41,8 @@ PROJECT_ID = "prompt-proto" verification_token = os.environ["PUBSUB_VERIFICATION_TOKEN"] -# The full instrument class name, including module path. -config_instrument = os.environ["RUBIN_INSTRUMENT"] -instrument_name = Instrument.from_string(config_instrument).getName() +# The short name for the instrument. +instrument_name = os.environ["RUBIN_INSTRUMENT"] calib_repo = os.environ["CALIB_REPO"] image_bucket = os.environ["IMAGE_BUCKET"] timeout = os.environ.get("IMAGE_TIMEOUT", 50) @@ -74,9 +72,9 @@ storage_client = storage.Client() # Initialize middleware interface. -mwi = MiddlewareInterface(get_central_butler(calib_repo, config_instrument), +mwi = MiddlewareInterface(get_central_butler(calib_repo, instrument_name), image_bucket, - config_instrument, + instrument_name, local_repos) From 18b81836c829b1574ece2ed77abe649490c7c1e0 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 6 Oct 2022 13:46:57 -0700 Subject: [PATCH 12/19] Document environment variables. --- doc/playbook.rst | 2 +- python/activator/activator.py | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/doc/playbook.rst b/doc/playbook.rst index 04c8fc4b..d6e6e488 100644 --- a/doc/playbook.rst +++ b/doc/playbook.rst @@ -105,7 +105,7 @@ To create or edit the Cloud Run service in the Google Cloud Console: * RUBIN_INSTRUMENT: the "short" instrument name * PUBSUB_VERIFICATION_TOKEN: choose an arbitrary string matching the Pub/Sub endpoint URL below * IMAGE_BUCKET: bucket containing raw images (``rubin-prompt-proto-main``) - * CALIB_REPO: repo containing calibrations (and templates) + * CALIB_REPO: URI to repo containing calibrations (and templates) * IP_APDB: IP address and port of the APDB (see `Databases`_, below) * IP_REGISTRY: IP address and port of the registry database (see `Databases`_) diff --git a/python/activator/activator.py b/python/activator/activator.py index ce60bdfb..6bc459ad 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -43,9 +43,13 @@ verification_token = os.environ["PUBSUB_VERIFICATION_TOKEN"] # The short name for the instrument. instrument_name = os.environ["RUBIN_INSTRUMENT"] +# URI to the main repository containing calibs and templates calib_repo = os.environ["CALIB_REPO"] +# Bucket name (not URI) containing raw images image_bucket = os.environ["IMAGE_BUCKET"] +# Time to wait for raw image upload, in seconds timeout = os.environ.get("IMAGE_TIMEOUT", 50) +# Absolute path on this worker's system where local repos may be created local_repos = os.environ.get("LOCAL_REPOS", "/tmp") setup_google_logger( From 4cc06a2ce7941b961896953dcd02ad89f6125a87 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 6 Oct 2022 17:35:18 -0700 Subject: [PATCH 13/19] Factor message parsing out of message handler. The parser is a fairly self-contained block of code, and this change makes the rest of the handler easier to read and more focused on the actual subscription handling. --- python/activator/activator.py | 44 +++++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/python/activator/activator.py b/python/activator/activator.py index 6bc459ad..087a4588 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -113,6 +113,35 @@ def check_for_snap( return blobs[0].name +def parse_next_visit(http_request): + """Parse a next_visit event and extract its data. + + Parameters + ---------- + http_request : `flask.Request` + The request to be parsed. + + Returns + ------- + next_visit : `activator.visit.Visit` + The next_visit message contained in the request. + + Raises + ------ + ValueError + Raised if ``http_request`` is not a valid message. + """ + envelope = http_request.get_json() + if not envelope: + raise ValueError("no Pub/Sub message received") + if not isinstance(envelope, dict) or "message" not in envelope: + raise ValueError("invalid Pub/Sub message format") + + payload = base64.b64decode(envelope["message"]["data"]) + data = json.loads(payload) + return Visit(**data) + + @app.route("/next-visit", methods=["POST"]) def next_visit_handler() -> Tuple[str, int]: """A Flask view function for handling next-visit events. @@ -135,20 +164,11 @@ def next_visit_handler() -> Tuple[str, int]: ) _log.debug(f"Created subscription '{subscription.name}'") try: - envelope = request.get_json() - if not envelope: - msg = "no Pub/Sub message received" + try: + expected_visit = parse_next_visit(request) + except ValueError as msg: _log.warn(f"error: '{msg}'") return f"Bad Request: {msg}", 400 - - if not isinstance(envelope, dict) or "message" not in envelope: - msg = "invalid Pub/Sub message format" - _log.warn(f"error: '{msg}'") - return f"Bad Request: {msg}", 400 - - payload = base64.b64decode(envelope["message"]["data"]) - data = json.loads(payload) - expected_visit = Visit(**data) assert expected_visit.instrument == instrument_name, \ f"Expected {instrument_name}, received {expected_visit.instrument}." expid_set = set() From 5a3e2b1f22b206b3413b29ef0a0482e49219f0ff Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 6 Oct 2022 18:34:41 -0700 Subject: [PATCH 14/19] Factor filename parsing out of message handler. This change keeps the activator code from depending on the details of the raw filename convention, particularly the (optimized) directory order, and prevents drift between the two places where the filename is parsed. --- python/activator/activator.py | 26 +++++++++++------------ python/activator/raw.py | 40 ++++++++++++++++++++++++++++++++++- tests/test_raw.py | 22 ++++++++++++++++++- 3 files changed, 72 insertions(+), 16 deletions(-) diff --git a/python/activator/activator.py b/python/activator/activator.py index 087a4588..82ac3f35 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -25,7 +25,6 @@ import json import logging import os -import re import time from typing import Optional, Tuple @@ -35,7 +34,7 @@ from .logger import setup_google_logger from .make_pgpass import make_pgpass from .middleware_interface import get_central_butler, MiddlewareInterface -from .raw import RAW_REGEXP +from .raw import Snap from .visit import Visit PROJECT_ID = "prompt-proto" @@ -185,9 +184,9 @@ def next_visit_handler() -> Tuple[str, int]: expected_visit.detector, ) if oid: - m = re.match(RAW_REGEXP, oid) + raw_info = Snap.from_oid(oid) mwi.ingest_image(expected_visit, oid) - expid_set.add(m.group('expid')) + expid_set.add(raw_info.exp_id) _log.debug(f"Waiting for snaps from {expected_visit}.") start = time.time() @@ -212,20 +211,19 @@ def next_visit_handler() -> Tuple[str, int]: for received in response.received_messages: ack_list.append(received.ack_id) oid = received.message.attributes["objectId"] - m = re.match(RAW_REGEXP, oid) - if m: - instrument, detector, group, snap, expid = m.groups() - _log.debug("instrument, detector, group, snap, expid = %s", m.groups()) + try: + raw_info = Snap.from_oid(oid) + _log.debug("Received %r", raw_info) if ( - instrument == expected_visit.instrument - and int(detector) == int(expected_visit.detector) - and group == str(expected_visit.group) - and int(snap) < int(expected_visit.snaps) + raw_info.instrument == expected_visit.instrument + and raw_info.detector == int(expected_visit.detector) + and raw_info.group == str(expected_visit.group) + and raw_info.snap < int(expected_visit.snaps) ): # Ingest the snap mwi.ingest_image(oid) - expid_set.add(expid) - else: + expid_set.add(raw_info.exp_id) + except ValueError: _log.error(f"Failed to match object id '{oid}'") subscriber.acknowledge(subscription=subscription.name, ack_ids=ack_list) diff --git a/python/activator/raw.py b/python/activator/raw.py index 4a931f18..c89f4bd6 100644 --- a/python/activator/raw.py +++ b/python/activator/raw.py @@ -25,10 +25,48 @@ vice versa. """ -__all__ = ["RAW_REGEXP", "get_raw_path"] +__all__ = ["Snap", "RAW_REGEXP", "get_raw_path"] +from dataclasses import dataclass import re + +@dataclass(frozen=True) +class Snap: + instrument: str # short name + detector: int + group: str # observatory-specific ID; not the same as visit number + snap: int # exposure number within a group + exp_id: int # Butler-compatible unique exposure ID + filter: str # physical filter + + def __str__(self): + """Return a short string that disambiguates the image. + """ + return f"(exposure {self.exp_id}, group {self.group}/{self.snap})" + + @classmethod + def from_oid(cls, oid: str): + """Construct a Snap from an image bucket's filename. + + Parameters + ---------- + oid : `str` + A pathname from which to extract snap information. + """ + m = re.match(RAW_REGEXP, oid) + if m: + return Snap(instrument=m["instrument"], + detector=int(m["detector"]), + group=m["group"], + snap=int(m["snap"]), + exp_id=int(m["expid"]), + filter=m["filter"], + ) + else: + raise ValueError(f"{oid} could not be parsed into a Snap") + + # Format for filenames of raws uploaded to image bucket: # instrument/detector/group/snap/expid/filter/*.(fits, fz, fits.gz) RAW_REGEXP = re.compile( diff --git a/tests/test_raw.py b/tests/test_raw.py index 0fdf1d27..c2afc418 100644 --- a/tests/test_raw.py +++ b/tests/test_raw.py @@ -22,7 +22,7 @@ import re import unittest -from activator.raw import RAW_REGEXP, get_raw_path +from activator.raw import Snap, RAW_REGEXP, get_raw_path class RawTest(unittest.TestCase): @@ -55,3 +55,23 @@ def test_writeread(self): self.assertEqual(parsed['snap'], str(self.snap)) self.assertEqual(parsed['expid'], str(self.exposure)) self.assertEqual(parsed['filter'], str(self.filter)) + + def test_snap(self): + """Test that Snap objects can be constructed from parseable paths. + """ + path = get_raw_path(self.instrument, self.detector, self.group, self.snap, self.exposure, self.filter) + parsed = Snap.from_oid(path) + self.assertIsNotNone(parsed) + # These tests automatically include type-checking. + self.assertEqual(parsed.instrument, self.instrument) + self.assertEqual(parsed.detector, self.detector) + self.assertEqual(parsed.group, self.group) + self.assertEqual(parsed.snap, self.snap) + self.assertEqual(parsed.exp_id, self.exposure) + self.assertEqual(parsed.filter, self.filter) + + def test_bad_snap(self): + path = get_raw_path(self.instrument, f"{self.detector}b", self.group, + self.snap, self.exposure, self.filter) + with self.assertRaisesRegex(ValueError, "not .* parsed"): + Snap.from_oid(path) From 2d6f96bb24acaf0a183d35777ad4411161b8ea58 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 6 Oct 2022 18:48:48 -0700 Subject: [PATCH 15/19] Factor snap-visit comparisons out of activator. This makes the activator code simpler and easier to read, and avoids distracting from the main subscription loop. --- python/activator/activator.py | 7 +------ python/activator/raw.py | 16 ++++++++++++++++ tests/test_raw.py | 36 +++++++++++++++++++++++++++++++++++ 3 files changed, 53 insertions(+), 6 deletions(-) diff --git a/python/activator/activator.py b/python/activator/activator.py index 82ac3f35..780bf208 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -214,12 +214,7 @@ def next_visit_handler() -> Tuple[str, int]: try: raw_info = Snap.from_oid(oid) _log.debug("Received %r", raw_info) - if ( - raw_info.instrument == expected_visit.instrument - and raw_info.detector == int(expected_visit.detector) - and raw_info.group == str(expected_visit.group) - and raw_info.snap < int(expected_visit.snaps) - ): + if raw_info.is_consistent(expected_visit): # Ingest the snap mwi.ingest_image(oid) expid_set.add(raw_info.exp_id) diff --git a/python/activator/raw.py b/python/activator/raw.py index c89f4bd6..80ddeb5a 100644 --- a/python/activator/raw.py +++ b/python/activator/raw.py @@ -30,6 +30,8 @@ from dataclasses import dataclass import re +from .visit import Visit + @dataclass(frozen=True) class Snap: @@ -66,6 +68,20 @@ def from_oid(cls, oid: str): else: raise ValueError(f"{oid} could not be parsed into a Snap") + def is_consistent(self, visit: Visit): + """Test if this snap could have come from a particular visit. + + Parameters + ---------- + visit : `activator.visit.Visit` + The visit from which snaps were expected. + """ + return (self.instrument == visit.instrument + and self.detector == visit.detector + and self.group == visit.group + and self.snap < visit.snaps + ) + # Format for filenames of raws uploaded to image bucket: # instrument/detector/group/snap/expid/filter/*.(fits, fz, fits.gz) diff --git a/tests/test_raw.py b/tests/test_raw.py index c2afc418..e1d973f8 100644 --- a/tests/test_raw.py +++ b/tests/test_raw.py @@ -23,6 +23,7 @@ import unittest from activator.raw import Snap, RAW_REGEXP, get_raw_path +from activator.visit import Visit class RawTest(unittest.TestCase): @@ -43,6 +44,17 @@ def setUp(self): self.snap = 1 self.exposure = 404 + self.visit = Visit(instrument=self.instrument, + detector=self.detector, + group=self.group, + snaps=self.snaps, + filter=self.filter, + ra=self.ra, + dec=self.dec, + rot=self.rot, + kind=self.kind, + ) + def test_writeread(self): """Test that raw module can parse the paths it creates. """ @@ -75,3 +87,27 @@ def test_bad_snap(self): self.snap, self.exposure, self.filter) with self.assertRaisesRegex(ValueError, "not .* parsed"): Snap.from_oid(path) + + def test_snap_consistent(self): + path = get_raw_path(self.instrument, self.detector, self.group, self.snap, self.exposure, self.filter) + snap = Snap.from_oid(path) + self.assertTrue(snap.is_consistent(self.visit)) + + path = get_raw_path("Foo", self.detector, self.group, self.snap, self.exposure, self.filter) + snap = Snap.from_oid(path) + self.assertFalse(snap.is_consistent(self.visit)) + + path = get_raw_path(self.instrument, self.visit.detector+1, self.group, self.snap, + self.exposure, self.filter) + snap = Snap.from_oid(path) + self.assertFalse(snap.is_consistent(self.visit)) + + path = get_raw_path(self.instrument, self.detector, self.group + "b", self.snap, + self.exposure, self.filter) + snap = Snap.from_oid(path) + self.assertFalse(snap.is_consistent(self.visit)) + + path = get_raw_path(self.instrument, self.detector, self.group, self.visit.snaps, + self.exposure, self.filter) + snap = Snap.from_oid(path) + self.assertFalse(snap.is_consistent(self.visit)) From d562425bffbd7ca8dc38a59f5b76c0a3b1186440 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Fri, 7 Oct 2022 09:19:03 -0700 Subject: [PATCH 16/19] Add debug log for already-existing raws. There is already a debug log for raws reported through the subscription system, but already present is by far the more common case in testing. --- python/activator/activator.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/activator/activator.py b/python/activator/activator.py index 780bf208..1964d3b1 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -185,6 +185,7 @@ def next_visit_handler() -> Tuple[str, int]: ) if oid: raw_info = Snap.from_oid(oid) + _log.debug("Found %r already present", raw_info) mwi.ingest_image(expected_visit, oid) expid_set.add(raw_info.exp_id) From 5eaefd371a6b18c1350bfbbbbba8f5b2b4ffb744 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Fri, 7 Oct 2022 11:30:40 -0700 Subject: [PATCH 17/19] Move all DB configuration to environment variables. This allows more flexibility in how the APDB and registry databases are set up, such as having them be different databases on the same PostgreSQL server. --- doc/playbook.rst | 6 +++++- python/activator/make_pgpass.py | 9 ++++++--- python/activator/middleware_interface.py | 8 +++++--- tests/test_middleware_interface.py | 10 ++++++++-- 4 files changed, 24 insertions(+), 9 deletions(-) diff --git a/doc/playbook.rst b/doc/playbook.rst index d6e6e488..523f87af 100644 --- a/doc/playbook.rst +++ b/doc/playbook.rst @@ -108,11 +108,15 @@ To create or edit the Cloud Run service in the Google Cloud Console: * CALIB_REPO: URI to repo containing calibrations (and templates) * IP_APDB: IP address and port of the APDB (see `Databases`_, below) * IP_REGISTRY: IP address and port of the registry database (see `Databases`_) + * DB_APDB: PostgreSQL database name for the APDB + * DB_REGISTRY: PostgreSQL database name for the registry database -* There are also two optional parameters: +* There are also four optional parameters: * IMAGE_TIMEOUT: timeout in seconds to wait for raw image, default 50 sec. * LOCAL_REPOS: absolute path (in the container) where local repos are created, default ``/tmp``. + * USER_APDB: database user for the APDB, default "postgres" + * USER_REGISTRY: database user for the registry database, default "postgres" * One variable is set by Cloud Run and should not be overridden: diff --git a/python/activator/make_pgpass.py b/python/activator/make_pgpass.py index ce135200..301c7edd 100755 --- a/python/activator/make_pgpass.py +++ b/python/activator/make_pgpass.py @@ -27,7 +27,6 @@ import stat -PSQL_DB = "postgres" PSQL_USER = "postgres" @@ -44,15 +43,19 @@ def make_pgpass(): """ try: ip_apdb = os.environ["IP_APDB"] + db_apdb = os.environ["DB_APDB"] + user_apdb = os.environ.get("USER_APDB", PSQL_USER) pass_apdb = os.environ["PSQL_APDB_PASS"] ip_registry = os.environ["IP_REGISTRY"] + db_registry = os.environ["DB_REGISTRY"] + user_registry = os.environ.get("USER_REGISTRY", PSQL_USER) pass_registry = os.environ["PSQL_REGISTRY_PASS"] except KeyError as e: raise RuntimeError("Addresses and passwords have not been configured") from e filename = os.path.join(os.environ["HOME"], ".pgpass") with open(filename, mode="wt") as file: - file.write(f"{ip_apdb}:{PSQL_DB}:{PSQL_USER}:{pass_apdb}\n") - file.write(f"{ip_registry}:{PSQL_DB}:{PSQL_USER}:{pass_registry}\n") + file.write(f"{ip_apdb}:{db_apdb}:{user_apdb}:{pass_apdb}\n") + file.write(f"{ip_registry}:{db_registry}:{user_registry}:{pass_registry}\n") # Only user may access the file os.chmod(filename, stat.S_IRUSR) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index cb17fd1e..9d91d039 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -139,7 +139,10 @@ class MiddlewareInterface: def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, local_storage: str, prefix: str = "gs://"): + # TODO: merge this code with make_pgpass.py self.ip_apdb = os.environ["IP_APDB"] + self.db_apdb = os.environ["DB_APDB"] + self.user_apdb = os.environ.get("USER_APDB", "postgres") self.central_butler = central_butler self.image_host = prefix + image_bucket # TODO: _download_store turns MWI into a tagged class; clean this up later @@ -536,10 +539,9 @@ def _prep_pipeline(self, visit: Visit) -> None: pipeline = lsst.pipe.base.Pipeline.fromFile(ap_pipeline_file) except FileNotFoundError: raise RuntimeError(f"No ApPipe.yaml defined for camera {self.instrument.getName()}") - # TODO: Can we write to a configurable apdb schema, rather than - # "postgres"? + # TODO: Can we write to a configurable apdb schema? pipeline.addConfigOverride("diaPipe", "apdb.db_url", - f"postgresql://postgres@{self.ip_apdb}/postgres") + f"postgresql://{self.user_apdb}@{self.ip_apdb}/{self.db_apdb}") return pipeline def _download(self, remote): diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 29298b14..8bf2c91c 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -99,7 +99,10 @@ class MiddlewareInterfaceTest(unittest.TestCase): @classmethod def setUpClass(cls): cls.env_patcher = unittest.mock.patch.dict(os.environ, - {"IP_APDB": "localhost"}) + {"IP_APDB": "localhost", + "DB_APDB": "postgres", + "USER_APDB": "postgres", + }) cls.env_patcher.start() super().setUpClass() @@ -433,7 +436,10 @@ def setUpClass(cls): super().setUpClass() cls.env_patcher = unittest.mock.patch.dict(os.environ, - {"IP_APDB": "localhost"}) + {"IP_APDB": "localhost", + "DB_APDB": "postgres", + "USER_APDB": "postgres", + }) cls.env_patcher.start() @classmethod From 885bd2a3b47ad13c19d820876c8d8941725ba88b Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Fri, 7 Oct 2022 11:40:41 -0700 Subject: [PATCH 18/19] Refactor APDB URI handling in MiddlewareInterface. The previous implementation stored the URI component(s) in object fields and assembled them on the fly. Now there's a private method for computing the URI. This is currently run once and stored on __init__ for convenience, but in the future the APDB may be accessed using a situational namespace. --- python/activator/middleware_interface.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 9d91d039..4ff77e45 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -126,6 +126,7 @@ class MiddlewareInterface: """ # Class invariants: + # self._apdb_uri is a valid URI that unambiguously identifies the APDB # self.image_host is a valid URI with non-empty path and no query or fragment. # self._download_store is None if and only if self.image_host is a local URI. # self.instrument, self.camera, and self.skymap do not change after __init__. @@ -139,10 +140,7 @@ class MiddlewareInterface: def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, local_storage: str, prefix: str = "gs://"): - # TODO: merge this code with make_pgpass.py - self.ip_apdb = os.environ["IP_APDB"] - self.db_apdb = os.environ["DB_APDB"] - self.user_apdb = os.environ.get("USER_APDB", "postgres") + self._apdb_uri = self._make_apdb_uri() self.central_butler = central_butler self.image_host = prefix + image_bucket # TODO: _download_store turns MWI into a tagged class; clean this up later @@ -175,6 +173,15 @@ def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, # How much to pad the refcat region we will copy over. self.padding = 30*lsst.geom.arcseconds + def _make_apdb_uri(self): + """Generate a URI for accessing the APDB. + """ + # TODO: merge this code with make_pgpass.py + ip_apdb = os.environ["IP_APDB"] # Also includes port + db_apdb = os.environ["DB_APDB"] + user_apdb = os.environ.get("USER_APDB", "postgres") + return f"postgresql://{user_apdb}@{ip_apdb}/{db_apdb}" + def _init_local_butler(self, base_path: str): """Prepare the local butler to ingest into and process from. @@ -539,9 +546,8 @@ def _prep_pipeline(self, visit: Visit) -> None: pipeline = lsst.pipe.base.Pipeline.fromFile(ap_pipeline_file) except FileNotFoundError: raise RuntimeError(f"No ApPipe.yaml defined for camera {self.instrument.getName()}") - # TODO: Can we write to a configurable apdb schema? - pipeline.addConfigOverride("diaPipe", "apdb.db_url", - f"postgresql://{self.user_apdb}@{self.ip_apdb}/{self.db_apdb}") + # TODO: Can we write to a configurable apdb schema (see DM-36497)? + pipeline.addConfigOverride("diaPipe", "apdb.db_url", self._apdb_uri) return pipeline def _download(self, remote): From 9bd8f58600fff41e21b90830d0a3b66217babfbe Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 13 Oct 2022 12:24:23 -0700 Subject: [PATCH 19/19] Add support for custom APDB namespace. This feature is essential to being able to run on the USDF development APDB, which is shared by multiple users. It's not logically related to DM-36080, but it's not worth its own ticket and we shouldn't make DM-36505 a blocker on the USDF migration. --- doc/playbook.rst | 1 + python/activator/middleware_interface.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/doc/playbook.rst b/doc/playbook.rst index 523f87af..b8094027 100644 --- a/doc/playbook.rst +++ b/doc/playbook.rst @@ -117,6 +117,7 @@ To create or edit the Cloud Run service in the Google Cloud Console: * LOCAL_REPOS: absolute path (in the container) where local repos are created, default ``/tmp``. * USER_APDB: database user for the APDB, default "postgres" * USER_REGISTRY: database user for the registry database, default "postgres" + * NAMESPACE_APDB: the database namespace for the APDB, defaults to the DB's default namespace * One variable is set by Cloud Run and should not be overridden: diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 4ff77e45..376cfc85 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -141,6 +141,7 @@ def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, local_storage: str, prefix: str = "gs://"): self._apdb_uri = self._make_apdb_uri() + self._apdb_namespace = os.environ.get("NAMESPACE_APDB", None) self.central_butler = central_butler self.image_host = prefix + image_bucket # TODO: _download_store turns MWI into a tagged class; clean this up later @@ -546,8 +547,8 @@ def _prep_pipeline(self, visit: Visit) -> None: pipeline = lsst.pipe.base.Pipeline.fromFile(ap_pipeline_file) except FileNotFoundError: raise RuntimeError(f"No ApPipe.yaml defined for camera {self.instrument.getName()}") - # TODO: Can we write to a configurable apdb schema (see DM-36497)? pipeline.addConfigOverride("diaPipe", "apdb.db_url", self._apdb_uri) + pipeline.addConfigOverride("diaPipe", "apdb.namespace", self._apdb_namespace) return pipeline def _download(self, remote):