From 4155c915156dd2fb38b5301f24fce84859145e60 Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Tue, 28 May 2024 02:40:23 -0700 Subject: [PATCH] Allow logging configuration. --- python/lsst/consdb/hinfo.py | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/python/lsst/consdb/hinfo.py b/python/lsst/consdb/hinfo.py index 5079f83..bfcf73c 100644 --- a/python/lsst/consdb/hinfo.py +++ b/python/lsst/consdb/hinfo.py @@ -1,9 +1,7 @@ import asyncio -import logging import os import random import re -import sys from dataclasses import dataclass from datetime import datetime from typing import TYPE_CHECKING, Any, Sequence @@ -21,7 +19,7 @@ from lsst.resources import ResourcePath from sqlalchemy import MetaData, Table from sqlalchemy.dialects.postgresql import insert -from utils import setup_postgres +from utils import setup_logging, setup_postgres if TYPE_CHECKING: import lsst.afw.cameraGeom # type: ignore @@ -332,7 +330,7 @@ def process_resource(resource: ResourcePath, instrument_dict: dict, update: bool resource : `ResourcePath` Path to the Header Service header resource. """ - global KW_MAPPING, OI_MAPPING + global KW_MAPPING, OI_MAPPING, logger global engine exposure_rec = dict() @@ -362,7 +360,7 @@ def process_resource(resource: ResourcePath, instrument_dict: dict, update: bool stmt = stmt.on_conflict_do_update(index_elements=["exposure_id"], set_=exposure_rec) else: stmt.on_conflict_do_nothing() - logging.debug(str(stmt)) + logger.debug(str(stmt)) with engine.begin() as conn: conn.execute(stmt) @@ -388,7 +386,7 @@ def process_resource(resource: ResourcePath, instrument_dict: dict, update: bool ) else: det_stmt = det_stmt.on_conflict_do_nothing() - logging.debug(str(det_stmt)) + logger.debug(str(det_stmt)) conn.execute(det_stmt) conn.commit() @@ -437,10 +435,10 @@ def get_kafka_config() -> KafkaConfig: ) -logging.basicConfig(stream=sys.stderr, level=logging.INFO) +logger = setup_logging(__name__) instrument = os.environ["INSTRUMENT"] -logging.info(f"Instrument = {instrument}") +logger.info(f"Instrument = {instrument}") bucket_prefix = os.environ.get("BUCKET_PREFIX", "") if bucket_prefix: os.environ["LSST_DISABLE_BUCKET_VALIDATION"] = "1" @@ -479,7 +477,7 @@ def __init__(self, instrument_name, translator, instrument_mapping, det_mapping, async def main() -> None: """Handle Header Service largeFileObjectAvailable messages.""" - global instrument, bucket_prefix, TOPIC_MAPPING + global logger, instrument, bucket_prefix, TOPIC_MAPPING if instrument == "LATISS": instrument_dict = { @@ -540,22 +538,22 @@ async def main() -> None: ) await consumer.start() - logging.info("Consumer started") + logger.info("Consumer started") try: async for msg in consumer: message = (await deserializer.deserialize(msg.value))["message"] - logging.debug(f"Received message {message}") + logger.debug(f"Received message {message}") url = message["url"] # Replace local HTTP access URL with generic S3 access URL. url = re.sub(r"https://s3\.\w+\.lsst\.org/", "s3://", url) if bucket_prefix: url = re.sub(r"s3://", "s3://" + bucket_prefix, url) resource = ResourcePath(url) - logging.info(f"Waiting for {url}") + logger.info(f"Waiting for {url}") while not resource.exists(): await asyncio.sleep(random.uniform(0.1, 2.0)) process_resource(resource, instrument_dict) - logging.info(f"Processed {url}") + logger.info(f"Processed {url}") finally: await consumer.stop()