diff --git a/python/activator/activator.py b/python/activator/activator.py index 14a97160..7b0db789 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -33,6 +33,8 @@ from google.cloud import pubsub_v1, storage from lsst.daf.butler import Butler +from lsst.obs.base.utils import getInstrument +from .make_pgpass import make_pgpass from .middleware_interface import MiddlewareInterface from .visit import Visit @@ -41,6 +43,7 @@ verification_token = os.environ["PUBSUB_VERIFICATION_TOKEN"] # The full instrument class name, including module path. config_instrument = os.environ["RUBIN_INSTRUMENT"] +active_instrument = getInstrument(config_instrument) calib_repo = os.environ["CALIB_REPO"] image_bucket = os.environ["IMAGE_BUCKET"] oid_regexp = re.compile(r"(.*?)/(\d+)/(.*?)/(\d+)/\1-\3-\4-.*?-.*?-\2\.f") @@ -50,34 +53,47 @@ # Use JSON format compatible with Google Cloud Logging format=( '{{"severity":"{levelname}", "labels":{{"instrument":"' - + config_instrument + + active_instrument.getName() + '"}}, "message":{message!r}}}' ), style="{", ) _log = logging.getLogger("lsst." + __name__) _log.setLevel(logging.DEBUG) + + +# Write PostgreSQL credentials. +# This MUST be done before creating a Butler or accessing the APDB. +make_pgpass() + + app = Flask(__name__) subscriber = pubsub_v1.SubscriberClient() topic_path = subscriber.topic_path( PROJECT_ID, - f"{config_instrument}-image", + f"{active_instrument.getName()}-image", ) subscription = None storage_client = storage.Client() -# Initialize middleware interface; TODO: we'll need one of these per detector. -repo = f"/tmp/butler-{os.getpid()}" +# Initialize middleware interface. +# TODO: this should not be done in activator.py, which is supposed to have only +# framework/messaging support (ideally, it should not contain any LSST imports). +# However, we don't want MiddlewareInterface to need to know details like where +# the central repo is located, either, so perhaps we need a new module. central_butler = Butler(calib_repo, - # TODO: How do we get the appropriate instrument name - # here and for what we pass to MiddlewareInterface? - instrument=config_instrument, - skymap="deepCoadd_skyMap", - collections=[f"{config_instrument}/defaults"], - writeable=False) -butler = Butler(Butler.makeRepo(repo.name), writeable=True) + # TODO: investigate whether these defaults, esp. skymap, slow down queries + instrument=active_instrument.getName(), + # NOTE: with inferDefaults=True, it's possible we don't need to hardcode this + # value from the real repository. + # skymap="hsc_rings_v1", + collections=[active_instrument.makeCollectionName("defaults")], + writeable=False, + inferDefaults=True) +repo = f"/tmp/butler-{os.getpid()}" +butler = Butler(Butler.makeRepo(repo), writeable=True) mwi = MiddlewareInterface(central_butler, image_bucket, config_instrument, butler) @@ -98,6 +114,18 @@ def check_for_snap( @app.route("/next-visit", methods=["POST"]) def next_visit_handler() -> Tuple[str, int]: + """A Flask view function for handling next-visit events. + + Like all Flask handlers, this function accepts input through the + ``request`` global rather than parameters. + + Returns + ------- + message : `str` + The HTTP response reason to return to the client. + status : `int` + The HTTP response status code to return to the client. + """ if request.args.get("token", "") != verification_token: return "Invalid request", 400 subscription = subscriber.create_subscription( @@ -120,7 +148,8 @@ def next_visit_handler() -> Tuple[str, int]: payload = base64.b64decode(envelope["message"]["data"]) data = json.loads(payload) expected_visit = Visit(**data) - assert expected_visit.instrument == config_instrument + assert expected_visit.instrument == active_instrument.getName(), \ + f"Expected {active_instrument.getName()}, received {expected_visit.instrument}." snap_set = set() # Copy calibrations for this detector/visit diff --git a/python/activator/make_pgpass.py b/python/activator/make_pgpass.py new file mode 100755 index 00000000..ce135200 --- /dev/null +++ b/python/activator/make_pgpass.py @@ -0,0 +1,58 @@ +# This file is part of prompt_prototype. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +__all__ = ["make_pgpass"] + + +import os +import stat + + +PSQL_DB = "postgres" +PSQL_USER = "postgres" + + +def make_pgpass(): + """Create a .pgpass file that contains the service's database credentials. + + This function is designed to work in the Prompt Processing Service's docker + container, and no other environment. + + Raises + ------ + RuntimeError + Raised if the database passwords cannot be found. + """ + try: + ip_apdb = os.environ["IP_APDB"] + pass_apdb = os.environ["PSQL_APDB_PASS"] + ip_registry = os.environ["IP_REGISTRY"] + pass_registry = os.environ["PSQL_REGISTRY_PASS"] + except KeyError as e: + raise RuntimeError("Addresses and passwords have not been configured") from e + + filename = os.path.join(os.environ["HOME"], ".pgpass") + with open(filename, mode="wt") as file: + file.write(f"{ip_apdb}:{PSQL_DB}:{PSQL_USER}:{pass_apdb}\n") + file.write(f"{ip_registry}:{PSQL_DB}:{PSQL_USER}:{pass_registry}\n") + # Only user may access the file + os.chmod(filename, stat.S_IRUSR) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 22c0deb1..077c7dba 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -86,7 +86,13 @@ def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, self._init_local_butler(butler) self._init_ingester() # TODO DM-34098: note that we currently need to supply instrument here. - self.camera = self.central_butler.get("camera", instrument=self.instrument.getName()) + # HACK: explicit collection gets around the fact that we don't have any + # timestamp/exposure information in a form we can pass to the Butler. + # This code will break once cameras start being versioned. + self.camera = self.central_butler.get( + "camera", instrument=self.instrument.getName(), + collections=self.instrument.makeCalibrationCollectionName("unbounded") + ) self.skymap = self.central_butler.get("skyMap") # self.r = self.src.registry diff --git a/python/activator/visit.py b/python/activator/visit.py index b994b8d5..b6065fac 100644 --- a/python/activator/visit.py +++ b/python/activator/visit.py @@ -5,7 +5,7 @@ @dataclass(frozen=True) class Visit: - # elements should use built-in types that are hashable and JSON-persistable + # elements must be hashable and JSON-persistable; built-in types recommended instrument: str detector: int group: str diff --git a/tests/data/central_repo/gen3.sqlite3 b/tests/data/central_repo/gen3.sqlite3 index 4a2e5bc8..6b9f3950 100644 Binary files a/tests/data/central_repo/gen3.sqlite3 and b/tests/data/central_repo/gen3.sqlite3 differ diff --git a/tests/data/export.yaml b/tests/data/export.yaml index 6a7fa57b..5c8f73cf 100644 --- a/tests/data/export.yaml +++ b/tests/data/export.yaml @@ -555,7 +555,6 @@ data: collection_type: CHAINED name: DECam/defaults children: - - DECam/calib/unbounded - DECam/calib - refcats - templates diff --git a/tests/test_visit.py b/tests/test_visit.py new file mode 100644 index 00000000..2daa5688 --- /dev/null +++ b/tests/test_visit.py @@ -0,0 +1,57 @@ +# This file is part of prompt_prototype. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import json +import unittest + +from activator.visit import Visit + + +class VisitTest(unittest.TestCase): + """Test the Visit class's functionality. + """ + def setUp(self): + super().setUp() + + self.testbed = Visit( + instrument="NotACam", + detector=42, + group="2022032100001", + snaps=2, + filter="k2022", + ra=134.5454, + dec=-65.3261, + rot=135.0, + kind="IMAGINARY", + ) + + def test_hash(self): + # Strictly speaking should test whether Visit fulfills the hash + # contract, but it's not clear what kinds of differences the default + # __hash__ might be insensitive to. So just test that the object + # is hashable. + value = hash(self.testbed) + self.assertNotEqual(value, 0) + + def test_json(self): + serialized = json.dumps(self.testbed.__dict__).encode("utf-8") + deserialized = Visit(**json.loads(serialized)) + self.assertEqual(deserialized, self.testbed)