Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 41 additions & 12 deletions python/activator/activator.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
from google.cloud import pubsub_v1, storage

from lsst.daf.butler import Butler
from lsst.obs.base.utils import getInstrument
from .make_pgpass import make_pgpass
from .middleware_interface import MiddlewareInterface
from .visit import Visit

Expand All @@ -41,6 +43,7 @@
verification_token = os.environ["PUBSUB_VERIFICATION_TOKEN"]
# The full instrument class name, including module path.
config_instrument = os.environ["RUBIN_INSTRUMENT"]
active_instrument = getInstrument(config_instrument)
calib_repo = os.environ["CALIB_REPO"]
image_bucket = os.environ["IMAGE_BUCKET"]
oid_regexp = re.compile(r"(.*?)/(\d+)/(.*?)/(\d+)/\1-\3-\4-.*?-.*?-\2\.f")
Expand All @@ -50,34 +53,47 @@
# Use JSON format compatible with Google Cloud Logging
format=(
'{{"severity":"{levelname}", "labels":{{"instrument":"'
+ config_instrument
+ active_instrument.getName()
+ '"}}, "message":{message!r}}}'
),
style="{",
)
_log = logging.getLogger("lsst." + __name__)
_log.setLevel(logging.DEBUG)


# Write PostgreSQL credentials.
# This MUST be done before creating a Butler or accessing the APDB.
make_pgpass()


app = Flask(__name__)

subscriber = pubsub_v1.SubscriberClient()
topic_path = subscriber.topic_path(
PROJECT_ID,
f"{config_instrument}-image",
f"{active_instrument.getName()}-image",
)
subscription = None

storage_client = storage.Client()

# Initialize middleware interface; TODO: we'll need one of these per detector.
repo = f"/tmp/butler-{os.getpid()}"
# Initialize middleware interface.
# TODO: this should not be done in activator.py, which is supposed to have only
# framework/messaging support (ideally, it should not contain any LSST imports).
# However, we don't want MiddlewareInterface to need to know details like where
# the central repo is located, either, so perhaps we need a new module.
central_butler = Butler(calib_repo,
# TODO: How do we get the appropriate instrument name
# here and for what we pass to MiddlewareInterface?
instrument=config_instrument,
skymap="deepCoadd_skyMap",
collections=[f"{config_instrument}/defaults"],
writeable=False)
butler = Butler(Butler.makeRepo(repo.name), writeable=True)
# TODO: investigate whether these defaults, esp. skymap, slow down queries
instrument=active_instrument.getName(),
# NOTE: with inferDefaults=True, it's possible we don't need to hardcode this
# value from the real repository.
# skymap="hsc_rings_v1",
collections=[active_instrument.makeCollectionName("defaults")],
writeable=False,
inferDefaults=True)
repo = f"/tmp/butler-{os.getpid()}"
butler = Butler(Butler.makeRepo(repo), writeable=True)
mwi = MiddlewareInterface(central_butler, image_bucket, config_instrument, butler)


Expand All @@ -98,6 +114,18 @@ def check_for_snap(

@app.route("/next-visit", methods=["POST"])
def next_visit_handler() -> Tuple[str, int]:
"""A Flask view function for handling next-visit events.

Like all Flask handlers, this function accepts input through the
``request`` global rather than parameters.

Returns
-------
message : `str`
The HTTP response reason to return to the client.
status : `int`
The HTTP response status code to return to the client.
"""
if request.args.get("token", "") != verification_token:
return "Invalid request", 400
subscription = subscriber.create_subscription(
Expand All @@ -120,7 +148,8 @@ def next_visit_handler() -> Tuple[str, int]:
payload = base64.b64decode(envelope["message"]["data"])
data = json.loads(payload)
expected_visit = Visit(**data)
assert expected_visit.instrument == config_instrument
assert expected_visit.instrument == active_instrument.getName(), \
f"Expected {active_instrument.getName()}, received {expected_visit.instrument}."
snap_set = set()

# Copy calibrations for this detector/visit
Expand Down
58 changes: 58 additions & 0 deletions python/activator/make_pgpass.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# This file is part of prompt_prototype.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.


__all__ = ["make_pgpass"]


import os
import stat


PSQL_DB = "postgres"
PSQL_USER = "postgres"


def make_pgpass():
"""Create a .pgpass file that contains the service's database credentials.

This function is designed to work in the Prompt Processing Service's docker
container, and no other environment.

Raises
------
RuntimeError
Raised if the database passwords cannot be found.
"""
try:
ip_apdb = os.environ["IP_APDB"]
pass_apdb = os.environ["PSQL_APDB_PASS"]
ip_registry = os.environ["IP_REGISTRY"]
pass_registry = os.environ["PSQL_REGISTRY_PASS"]
except KeyError as e:
raise RuntimeError("Addresses and passwords have not been configured") from e

filename = os.path.join(os.environ["HOME"], ".pgpass")
with open(filename, mode="wt") as file:
file.write(f"{ip_apdb}:{PSQL_DB}:{PSQL_USER}:{pass_apdb}\n")
file.write(f"{ip_registry}:{PSQL_DB}:{PSQL_USER}:{pass_registry}\n")
# Only user may access the file
os.chmod(filename, stat.S_IRUSR)
8 changes: 7 additions & 1 deletion python/activator/middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,13 @@ def __init__(self, central_butler: Butler, image_bucket: str, instrument: str,
self._init_local_butler(butler)
self._init_ingester()
# TODO DM-34098: note that we currently need to supply instrument here.
self.camera = self.central_butler.get("camera", instrument=self.instrument.getName())
# HACK: explicit collection gets around the fact that we don't have any
# timestamp/exposure information in a form we can pass to the Butler.
# This code will break once cameras start being versioned.
self.camera = self.central_butler.get(
"camera", instrument=self.instrument.getName(),
collections=self.instrument.makeCalibrationCollectionName("unbounded")
)
self.skymap = self.central_butler.get("skyMap")

# self.r = self.src.registry
Expand Down
2 changes: 1 addition & 1 deletion python/activator/visit.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

@dataclass(frozen=True)
class Visit:
# elements should use built-in types that are hashable and JSON-persistable
# elements must be hashable and JSON-persistable; built-in types recommended
instrument: str
detector: int
group: str
Expand Down
Binary file modified tests/data/central_repo/gen3.sqlite3
Binary file not shown.
1 change: 0 additions & 1 deletion tests/data/export.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,6 @@ data:
collection_type: CHAINED
name: DECam/defaults
children:
- DECam/calib/unbounded
- DECam/calib
- refcats
- templates
Expand Down
57 changes: 57 additions & 0 deletions tests/test_visit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# This file is part of prompt_prototype.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import json
import unittest

from activator.visit import Visit


class VisitTest(unittest.TestCase):
"""Test the Visit class's functionality.
"""
def setUp(self):
super().setUp()

self.testbed = Visit(
instrument="NotACam",
detector=42,
group="2022032100001",
snaps=2,
filter="k2022",
ra=134.5454,
dec=-65.3261,
rot=135.0,
kind="IMAGINARY",
)

def test_hash(self):
# Strictly speaking should test whether Visit fulfills the hash
# contract, but it's not clear what kinds of differences the default
# __hash__ might be insensitive to. So just test that the object
# is hashable.
value = hash(self.testbed)
self.assertNotEqual(value, 0)

def test_json(self):
serialized = json.dumps(self.testbed.__dict__).encode("utf-8")
deserialized = Visit(**json.loads(serialized))
self.assertEqual(deserialized, self.testbed)