From f267a767a5170a8f511ba814cdc14da0dea53a11 Mon Sep 17 00:00:00 2001 From: John Parejko Date: Tue, 8 Mar 2022 10:47:38 -0800 Subject: [PATCH 1/6] Update logging with field descriptions. --- python/activator/activator.py | 10 +++++----- python/activator/middleware_interface.py | 6 +++--- python/tester/upload.py | 4 ++-- tests/test_middleware_interface.py | 2 +- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/python/activator/activator.py b/python/activator/activator.py index f6488b43..71dc3bbe 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -80,7 +80,7 @@ def check_for_snap( return None elif len(blobs) > 1: logger.error( - f"Multiple files detected for a single group/snap/detector: '{prefix}'" + f"Multiple files detected for a single detector/group/snap: '{prefix}'" ) return blobs[0] @@ -142,13 +142,13 @@ def next_visit_handler() -> Tuple[str, int]: if len(response.received_messages) == 0: if end - start < timeout: logger.debug( - f"Empty pull after {end - start}" - f" for '{expected_visit.group}'" + f"Empty pull after {end - start}s" + f" for group '{expected_visit.group}'" ) continue logger.warning( "Timed out waiting for image in" - f" '{expected_visit.group}' after receiving snaps {snap_set}" + f" group '{expected_visit.group}' after receiving snaps {snap_set}" ) break @@ -159,7 +159,7 @@ def next_visit_handler() -> Tuple[str, int]: m = re.match(oid_regexp, oid) if m: instrument, detector, group, snap = m.groups() - logger.debug(m.groups()) + logger.debug("instrument, detector, group, snap = %s", m.groups()) if ( instrument == expected_visit.instrument and int(detector) == int(expected_visit.detector) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index c995ab83..7e72f311 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -180,7 +180,7 @@ def ingest_image(self, oid: str) -> None: Google storage identifier for incoming image, relative to the image bucket. """ - _log.info(f"Ingesting image '{oid}'") + _log.info(f"Ingesting image id '{oid}'") run = f"{self.instrument}/raw/all" cmd = [ "butler", @@ -192,7 +192,7 @@ def ingest_image(self, oid: str) -> None: self.repo, f"gs://{self.image_bucket}/{oid}", ] - _log.debug(str(cmd)) + _log.debug("ingest command line: %s", cmd) # subprocess.run(cmd, check=True) def run_pipeline(self, visit: Visit, snaps: set) -> None: @@ -221,7 +221,7 @@ def run_pipeline(self, visit: Visit, snaps: set) -> None: "-i", f"{self.instrument}/raw/all", ] - _log.debug(str(cmd)) + _log.debug("pipetask command line: %s", cmd) # subprocess.run(cmd, check=True) diff --git a/python/tester/upload.py b/python/tester/upload.py index 607a23af..f3cb3005 100644 --- a/python/tester/upload.py +++ b/python/tester/upload.py @@ -41,7 +41,7 @@ def process_group(publisher, bucket, instrument, group, filter, kind): n_snaps = INSTRUMENTS[instrument].n_snaps send_next_visit(publisher, instrument, group, n_snaps, filter, kind) for snap in range(n_snaps): - logging.info(f"Taking group {group} snap {snap}") + logging.info(f"Taking group: {group} snap: {snap}") time.sleep(EXPOSURE_INTERVAL) for detector in range(INSTRUMENTS[instrument].n_detectors): logging.info(f"Uploading group: {group} snap: {snap} filter: {filter} detector: {detector}") @@ -54,7 +54,7 @@ def process_group(publisher, bucket, instrument, group, filter, kind): f"-{exposure_id}-{filter}-{detector}.fz" ) bucket.blob(fname).upload_from_string("Test") - logging.info(f"Uploaded {group} {snap} {filter} {detector}") + logging.info(f"Uploaded group: {group} snap: {snap} filter: {filter} detector: {detector}") def send_next_visit(publisher, instrument, group, snaps, filter, kind): diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 4463012b..cd1bc250 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -62,7 +62,7 @@ def test_prep_butler(self): def test_ingest_image(self): with self.assertLogs(self.logger_name, level="INFO") as cm: self.interface.ingest_image(self.next_visit) - msg = f"INFO:{self.logger_name}:Ingesting image '{self.next_visit}'" + msg = f"INFO:{self.logger_name}:Ingesting image id '{self.next_visit}'" self.assertEqual(cm.output, [msg]) def test_run_pipeline(self): From 00bbcb53687ebd95e635ec8baa1c15433a76bef0 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Fri, 11 Mar 2022 15:28:53 -0600 Subject: [PATCH 2/6] Standardize local logger names. --- python/activator/activator.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/python/activator/activator.py b/python/activator/activator.py index 71dc3bbe..250aaab2 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -53,8 +53,8 @@ ), style="{", ) -logger = logging.getLogger(__name__) -logger.setLevel(logging.DEBUG) +_log = logging.getLogger(__name__) +_log.setLevel(logging.DEBUG) app = Flask(__name__) subscriber = pubsub_v1.SubscriberClient() @@ -74,12 +74,12 @@ def check_for_snap( instrument: str, group: int, snap: int, detector: int ) -> Optional[str]: prefix = f"{instrument}/{detector}/{group}/{snap}/{instrument}-{group}-{snap}-" - logger.debug(f"Checking for '{prefix}'") + _log.debug(f"Checking for '{prefix}'") blobs = list(storage_client.list_blobs(image_bucket, prefix=prefix)) if not blobs: return None elif len(blobs) > 1: - logger.error( + _log.error( f"Multiple files detected for a single detector/group/snap: '{prefix}'" ) return blobs[0] @@ -93,7 +93,7 @@ def next_visit_handler() -> Tuple[str, int]: topic=topic_path, ack_deadline_seconds=60, ) - logger.debug(f"Created subscription '{subscription.name}'") + _log.debug(f"Created subscription '{subscription.name}'") try: envelope = request.get_json() if not envelope: @@ -127,7 +127,7 @@ def next_visit_handler() -> Tuple[str, int]: mwi.ingest_image(oid) snap_set.add(snap) - logger.debug( + _log.debug( "Waiting for snaps from group" f" '{expected_visit.group}' detector {expected_visit.detector}" ) @@ -141,12 +141,12 @@ def next_visit_handler() -> Tuple[str, int]: end = time.time() if len(response.received_messages) == 0: if end - start < timeout: - logger.debug( + _log.debug( f"Empty pull after {end - start}s" f" for group '{expected_visit.group}'" ) continue - logger.warning( + _log.warning( "Timed out waiting for image in" f" group '{expected_visit.group}' after receiving snaps {snap_set}" ) @@ -159,7 +159,7 @@ def next_visit_handler() -> Tuple[str, int]: m = re.match(oid_regexp, oid) if m: instrument, detector, group, snap = m.groups() - logger.debug("instrument, detector, group, snap = %s", m.groups()) + _log.debug("instrument, detector, group, snap = %s", m.groups()) if ( instrument == expected_visit.instrument and int(detector) == int(expected_visit.detector) @@ -170,7 +170,7 @@ def next_visit_handler() -> Tuple[str, int]: mwi.ingest_image(oid) snap_set.add(snap) else: - logger.error(f"Failed to match object id '{oid}'") + _log.error(f"Failed to match object id '{oid}'") subscriber.acknowledge(subscription=subscription.name, ack_ids=ack_list) # Got all the snaps; run the pipeline @@ -182,7 +182,7 @@ def next_visit_handler() -> Tuple[str, int]: @app.errorhandler(500) def server_error(e) -> Tuple[str, int]: - logger.exception("An error occurred during a request.") + _log.exception("An error occurred during a request.") return ( f""" An internal error occurred:
{e}
From e3d19d35e2c31b617e05242d08b9fd40343202c8 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Tue, 8 Mar 2022 16:37:30 -0600 Subject: [PATCH 3/6] Use Logger object everywhere. --- python/activator/activator.py | 4 ++-- python/tester/upload.py | 13 +++++++------ 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/python/activator/activator.py b/python/activator/activator.py index 250aaab2..48f690f1 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -98,12 +98,12 @@ def next_visit_handler() -> Tuple[str, int]: envelope = request.get_json() if not envelope: msg = "no Pub/Sub message received" - logging.warn(f"error: '{msg}'") + _log.warn(f"error: '{msg}'") return f"Bad Request: {msg}", 400 if not isinstance(envelope, dict) or "message" not in envelope: msg = "invalid Pub/Sub message format" - logging.warn(f"error: '{msg}'") + _log.warn(f"error: '{msg}'") return f"Bad Request: {msg}", 400 payload = base64.b64decode(envelope["message"]["data"]) diff --git a/python/tester/upload.py b/python/tester/upload.py index f3cb3005..edd0b8fb 100644 --- a/python/tester/upload.py +++ b/python/tester/upload.py @@ -33,18 +33,19 @@ class Instrument: logging.basicConfig( format="{levelname} {asctime} {name} - {message}", style="{", - level=logging.DEBUG, ) +_log = logging.getLogger(__name__) +_log.setLevel(logging.DEBUG) def process_group(publisher, bucket, instrument, group, filter, kind): n_snaps = INSTRUMENTS[instrument].n_snaps send_next_visit(publisher, instrument, group, n_snaps, filter, kind) for snap in range(n_snaps): - logging.info(f"Taking group: {group} snap: {snap}") + _log.info(f"Taking group: {group} snap: {snap}") time.sleep(EXPOSURE_INTERVAL) for detector in range(INSTRUMENTS[instrument].n_detectors): - logging.info(f"Uploading group: {group} snap: {snap} filter: {filter} detector: {detector}") + _log.info(f"Uploading group: {group} snap: {snap} filter: {filter} detector: {detector}") exposure_id = (group // 100000) * 100000 exposure_id += (group % 100000) * INSTRUMENTS[instrument].n_snaps exposure_id += snap @@ -54,7 +55,7 @@ def process_group(publisher, bucket, instrument, group, filter, kind): f"-{exposure_id}-{filter}-{detector}.fz" ) bucket.blob(fname).upload_from_string("Test") - logging.info(f"Uploaded group: {group} snap: {snap} filter: {filter} detector: {detector}") + _log.info(f"Uploaded group: {group} snap: {snap} filter: {filter} detector: {detector}") def send_next_visit(publisher, instrument, group, snaps, filter, kind): @@ -99,14 +100,14 @@ def main(): last_group = int(date) * 100000 else: last_group = max(prefixes) + random.randrange(10, 19) - logging.info(f"Last group {last_group}") + _log.info(f"Last group {last_group}") for i in range(n_groups): kind = KINDS[i % len(KINDS)] group = last_group + i + 1 filter = FILTER_LIST[random.randrange(0, len(FILTER_LIST))] process_group(publisher, bucket, instrument, group, filter, kind) - logging.info("Slewing to next group") + _log.info("Slewing to next group") time.sleep(SLEW_INTERVAL) From 3c06fc6e1aa9a5c65afa2fc5619659eb94c831a7 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Tue, 8 Mar 2022 18:28:34 -0600 Subject: [PATCH 4/6] Add logging to more operations. --- python/activator/activator.py | 1 + python/activator/middleware_interface.py | 4 +++- python/tester/upload.py | 2 ++ 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/python/activator/activator.py b/python/activator/activator.py index 48f690f1..22f3e582 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -174,6 +174,7 @@ def next_visit_handler() -> Tuple[str, int]: subscriber.acknowledge(subscription=subscription.name, ack_ids=ack_list) # Got all the snaps; run the pipeline + _log.info(f"Running pipeline on group: {expected_visit.group} detector: {expected_visit.detector}") mwi.run_pipeline(expected_visit, snap_set) return "Pipeline executed", 200 finally: diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 7e72f311..aab7641e 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -64,7 +64,7 @@ class MiddlewareInterface: def __init__(self, input_repo: str, image_bucket: str, instrument: str): # self.src = Butler(input_repo, writeable=False) - _log.info(f"Butler({input_repo}, writeable=False)") + _log.debug(f"Butler({input_repo}, writeable=False)") self.image_bucket = image_bucket self.instrument = instrument @@ -82,6 +82,7 @@ def __init__(self, input_repo: str, image_bucket: str, instrument: str): export_collections = set() export_collections.add(self.calibration_collection) + _log.debug("Finding secondary collections") # calib_collections = list( # self.r.queryCollections( # self.calibration_collection, @@ -94,6 +95,7 @@ def __init__(self, input_repo: str, image_bucket: str, instrument: str): # export_collections.add(collection) export_collections.add(refcat_collection) + _log.debug("Finding refcats") # for dataset in self.r.queryDatasets( # "gaia_dr2_20200414", # where=f"htm7 IN ({htm7})", diff --git a/python/tester/upload.py b/python/tester/upload.py index edd0b8fb..7a10fd5b 100644 --- a/python/tester/upload.py +++ b/python/tester/upload.py @@ -59,10 +59,12 @@ def process_group(publisher, bucket, instrument, group, filter, kind): def send_next_visit(publisher, instrument, group, snaps, filter, kind): + _log.info(f"Sending next_visit for group: {group} snaps: {snaps} filter: {filter} kind: {kind}") topic_path = publisher.topic_path(PROJECT_ID, "nextVisit") ra = random.uniform(0.0, 360.0) dec = random.uniform(-90.0, 90.0) for detector in range(INSTRUMENTS[instrument].n_detectors): + _log.debug(f"Sending next_visit for group: {group} detector: {detector} ra: {ra} dec: {dec}") visit = Visit(instrument, detector, group, snaps, filter, ra, dec, kind) data = json.dumps(visit.__dict__).encode("utf-8") publisher.publish(topic_path, data=data) From 8d87373792f1d77b2796caf27e71718ffa8caa53 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Tue, 8 Mar 2022 18:30:45 -0600 Subject: [PATCH 5/6] Disambiguate user and flake8 logs. --- python/activator/activator.py | 2 +- python/activator/middleware_interface.py | 2 +- python/tester/upload.py | 2 +- tests/test_middleware_interface.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python/activator/activator.py b/python/activator/activator.py index 22f3e582..878b716b 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -53,7 +53,7 @@ ), style="{", ) -_log = logging.getLogger(__name__) +_log = logging.getLogger("lsst." + __name__) _log.setLevel(logging.DEBUG) app = Flask(__name__) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index aab7641e..c2b126a8 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -36,7 +36,7 @@ FLAT="flat.yaml", ) -_log = logging.getLogger(__name__) +_log = logging.getLogger("lsst." + __name__) _log.setLevel(logging.DEBUG) diff --git a/python/tester/upload.py b/python/tester/upload.py index 7a10fd5b..a7e9d42a 100644 --- a/python/tester/upload.py +++ b/python/tester/upload.py @@ -34,7 +34,7 @@ class Instrument: format="{levelname} {asctime} {name} - {message}", style="{", ) -_log = logging.getLogger(__name__) +_log = logging.getLogger("lsst." + __name__) _log.setLevel(logging.DEBUG) diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index cd1bc250..40a450af 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -41,7 +41,7 @@ def setUp(self): ra=10, dec=20, kind="BIAS") - self.logger_name = "activator.middleware_interface" + self.logger_name = "lsst.activator.middleware_interface" def test_init(self): """Basic tests of initializing an interface object. From e3e2106d3a279671b056bb5ddd6e02fba100a571 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 10 Mar 2022 18:17:07 -0600 Subject: [PATCH 6/6] Fix misleading link. --- doc/playbook.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/playbook.rst b/doc/playbook.rst index 7b0ec502..c4e9c7f7 100644 --- a/doc/playbook.rst +++ b/doc/playbook.rst @@ -131,7 +131,7 @@ tester On a local machine, it requires a JSON token in ``./prompt-proto-upload.json``. To obtain a token, see the GCP documentation on `service account keys`_; the relevant service account is ``prompt-image-upload@prompt-proto.iam.gserviceaccount.com``. -.. _service account keys: https://cloud.google.com/iam/docs/creating-managing-service-account-keys#getting_a_service_account_key +.. _service account keys: https://cloud.google.com/iam/docs/creating-managing-service-account-keys Run the tester either on a local machine, or in Cloud Shell. In Cloud Shell, install the prototype code and the GCP PubSub client: