diff --git a/doc/playbook.rst b/doc/playbook.rst index 7b0ec502..c4e9c7f7 100644 --- a/doc/playbook.rst +++ b/doc/playbook.rst @@ -131,7 +131,7 @@ tester On a local machine, it requires a JSON token in ``./prompt-proto-upload.json``. To obtain a token, see the GCP documentation on `service account keys`_; the relevant service account is ``prompt-image-upload@prompt-proto.iam.gserviceaccount.com``. -.. _service account keys: https://cloud.google.com/iam/docs/creating-managing-service-account-keys#getting_a_service_account_key +.. _service account keys: https://cloud.google.com/iam/docs/creating-managing-service-account-keys Run the tester either on a local machine, or in Cloud Shell. In Cloud Shell, install the prototype code and the GCP PubSub client: diff --git a/python/activator/activator.py b/python/activator/activator.py index f6488b43..878b716b 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -53,8 +53,8 @@ ), style="{", ) -logger = logging.getLogger(__name__) -logger.setLevel(logging.DEBUG) +_log = logging.getLogger("lsst." + __name__) +_log.setLevel(logging.DEBUG) app = Flask(__name__) subscriber = pubsub_v1.SubscriberClient() @@ -74,13 +74,13 @@ def check_for_snap( instrument: str, group: int, snap: int, detector: int ) -> Optional[str]: prefix = f"{instrument}/{detector}/{group}/{snap}/{instrument}-{group}-{snap}-" - logger.debug(f"Checking for '{prefix}'") + _log.debug(f"Checking for '{prefix}'") blobs = list(storage_client.list_blobs(image_bucket, prefix=prefix)) if not blobs: return None elif len(blobs) > 1: - logger.error( - f"Multiple files detected for a single group/snap/detector: '{prefix}'" + _log.error( + f"Multiple files detected for a single detector/group/snap: '{prefix}'" ) return blobs[0] @@ -93,17 +93,17 @@ def next_visit_handler() -> Tuple[str, int]: topic=topic_path, ack_deadline_seconds=60, ) - logger.debug(f"Created subscription '{subscription.name}'") + _log.debug(f"Created subscription '{subscription.name}'") try: envelope = request.get_json() if not envelope: msg = "no Pub/Sub message received" - logging.warn(f"error: '{msg}'") + _log.warn(f"error: '{msg}'") return f"Bad Request: {msg}", 400 if not isinstance(envelope, dict) or "message" not in envelope: msg = "invalid Pub/Sub message format" - logging.warn(f"error: '{msg}'") + _log.warn(f"error: '{msg}'") return f"Bad Request: {msg}", 400 payload = base64.b64decode(envelope["message"]["data"]) @@ -127,7 +127,7 @@ def next_visit_handler() -> Tuple[str, int]: mwi.ingest_image(oid) snap_set.add(snap) - logger.debug( + _log.debug( "Waiting for snaps from group" f" '{expected_visit.group}' detector {expected_visit.detector}" ) @@ -141,14 +141,14 @@ def next_visit_handler() -> Tuple[str, int]: end = time.time() if len(response.received_messages) == 0: if end - start < timeout: - logger.debug( - f"Empty pull after {end - start}" - f" for '{expected_visit.group}'" + _log.debug( + f"Empty pull after {end - start}s" + f" for group '{expected_visit.group}'" ) continue - logger.warning( + _log.warning( "Timed out waiting for image in" - f" '{expected_visit.group}' after receiving snaps {snap_set}" + f" group '{expected_visit.group}' after receiving snaps {snap_set}" ) break @@ -159,7 +159,7 @@ def next_visit_handler() -> Tuple[str, int]: m = re.match(oid_regexp, oid) if m: instrument, detector, group, snap = m.groups() - logger.debug(m.groups()) + _log.debug("instrument, detector, group, snap = %s", m.groups()) if ( instrument == expected_visit.instrument and int(detector) == int(expected_visit.detector) @@ -170,10 +170,11 @@ def next_visit_handler() -> Tuple[str, int]: mwi.ingest_image(oid) snap_set.add(snap) else: - logger.error(f"Failed to match object id '{oid}'") + _log.error(f"Failed to match object id '{oid}'") subscriber.acknowledge(subscription=subscription.name, ack_ids=ack_list) # Got all the snaps; run the pipeline + _log.info(f"Running pipeline on group: {expected_visit.group} detector: {expected_visit.detector}") mwi.run_pipeline(expected_visit, snap_set) return "Pipeline executed", 200 finally: @@ -182,7 +183,7 @@ def next_visit_handler() -> Tuple[str, int]: @app.errorhandler(500) def server_error(e) -> Tuple[str, int]: - logger.exception("An error occurred during a request.") + _log.exception("An error occurred during a request.") return ( f""" An internal error occurred:
{e}diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index c995ab83..c2b126a8 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -36,7 +36,7 @@ FLAT="flat.yaml", ) -_log = logging.getLogger(__name__) +_log = logging.getLogger("lsst." + __name__) _log.setLevel(logging.DEBUG) @@ -64,7 +64,7 @@ class MiddlewareInterface: def __init__(self, input_repo: str, image_bucket: str, instrument: str): # self.src = Butler(input_repo, writeable=False) - _log.info(f"Butler({input_repo}, writeable=False)") + _log.debug(f"Butler({input_repo}, writeable=False)") self.image_bucket = image_bucket self.instrument = instrument @@ -82,6 +82,7 @@ def __init__(self, input_repo: str, image_bucket: str, instrument: str): export_collections = set() export_collections.add(self.calibration_collection) + _log.debug("Finding secondary collections") # calib_collections = list( # self.r.queryCollections( # self.calibration_collection, @@ -94,6 +95,7 @@ def __init__(self, input_repo: str, image_bucket: str, instrument: str): # export_collections.add(collection) export_collections.add(refcat_collection) + _log.debug("Finding refcats") # for dataset in self.r.queryDatasets( # "gaia_dr2_20200414", # where=f"htm7 IN ({htm7})", @@ -180,7 +182,7 @@ def ingest_image(self, oid: str) -> None: Google storage identifier for incoming image, relative to the image bucket. """ - _log.info(f"Ingesting image '{oid}'") + _log.info(f"Ingesting image id '{oid}'") run = f"{self.instrument}/raw/all" cmd = [ "butler", @@ -192,7 +194,7 @@ def ingest_image(self, oid: str) -> None: self.repo, f"gs://{self.image_bucket}/{oid}", ] - _log.debug(str(cmd)) + _log.debug("ingest command line: %s", cmd) # subprocess.run(cmd, check=True) def run_pipeline(self, visit: Visit, snaps: set) -> None: @@ -221,7 +223,7 @@ def run_pipeline(self, visit: Visit, snaps: set) -> None: "-i", f"{self.instrument}/raw/all", ] - _log.debug(str(cmd)) + _log.debug("pipetask command line: %s", cmd) # subprocess.run(cmd, check=True) diff --git a/python/tester/upload.py b/python/tester/upload.py index 607a23af..a7e9d42a 100644 --- a/python/tester/upload.py +++ b/python/tester/upload.py @@ -33,18 +33,19 @@ class Instrument: logging.basicConfig( format="{levelname} {asctime} {name} - {message}", style="{", - level=logging.DEBUG, ) +_log = logging.getLogger("lsst." + __name__) +_log.setLevel(logging.DEBUG) def process_group(publisher, bucket, instrument, group, filter, kind): n_snaps = INSTRUMENTS[instrument].n_snaps send_next_visit(publisher, instrument, group, n_snaps, filter, kind) for snap in range(n_snaps): - logging.info(f"Taking group {group} snap {snap}") + _log.info(f"Taking group: {group} snap: {snap}") time.sleep(EXPOSURE_INTERVAL) for detector in range(INSTRUMENTS[instrument].n_detectors): - logging.info(f"Uploading group: {group} snap: {snap} filter: {filter} detector: {detector}") + _log.info(f"Uploading group: {group} snap: {snap} filter: {filter} detector: {detector}") exposure_id = (group // 100000) * 100000 exposure_id += (group % 100000) * INSTRUMENTS[instrument].n_snaps exposure_id += snap @@ -54,14 +55,16 @@ def process_group(publisher, bucket, instrument, group, filter, kind): f"-{exposure_id}-{filter}-{detector}.fz" ) bucket.blob(fname).upload_from_string("Test") - logging.info(f"Uploaded {group} {snap} {filter} {detector}") + _log.info(f"Uploaded group: {group} snap: {snap} filter: {filter} detector: {detector}") def send_next_visit(publisher, instrument, group, snaps, filter, kind): + _log.info(f"Sending next_visit for group: {group} snaps: {snaps} filter: {filter} kind: {kind}") topic_path = publisher.topic_path(PROJECT_ID, "nextVisit") ra = random.uniform(0.0, 360.0) dec = random.uniform(-90.0, 90.0) for detector in range(INSTRUMENTS[instrument].n_detectors): + _log.debug(f"Sending next_visit for group: {group} detector: {detector} ra: {ra} dec: {dec}") visit = Visit(instrument, detector, group, snaps, filter, ra, dec, kind) data = json.dumps(visit.__dict__).encode("utf-8") publisher.publish(topic_path, data=data) @@ -99,14 +102,14 @@ def main(): last_group = int(date) * 100000 else: last_group = max(prefixes) + random.randrange(10, 19) - logging.info(f"Last group {last_group}") + _log.info(f"Last group {last_group}") for i in range(n_groups): kind = KINDS[i % len(KINDS)] group = last_group + i + 1 filter = FILTER_LIST[random.randrange(0, len(FILTER_LIST))] process_group(publisher, bucket, instrument, group, filter, kind) - logging.info("Slewing to next group") + _log.info("Slewing to next group") time.sleep(SLEW_INTERVAL) diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 4463012b..40a450af 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -41,7 +41,7 @@ def setUp(self): ra=10, dec=20, kind="BIAS") - self.logger_name = "activator.middleware_interface" + self.logger_name = "lsst.activator.middleware_interface" def test_init(self): """Basic tests of initializing an interface object. @@ -62,7 +62,7 @@ def test_prep_butler(self): def test_ingest_image(self): with self.assertLogs(self.logger_name, level="INFO") as cm: self.interface.ingest_image(self.next_visit) - msg = f"INFO:{self.logger_name}:Ingesting image '{self.next_visit}'" + msg = f"INFO:{self.logger_name}:Ingesting image id '{self.next_visit}'" self.assertEqual(cm.output, [msg]) def test_run_pipeline(self):