diff --git a/doc/playbook.rst b/doc/playbook.rst index 1598f6d6..b8094027 100644 --- a/doc/playbook.rst +++ b/doc/playbook.rst @@ -102,16 +102,22 @@ To create or edit the Cloud Run service in the Google Cloud Console: * Select the container image URL from "Artifact Registry > prompt-proto-service" * In the Variables & Secrets tab, set the following required parameters: - * RUBIN_INSTRUMENT: full instrument class name, including module path + * RUBIN_INSTRUMENT: the "short" instrument name * PUBSUB_VERIFICATION_TOKEN: choose an arbitrary string matching the Pub/Sub endpoint URL below * IMAGE_BUCKET: bucket containing raw images (``rubin-prompt-proto-main``) - * CALIB_REPO: repo containing calibrations (and templates) + * CALIB_REPO: URI to repo containing calibrations (and templates) * IP_APDB: IP address and port of the APDB (see `Databases`_, below) * IP_REGISTRY: IP address and port of the registry database (see `Databases`_) + * DB_APDB: PostgreSQL database name for the APDB + * DB_REGISTRY: PostgreSQL database name for the registry database -* There is also one optional parameter: +* There are also four optional parameters: * IMAGE_TIMEOUT: timeout in seconds to wait for raw image, default 50 sec. + * LOCAL_REPOS: absolute path (in the container) where local repos are created, default ``/tmp``. + * USER_APDB: database user for the APDB, default "postgres" + * USER_REGISTRY: database user for the registry database, default "postgres" + * NAMESPACE_APDB: the database namespace for the APDB, defaults to the DB's default namespace * One variable is set by Cloud Run and should not be overridden: diff --git a/python/activator/activator.py b/python/activator/activator.py index 9480d499..1964d3b1 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -25,40 +25,37 @@ import json import logging import os -import re import time from typing import Optional, Tuple from flask import Flask, request from google.cloud import pubsub_v1, storage -from lsst.daf.butler import Butler -from lsst.obs.base import Instrument -from .logger import GCloudStructuredLogFormatter +from .logger import setup_google_logger from .make_pgpass import make_pgpass -from .middleware_interface import MiddlewareInterface -from .raw import RAW_REGEXP +from .middleware_interface import get_central_butler, MiddlewareInterface +from .raw import Snap from .visit import Visit PROJECT_ID = "prompt-proto" verification_token = os.environ["PUBSUB_VERIFICATION_TOKEN"] -# The full instrument class name, including module path. -config_instrument = os.environ["RUBIN_INSTRUMENT"] -active_instrument = Instrument.from_string(config_instrument) +# The short name for the instrument. +instrument_name = os.environ["RUBIN_INSTRUMENT"] +# URI to the main repository containing calibs and templates calib_repo = os.environ["CALIB_REPO"] +# Bucket name (not URI) containing raw images image_bucket = os.environ["IMAGE_BUCKET"] +# Time to wait for raw image upload, in seconds timeout = os.environ.get("IMAGE_TIMEOUT", 50) +# Absolute path on this worker's system where local repos may be created +local_repos = os.environ.get("LOCAL_REPOS", "/tmp") -# Set up logging for all modules used by this worker. -log_handler = logging.StreamHandler() -log_handler.setFormatter(GCloudStructuredLogFormatter( - labels={"instrument": active_instrument.getName()}, -)) -logging.basicConfig(handlers=[log_handler]) +setup_google_logger( + labels={"instrument": instrument_name}, +) _log = logging.getLogger("lsst." + __name__) _log.setLevel(logging.DEBUG) -logging.captureWarnings(True) # Write PostgreSQL credentials. @@ -71,25 +68,17 @@ subscriber = pubsub_v1.SubscriberClient() topic_path = subscriber.topic_path( PROJECT_ID, - f"{active_instrument.getName()}-image", + f"{instrument_name}-image", ) subscription = None storage_client = storage.Client() # Initialize middleware interface. -# TODO: this should not be done in activator.py, which is supposed to have only -# framework/messaging support (ideally, it should not contain any LSST imports). -# However, we don't want MiddlewareInterface to need to know details like where -# the central repo is located, either, so perhaps we need a new module. -central_butler = Butler(calib_repo, - collections=[active_instrument.makeCollectionName("defaults")], - writeable=True, - inferDefaults=False) -repo = f"/tmp/butler-{os.getpid()}" -butler = Butler(Butler.makeRepo(repo), writeable=True) -_log.info("Created local Butler repo at %s.", repo) -mwi = MiddlewareInterface(central_butler, image_bucket, config_instrument, butler) +mwi = MiddlewareInterface(get_central_butler(calib_repo, instrument_name), + image_bucket, + instrument_name, + local_repos) def check_for_snap( @@ -123,6 +112,35 @@ def check_for_snap( return blobs[0].name +def parse_next_visit(http_request): + """Parse a next_visit event and extract its data. + + Parameters + ---------- + http_request : `flask.Request` + The request to be parsed. + + Returns + ------- + next_visit : `activator.visit.Visit` + The next_visit message contained in the request. + + Raises + ------ + ValueError + Raised if ``http_request`` is not a valid message. + """ + envelope = http_request.get_json() + if not envelope: + raise ValueError("no Pub/Sub message received") + if not isinstance(envelope, dict) or "message" not in envelope: + raise ValueError("invalid Pub/Sub message format") + + payload = base64.b64decode(envelope["message"]["data"]) + data = json.loads(payload) + return Visit(**data) + + @app.route("/next-visit", methods=["POST"]) def next_visit_handler() -> Tuple[str, int]: """A Flask view function for handling next-visit events. @@ -145,22 +163,13 @@ def next_visit_handler() -> Tuple[str, int]: ) _log.debug(f"Created subscription '{subscription.name}'") try: - envelope = request.get_json() - if not envelope: - msg = "no Pub/Sub message received" + try: + expected_visit = parse_next_visit(request) + except ValueError as msg: _log.warn(f"error: '{msg}'") return f"Bad Request: {msg}", 400 - - if not isinstance(envelope, dict) or "message" not in envelope: - msg = "invalid Pub/Sub message format" - _log.warn(f"error: '{msg}'") - return f"Bad Request: {msg}", 400 - - payload = base64.b64decode(envelope["message"]["data"]) - data = json.loads(payload) - expected_visit = Visit(**data) - assert expected_visit.instrument == active_instrument.getName(), \ - f"Expected {active_instrument.getName()}, received {expected_visit.instrument}." + assert expected_visit.instrument == instrument_name, \ + f"Expected {instrument_name}, received {expected_visit.instrument}." expid_set = set() # Copy calibrations for this detector/visit @@ -175,9 +184,10 @@ def next_visit_handler() -> Tuple[str, int]: expected_visit.detector, ) if oid: - m = re.match(RAW_REGEXP, oid) + raw_info = Snap.from_oid(oid) + _log.debug("Found %r already present", raw_info) mwi.ingest_image(expected_visit, oid) - expid_set.add(m.group('expid')) + expid_set.add(raw_info.exp_id) _log.debug(f"Waiting for snaps from {expected_visit}.") start = time.time() @@ -202,20 +212,14 @@ def next_visit_handler() -> Tuple[str, int]: for received in response.received_messages: ack_list.append(received.ack_id) oid = received.message.attributes["objectId"] - m = re.match(RAW_REGEXP, oid) - if m: - instrument, detector, group, snap, expid = m.groups() - _log.debug("instrument, detector, group, snap, expid = %s", m.groups()) - if ( - instrument == expected_visit.instrument - and int(detector) == int(expected_visit.detector) - and group == str(expected_visit.group) - and int(snap) < int(expected_visit.snaps) - ): + try: + raw_info = Snap.from_oid(oid) + _log.debug("Received %r", raw_info) + if raw_info.is_consistent(expected_visit): # Ingest the snap mwi.ingest_image(oid) - expid_set.add(expid) - else: + expid_set.add(raw_info.exp_id) + except ValueError: _log.error(f"Failed to match object id '{oid}'") subscriber.acknowledge(subscription=subscription.name, ack_ids=ack_list) diff --git a/python/activator/logger.py b/python/activator/logger.py index 348ed073..142f12a4 100644 --- a/python/activator/logger.py +++ b/python/activator/logger.py @@ -19,12 +19,38 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -__all__ = ["GCloudStructuredLogFormatter"] +__all__ = ["GCloudStructuredLogFormatter", "setup_google_logger"] import json import logging +# TODO: replace with something more extensible, once we know what needs to +# vary besides the formatter (handler type?). +def setup_google_logger(labels=None): + """Set global logging settings for prompt_prototype. + + Calling this function makes `GCloudStructuredLogFormatter` the root + formatter and redirects all warnings to go through it. + + Parameters + ---------- + labels : `dict` [`str`, `str`] + Any metadata that should be attached to all logs. See + ``LogEntry.labels`` in Google Cloud REST API documentation. + + Returns + ------- + handler : `logging.Handler` + The handler used by the root logger. + """ + log_handler = logging.StreamHandler() + log_handler.setFormatter(GCloudStructuredLogFormatter(labels)) + logging.basicConfig(handlers=[log_handler]) + logging.captureWarnings(True) + return log_handler + + class GCloudStructuredLogFormatter(logging.Formatter): """A formatter that can be parsed by the Google Cloud logging agent. diff --git a/python/activator/make_pgpass.py b/python/activator/make_pgpass.py index ce135200..301c7edd 100755 --- a/python/activator/make_pgpass.py +++ b/python/activator/make_pgpass.py @@ -27,7 +27,6 @@ import stat -PSQL_DB = "postgres" PSQL_USER = "postgres" @@ -44,15 +43,19 @@ def make_pgpass(): """ try: ip_apdb = os.environ["IP_APDB"] + db_apdb = os.environ["DB_APDB"] + user_apdb = os.environ.get("USER_APDB", PSQL_USER) pass_apdb = os.environ["PSQL_APDB_PASS"] ip_registry = os.environ["IP_REGISTRY"] + db_registry = os.environ["DB_REGISTRY"] + user_registry = os.environ.get("USER_REGISTRY", PSQL_USER) pass_registry = os.environ["PSQL_REGISTRY_PASS"] except KeyError as e: raise RuntimeError("Addresses and passwords have not been configured") from e filename = os.path.join(os.environ["HOME"], ".pgpass") with open(filename, mode="wt") as file: - file.write(f"{ip_apdb}:{PSQL_DB}:{PSQL_USER}:{pass_apdb}\n") - file.write(f"{ip_registry}:{PSQL_DB}:{PSQL_USER}:{pass_registry}\n") + file.write(f"{ip_apdb}:{db_apdb}:{user_apdb}:{pass_apdb}\n") + file.write(f"{ip_registry}:{db_registry}:{user_registry}:{pass_registry}\n") # Only user may access the file os.chmod(filename, stat.S_IRUSR) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 921c365f..376cfc85 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -19,7 +19,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -__all__ = ["MiddlewareInterface"] +__all__ = ["get_central_butler", "MiddlewareInterface"] import collections.abc import itertools @@ -45,6 +45,35 @@ _log.setLevel(logging.DEBUG) +def get_central_butler(central_repo: str, instrument_class: str): + """Provide a Butler that can access the given repository and read and write + data for the given instrument. + + Parameters + ---------- + central_repo : `str` + The path or URI to the central repository. + instrument_class : `str` + The name of the instrument whose data will be retrieved or written. May + be either the fully qualified class name or the short name. + + Returns + ------- + butler : `lsst.daf.butler.Butler` + A Butler for ``central_repo`` pre-configured to load and store + ``instrument_name`` data. + """ + # TODO: how much overhead do we take on from asking the central repo about + # the instrument instead of handling it internally? + registry = Butler(central_repo).registry + instrument = lsst.obs.base.Instrument.from_string(instrument_class, registry) + return Butler(central_repo, + collections=[instrument.makeCollectionName("defaults")], + writeable=True, + inferDefaults=False, + ) + + class MiddlewareInterface: """Interface layer between the Butler middleware and the prompt processing data handling system, to handle processing individual images. @@ -75,14 +104,14 @@ class MiddlewareInterface: Storage bucket where images will be written to as they arrive. See also ``prefix``. instrument : `str` - Full class name of the instrument taking the data, for populating - butler collections and dataIds. Example: "lsst.obs.lsst.LsstCam" + Name of the instrument taking the data, for populating + butler collections and dataIds. May be either the fully qualified class + name or the short name. Examples: "LsstCam", "lsst.obs.lsst.LsstCam". TODO: this arg can probably be removed and replaced with internal use of the butler. - butler : `lsst.daf.butler.Butler` - Local butler to process data in and hold calibrations, etc.; must be - writeable. Must not be shared with any other ``MiddlewareInterface`` - object. + local_storage : `str` + An absolute path to a space where this object can create a local + Butler repo. The repo is guaranteed to be unique to this object. prefix : `str`, optional URI scheme followed by ``://``; prepended to ``image_bucket`` when constructing URIs to retrieve incoming files. The default is @@ -97,9 +126,11 @@ class MiddlewareInterface: """ # Class invariants: + # self._apdb_uri is a valid URI that unambiguously identifies the APDB # self.image_host is a valid URI with non-empty path and no query or fragment. # self._download_store is None if and only if self.image_host is a local URI. # self.instrument, self.camera, and self.skymap do not change after __init__. + # self._repo is the only reference to its TemporaryDirectory object. # self.butler defaults to a chained collection named # self.output_collection, which contains zero or more output runs, # pre-made inputs, and raws, in that order. However, self.butler is not @@ -107,9 +138,10 @@ class MiddlewareInterface: # corresponding to self.camera and self.skymap. def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, - butler: Butler, + local_storage: str, prefix: str = "gs://"): - self.ip_apdb = os.environ["IP_APDB"] + self._apdb_uri = self._make_apdb_uri() + self._apdb_namespace = os.environ.get("NAMESPACE_APDB", None) self.central_butler = central_butler self.image_host = prefix + image_bucket # TODO: _download_store turns MWI into a tagged class; clean this up later @@ -117,11 +149,12 @@ def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, self._download_store = tempfile.TemporaryDirectory(prefix="holding-") else: self._download_store = None - self.instrument = lsst.obs.base.Instrument.from_string(instrument) + # TODO: how much overhead do we pick up from going through the registry? + self.instrument = lsst.obs.base.Instrument.from_string(instrument, central_butler.registry) self.output_collection = self.instrument.makeCollectionName("prompt") - self._init_local_butler(butler) + self._init_local_butler(local_storage) self._init_ingester() self._init_visit_definer() @@ -141,18 +174,32 @@ def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, # How much to pad the refcat region we will copy over. self.padding = 30*lsst.geom.arcseconds - def _init_local_butler(self, butler: Butler): + def _make_apdb_uri(self): + """Generate a URI for accessing the APDB. + """ + # TODO: merge this code with make_pgpass.py + ip_apdb = os.environ["IP_APDB"] # Also includes port + db_apdb = os.environ["DB_APDB"] + user_apdb = os.environ.get("USER_APDB", "postgres") + return f"postgresql://{user_apdb}@{ip_apdb}/{db_apdb}" + + def _init_local_butler(self, base_path: str): """Prepare the local butler to ingest into and process from. ``self.instrument`` must already exist. ``self.butler`` is correctly - initialized after this method returns. + initialized after this method returns, and is guaranteed to be unique + to this object. Parameters ---------- - butler : `lsst.daf.butler.Butler` - Local butler to process data in and hold calibrations, etc.; must - be writeable. + base_path : `str` + An absolute path to a space where the repo can be created. """ + # Directory has same lifetime as this object. + self._repo = tempfile.TemporaryDirectory(dir=base_path, prefix="butler-") + butler = Butler(Butler.makeRepo(self._repo.name), writeable=True) + _log.info("Created local Butler repo at %s.", self._repo.name) + self.instrument.register(butler.registry) # Will be populated in prep_butler. @@ -500,10 +547,8 @@ def _prep_pipeline(self, visit: Visit) -> None: pipeline = lsst.pipe.base.Pipeline.fromFile(ap_pipeline_file) except FileNotFoundError: raise RuntimeError(f"No ApPipe.yaml defined for camera {self.instrument.getName()}") - # TODO: Can we write to a configurable apdb schema, rather than - # "postgres"? - pipeline.addConfigOverride("diaPipe", "apdb.db_url", - f"postgresql://postgres@{self.ip_apdb}/postgres") + pipeline.addConfigOverride("diaPipe", "apdb.db_url", self._apdb_uri) + pipeline.addConfigOverride("diaPipe", "apdb.namespace", self._apdb_namespace) return pipeline def _download(self, remote): diff --git a/python/activator/raw.py b/python/activator/raw.py index 4a931f18..80ddeb5a 100644 --- a/python/activator/raw.py +++ b/python/activator/raw.py @@ -25,10 +25,64 @@ vice versa. """ -__all__ = ["RAW_REGEXP", "get_raw_path"] +__all__ = ["Snap", "RAW_REGEXP", "get_raw_path"] +from dataclasses import dataclass import re +from .visit import Visit + + +@dataclass(frozen=True) +class Snap: + instrument: str # short name + detector: int + group: str # observatory-specific ID; not the same as visit number + snap: int # exposure number within a group + exp_id: int # Butler-compatible unique exposure ID + filter: str # physical filter + + def __str__(self): + """Return a short string that disambiguates the image. + """ + return f"(exposure {self.exp_id}, group {self.group}/{self.snap})" + + @classmethod + def from_oid(cls, oid: str): + """Construct a Snap from an image bucket's filename. + + Parameters + ---------- + oid : `str` + A pathname from which to extract snap information. + """ + m = re.match(RAW_REGEXP, oid) + if m: + return Snap(instrument=m["instrument"], + detector=int(m["detector"]), + group=m["group"], + snap=int(m["snap"]), + exp_id=int(m["expid"]), + filter=m["filter"], + ) + else: + raise ValueError(f"{oid} could not be parsed into a Snap") + + def is_consistent(self, visit: Visit): + """Test if this snap could have come from a particular visit. + + Parameters + ---------- + visit : `activator.visit.Visit` + The visit from which snaps were expected. + """ + return (self.instrument == visit.instrument + and self.detector == visit.detector + and self.group == visit.group + and self.snap < visit.snaps + ) + + # Format for filenames of raws uploaded to image bucket: # instrument/detector/group/snap/expid/filter/*.(fits, fz, fits.gz) RAW_REGEXP = re.compile( diff --git a/tests/test_logger.py b/tests/test_logger.py index c859ab3a..7e4f82b0 100644 --- a/tests/test_logger.py +++ b/tests/test_logger.py @@ -31,13 +31,6 @@ class GoogleFormatterTest(unittest.TestCase): """Test GCloudStructuredLogFormatter with fake log messages. """ - @classmethod - def setUpClass(cls): - super().setUpClass() - - # Each test has its own handler - # logging.basicConfig(handlers=[logging.NullHandler()]) - def setUp(self): super().setUp() diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 114b1364..8bf2c91c 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -38,7 +38,8 @@ import lsst.resources from activator.visit import Visit -from activator.middleware_interface import MiddlewareInterface, _query_missing_datasets, _prepend_collection +from activator.middleware_interface import get_central_butler, MiddlewareInterface, \ + _query_missing_datasets, _prepend_collection # The short name of the instrument used in the test repo. instname = "DECam" @@ -98,7 +99,10 @@ class MiddlewareInterfaceTest(unittest.TestCase): @classmethod def setUpClass(cls): cls.env_patcher = unittest.mock.patch.dict(os.environ, - {"IP_APDB": "localhost"}) + {"IP_APDB": "localhost", + "DB_APDB": "postgres", + "USER_APDB": "postgres", + }) cls.env_patcher.start() super().setUpClass() @@ -111,18 +115,17 @@ def tearDownClass(cls): def setUp(self): data_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), "data") - central_repo = os.path.join(data_dir, "central_repo") - central_butler = Butler(central_repo, + self.central_repo = os.path.join(data_dir, "central_repo") + central_butler = Butler(self.central_repo, collections=[f"{instname}/defaults"], writeable=False, inferDefaults=False) instrument = "lsst.obs.decam.DarkEnergyCamera" - instrument_name = "DECam" self.input_data = os.path.join(data_dir, "input_data") # Have to preserve the tempdir, so that it doesn't get cleaned up. - self.repo = tempfile.TemporaryDirectory() - self.butler = Butler(Butler.makeRepo(self.repo.name), writeable=True) - self.interface = MiddlewareInterface(central_butler, self.input_data, instrument, self.butler, + self.workspace = tempfile.TemporaryDirectory() + self.interface = MiddlewareInterface(central_butler, self.input_data, instrument, + self.workspace.name, prefix="file://") # coordinates from DECam data in ap_verify_ci_hits2015 for visit 411371 @@ -130,7 +133,7 @@ def setUp(self): dec = -4.950050405424033 # DECam has no rotator; instrument angle is 90 degrees in our system. rot = 90. - self.next_visit = Visit(instrument_name, + self.next_visit = Visit(instname, detector=56, group="1", snaps=1, @@ -144,7 +147,19 @@ def setUp(self): def tearDown(self): super().tearDown() # TemporaryDirectory warns on leaks - self.repo.cleanup() + self.interface._repo.cleanup() # TODO: should MiddlewareInterface have a cleanup method? + self.workspace.cleanup() + + def test_get_butler(self): + for butler in [get_central_butler(self.central_repo, "lsst.obs.decam.DarkEnergyCamera"), + get_central_butler(self.central_repo, instname), + ]: + # TODO: better way to test repo location? + self.assertTrue( + butler.getURI("camera", instrument=instname, run="foo", predict=True).ospath + .startswith(self.central_repo)) + self.assertEqual(list(butler.collections), [f"{instname}/defaults"]) + self.assertTrue(butler.isWriteable()) def test_init(self): """Basic tests of the initialized interface object. @@ -155,7 +170,7 @@ def test_init(self): # * On init, is the local butler repo purely in memory? # Check that the butler instance is properly configured. - instruments = list(self.butler.registry.queryDimensionRecords("instrument")) + instruments = list(self.interface.butler.registry.queryDimensionRecords("instrument")) self.assertEqual(instname, instruments[0].name) self.assertEqual(set(self.interface.butler.collections), {self.interface.output_collection}) @@ -224,7 +239,7 @@ def test_prep_butler(self): # TODO DM-34112: check these shards again with some plots, once I've # determined whether ci_hits2015 actually has enough shards. expected_shards = {157394, 157401, 157405} - self._check_imports(self.butler, detector=56, expected_shards=expected_shards) + self._check_imports(self.interface.butler, detector=56, expected_shards=expected_shards) def test_prep_butler_twice(self): """prep_butler should have the correct calibs (and not raise an @@ -239,7 +254,7 @@ def test_prep_butler_twice(self): self.next_visit = dataclasses.replace(self.next_visit, group=str(int(self.next_visit.group) + 1)) self.interface.prep_butler(self.next_visit) expected_shards = {157394, 157401, 157405} - self._check_imports(self.butler, detector=56, expected_shards=expected_shards) + self._check_imports(self.interface.butler, detector=56, expected_shards=expected_shards) # Third visit with different detector and coordinates. # Only 5, 10, 56, 60 have valid calibs. @@ -252,7 +267,7 @@ def test_prep_butler_twice(self): ) self.interface.prep_butler(self.next_visit) expected_shards.update({157218, 157229}) - self._check_imports(self.butler, detector=5, expected_shards=expected_shards) + self._check_imports(self.interface.butler, detector=5, expected_shards=expected_shards) # TODO: regression test for prep_butler having a stale cache for the butler it's updating. # This may be impossible to unit test, since it seems to depend on Google-side parallelism. @@ -262,15 +277,15 @@ def test_ingest_image(self): filename = "fakeRawImage.fits" filepath = os.path.join(self.input_data, filename) data_id, file_data = fake_file_data(filepath, - self.butler.dimensions, + self.interface.butler.dimensions, self.interface.instrument, self.next_visit) with unittest.mock.patch.object(self.interface.rawIngestTask, "extractMetadata") as mock: mock.return_value = file_data self.interface.ingest_image(self.next_visit, filename) - datasets = list(self.butler.registry.queryDatasets('raw', - collections=[f'{instname}/raw/all'])) + datasets = list(self.interface.butler.registry.queryDatasets('raw', + collections=[f'{instname}/raw/all'])) self.assertEqual(datasets[0].dataId, data_id) # TODO: After raw ingest, we can define exposure dimension records # and check that the visits are defined @@ -290,7 +305,7 @@ def test_ingest_image_fails_missing_file(self): filename = "nonexistentImage.fits" filepath = os.path.join(self.input_data, filename) data_id, file_data = fake_file_data(filepath, - self.butler.dimensions, + self.interface.butler.dimensions, self.interface.instrument, self.next_visit) with unittest.mock.patch.object(self.interface.rawIngestTask, "extractMetadata") as mock, \ @@ -298,8 +313,8 @@ def test_ingest_image_fails_missing_file(self): mock.return_value = file_data self.interface.ingest_image(self.next_visit, filename) # There should not be any raw files in the registry. - datasets = list(self.butler.registry.queryDatasets('raw', - collections=[f'{instname}/raw/all'])) + datasets = list(self.interface.butler.registry.queryDatasets('raw', + collections=[f'{instname}/raw/all'])) self.assertEqual(datasets, []) def test_run_pipeline(self): @@ -312,7 +327,7 @@ def test_run_pipeline(self): filename = "fakeRawImage.fits" filepath = os.path.join(self.input_data, filename) data_id, file_data = fake_file_data(filepath, - self.butler.dimensions, + self.interface.butler.dimensions, self.interface.instrument, self.next_visit) with unittest.mock.patch.object(self.interface.rawIngestTask, "extractMetadata") as mock: @@ -336,7 +351,7 @@ def test_run_pipeline_empty_quantum_graph(self): filename = "fakeRawImage.fits" filepath = os.path.join(self.input_data, filename) data_id, file_data = fake_file_data(filepath, - self.butler.dimensions, + self.interface.butler.dimensions, self.interface.instrument, self.next_visit) with unittest.mock.patch.object(self.interface.rawIngestTask, "extractMetadata") as mock: @@ -391,20 +406,21 @@ def test_query_missing_datasets_nodim(self): self.assertEqual(result, {data1}) def test_prepend_collection(self): - self.butler.registry.registerCollection("_prepend1", CollectionType.TAGGED) - self.butler.registry.registerCollection("_prepend2", CollectionType.TAGGED) - self.butler.registry.registerCollection("_prepend3", CollectionType.TAGGED) - self.butler.registry.registerCollection("_prepend_base", CollectionType.CHAINED) + butler = self.interface.butler + butler.registry.registerCollection("_prepend1", CollectionType.TAGGED) + butler.registry.registerCollection("_prepend2", CollectionType.TAGGED) + butler.registry.registerCollection("_prepend3", CollectionType.TAGGED) + butler.registry.registerCollection("_prepend_base", CollectionType.CHAINED) # Empty chain. - self.assertEqual(list(self.butler.registry.getCollectionChain("_prepend_base")), []) - _prepend_collection(self.butler, "_prepend_base", ["_prepend1"]) - self.assertEqual(list(self.butler.registry.getCollectionChain("_prepend_base")), ["_prepend1"]) + self.assertEqual(list(butler.registry.getCollectionChain("_prepend_base")), []) + _prepend_collection(butler, "_prepend_base", ["_prepend1"]) + self.assertEqual(list(butler.registry.getCollectionChain("_prepend_base")), ["_prepend1"]) # Non-empty chain. - self.butler.registry.setCollectionChain("_prepend_base", ["_prepend1", "_prepend2"]) - _prepend_collection(self.butler, "_prepend_base", ["_prepend3"]) - self.assertEqual(list(self.butler.registry.getCollectionChain("_prepend_base")), + butler.registry.setCollectionChain("_prepend_base", ["_prepend1", "_prepend2"]) + _prepend_collection(butler, "_prepend_base", ["_prepend3"]) + self.assertEqual(list(butler.registry.getCollectionChain("_prepend_base")), ["_prepend3", "_prepend1", "_prepend2"]) @@ -420,7 +436,10 @@ def setUpClass(cls): super().setUpClass() cls.env_patcher = unittest.mock.patch.dict(os.environ, - {"IP_APDB": "localhost"}) + {"IP_APDB": "localhost", + "DB_APDB": "postgres", + "USER_APDB": "postgres", + }) cls.env_patcher.start() @classmethod @@ -466,11 +485,10 @@ def setUp(self): instrument = "lsst.obs.decam.DarkEnergyCamera" data_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), "data") self.input_data = os.path.join(data_dir, "input_data") - repo = tempfile.TemporaryDirectory() + workspace = tempfile.TemporaryDirectory() # TemporaryDirectory warns on leaks; addCleanup also keeps the TD from # getting garbage-collected. - self.addCleanup(tempfile.TemporaryDirectory.cleanup, repo) - self.butler = Butler(Butler.makeRepo(repo.name), writeable=True) + self.addCleanup(tempfile.TemporaryDirectory.cleanup, workspace) # coordinates from DECam data in ap_verify_ci_hits2015 for visit 411371 ra = 155.4702849608958 @@ -489,13 +507,15 @@ def setUp(self): self.logger_name = "lsst.activator.middleware_interface" # Populate repository. - self.interface = MiddlewareInterface(central_butler, self.input_data, instrument, self.butler, + self.interface = MiddlewareInterface(central_butler, self.input_data, instrument, workspace.name, prefix="file://") + # TODO: should MiddlewareInterface have a cleanup method? + self.addCleanup(tempfile.TemporaryDirectory.cleanup, self.interface._repo) self.interface.prep_butler(self.next_visit) filename = "fakeRawImage.fits" filepath = os.path.join(self.input_data, filename) self.raw_data_id, file_data = fake_file_data(filepath, - self.butler.dimensions, + self.interface.butler.dimensions, self.interface.instrument, self.next_visit) with unittest.mock.patch.object(self.interface.rawIngestTask, "extractMetadata") as mock: diff --git a/tests/test_raw.py b/tests/test_raw.py index 0fdf1d27..e1d973f8 100644 --- a/tests/test_raw.py +++ b/tests/test_raw.py @@ -22,7 +22,8 @@ import re import unittest -from activator.raw import RAW_REGEXP, get_raw_path +from activator.raw import Snap, RAW_REGEXP, get_raw_path +from activator.visit import Visit class RawTest(unittest.TestCase): @@ -43,6 +44,17 @@ def setUp(self): self.snap = 1 self.exposure = 404 + self.visit = Visit(instrument=self.instrument, + detector=self.detector, + group=self.group, + snaps=self.snaps, + filter=self.filter, + ra=self.ra, + dec=self.dec, + rot=self.rot, + kind=self.kind, + ) + def test_writeread(self): """Test that raw module can parse the paths it creates. """ @@ -55,3 +67,47 @@ def test_writeread(self): self.assertEqual(parsed['snap'], str(self.snap)) self.assertEqual(parsed['expid'], str(self.exposure)) self.assertEqual(parsed['filter'], str(self.filter)) + + def test_snap(self): + """Test that Snap objects can be constructed from parseable paths. + """ + path = get_raw_path(self.instrument, self.detector, self.group, self.snap, self.exposure, self.filter) + parsed = Snap.from_oid(path) + self.assertIsNotNone(parsed) + # These tests automatically include type-checking. + self.assertEqual(parsed.instrument, self.instrument) + self.assertEqual(parsed.detector, self.detector) + self.assertEqual(parsed.group, self.group) + self.assertEqual(parsed.snap, self.snap) + self.assertEqual(parsed.exp_id, self.exposure) + self.assertEqual(parsed.filter, self.filter) + + def test_bad_snap(self): + path = get_raw_path(self.instrument, f"{self.detector}b", self.group, + self.snap, self.exposure, self.filter) + with self.assertRaisesRegex(ValueError, "not .* parsed"): + Snap.from_oid(path) + + def test_snap_consistent(self): + path = get_raw_path(self.instrument, self.detector, self.group, self.snap, self.exposure, self.filter) + snap = Snap.from_oid(path) + self.assertTrue(snap.is_consistent(self.visit)) + + path = get_raw_path("Foo", self.detector, self.group, self.snap, self.exposure, self.filter) + snap = Snap.from_oid(path) + self.assertFalse(snap.is_consistent(self.visit)) + + path = get_raw_path(self.instrument, self.visit.detector+1, self.group, self.snap, + self.exposure, self.filter) + snap = Snap.from_oid(path) + self.assertFalse(snap.is_consistent(self.visit)) + + path = get_raw_path(self.instrument, self.detector, self.group + "b", self.snap, + self.exposure, self.filter) + snap = Snap.from_oid(path) + self.assertFalse(snap.is_consistent(self.visit)) + + path = get_raw_path(self.instrument, self.detector, self.group, self.visit.snaps, + self.exposure, self.filter) + snap = Snap.from_oid(path) + self.assertFalse(snap.is_consistent(self.visit))