Skip to content

Commit

Permalink
Allow logging configuration.
Browse files Browse the repository at this point in the history
  • Loading branch information
ktlim committed May 28, 2024
1 parent eb043fe commit 8f9a6b9
Showing 1 changed file with 11 additions and 13 deletions.
24 changes: 11 additions & 13 deletions python/lsst/consdb/hinfo.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import asyncio
import logging
import os
import random
import re
import sys
from dataclasses import dataclass
from datetime import datetime
from typing import TYPE_CHECKING, Any, Sequence
Expand All @@ -21,7 +19,7 @@
from lsst.resources import ResourcePath
from sqlalchemy import MetaData, Table
from sqlalchemy.dialects.postgresql import insert
from utils import setup_postgres
from utils import setup_logging, setup_postgres

if TYPE_CHECKING:
import lsst.afw.cameraGeom # type: ignore
Expand Down Expand Up @@ -332,7 +330,7 @@ def process_resource(resource: ResourcePath, instrument_dict: dict, update: bool
resource : `ResourcePath`
Path to the Header Service header resource.
"""
global KW_MAPPING, OI_MAPPING
global KW_MAPPING, OI_MAPPING, logger
global engine

exposure_rec = dict()
Expand Down Expand Up @@ -362,7 +360,7 @@ def process_resource(resource: ResourcePath, instrument_dict: dict, update: bool
stmt = stmt.on_conflict_do_update(index_elements=["exposure_id"], set_=exposure_rec)
else:
stmt.on_conflict_do_nothing()
logging.debug(str(stmt))
logger.debug(str(stmt))
with engine.begin() as conn:
conn.execute(stmt)

Expand All @@ -388,7 +386,7 @@ def process_resource(resource: ResourcePath, instrument_dict: dict, update: bool
)
else:
det_stmt = det_stmt.on_conflict_do_nothing()
logging.debug(str(det_stmt))
logger.debug(str(det_stmt))
conn.execute(det_stmt)

conn.commit()
Expand Down Expand Up @@ -437,10 +435,10 @@ def get_kafka_config() -> KafkaConfig:
)


logging.basicConfig(stream=sys.stderr, level=logging.INFO)
logger = setup_logging(__name__)

instrument = os.environ["INSTRUMENT"]
logging.info(f"Instrument = {instrument}")
logger.info(f"Instrument = {instrument}")
bucket_prefix = os.environ.get("BUCKET_PREFIX", "")
if bucket_prefix:
os.environ["LSST_DISABLE_BUCKET_VALIDATION"] = "1"
Expand Down Expand Up @@ -479,7 +477,7 @@ def __init__(self, instrument_name, translator, instrument_mapping, det_mapping,

async def main() -> None:
"""Handle Header Service largeFileObjectAvailable messages."""
global instrument, bucket_prefix, TOPIC_MAPPING
global logger, instrument, bucket_prefix, TOPIC_MAPPING

if instrument == "LATISS":
instrument_dict = {
Expand Down Expand Up @@ -540,22 +538,22 @@ async def main() -> None:
)

await consumer.start()
logging.info("Consumer started")
logger.info("Consumer started")
try:
async for msg in consumer:
message = (await deserializer.deserialize(msg.value))["message"]
logging.debug(f"Received message {message}")
logger.debug(f"Received message {message}")
url = message["url"]
# Replace local HTTP access URL with generic S3 access URL.
url = re.sub(r"https://s3\.\w+\.lsst\.org/", "s3://", url)
if bucket_prefix:
url = re.sub(r"s3://", "s3://" + bucket_prefix, url)
resource = ResourcePath(url)
logging.info(f"Waiting for {url}")
logger.info(f"Waiting for {url}")
while not resource.exists():
await asyncio.sleep(random.uniform(0.1, 2.0))
process_resource(resource, instrument_dict)
logging.info(f"Processed {url}")
logger.info(f"Processed {url}")
finally:
await consumer.stop()

Expand Down

0 comments on commit 8f9a6b9

Please sign in to comment.