diff --git a/python/activator/activator.py b/python/activator/activator.py index a080a429..f6488b43 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -1,3 +1,26 @@ +# This file is part of prompt_prototype. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +__all__ = ["check_for_snap", "next_visit_handler"] + import base64 import json import logging @@ -9,8 +32,8 @@ from flask import Flask, request from google.cloud import pubsub_v1, storage -from middleware_interface import MiddlewareInterface -from visit import Visit +from .middleware_interface import MiddlewareInterface +from .visit import Visit PROJECT_ID = "prompt-proto" diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 94de4913..c995ab83 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -1,3 +1,26 @@ +# This file is part of prompt_prototype. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +__all__ = ["MiddlewareInterface"] + import logging import os import shutil @@ -5,7 +28,7 @@ from astropy.time import Time from lsst.daf.butler import Butler -from visit import Visit +from .visit import Visit PIPELINE_MAP = dict( BIAS="bias.yaml", @@ -18,6 +41,26 @@ class MiddlewareInterface: + """Interface layer between the Butler middleware and the prompt processing + data handling system. + + An instance of this class will accept an incoming group of snaps to + process, using an instance-local butler repo. The instance can pre-load + the necessary calibrations to process an incoming visit, ingest the data + when it is available, and run the difference imaging pipeline, all in that + local butler. + + Parameters + ---------- + input_repo : `str` + Path to a butler repo containing the calibration and other data needed + for processing images as they are received. + image_bucket : `str` + Google storage bucket where images will be written to as they arrive. + instrument : `str` + Name of the instrument taking the data, for populating butler + collections and dataIds. + """ def __init__(self, input_repo: str, image_bucket: str, instrument: str): # self.src = Butler(input_repo, writeable=False) @@ -25,7 +68,7 @@ def __init__(self, input_repo: str, image_bucket: str, instrument: str): self.image_bucket = image_bucket self.instrument = instrument - self.repo = "/tmp/butler-{os.getpid()}" + self.repo = f"/tmp/butler-{os.getpid()}" if not os.path.exists(self.repo): _log.info(f"Making local Butler {self.repo}") Butler.makeRepo(self.repo) @@ -83,6 +126,13 @@ def __init__(self, input_repo: str, image_bucket: str, instrument: str): self.calib_types = ["bias", "dark", "defects", "flat", "fringe", ] def prep_butler(self, visit: Visit) -> None: + """Prepare a temporary butler repo for processing the incoming data. + + Parameters + ---------- + visit : Visit + Group of snaps from one detector to prepare the butler for. + """ _log.info(f"Preparing Butler for visit '{visit}'") visit_info = visit.__dict__ for calib_type in self.calib_types: @@ -122,6 +172,14 @@ def prep_butler(self, visit: Visit) -> None: shutil.rmtree(visit_dir, ignore_errors=True) def ingest_image(self, oid: str) -> None: + """Ingest an image into the temporary butler. + + Parameters + ---------- + oid : `str` + Google storage identifier for incoming image, relative to the + image bucket. + """ _log.info(f"Ingesting image '{oid}'") run = f"{self.instrument}/raw/all" cmd = [ @@ -137,7 +195,19 @@ def ingest_image(self, oid: str) -> None: _log.debug(str(cmd)) # subprocess.run(cmd, check=True) - def run_pipeline(self, visit: Visit, snaps) -> None: + def run_pipeline(self, visit: Visit, snaps: set) -> None: + """Process the received image. + + Parameters + ---------- + visit : Visit + Group of snaps from one detector to be processed. + snaps : `set` + Identifiers of the snaps that were received. + TODO: I believe this is unnecessary because it should be encoded + in the `visit` object, but we'll have to test how that works once + we implemented this with actual data. + """ pipeline = PIPELINE_MAP[visit.kind] _log.info(f"Running pipeline {pipeline} on visit '{visit}', snaps {snaps}") cmd = [ diff --git a/python/activator/visit.py b/python/activator/visit.py index b28fac99..b2729ac7 100644 --- a/python/activator/visit.py +++ b/python/activator/visit.py @@ -1,3 +1,5 @@ +__all__ = ["Visit"] + from dataclasses import dataclass diff --git a/python/tester/upload.py b/python/tester/upload.py index a15680d8..607a23af 100644 --- a/python/tester/upload.py +++ b/python/tester/upload.py @@ -44,7 +44,7 @@ def process_group(publisher, bucket, instrument, group, filter, kind): logging.info(f"Taking group {group} snap {snap}") time.sleep(EXPOSURE_INTERVAL) for detector in range(INSTRUMENTS[instrument].n_detectors): - logging.info(f"Uploading {group} {snap} {filter} {detector}") + logging.info(f"Uploading group: {group} snap: {snap} filter: {filter} detector: {detector}") exposure_id = (group // 100000) * 100000 exposure_id += (group % 100000) * INSTRUMENTS[instrument].n_snaps exposure_id += snap diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py new file mode 100644 index 00000000..4463012b --- /dev/null +++ b/tests/test_middleware_interface.py @@ -0,0 +1,72 @@ +# This file is part of prompt_prototype. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import os.path +import tempfile +import unittest + +from activator.middleware_interface import MiddlewareInterface +from activator.visit import Visit + + +class MiddlewareInterfaceTest(unittest.TestCase): + def setUp(self): + input_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), "data") + instrument = "TestCam" + self.output_dir = tempfile.TemporaryDirectory() + self.interface = MiddlewareInterface(input_dir, self.output_dir, instrument) + self.next_visit = Visit(instrument, + detector=1, + group=1, + snaps=1, + filter="r", + ra=10, + dec=20, + kind="BIAS") + self.logger_name = "activator.middleware_interface" + + def test_init(self): + """Basic tests of initializing an interface object. + """ + # did we get a useable butler instance? + self.assertIn("/tmp/butler-", self.interface.dest.datastore.root.path) + # Ideas for things to test: + # * On init, does the right kind of butler get created, with the right + # collections, etc? + # * On init, is the local butler repo purely in memory? + + def test_prep_butler(self): + with self.assertLogs(self.logger_name, level="INFO") as cm: + self.interface.prep_butler(self.next_visit) + msg = f"INFO:{self.logger_name}:Preparing Butler for visit '{self.next_visit}'" + self.assertEqual(cm.output, [msg]) + + def test_ingest_image(self): + with self.assertLogs(self.logger_name, level="INFO") as cm: + self.interface.ingest_image(self.next_visit) + msg = f"INFO:{self.logger_name}:Ingesting image '{self.next_visit}'" + self.assertEqual(cm.output, [msg]) + + def test_run_pipeline(self): + with self.assertLogs(self.logger_name, level="INFO") as cm: + self.interface.run_pipeline(self.next_visit, 1) + msg = f"INFO:{self.logger_name}:Running pipeline bias.yaml on visit '{self.next_visit}', snaps 1" + self.assertEqual(cm.output, [msg])