From eaa5f92e141298990c5c565c02221214d682e28f Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Mon, 28 Aug 2023 16:31:01 -0700 Subject: [PATCH 01/26] Add initial code. --- README.rst | 2 +- SConstruct | 4 - bin.src/SConscript | 3 - python/lsst/consdb/header_proc.py | 85 +++++++++++++ python/lsst/consdb/summarize_efd.py | 185 ++++++++++++++++++++++++++++ ups/consdb.table | 9 -- 6 files changed, 271 insertions(+), 17 deletions(-) delete mode 100644 SConstruct delete mode 100644 bin.src/SConscript create mode 100644 python/lsst/consdb/header_proc.py create mode 100644 python/lsst/consdb/summarize_efd.py delete mode 100644 ups/consdb.table diff --git a/README.rst b/README.rst index 4cebb84..8930889 100644 --- a/README.rst +++ b/README.rst @@ -2,4 +2,4 @@ consdb ###### -This CSC listens for SAL events, executes EFD queries when they arrive, and writes results to columns in relational database tables in the Consolidated Database. +Scripts and services for generating the Summit Visit Database and Consolidated Database (ConsDB), including summarizing the Engineering and Facilities Database (EFD). \ No newline at end of file diff --git a/SConstruct b/SConstruct deleted file mode 100644 index 378aa93..0000000 --- a/SConstruct +++ /dev/null @@ -1,4 +0,0 @@ -# -*- python -*- -from lsst.sconsUtils import scripts -# Python-only package -scripts.BasicSConstruct("consdb", disableCc=True, noCfgFile=True) diff --git a/bin.src/SConscript b/bin.src/SConscript deleted file mode 100644 index e00724c..0000000 --- a/bin.src/SConscript +++ /dev/null @@ -1,3 +0,0 @@ -# -*- python -*- -from lsst.sconsUtils import scripts -scripts.BasicSConscript.shebang() diff --git a/python/lsst/consdb/header_proc.py b/python/lsst/consdb/header_proc.py new file mode 100644 index 0000000..ac032ba --- /dev/null +++ b/python/lsst/consdb/header_proc.py @@ -0,0 +1,85 @@ +import asyncio +import json +import os +import random +import time +from typing import TYPE_CHECKING + +import aiokafka +import httpx +import kafkit +from lsst.resources import ResourcePath +from sqlalchemy import create_engine, text + +if TYPE_CHECKING: + import lsst.resources + +# Environment variables from deployment + +kafka_cluster = os.environ["KAFKA_CLUSTER"] +schema_url = os.environ["SCHEMA_URL"] +db_url = os.environ["DB_URL"] +tenant = os.environ.get("BUCKET_TENANT", None) +kafka_group_id = 1 + +topic = "lsst.ATHeaderService.logevent_largeFileObjectAvailable" + +engine = create_engine(db_url) + + +def process_resource(resource: lsst.resources.ResourcePath) -> None: + content = json.loads(resource.read()) + with engine.begin() as conn: + # TODO get all fields and tables, do as a transaction + conn.execute( + text("INSERT INTO exposure (a, b, c, d, e)" " VALUES(:a, :b, :c, :d, :e)"), + [dict(a=content["something"], b=2, c=3, d=4, e=5)], + ) + # TODO check result + + +async def main() -> None: + async with httpx.AsyncClient() as client: + schema_registry = kafkit.registry.RegistryApi(client=client, url=schema_url) + deserializer = kafkit.registry.Deserializer(registry=schema_registry) + + # Alternative 2: + # Something like + # asyncio.run(queue_check()) + + consumer = aiokafka.AIOKafkaConsumer( + topic, + bootstrap_servers=kafka_cluster, + group_id=kafka_group_id, + ) + await consumer.start() + try: + async for msg in consumer: + message = (await deserializer.deserialize(msg.value)).message + resource = ResourcePath(message.url) + if tenant: + new_scheme = resource.scheme + new_netloc = f"{tenant}:{resource.netloc}" + new_path = resource.path + resource = ResourcePath(f"{new_scheme}://{new_netloc}/{new_path}") + # Alternative 1: block for file + while not resource.exists(): + time.sleep(random.uniform(0.1, 2.0)) + process_resource(resource) + + # Alternative 2: queue + # r.sadd("EXPOSURE_QUEUE", str(resource)) + + finally: + await consumer.stop() + + +# Alternative 2: +# async def queue_check(): +# resource_list = r.slist("EXPOSURE_QUEUE", 0, -1) +# for resource in resource_list: +# if r.exists(): +# process_resource(resource) +# r.sremove("EXPOSURE_QUEUE", resource) + +asyncio.run(main()) diff --git a/python/lsst/consdb/summarize_efd.py b/python/lsst/consdb/summarize_efd.py new file mode 100644 index 0000000..f8da6ae --- /dev/null +++ b/python/lsst/consdb/summarize_efd.py @@ -0,0 +1,185 @@ +import argparse +from typing import TYPE_CHECKING, Any, Callable + +import astropy.time +import lsst_efd_client +import yaml +from lsst.daf.butler import Butler +from sqlalchemy import create_engine + +if TYPE_CHECKING: + import lsst.daf.butler + import pandas + import sqlalchemy + + +class Summary: + # TODO define summary + pass + + +# TODO add all summarizing functions +def gen_mean( + config: dict[str, Any] +) -> Callable[[pandas.DataFrame, astropy.time.Time, astropy.time.Time], Summary]: + def do( + series: pandas.DataFrame, start: astropy.time.Time, end: astropy.time.Time + ) -> Summary: + return Summary() + + return do + + +FUNCTION_GENERATORS = dict(mean=gen_mean) + + +class EfdValues: + def __init__( + self, + config: dict[str, Any], + window: astropy.time.TimeDelta, + series: pandas.DataFrame, + ): + self._entries = series + self._sum_function = FUNCTION_GENERATORS[config["function"]](config) + self._window = window + + def summarize(self, start: astropy.time.Time, end: astropy.time.Time) -> Any: + return self._sum_function( + self._entries, start - self._window, end + self._window + ) + + +class Records: + def __init__(self, db: sqlalchemy.Engine): + self._db = db + + def add( + self, dim: lsst.daf.butler.DimensionRecord, topic: dict[str, Any], summary: Any + ) -> None: + pass + + def write(self, table: str) -> None: + pass + + +def read_config(config_name: str) -> dict[str, Any]: + with open(config_name, "r") as f: + return yaml.safe_load(f) + + +def get_efd_values( + efd: lsst_efd_client.EfdClient, + topic: dict[str, Any], + start: astropy.time.Time, + end: astropy.time.Time, +) -> pandas.DataFrame: + window = astropy.time.TimeDelta(topic.get("window", 0.0), format="sec") + series = efd.select_time_series( + topic["name"], + topic["fields"], + start - window, + end + window, + topic.get("index", None), + ) + return EfdValues(topic, window, series) + + +def process_interval( + butler: Butler, + db: sqlalchemy.Engine, + efd: lsst_efd_client.EfdClient, + config: dict[str, Any], + instrument: str, + start_time: str, + end_time: str, +) -> None: + start = astropy.time.Time(start_time, format="isot") + end = astropy.time.Time(end_time, format="isot") + + exposure_list = [] + visit_list = [] + min_topic_time = end + max_topic_time = start + + where_clause = "instrument=instr and timespan OVERLAPS (start, end)" + + for e in butler.queryDimensionRecords( + "exposure", + where=where_clause, + bind=dict(instr=instrument, start=start, end=end), + ): + if e.timespan.end < end: + exposure_list.append(e) + min_topic_time = min(e.timespan.begin, min_topic_time) + max_topic_time = max(e.timespan.begin, max_topic_time) + + for v in butler.queryDimensionRecords( + "visit", where=where_clause, bind=dict(instr=instrument, start=start, end=end) + ): + if v.timespan.end < end: + visit_list.append(v) + min_topic_time = min(v.timespan.begin, min_topic_time) + max_topic_time = max(v.timespan.begin, max_topic_time) + + exposure_records = Records(db) + visit_records = Records(db) + for topic in config["topics"]: + efd_values = get_efd_values(efd, topic, min_topic_time, max_topic_time) + for e in exposure_list: + summary = efd_values.summarize(e.timespan.begin, e.timespan.end) + exposure_records.add(e, topic, summary) + for v in visit_list: + summary = efd_values.summarize(v.timespan.begin, v.timespan.end) + visit_records.add(v, topic, summary) + + exposure_records.write(config["exposure_table"]) + visit_records.write(config["visit_table"]) + + +def build_argparser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Summarize EFD topics in a time range") + parser.add_argument( + "-c", "--config", dest="config_name", required=True, help="config YAML" + ) + parser.add_argument( + "-i", "--instrument", dest="instrument", required=True, help="instrument name" + ) + parser.add_argument( + "-s", + "--start", + dest="start_time", + required=True, + help="start time (ISO, YYYY-MM-DDTHH:MM:SS)", + ) + parser.add_argument( + "-e", + "--end", + dest="end_time", + required=True, + help="end time (ISO, YYYY-MM-DDTHH:MM:SS)", + ) + parser.add_argument("-r", "--repo", dest="repo", required=True, help="Butler repo") + parser.add_argument( + "-d", + "--db", + dest="db_conn_str", + required=True, + help="Consolidated Database connection string", + ) + parser.add_argument( + "-E", "--efd", dest="efd_conn_str", required=True, help="EFD connection string" + ) + return parser + + +def main() -> None: + parser = build_argparser() + args = parser.parse_args() + butler = Butler(args.repo) + db = create_engine(args.db_conn_str) + efd = lsst_efd_client.EfdClient(args.efd_conn_str) + config = read_config(args.config_name) + process_interval( + butler, db, efd, config, args.instrument, args.start_time, args.end_time + ) diff --git a/ups/consdb.table b/ups/consdb.table deleted file mode 100644 index e3b1923..0000000 --- a/ups/consdb.table +++ /dev/null @@ -1,9 +0,0 @@ -# List EUPS dependencies of this package here. -# - Any package whose API is used directly should be listed explicitly. -# - Common third-party packages can be assumed to be recursively included by -# the "sconsUtils" package. -setupRequired(sconsUtils) - -# The following is boilerplate for all packages. -# See https://dmtn-001.lsst.io for details on LSST_LIBRARY_PATH. -envPrepend(PYTHONPATH, ${PRODUCT_DIR}/python) From d2c1be6af8c483fa917e709a5ad9d0dbbbf2789e Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Tue, 20 Feb 2024 04:56:41 -0800 Subject: [PATCH 02/26] Prototypes for JTM demo. --- python/lsst/consdb/__init__.py | 5 +- python/lsst/consdb/client.py | 58 ++++++++++++ python/lsst/consdb/header_proc.py | 10 +- python/lsst/consdb/hinfo-latiss.py | 144 +++++++++++++++++++++++++++++ python/lsst/consdb/server.py | 57 ++++++++++++ 5 files changed, 266 insertions(+), 8 deletions(-) create mode 100644 python/lsst/consdb/client.py create mode 100644 python/lsst/consdb/hinfo-latiss.py create mode 100644 python/lsst/consdb/server.py diff --git a/python/lsst/consdb/__init__.py b/python/lsst/consdb/__init__.py index 810dc34..073f265 100644 --- a/python/lsst/consdb/__init__.py +++ b/python/lsst/consdb/__init__.py @@ -19,7 +19,4 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -try: - from .version import * # Generated by sconsUtils -except ImportError: - __version__ = "?" +# from .version import * # Generated by sconsUtils \ No newline at end of file diff --git a/python/lsst/consdb/client.py b/python/lsst/consdb/client.py new file mode 100644 index 0000000..0e1beb3 --- /dev/null +++ b/python/lsst/consdb/client.py @@ -0,0 +1,58 @@ +import json +import os +from pandas import DataFrame +import requests +from typing import Any, Iterable +from urllib.parse import urljoin + +session = requests.Session() +base_url = os.environ["CONSDB_URL"] + + +def insert(table: str, values: dict[str, Any], **kwargs): + values.update(kwargs) + # check values against schema for table + data = {"table": table, "values": values} + url = urljoin(base_url, "insert") + try: + response = requests.post(url, json=data) + except: + raise + response.raise_for_status() + + +def query( + tables: str | Iterable[str], + columns: str | Iterable[str], + *, + where: str | None = None, + join: str | None = None +) -> list[Any]: + if isinstance(tables, str): + tables = [tables] + if isinstance(columns, str): + columns = [columns] + url = urljoin(base_url, "query") + data = {"tables": tables, "columns": columns, "where": where, "join": join} + try: + response = requests.post(url, json=data) + except: + raise + try: + response.raise_for_status() + except: + print(response.content.decode()) + raise + arr = response.json() + return DataFrame(arr[1:], columns=arr[0]) + + +def schema(table: str): + url = urljoin(base_url, "schema/") + url = urljoin(url, table) + try: + response = requests.get(url) + except: + raise + response.raise_for_status() + return response.json() diff --git a/python/lsst/consdb/header_proc.py b/python/lsst/consdb/header_proc.py index ac032ba..551684e 100644 --- a/python/lsst/consdb/header_proc.py +++ b/python/lsst/consdb/header_proc.py @@ -31,10 +31,11 @@ def process_resource(resource: lsst.resources.ResourcePath) -> None: content = json.loads(resource.read()) with engine.begin() as conn: # TODO get all fields and tables, do as a transaction - conn.execute( - text("INSERT INTO exposure (a, b, c, d, e)" " VALUES(:a, :b, :c, :d, :e)"), - [dict(a=content["something"], b=2, c=3, d=4, e=5)], - ) + # conn.execute( + # text("INSERT INTO exposure (a, b, c, d, e)" " VALUES(:a, :b, :c, :d, :e)"), + # [dict(a=content["something"], b=2, c=3, d=4, e=5)], + # ) + print(f"Processing {resource}: {content[0:100]}") # TODO check result @@ -51,6 +52,7 @@ async def main() -> None: topic, bootstrap_servers=kafka_cluster, group_id=kafka_group_id, + auto_offset_reset="earliest", ) await consumer.start() try: diff --git a/python/lsst/consdb/hinfo-latiss.py b/python/lsst/consdb/hinfo-latiss.py new file mode 100644 index 0000000..91239d0 --- /dev/null +++ b/python/lsst/consdb/hinfo-latiss.py @@ -0,0 +1,144 @@ +from datetime import datetime +import os +import sys +from typing import Any, Iterable + +import yaml +from astropy.time import Time +from sqlalchemy import create_engine, MetaData, Table +from sqlalchemy.dialects.postgresql import insert + +from astro_metadata_translator import ObservationInfo +from lsst.resources import ResourcePath +from lsst.obs.lsst.translators import LatissTranslator + +os.environ["LSST_DISABLE_BUCKET_VALIDATION"] = "1" + +engine = create_engine("postgresql://usdf-butler.slac.stanford.edu:5432/lsstdb1") +metadata_obj = MetaData(schema="cdb_latiss") +exposure_table = Table("exposure", metadata_obj, autoload_with=engine) + +def ninety_minus(angle: float) -> float: + return 90.0 - angle + +def tai_convert(t: str) -> datetime: + return Time(t, format="isot", scale="tai").datetime + +def tai_mean(start: str, end: str) -> datetime: + s = Time(start, format="isot", scale="tai") + e = Time(end, format="isot", scale="tai") + return (s + (e - s) / 2).datetime + +def mean(*iterable: Iterable[Any]) -> Any: + return sum(iterable) / len(iterable) + +def logical_or(*bools: Iterable[int | str | None]) -> bool: + return any([b == 1 or b == "1" for b in bools]) + + +KW_MAPPING = { + "controller": "CONTRLLR", + "seq_num": "SEQNUM", + "band": "FILTBAND", + "ra": "RA", + "decl": "DEC", + "skyrotation": "ROTPA", + "azimuth_start": "AZSTART", + "azimuth_end": "AZEND", + "altitude_start": (ninety_minus, "ELSTART"), + "altitude_end": (ninety_minus, "ELEND"), + "zenithdistance_start": "ELSTART", + "zenithdistance_end": "ELEND", + "expmidpt": (tai_mean, "DATE-BEG", "DATE-END"), + "expmidptmjd": (mean, "MJD-BEG", "MJD-END"), + "obsstart": (tai_convert, "DATE-BEG"), + "obsstartmjd": "MJD-BEG", + "obsend": (tai_convert, "DATE-END"), + "obsendmjd": "MJD-END", + "exptime": "EXPTIME", + "shuttime": "SHUTTIME", + "darktime": "DARKTIME", + "group_id": "GROUPID", + "curindex": "CURINDEX", + "maxindex": "MAXINDEX", + "imgtype": "IMGTYPE", + "emulated": (logical_or, "EMUIMAGE"), + "science_program": "PROGRAM", + "observation_reason": "REASON", + "target_name": "OBJECT", + "airtemp": "AIRTEMP", + "pressure": "PRESSURE", + "humidity": "HUMIDITY", + "wind_speed": "WINDSPD", + "wind_dir": "WINDDIR", + "dimm_seeing": "SEEING", +} + +LATISS_MAPPING = { + "focus_z": "FOCUSZ", + "dome_azimuth": "DOMEAZ", + "shut_lower": "SHUTLOWR", + "shut_upper": "SHUTUPPR", +# "temp_set": "TEMP_SET", + "simulated": (logical_or, "SIMULATE ATMCS", "SIMULATE ATHEXAPOD", "SIMULAT ATPNEUMATICS", "SIMULATE ATDOME", "SIMULATE ATSPECTROGRAPH"), +} + +# LATISS_DETECTOR_MAPPING = { +# "ccdtemp": "CCDTEMP", +# } + +OI_MAPPING = { + "exposure_id": "exposure_id", + "physical_filter": "physical_filter", + "airmass": "boresight_airmass", + "day_obs": "observing_day", +} + + +def process_keyword(keyword: str | tuple, info: dict) -> Any: + if type(keyword) == str: + if keyword in info: + return info[keyword] + elif type(keyword) == tuple: + fn = keyword[0] + args = keyword[1:] + if all([a in info for a in args]): + return fn(*[info[a] for a in args]) + +def process_resource(resource: ResourcePath) -> None: + content = yaml.safe_load(resource.read()) + exposure_rec = dict() + + info = dict() + for header in content["PRIMARY"]: + info[header["keyword"]] = header["value"] + for field, keyword in KW_MAPPING.items(): + exposure_rec[field] = process_keyword(keyword, info) + for field, keyword in LATISS_MAPPING.items(): + exposure_rec[field] = process_keyword(keyword, info) + +# det_info = dict() +# for header in content["R00S00_PRIMARY"]: +# det_info[header["keyword"]] = header["value"] +# for field, keyword in LATISS_DETECTOR_MAPPING.items(): +# det_exposure_rec[field] = process_keyword(keyword, det_info) + + obs_info_obj = ObservationInfo(info, translator_class=LatissTranslator) + obs_info = dict() + for keyword in OI_MAPPING.values(): + obs_info[keyword] = getattr(obs_info_obj, keyword) + for field, keyword in OI_MAPPING.items(): + exposure_rec[field] = process_keyword(keyword, obs_info) + + stmt = insert(exposure_table).values(exposure_rec).on_conflict_do_nothing() + with engine.begin() as conn: + result = conn.execute(stmt) + + print(exposure_rec) + + +date = "/".join(sys.argv[1].split("-")) +d = ResourcePath(f"s3://rubin:rubinobs-lfa-cp/ATHeaderService/header/{date}/") +for dirpath, dirnames, filenames in d.walk(): + for fname in filenames: + process_resource(d.join(fname)) diff --git a/python/lsst/consdb/server.py b/python/lsst/consdb/server.py new file mode 100644 index 0000000..106c900 --- /dev/null +++ b/python/lsst/consdb/server.py @@ -0,0 +1,57 @@ +from flask import Flask, request +from sqlalchemy import create_engine, MetaData, Table +from sqlalchemy.dialects.postgresql import insert +import sqlalchemy.exc + + +app = Flask(__name__) +engine = create_engine("postgresql://usdf-butler.slac.stanford.edu:5432/lsstdb1") +metadata_obj = MetaData(schema="cdb_latiss") +metadata_obj.reflect(engine) + + +@app.post("/insert") +def insert(): + info = request.json + table = info["table"] + valdict = info["values"] + keylist = list(valdict.keys()) + valuelist = list(valdict.values()) + placeholders = ",".join(["?"] * len(valdict)) + # check schema + + with engine.begin() as conn: + conn.exec_driver_sql( + f"INSERT OR UPDATE INTO ? ({placeholders}) VALUES ({placeholders})", + [table] + keylist + valuelist, + ) + return ("OK", 200) + + +@app.post("/query") +def query(): + info = request.json + tables = ",".join(info["tables"]) + columns = ",".join(info["columns"]) + if "where" in info: + where = "WHERE " + info["where"] + if ";" in where: + return ("Cannot create query containing more than one statement", 403) + with engine.begin() as conn: + try: + cursor = conn.exec_driver_sql(f"SELECT {columns} FROM {tables} {where}") + first = True + result = [] + for row in cursor: + if first: + result.append(row._fields) + first = False + result.append(list(row)) + return result + except sqlalchemy.exc.DBAPIError as e: + return (str(e), 500) + + +@app.get("/schema/") +def schema(table: str): + return [(c.name, str(c.type), c.doc) for c in metadata_obj.tables[table.lower()].columns] From 60c07b44b3e2e43ecd5ebbfcc54570fd67f3b1fa Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Wed, 21 Feb 2024 17:27:27 -0800 Subject: [PATCH 03/26] Start sketching Kafka interface. --- python/lsst/consdb/hinfo-latiss.py | 41 +++++++++++++++++++++++++----- 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/python/lsst/consdb/hinfo-latiss.py b/python/lsst/consdb/hinfo-latiss.py index 91239d0..1f7623e 100644 --- a/python/lsst/consdb/hinfo-latiss.py +++ b/python/lsst/consdb/hinfo-latiss.py @@ -12,11 +12,9 @@ from lsst.resources import ResourcePath from lsst.obs.lsst.translators import LatissTranslator -os.environ["LSST_DISABLE_BUCKET_VALIDATION"] = "1" +# import Kafka interface + -engine = create_engine("postgresql://usdf-butler.slac.stanford.edu:5432/lsstdb1") -metadata_obj = MetaData(schema="cdb_latiss") -exposure_table = Table("exposure", metadata_obj, autoload_with=engine) def ninety_minus(angle: float) -> float: return 90.0 - angle @@ -94,6 +92,19 @@ def logical_or(*bools: Iterable[int | str | None]) -> bool: "day_obs": "observing_day", } +TOPIC_MAPPING = { + "LATISS": "ATHeaderService", + "LSSTComCam": "CCHeaderService", + "LSSTCam": "MTHeaderService", +} + + +url = os.environ.get("POSTGRES_URL", "postgresql://usdf-butler.slac.stanford.edu:5432/lsstdb1") +engine = create_engine(url) +instrument = os.environ.get("INSTRUMENT", "LATISS") +metadata_obj = MetaData(schema=f"cdb_{instrument.lower()}") +exposure_table = Table("exposure", metadata_obj, autoload_with=engine) + def process_keyword(keyword: str | tuple, info: dict) -> Any: if type(keyword) == str: @@ -106,6 +117,8 @@ def process_keyword(keyword: str | tuple, info: dict) -> Any: return fn(*[info[a] for a in args]) def process_resource(resource: ResourcePath) -> None: + global engine, exposure_table + content = yaml.safe_load(resource.read()) exposure_rec = dict() @@ -134,11 +147,27 @@ def process_resource(resource: ResourcePath) -> None: with engine.begin() as conn: result = conn.execute(stmt) - print(exposure_rec) + # print(exposure_rec) + + +site = os.environ.get("SITE", "USDF") +if site == "USDF": + os.environ["LSST_DISABLE_BUCKET_VALIDATION"] = "1" + bucket_prefix = "rubin:" +else: + bucket_prefix = "" +# For Kafka: +# consumer = configure_kafka() +# while True: +# msgs = consumer.consume() +# for msg in msgs: +# re.sub(r"s3://", "s3://" + bucket_prefix, msg.data) +# process_resource(msg.data) +# To process all of a given date: date = "/".join(sys.argv[1].split("-")) -d = ResourcePath(f"s3://rubin:rubinobs-lfa-cp/ATHeaderService/header/{date}/") +d = ResourcePath(f"s3://{bucket_prefix}rubinobs-lfa-cp/{TOPIC_MAPPING[instrument]}/header/{date}/") for dirpath, dirnames, filenames in d.walk(): for fname in filenames: process_resource(d.join(fname)) From 8328d1446957ac06fbf1586d9a32c2a5885548c6 Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Wed, 21 Feb 2024 17:27:50 -0800 Subject: [PATCH 04/26] Start building service container. --- .github/workflows/build.yaml | 42 ++++++++++++++++++++++++++++++++++++ Dockerfile.hinfo | 12 +++++++++++ 2 files changed, 54 insertions(+) create mode 100644 .github/workflows/build.yaml create mode 100644 Dockerfile.hinfo diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml new file mode 100644 index 0000000..c5ac25d --- /dev/null +++ b/.github/workflows/build.yaml @@ -0,0 +1,42 @@ +name: CI build of all containers +on: + push: + branches: + - main + tags: + - v* + pull_request: + +jobs: + push: + runs-on: ubuntu-latest + permissions: + packages: write + contents: read + + steps: + - name: Checkout + uses: actions/checkout@v3 + + - name: Build hinfo image + run: | + docker build . \ + -f Dockerfile.hinfo \ + --tag consdb-hinfo \ + --label "runnumber=${GITHUB_RUN_ID}" + + - name: Log in to GitHub Container Registry + run: echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io -u $ --password-stdin + + - name: Push image + run: | + ID=ghcr.io/${{ github.repository_owner }}/consdb-hinfo + if [[ "${{ github.ref }}" == "refs/pull/"* ]]; then + VERSION=$(echo "${{ github.head_ref }}" | sed -e 's|.*/||') + elif [[ "${{ github.ref }}" == "refs/tags/"* ]]; then + VERSION=$(echo "${{ github.ref_name }}" | sed -e 's|^v||') + else + VERSION=latest + fi + docker tag consdb-hinfo $ID:$VERSION + docker push $ID:$VERSION diff --git a/Dockerfile.hinfo b/Dockerfile.hinfo new file mode 100644 index 0000000..ca29f35 --- /dev/null +++ b/Dockerfile.hinfo @@ -0,0 +1,12 @@ +ARG RUBINENV_VERSION=8.0.0 +FROM lsstsqre/newinstall:${RUBINENV_VERSION} +ARG OBS_LSST_VERSION +ENV OBS_LSST_VERSION=${OBS_LSST_VERSION:-w_2024_06} +USER lsst +RUN source loadLSST.bash && mamba install confluent-kafka +RUN source loadLSST.bash && eups distrib install -t "${OBS_LSST_VERSION}" obs_lsst +COPY python/lsst/consdb/hinfo.py ./hinfo/ +# Environment variables that must be set: +# POSTGRES_URL INSTRUMENT SITE + +ENTRYPOINT [ "bash", "-c", "source loadLSST.bash; setup obs_lsst; python hinfo/hinfo.py" ] From 4c70e82cb3d4f080bb7aac5ce1df7a31ed51939d Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Wed, 21 Feb 2024 17:30:32 -0800 Subject: [PATCH 05/26] Add server Dockerfile. --- Dockerfile.server | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 Dockerfile.server diff --git a/Dockerfile.server b/Dockerfile.server new file mode 100644 index 0000000..9c2dd9c --- /dev/null +++ b/Dockerfile.server @@ -0,0 +1,8 @@ +FROM python:3.11 +RUN pip install flask gunicorn sqlalchemy +WORKDIR /consdb-server +COPY src/server.py /consdb-server/ +# Environment variables that must be set: +# POSTGRES_URL +ENTRYPOINT [ "gunicorn", "-b", "0.0.0.0:8000", "-w", "2", "server:app" ] + From 439e9d60fbc0dec70c726aee720c502d31229e4f Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Wed, 21 Feb 2024 17:30:32 -0800 Subject: [PATCH 06/26] Add server Dockerfile. --- python/lsst/consdb/client.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/python/lsst/consdb/client.py b/python/lsst/consdb/client.py index 0e1beb3..b756a50 100644 --- a/python/lsst/consdb/client.py +++ b/python/lsst/consdb/client.py @@ -1,7 +1,7 @@ -import json import os from pandas import DataFrame import requests +from requests.exceptions import RequestException from typing import Any, Iterable from urllib.parse import urljoin @@ -16,8 +16,8 @@ def insert(table: str, values: dict[str, Any], **kwargs): url = urljoin(base_url, "insert") try: response = requests.post(url, json=data) - except: - raise + except (RequestException) as e: + raise e response.raise_for_status() @@ -36,8 +36,8 @@ def query( data = {"tables": tables, "columns": columns, "where": where, "join": join} try: response = requests.post(url, json=data) - except: - raise + except (RequestException) as e: + raise e try: response.raise_for_status() except: @@ -52,7 +52,7 @@ def schema(table: str): url = urljoin(url, table) try: response = requests.get(url) - except: - raise + except (RequestException) as e: + raise e response.raise_for_status() return response.json() From 2af2c370d18ad6eb89a73b58f8c2de70988c76be Mon Sep 17 00:00:00 2001 From: William O'Mullane Date: Tue, 27 Feb 2024 15:34:09 -0300 Subject: [PATCH 07/26] bump python and checkout --- .github/workflows/build.yaml | 2 +- .github/workflows/lint.yaml | 6 +++--- python/lsst/consdb/client.py | 12 ++++++------ python/lsst/consdb/header_proc.py | 8 +++++--- python/lsst/consdb/hinfo-latiss.py | 24 +++++++++++++++--------- python/lsst/consdb/server.py | 3 +-- 6 files changed, 31 insertions(+), 24 deletions(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index c5ac25d..728fb83 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -16,7 +16,7 @@ jobs: steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Build hinfo image run: | diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index ce34a7b..3733b42 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -8,12 +8,12 @@ jobs: lint: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Set up Python - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: - python-version: 3.9 + python-version: '3.11' - name: Install run: pip install -r <(curl https://raw.githubusercontent.com/lsst/linting/main/requirements.txt) diff --git a/python/lsst/consdb/client.py b/python/lsst/consdb/client.py index b756a50..fa7d8fc 100644 --- a/python/lsst/consdb/client.py +++ b/python/lsst/consdb/client.py @@ -16,7 +16,7 @@ def insert(table: str, values: dict[str, Any], **kwargs): url = urljoin(base_url, "insert") try: response = requests.post(url, json=data) - except (RequestException) as e: + except RequestException as e: raise e response.raise_for_status() @@ -36,13 +36,13 @@ def query( data = {"tables": tables, "columns": columns, "where": where, "join": join} try: response = requests.post(url, json=data) - except (RequestException) as e: - raise e + except RequestException as e: + raise e try: response.raise_for_status() - except: + except Exception as ex: print(response.content.decode()) - raise + raise ex arr = response.json() return DataFrame(arr[1:], columns=arr[0]) @@ -52,7 +52,7 @@ def schema(table: str): url = urljoin(url, table) try: response = requests.get(url) - except (RequestException) as e: + except RequestException as e: raise e response.raise_for_status() return response.json() diff --git a/python/lsst/consdb/header_proc.py b/python/lsst/consdb/header_proc.py index 551684e..4c9ae20 100644 --- a/python/lsst/consdb/header_proc.py +++ b/python/lsst/consdb/header_proc.py @@ -9,7 +9,7 @@ import httpx import kafkit from lsst.resources import ResourcePath -from sqlalchemy import create_engine, text +from sqlalchemy import create_engine if TYPE_CHECKING: import lsst.resources @@ -30,9 +30,11 @@ def process_resource(resource: lsst.resources.ResourcePath) -> None: content = json.loads(resource.read()) with engine.begin() as conn: + print(conn) # TODO get all fields and tables, do as a transaction # conn.execute( - # text("INSERT INTO exposure (a, b, c, d, e)" " VALUES(:a, :b, :c, :d, :e)"), + # text("INSERT INTO exposure (a, b, c, d, e)" + # " VALUES(:a, :b, :c, :d, :e)"), # [dict(a=content["something"], b=2, c=3, d=4, e=5)], # ) print(f"Processing {resource}: {content[0:100]}") @@ -51,7 +53,7 @@ async def main() -> None: consumer = aiokafka.AIOKafkaConsumer( topic, bootstrap_servers=kafka_cluster, - group_id=kafka_group_id, + group_id=str(kafka_group_id), auto_offset_reset="earliest", ) await consumer.start() diff --git a/python/lsst/consdb/hinfo-latiss.py b/python/lsst/consdb/hinfo-latiss.py index 1f7623e..df30508 100644 --- a/python/lsst/consdb/hinfo-latiss.py +++ b/python/lsst/consdb/hinfo-latiss.py @@ -15,21 +15,24 @@ # import Kafka interface - def ninety_minus(angle: float) -> float: return 90.0 - angle + def tai_convert(t: str) -> datetime: return Time(t, format="isot", scale="tai").datetime + def tai_mean(start: str, end: str) -> datetime: s = Time(start, format="isot", scale="tai") e = Time(end, format="isot", scale="tai") return (s + (e - s) / 2).datetime + def mean(*iterable: Iterable[Any]) -> Any: return sum(iterable) / len(iterable) + def logical_or(*bools: Iterable[int | str | None]) -> bool: return any([b == 1 or b == "1" for b in bools]) @@ -77,8 +80,9 @@ def logical_or(*bools: Iterable[int | str | None]) -> bool: "dome_azimuth": "DOMEAZ", "shut_lower": "SHUTLOWR", "shut_upper": "SHUTUPPR", -# "temp_set": "TEMP_SET", - "simulated": (logical_or, "SIMULATE ATMCS", "SIMULATE ATHEXAPOD", "SIMULAT ATPNEUMATICS", "SIMULATE ATDOME", "SIMULATE ATSPECTROGRAPH"), + # "temp_set": "TEMP_SET", + "simulated": (logical_or, "SIMULATE ATMCS", "SIMULATE ATHEXAPOD", "SIMULAT ATPNEUMATICS", + "SIMULATE ATDOME", "SIMULATE ATSPECTROGRAPH"), } # LATISS_DETECTOR_MAPPING = { @@ -93,9 +97,9 @@ def logical_or(*bools: Iterable[int | str | None]) -> bool: } TOPIC_MAPPING = { - "LATISS": "ATHeaderService", - "LSSTComCam": "CCHeaderService", - "LSSTCam": "MTHeaderService", + "LATISS": "ATHeaderService", + "LSSTComCam": "CCHeaderService", + "LSSTCam": "MTHeaderService", } @@ -107,15 +111,16 @@ def logical_or(*bools: Iterable[int | str | None]) -> bool: def process_keyword(keyword: str | tuple, info: dict) -> Any: - if type(keyword) == str: + if type(keyword) is str: if keyword in info: return info[keyword] - elif type(keyword) == tuple: + elif type(keyword) is tuple: fn = keyword[0] args = keyword[1:] if all([a in info for a in args]): return fn(*[info[a] for a in args]) + def process_resource(resource: ResourcePath) -> None: global engine, exposure_table @@ -145,7 +150,8 @@ def process_resource(resource: ResourcePath) -> None: stmt = insert(exposure_table).values(exposure_rec).on_conflict_do_nothing() with engine.begin() as conn: - result = conn.execute(stmt) + conn.execute(stmt) + # result = conn.execute(stmt) # print(exposure_rec) diff --git a/python/lsst/consdb/server.py b/python/lsst/consdb/server.py index 106c900..5344db6 100644 --- a/python/lsst/consdb/server.py +++ b/python/lsst/consdb/server.py @@ -1,6 +1,5 @@ from flask import Flask, request -from sqlalchemy import create_engine, MetaData, Table -from sqlalchemy.dialects.postgresql import insert +from sqlalchemy import create_engine, MetaData import sqlalchemy.exc From f16d6d06f24f021bcc3c61c3bb4b718890bddc35 Mon Sep 17 00:00:00 2001 From: William O'Mullane Date: Tue, 27 Feb 2024 16:48:32 -0300 Subject: [PATCH 08/26] container action --- .github/workflows/build.yaml | 2 +- Dockerfile.hinfo | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 728fb83..9dc1488 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -5,7 +5,7 @@ on: - main tags: - v* - pull_request: + pull_request: jobs: push: diff --git a/Dockerfile.hinfo b/Dockerfile.hinfo index ca29f35..0717156 100644 --- a/Dockerfile.hinfo +++ b/Dockerfile.hinfo @@ -5,8 +5,8 @@ ENV OBS_LSST_VERSION=${OBS_LSST_VERSION:-w_2024_06} USER lsst RUN source loadLSST.bash && mamba install confluent-kafka RUN source loadLSST.bash && eups distrib install -t "${OBS_LSST_VERSION}" obs_lsst -COPY python/lsst/consdb/hinfo.py ./hinfo/ +COPY python/lsst/consdb/hinfo*.py ./hinfo/ # Environment variables that must be set: # POSTGRES_URL INSTRUMENT SITE -ENTRYPOINT [ "bash", "-c", "source loadLSST.bash; setup obs_lsst; python hinfo/hinfo.py" ] +ENTRYPOINT [ "bash", "-c", "source loadLSST.bash; setup obs_lsst; python .hinfo/hinfo-latiss.py" ] From 66bd2d7d61312281656c4ac38b6221d2d2e58278 Mon Sep 17 00:00:00 2001 From: William O'Mullane Date: Tue, 27 Feb 2024 17:56:21 -0300 Subject: [PATCH 09/26] use pip not mamba --- Dockerfile.hinfo | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile.hinfo b/Dockerfile.hinfo index 0717156..e2a988f 100644 --- a/Dockerfile.hinfo +++ b/Dockerfile.hinfo @@ -3,7 +3,7 @@ FROM lsstsqre/newinstall:${RUBINENV_VERSION} ARG OBS_LSST_VERSION ENV OBS_LSST_VERSION=${OBS_LSST_VERSION:-w_2024_06} USER lsst -RUN source loadLSST.bash && mamba install confluent-kafka +RUN source loadLSST.bash && pip install confluent-kafka RUN source loadLSST.bash && eups distrib install -t "${OBS_LSST_VERSION}" obs_lsst COPY python/lsst/consdb/hinfo*.py ./hinfo/ # Environment variables that must be set: From 3733713534db8b216f684fca30e58cb97965ebfe Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Wed, 28 Feb 2024 02:55:42 -0800 Subject: [PATCH 10/26] Incorporate Kafka and handle all instruments. --- Dockerfile.hinfo | 15 +- python/lsst/consdb/header_proc.py | 89 --------- python/lsst/consdb/hinfo-latiss.py | 179 ----------------- python/lsst/consdb/hinfo.py | 306 +++++++++++++++++++++++++++++ 4 files changed, 317 insertions(+), 272 deletions(-) delete mode 100644 python/lsst/consdb/header_proc.py delete mode 100644 python/lsst/consdb/hinfo-latiss.py create mode 100644 python/lsst/consdb/hinfo.py diff --git a/Dockerfile.hinfo b/Dockerfile.hinfo index e2a988f..1a02b88 100644 --- a/Dockerfile.hinfo +++ b/Dockerfile.hinfo @@ -3,10 +3,17 @@ FROM lsstsqre/newinstall:${RUBINENV_VERSION} ARG OBS_LSST_VERSION ENV OBS_LSST_VERSION=${OBS_LSST_VERSION:-w_2024_06} USER lsst -RUN source loadLSST.bash && pip install confluent-kafka +RUN source loadLSST.bash && mamba install aiokafka httpx +RUN source loadLSST.bash && pip install kafkit RUN source loadLSST.bash && eups distrib install -t "${OBS_LSST_VERSION}" obs_lsst -COPY python/lsst/consdb/hinfo*.py ./hinfo/ +COPY python/lsst/consdb/hinfo.py ./hinfo/ + # Environment variables that must be set: -# POSTGRES_URL INSTRUMENT SITE +# INSTRUMENT: LATISS, LSSTComCam, LSSTComCamSim, LSSTCam +# POSTGRES_URL: SQLAlchemy connection URL +# KAFKA_BOOTSTRAP: AIOKafkaConsumer bootstrap server specification +# SCHEMA_URL: Kafkit registry schema URL +# Optional environment variable: +# BUCKET_PREFIX: "rubin:" at USDF -ENTRYPOINT [ "bash", "-c", "source loadLSST.bash; setup obs_lsst; python .hinfo/hinfo-latiss.py" ] +ENTRYPOINT [ "bash", "-c", "source loadLSST.bash; setup obs_lsst; python ./hinfo/hinfo.py" ] diff --git a/python/lsst/consdb/header_proc.py b/python/lsst/consdb/header_proc.py deleted file mode 100644 index 4c9ae20..0000000 --- a/python/lsst/consdb/header_proc.py +++ /dev/null @@ -1,89 +0,0 @@ -import asyncio -import json -import os -import random -import time -from typing import TYPE_CHECKING - -import aiokafka -import httpx -import kafkit -from lsst.resources import ResourcePath -from sqlalchemy import create_engine - -if TYPE_CHECKING: - import lsst.resources - -# Environment variables from deployment - -kafka_cluster = os.environ["KAFKA_CLUSTER"] -schema_url = os.environ["SCHEMA_URL"] -db_url = os.environ["DB_URL"] -tenant = os.environ.get("BUCKET_TENANT", None) -kafka_group_id = 1 - -topic = "lsst.ATHeaderService.logevent_largeFileObjectAvailable" - -engine = create_engine(db_url) - - -def process_resource(resource: lsst.resources.ResourcePath) -> None: - content = json.loads(resource.read()) - with engine.begin() as conn: - print(conn) - # TODO get all fields and tables, do as a transaction - # conn.execute( - # text("INSERT INTO exposure (a, b, c, d, e)" - # " VALUES(:a, :b, :c, :d, :e)"), - # [dict(a=content["something"], b=2, c=3, d=4, e=5)], - # ) - print(f"Processing {resource}: {content[0:100]}") - # TODO check result - - -async def main() -> None: - async with httpx.AsyncClient() as client: - schema_registry = kafkit.registry.RegistryApi(client=client, url=schema_url) - deserializer = kafkit.registry.Deserializer(registry=schema_registry) - - # Alternative 2: - # Something like - # asyncio.run(queue_check()) - - consumer = aiokafka.AIOKafkaConsumer( - topic, - bootstrap_servers=kafka_cluster, - group_id=str(kafka_group_id), - auto_offset_reset="earliest", - ) - await consumer.start() - try: - async for msg in consumer: - message = (await deserializer.deserialize(msg.value)).message - resource = ResourcePath(message.url) - if tenant: - new_scheme = resource.scheme - new_netloc = f"{tenant}:{resource.netloc}" - new_path = resource.path - resource = ResourcePath(f"{new_scheme}://{new_netloc}/{new_path}") - # Alternative 1: block for file - while not resource.exists(): - time.sleep(random.uniform(0.1, 2.0)) - process_resource(resource) - - # Alternative 2: queue - # r.sadd("EXPOSURE_QUEUE", str(resource)) - - finally: - await consumer.stop() - - -# Alternative 2: -# async def queue_check(): -# resource_list = r.slist("EXPOSURE_QUEUE", 0, -1) -# for resource in resource_list: -# if r.exists(): -# process_resource(resource) -# r.sremove("EXPOSURE_QUEUE", resource) - -asyncio.run(main()) diff --git a/python/lsst/consdb/hinfo-latiss.py b/python/lsst/consdb/hinfo-latiss.py deleted file mode 100644 index df30508..0000000 --- a/python/lsst/consdb/hinfo-latiss.py +++ /dev/null @@ -1,179 +0,0 @@ -from datetime import datetime -import os -import sys -from typing import Any, Iterable - -import yaml -from astropy.time import Time -from sqlalchemy import create_engine, MetaData, Table -from sqlalchemy.dialects.postgresql import insert - -from astro_metadata_translator import ObservationInfo -from lsst.resources import ResourcePath -from lsst.obs.lsst.translators import LatissTranslator - -# import Kafka interface - - -def ninety_minus(angle: float) -> float: - return 90.0 - angle - - -def tai_convert(t: str) -> datetime: - return Time(t, format="isot", scale="tai").datetime - - -def tai_mean(start: str, end: str) -> datetime: - s = Time(start, format="isot", scale="tai") - e = Time(end, format="isot", scale="tai") - return (s + (e - s) / 2).datetime - - -def mean(*iterable: Iterable[Any]) -> Any: - return sum(iterable) / len(iterable) - - -def logical_or(*bools: Iterable[int | str | None]) -> bool: - return any([b == 1 or b == "1" for b in bools]) - - -KW_MAPPING = { - "controller": "CONTRLLR", - "seq_num": "SEQNUM", - "band": "FILTBAND", - "ra": "RA", - "decl": "DEC", - "skyrotation": "ROTPA", - "azimuth_start": "AZSTART", - "azimuth_end": "AZEND", - "altitude_start": (ninety_minus, "ELSTART"), - "altitude_end": (ninety_minus, "ELEND"), - "zenithdistance_start": "ELSTART", - "zenithdistance_end": "ELEND", - "expmidpt": (tai_mean, "DATE-BEG", "DATE-END"), - "expmidptmjd": (mean, "MJD-BEG", "MJD-END"), - "obsstart": (tai_convert, "DATE-BEG"), - "obsstartmjd": "MJD-BEG", - "obsend": (tai_convert, "DATE-END"), - "obsendmjd": "MJD-END", - "exptime": "EXPTIME", - "shuttime": "SHUTTIME", - "darktime": "DARKTIME", - "group_id": "GROUPID", - "curindex": "CURINDEX", - "maxindex": "MAXINDEX", - "imgtype": "IMGTYPE", - "emulated": (logical_or, "EMUIMAGE"), - "science_program": "PROGRAM", - "observation_reason": "REASON", - "target_name": "OBJECT", - "airtemp": "AIRTEMP", - "pressure": "PRESSURE", - "humidity": "HUMIDITY", - "wind_speed": "WINDSPD", - "wind_dir": "WINDDIR", - "dimm_seeing": "SEEING", -} - -LATISS_MAPPING = { - "focus_z": "FOCUSZ", - "dome_azimuth": "DOMEAZ", - "shut_lower": "SHUTLOWR", - "shut_upper": "SHUTUPPR", - # "temp_set": "TEMP_SET", - "simulated": (logical_or, "SIMULATE ATMCS", "SIMULATE ATHEXAPOD", "SIMULAT ATPNEUMATICS", - "SIMULATE ATDOME", "SIMULATE ATSPECTROGRAPH"), -} - -# LATISS_DETECTOR_MAPPING = { -# "ccdtemp": "CCDTEMP", -# } - -OI_MAPPING = { - "exposure_id": "exposure_id", - "physical_filter": "physical_filter", - "airmass": "boresight_airmass", - "day_obs": "observing_day", -} - -TOPIC_MAPPING = { - "LATISS": "ATHeaderService", - "LSSTComCam": "CCHeaderService", - "LSSTCam": "MTHeaderService", -} - - -url = os.environ.get("POSTGRES_URL", "postgresql://usdf-butler.slac.stanford.edu:5432/lsstdb1") -engine = create_engine(url) -instrument = os.environ.get("INSTRUMENT", "LATISS") -metadata_obj = MetaData(schema=f"cdb_{instrument.lower()}") -exposure_table = Table("exposure", metadata_obj, autoload_with=engine) - - -def process_keyword(keyword: str | tuple, info: dict) -> Any: - if type(keyword) is str: - if keyword in info: - return info[keyword] - elif type(keyword) is tuple: - fn = keyword[0] - args = keyword[1:] - if all([a in info for a in args]): - return fn(*[info[a] for a in args]) - - -def process_resource(resource: ResourcePath) -> None: - global engine, exposure_table - - content = yaml.safe_load(resource.read()) - exposure_rec = dict() - - info = dict() - for header in content["PRIMARY"]: - info[header["keyword"]] = header["value"] - for field, keyword in KW_MAPPING.items(): - exposure_rec[field] = process_keyword(keyword, info) - for field, keyword in LATISS_MAPPING.items(): - exposure_rec[field] = process_keyword(keyword, info) - -# det_info = dict() -# for header in content["R00S00_PRIMARY"]: -# det_info[header["keyword"]] = header["value"] -# for field, keyword in LATISS_DETECTOR_MAPPING.items(): -# det_exposure_rec[field] = process_keyword(keyword, det_info) - - obs_info_obj = ObservationInfo(info, translator_class=LatissTranslator) - obs_info = dict() - for keyword in OI_MAPPING.values(): - obs_info[keyword] = getattr(obs_info_obj, keyword) - for field, keyword in OI_MAPPING.items(): - exposure_rec[field] = process_keyword(keyword, obs_info) - - stmt = insert(exposure_table).values(exposure_rec).on_conflict_do_nothing() - with engine.begin() as conn: - conn.execute(stmt) - # result = conn.execute(stmt) - - # print(exposure_rec) - - -site = os.environ.get("SITE", "USDF") -if site == "USDF": - os.environ["LSST_DISABLE_BUCKET_VALIDATION"] = "1" - bucket_prefix = "rubin:" -else: - bucket_prefix = "" - -# For Kafka: -# consumer = configure_kafka() -# while True: -# msgs = consumer.consume() -# for msg in msgs: -# re.sub(r"s3://", "s3://" + bucket_prefix, msg.data) -# process_resource(msg.data) - -# To process all of a given date: -date = "/".join(sys.argv[1].split("-")) -d = ResourcePath(f"s3://{bucket_prefix}rubinobs-lfa-cp/{TOPIC_MAPPING[instrument]}/header/{date}/") -for dirpath, dirnames, filenames in d.walk(): - for fname in filenames: - process_resource(d.join(fname)) diff --git a/python/lsst/consdb/hinfo.py b/python/lsst/consdb/hinfo.py new file mode 100644 index 0000000..2687c3a --- /dev/null +++ b/python/lsst/consdb/hinfo.py @@ -0,0 +1,306 @@ +import asyncio +import os +import random +import re +from datetime import datetime +from typing import Any, Sequence + +import aiokafka +import astropy.time +import httpx +import kafkit +import yaml +from astro_metadata_translator import ObservationInfo +from lsst.resources import ResourcePath +from sqlalchemy import MetaData, Table, create_engine +from sqlalchemy.dialects.postgresql import insert + +############################### +# Header Processing Functions # +############################### + + +def ninety_minus(angle: float) -> float: + return 90.0 - angle + + +def tai_convert(t: str) -> datetime: + return astropy.time.Time(t, format="isot", scale="tai").datetime + + +def tai_mean(start: str, end: str) -> datetime: + s = astropy.time.Time(start, format="isot", scale="tai") + e = astropy.time.Time(end, format="isot", scale="tai") + return (s + (e - s) / 2).datetime + + +def mean(*iterable: float) -> Any: + return sum(iterable) / len(iterable) + + +def logical_or(*bools: int | str | None) -> bool: + return any([b == 1 or b == "1" for b in bools]) + + +################################# +# Header Mapping Configurations # +################################# + +# Non-instrument-specific mapping to column name from Header Service keyword +KW_MAPPING: dict[str, str | Sequence] = { + "controller": "CONTRLLR", + "seq_num": "SEQNUM", + "band": "FILTBAND", + "ra": "RA", + "decl": "DEC", + "skyrotation": "ROTPA", + "azimuth_start": "AZSTART", + "azimuth_end": "AZEND", + "altitude_start": (ninety_minus, "ELSTART"), + "altitude_end": (ninety_minus, "ELEND"), + "zenithdistance_start": "ELSTART", + "zenithdistance_end": "ELEND", + "expmidpt": (tai_mean, "DATE-BEG", "DATE-END"), + "expmidptmjd": (mean, "MJD-BEG", "MJD-END"), + "obsstart": (tai_convert, "DATE-BEG"), + "obsstartmjd": "MJD-BEG", + "obsend": (tai_convert, "DATE-END"), + "obsendmjd": "MJD-END", + "exptime": "EXPTIME", + "shuttime": "SHUTTIME", + "darktime": "DARKTIME", + "group_id": "GROUPID", + "curindex": "CURINDEX", + "maxindex": "MAXINDEX", + "imgtype": "IMGTYPE", + "emulated": (logical_or, "EMUIMAGE"), + "science_program": "PROGRAM", + "observation_reason": "REASON", + "target_name": "OBJECT", + "airtemp": "AIRTEMP", + "pressure": "PRESSURE", + "humidity": "HUMIDITY", + "wind_speed": "WINDSPD", + "wind_dir": "WINDDIR", + "dimm_seeing": "SEEING", +} + +# Instrument-specific mapping to column name from Header Service keyword +LATISS_MAPPING: dict[str, str | Sequence] = { + "focus_z": "FOCUSZ", + "dome_azimuth": "DOMEAZ", + "shut_lower": "SHUTLOWR", + "shut_upper": "SHUTUPPR", + # "temp_set": "TEMP_SET", + "simulated": ( + logical_or, + "SIMULATE ATMCS", + "SIMULATE ATHEXAPOD", + "SIMULAT ATPNEUMATICS", + "SIMULATE ATDOME", + "SIMULATE ATSPECTROGRAPH", + ), +} + +LSSTCOMCAM_MAPPING: dict[str, str | Sequence] = {} +LSSTCOMCAMSIM_MAPPING: dict[str, str | Sequence] = {} +LSSTCAM_MAPPING: dict[str, str | Sequence] = {} + +# LATISS_DETECTOR_MAPPING = { +# "ccdtemp": "CCDTEMP", +# } + +# Mapping to column name from ObservationInfo keyword +OI_MAPPING = { + "exposure_id": "exposure_id", + "physical_filter": "physical_filter", + "airmass": "boresight_airmass", + "day_obs": "observing_day", +} + +# Mapping from instrument name to Header Service topic name +TOPIC_MAPPING = { + "LATISS": "ATHeaderService", + "LSSTComCam": "CCHeaderService", + "LSSTComCamSim": "CCHeaderService", + "LSSTCam": "MTHeaderService", +} + + +######################## +# Processing Functions # +######################## + + +def process_column(column_def: str | Sequence, info: dict) -> Any: + """Generate a column value from one or more keyword values in a dict. + + The dict may contain FITS headers or ObservationInfo. + + Parameters + ---------- + column_def: `str` + Definition of the column. Either a string specifying the info keyword + to use as the column value, or a tuple containing a function to apply + to the values of one or more info keywords. + info: `dict` + A dictionary containing keyword/value pairs. + + Returns + ------- + column_value: `Any` + The value to use for the column. + """ + if type(column_def) is str: + if column_def in info: + return info[column_def] + elif type(column_def) is tuple: + fn = column_def[0] + args = column_def[1:] + if all([a in info for a in args]): + return fn(*[info[a] for a in args]) + + +def process_resource(resource: ResourcePath) -> None: + """Process a header resource. + + Uses configured mappings and the ObservationInfo translator to generate + column values that are inserted into the exposure table. + + Parameters + ---------- + resource: `ResourcePath` + Path to the Header Service header resource. + """ + global KW_MAPPING, OI_MAPPING, instrument_mapping, translator + global engine, exposure_table + + exposure_rec = dict() + + info = dict() + content = yaml.safe_load(resource.read()) + for header in content["PRIMARY"]: + info[header["keyword"]] = header["value"] + for column, column_def in KW_MAPPING.items(): + exposure_rec[column] = process_column(column_def, info) + for column, column_def in instrument_mapping.items(): + exposure_rec[column] = process_column(column_def, info) + + obs_info_obj = ObservationInfo(info, translator_class=translator) + obs_info = dict() + for keyword in OI_MAPPING.values(): + obs_info[keyword] = getattr(obs_info_obj, keyword) + for field, keyword in OI_MAPPING.items(): + exposure_rec[field] = process_column(keyword, obs_info) + + stmt = insert(exposure_table).values(exposure_rec).on_conflict_do_nothing() + with engine.begin() as conn: + conn.execute(stmt) + + # TODO: exposure_detector table processing + # det_info = dict() + # for header in content["R00S00_PRIMARY"]: + # det_info[header["keyword"]] = header["value"] + # for field, keyword in LATISS_DETECTOR_MAPPING.items(): + # det_exposure_rec[field] = process_column(keyword, det_info) + + +def process_date(day_obs: str) -> None: + """Process all headers from a given observation day (as YYYY-MM-DD). + + Parameters + ---------- + day_obs: `str` + Observation day to process, as YYYY-MM-DD. + """ + global TOPIC_MAPPING, bucket_prefix, instrument + + date = "/".join(day_obs.split("-")) + d = ResourcePath( + f"s3://{bucket_prefix}rubinobs-lfa-cp/{TOPIC_MAPPING[instrument]}/header/{date}/" + ) + for dirpath, dirnames, filenames in d.walk(): + for fname in filenames: + process_resource(d.join(fname)) + + +################## +# Initialization # +################## + +instrument = os.environ.get("INSTRUMENT", "LATISS") +match instrument: + case "LATISS": + from lsst.obs.lsst.translators import LatissTranslator + + translator = LatissTranslator + instrument_mapping = LATISS_MAPPING + case "LSSTComCam": + from lsst.obs.lsst.translators import LsstComCamTranslator + + translator = LsstComCamTranslator + instrument_mapping = LSSTCOMCAM_MAPPING + case "LSSTComCamSim": + from lsst.obs.lsst.translators import LsstComCamSimTranslator + + translator = LsstComCamSimTranslator + instrument_mapping = LSSTCOMCAMSIM_MAPPING + case "LSSTCam": + from lsst.obs.lsst.translators import LsstCamTranslator + + translator = LsstCamTranslator + instrument_mapping = LSSTCAM_MAPPING + +url = os.environ.get( + "POSTGRES_URL", "postgresql://usdf-butler.slac.stanford.edu:5432/lsstdb1" +) +engine = create_engine(url) +metadata_obj = MetaData(schema=f"cdb_{instrument.lower()}") +exposure_table = Table("exposure", metadata_obj, autoload_with=engine) + + +bucket_prefix = os.environ.get("BUCKET_PREFIX", "") +if bucket_prefix: + os.environ["LSST_DISABLE_BUCKET_VALIDATION"] = "1" + +kafka_bootstrap = os.environ["KAFKA_BOOTSTRAP"] +schema_url = os.environ["SCHEMA_URL"] +kafka_group_id = "1" + +topic = f"lsst.{TOPIC_MAPPING[instrument]}.logevent_largeFileObjectAvailable" + + +################# +# Main Function # +################# + + +async def main() -> None: + """Handle Header Service largeFileObjectAvailable messages.""" + global bucket_prefix, kafka_bootstrap, kafka_group_id, schema_url, topic + + async with httpx.AsyncClient() as client: + schema_registry = kafkit.registry.RegistryApi(client=client, url=schema_url) + deserializer = kafkit.registry.Deserializer(registry=schema_registry) + + consumer = aiokafka.AIOKafkaConsumer( + topic, + bootstrap_servers=kafka_bootstrap, + group_id=kafka_group_id, + auto_offset_reset="earliest", + ) + await consumer.start() + try: + async for msg in consumer: + message = (await deserializer.deserialize(msg.value)).message + if bucket_prefix: + url = re.sub(r"s3://", "s3://" + bucket_prefix, message.url) + resource = ResourcePath(url) + while not resource.exists(): + await asyncio.sleep(random.uniform(0.1, 2.0)) + process_resource(resource) + finally: + await consumer.stop() + + +asyncio.run(main()) From 6c9a1c6fdef2c30eec788ce9c575ab67a679c0f1 Mon Sep 17 00:00:00 2001 From: William O'Mullane Date: Wed, 28 Feb 2024 09:24:49 -0300 Subject: [PATCH 11/26] force out conflicts --- README.rst | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.rst b/README.rst index 8930889..20c06bb 100644 --- a/README.rst +++ b/README.rst @@ -2,4 +2,6 @@ consdb ###### -Scripts and services for generating the Summit Visit Database and Consolidated Database (ConsDB), including summarizing the Engineering and Facilities Database (EFD). \ No newline at end of file +Scripts and services for generating the Summit Visit Database and Consolidated Database (ConsDB), including summarizing the Engineering and Facilities Database (EFD). + +See also DMTN-227.lsst.io From 961da5e0e211d401982b31fe08d2528efb26817a Mon Sep 17 00:00:00 2001 From: William O'Mullane Date: Wed, 28 Feb 2024 09:28:33 -0300 Subject: [PATCH 12/26] build on all push --- .github/workflows/build.yaml | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 9dc1488..3a902c0 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -1,11 +1,7 @@ name: CI build of all containers on: - push: - branches: - - main - tags: - - v* - pull_request: + - push + - pull_request jobs: push: From b14ecf24a701a98f914b50568f3e1c87f0850351 Mon Sep 17 00:00:00 2001 From: William O'Mullane Date: Sat, 2 Mar 2024 16:53:20 -0300 Subject: [PATCH 13/26] Construcg DB URL from parts --- python/lsst/consdb/hinfo.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/python/lsst/consdb/hinfo.py b/python/lsst/consdb/hinfo.py index 2687c3a..7c7891c 100644 --- a/python/lsst/consdb/hinfo.py +++ b/python/lsst/consdb/hinfo.py @@ -251,9 +251,17 @@ def process_date(day_obs: str) -> None: translator = LsstCamTranslator instrument_mapping = LSSTCAM_MAPPING -url = os.environ.get( - "POSTGRES_URL", "postgresql://usdf-butler.slac.stanford.edu:5432/lsstdb1" -) +host = os.environ.get("DB_HOST") +passwd = os.environ.get("DB_PASS") +user = os.environ.get("DB_USER") +dbname = os.environ.get("DB_NAME") +url = "" +if host and passwd and user and dbname: + url = f"postgresql://{user}:{passwd}@{host}/{dbname}" +else: + url = os.environ.get( + "POSTGRES_URL", "postgresql://usdf-butler.slac.stanford.edu:5432/lsstdb1" + ) engine = create_engine(url) metadata_obj = MetaData(schema=f"cdb_{instrument.lower()}") exposure_table = Table("exposure", metadata_obj, autoload_with=engine) From 70186c2c2d2eac461f01cd7a02e4936a3e057664 Mon Sep 17 00:00:00 2001 From: William O'Mullane Date: Mon, 4 Mar 2024 22:05:42 -0300 Subject: [PATCH 14/26] Print about connection --- python/lsst/consdb/hinfo.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/lsst/consdb/hinfo.py b/python/lsst/consdb/hinfo.py index 7c7891c..a688624 100644 --- a/python/lsst/consdb/hinfo.py +++ b/python/lsst/consdb/hinfo.py @@ -257,11 +257,13 @@ def process_date(day_obs: str) -> None: dbname = os.environ.get("DB_NAME") url = "" if host and passwd and user and dbname: + print(f"Connecting to {host} as {user} to {dbname}") url = f"postgresql://{user}:{passwd}@{host}/{dbname}" else: url = os.environ.get( "POSTGRES_URL", "postgresql://usdf-butler.slac.stanford.edu:5432/lsstdb1" ) + print("Using POSTGRES_URL {user} {host} {dbname}") engine = create_engine(url) metadata_obj = MetaData(schema=f"cdb_{instrument.lower()}") exposure_table = Table("exposure", metadata_obj, autoload_with=engine) From 6d813ca2fe05e9029c7f5b36fc53830bb597aef2 Mon Sep 17 00:00:00 2001 From: William O'Mullane Date: Tue, 5 Mar 2024 16:03:32 -0300 Subject: [PATCH 15/26] Exposure - pg case sensitive --- python/lsst/consdb/hinfo.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lsst/consdb/hinfo.py b/python/lsst/consdb/hinfo.py index a688624..8697482 100644 --- a/python/lsst/consdb/hinfo.py +++ b/python/lsst/consdb/hinfo.py @@ -266,7 +266,7 @@ def process_date(day_obs: str) -> None: print("Using POSTGRES_URL {user} {host} {dbname}") engine = create_engine(url) metadata_obj = MetaData(schema=f"cdb_{instrument.lower()}") -exposure_table = Table("exposure", metadata_obj, autoload_with=engine) +exposure_table = Table("Exposure", metadata_obj, autoload_with=engine) bucket_prefix = os.environ.get("BUCKET_PREFIX", "") From 068788bcbb307efdd174d103d25515d88abb38d1 Mon Sep 17 00:00:00 2001 From: William O'Mullane Date: Tue, 5 Mar 2024 16:06:09 -0300 Subject: [PATCH 16/26] Revert "Exposure - pg case sensitive" This reverts commit 6d813ca2fe05e9029c7f5b36fc53830bb597aef2. --- python/lsst/consdb/hinfo.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lsst/consdb/hinfo.py b/python/lsst/consdb/hinfo.py index 8697482..a688624 100644 --- a/python/lsst/consdb/hinfo.py +++ b/python/lsst/consdb/hinfo.py @@ -266,7 +266,7 @@ def process_date(day_obs: str) -> None: print("Using POSTGRES_URL {user} {host} {dbname}") engine = create_engine(url) metadata_obj = MetaData(schema=f"cdb_{instrument.lower()}") -exposure_table = Table("Exposure", metadata_obj, autoload_with=engine) +exposure_table = Table("exposure", metadata_obj, autoload_with=engine) bucket_prefix = os.environ.get("BUCKET_PREFIX", "") From f40de7ffcbcc892b7400fdcc07cd4f8002097151 Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Wed, 6 Mar 2024 06:56:55 -0800 Subject: [PATCH 17/26] Improve use of kafkit. --- python/lsst/consdb/hinfo.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/python/lsst/consdb/hinfo.py b/python/lsst/consdb/hinfo.py index a688624..4c40758 100644 --- a/python/lsst/consdb/hinfo.py +++ b/python/lsst/consdb/hinfo.py @@ -8,7 +8,9 @@ import aiokafka import astropy.time import httpx -import kafkit +import kafkit.registry +import kafkit.registry.httpx +import kafkit.ssl import yaml from astro_metadata_translator import ObservationInfo from lsst.resources import ResourcePath @@ -290,15 +292,25 @@ async def main() -> None: global bucket_prefix, kafka_bootstrap, kafka_group_id, schema_url, topic async with httpx.AsyncClient() as client: - schema_registry = kafkit.registry.RegistryApi(client=client, url=schema_url) + schema_registry = kafkit.registry.httpx.RegistryApi( + http_client=client, url=schema_url + ) deserializer = kafkit.registry.Deserializer(registry=schema_registry) + ssl_context = kafkit.ssl.create_ssl_context( + cluster_ca_path=broker_ca_path, + client_cert_path=client_cert_path, + client_key_path=client_key_path, + ) consumer = aiokafka.AIOKafkaConsumer( topic, bootstrap_servers=kafka_bootstrap, + ssl_context=ssl_context, + security_protocol="SSL", group_id=kafka_group_id, auto_offset_reset="earliest", ) + await consumer.start() try: async for msg in consumer: From b8e2646c35ea97d06e8930b7d68bd32793ff8434 Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Thu, 7 Mar 2024 04:16:39 -0800 Subject: [PATCH 18/26] Replace Kafka SSL with SASL. --- python/lsst/consdb/hinfo.py | 47 +++++++++++++++++++++++++------------ 1 file changed, 32 insertions(+), 15 deletions(-) diff --git a/python/lsst/consdb/hinfo.py b/python/lsst/consdb/hinfo.py index 4c40758..49efcea 100644 --- a/python/lsst/consdb/hinfo.py +++ b/python/lsst/consdb/hinfo.py @@ -2,6 +2,7 @@ import os import random import re +from dataclasses import dataclass from datetime import datetime from typing import Any, Sequence @@ -10,7 +11,6 @@ import httpx import kafkit.registry import kafkit.registry.httpx -import kafkit.ssl import yaml from astro_metadata_translator import ObservationInfo from lsst.resources import ResourcePath @@ -230,6 +230,28 @@ def process_date(day_obs: str) -> None: # Initialization # ################## + +@dataclass +class KafkaConfig: + """Class for configuring Kafka-related items.""" + + bootstrap: str + group_id: str + username: str + password: str + schema_url: str + + +def get_kafka_config() -> KafkaConfig: + return KafkaConfig( + bootstrap=os.environ["KAFKA_BOOTSTRAP"], + group_id=os.environ.get("KAFKA_GROUP_ID", "consdb-consumer"), + username=os.environ["KAFKA_USERNAME"], + password=os.environ["KAFKA_PASSWORD"], + schema_url=os.environ["SCHEMA_URL"], + ) + + instrument = os.environ.get("INSTRUMENT", "LATISS") match instrument: case "LATISS": @@ -275,9 +297,6 @@ def process_date(day_obs: str) -> None: if bucket_prefix: os.environ["LSST_DISABLE_BUCKET_VALIDATION"] = "1" -kafka_bootstrap = os.environ["KAFKA_BOOTSTRAP"] -schema_url = os.environ["SCHEMA_URL"] -kafka_group_id = "1" topic = f"lsst.{TOPIC_MAPPING[instrument]}.logevent_largeFileObjectAvailable" @@ -289,26 +308,24 @@ def process_date(day_obs: str) -> None: async def main() -> None: """Handle Header Service largeFileObjectAvailable messages.""" - global bucket_prefix, kafka_bootstrap, kafka_group_id, schema_url, topic + global bucket_prefix + kafka_config = get_kafka_config() async with httpx.AsyncClient() as client: schema_registry = kafkit.registry.httpx.RegistryApi( - http_client=client, url=schema_url + http_client=client, url=kafka_config.schema_url ) deserializer = kafkit.registry.Deserializer(registry=schema_registry) - ssl_context = kafkit.ssl.create_ssl_context( - cluster_ca_path=broker_ca_path, - client_cert_path=client_cert_path, - client_key_path=client_key_path, - ) consumer = aiokafka.AIOKafkaConsumer( topic, - bootstrap_servers=kafka_bootstrap, - ssl_context=ssl_context, - security_protocol="SSL", - group_id=kafka_group_id, + bootstrap_servers=kafka_config.bootstrap, + group_id=kafka_config.group_id, auto_offset_reset="earliest", + isolation_level="read_committed", + security_protocol="SASL_PLAINTEXT", + sasl_plain_username=kafka_config.username, + sasl_plain_password=kafka_config.password, ) await consumer.start() From 0c5a87e1e4b68b03a3ad44e45c56bc52efe98d34 Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Thu, 7 Mar 2024 04:35:20 -0800 Subject: [PATCH 19/26] Document additional env vars. --- Dockerfile.hinfo | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/Dockerfile.hinfo b/Dockerfile.hinfo index 1a02b88..359ec73 100644 --- a/Dockerfile.hinfo +++ b/Dockerfile.hinfo @@ -11,9 +11,12 @@ COPY python/lsst/consdb/hinfo.py ./hinfo/ # Environment variables that must be set: # INSTRUMENT: LATISS, LSSTComCam, LSSTComCamSim, LSSTCam # POSTGRES_URL: SQLAlchemy connection URL -# KAFKA_BOOTSTRAP: AIOKafkaConsumer bootstrap server specification +# KAFKA_BOOTSTRAP: host:port of bootstrap server +# KAFKA_USERNAME: username for SASL_PLAIN authentication +# KAFKA_PASSWORD: password for SASL_PLAIN authentication # SCHEMA_URL: Kafkit registry schema URL -# Optional environment variable: +# Optional environment variables: # BUCKET_PREFIX: "rubin:" at USDF +# KAFKA_GROUP_ID: name of consumer group, default is "consdb-consumer" ENTRYPOINT [ "bash", "-c", "source loadLSST.bash; setup obs_lsst; python ./hinfo/hinfo.py" ] From 5f3c051655fc66978ee54ed8ba2cc25c38c82c5e Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Thu, 7 Mar 2024 09:15:07 -0800 Subject: [PATCH 20/26] Default the username if not present. --- Dockerfile.hinfo | 4 ++-- python/lsst/consdb/hinfo.py | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/Dockerfile.hinfo b/Dockerfile.hinfo index 359ec73..e315587 100644 --- a/Dockerfile.hinfo +++ b/Dockerfile.hinfo @@ -12,11 +12,11 @@ COPY python/lsst/consdb/hinfo.py ./hinfo/ # INSTRUMENT: LATISS, LSSTComCam, LSSTComCamSim, LSSTCam # POSTGRES_URL: SQLAlchemy connection URL # KAFKA_BOOTSTRAP: host:port of bootstrap server -# KAFKA_USERNAME: username for SASL_PLAIN authentication # KAFKA_PASSWORD: password for SASL_PLAIN authentication # SCHEMA_URL: Kafkit registry schema URL # Optional environment variables: -# BUCKET_PREFIX: "rubin:" at USDF +# BUCKET_PREFIX: set to "rubin:" at USDF, default is "" # KAFKA_GROUP_ID: name of consumer group, default is "consdb-consumer" +# KAFKA_USERNAME: username for SASL_PLAIN authentication, default is "consdb" ENTRYPOINT [ "bash", "-c", "source loadLSST.bash; setup obs_lsst; python ./hinfo/hinfo.py" ] diff --git a/python/lsst/consdb/hinfo.py b/python/lsst/consdb/hinfo.py index 49efcea..9e4b196 100644 --- a/python/lsst/consdb/hinfo.py +++ b/python/lsst/consdb/hinfo.py @@ -246,7 +246,7 @@ def get_kafka_config() -> KafkaConfig: return KafkaConfig( bootstrap=os.environ["KAFKA_BOOTSTRAP"], group_id=os.environ.get("KAFKA_GROUP_ID", "consdb-consumer"), - username=os.environ["KAFKA_USERNAME"], + username=os.environ.get("KAFKA_USERNAME", "consdb"), password=os.environ["KAFKA_PASSWORD"], schema_url=os.environ["SCHEMA_URL"], ) @@ -324,6 +324,7 @@ async def main() -> None: auto_offset_reset="earliest", isolation_level="read_committed", security_protocol="SASL_PLAINTEXT", + sasl_mechanism="SCRAM-SHA-512", sasl_plain_username=kafka_config.username, sasl_plain_password=kafka_config.password, ) From 2c9e4b9b94aada48b35a571a6d32a2375e44a2d6 Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Tue, 12 Mar 2024 11:23:24 -0700 Subject: [PATCH 21/26] Fix topic name. --- python/lsst/consdb/hinfo.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lsst/consdb/hinfo.py b/python/lsst/consdb/hinfo.py index 9e4b196..ba80cc3 100644 --- a/python/lsst/consdb/hinfo.py +++ b/python/lsst/consdb/hinfo.py @@ -298,7 +298,7 @@ def get_kafka_config() -> KafkaConfig: os.environ["LSST_DISABLE_BUCKET_VALIDATION"] = "1" -topic = f"lsst.{TOPIC_MAPPING[instrument]}.logevent_largeFileObjectAvailable" +topic = f"lsst.sal.{TOPIC_MAPPING[instrument]}.logevent_largeFileObjectAvailable" ################# From bf087cedfbc120e99a020b35cad8942e484a84ba Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Tue, 12 Mar 2024 11:31:10 -0700 Subject: [PATCH 22/26] Try SQuaRE build/push action. --- .github/workflows/build.yaml | 36 ++++++++++++------------------------ 1 file changed, 12 insertions(+), 24 deletions(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 3a902c0..59770d0 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -1,7 +1,11 @@ name: CI build of all containers on: - - push - - pull_request + push: + branches: + - main + tags: + - "*" + pull_request: jobs: push: @@ -14,25 +18,9 @@ jobs: - name: Checkout uses: actions/checkout@v4 - - name: Build hinfo image - run: | - docker build . \ - -f Dockerfile.hinfo \ - --tag consdb-hinfo \ - --label "runnumber=${GITHUB_RUN_ID}" - - - name: Log in to GitHub Container Registry - run: echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io -u $ --password-stdin - - - name: Push image - run: | - ID=ghcr.io/${{ github.repository_owner }}/consdb-hinfo - if [[ "${{ github.ref }}" == "refs/pull/"* ]]; then - VERSION=$(echo "${{ github.head_ref }}" | sed -e 's|.*/||') - elif [[ "${{ github.ref }}" == "refs/tags/"* ]]; then - VERSION=$(echo "${{ github.ref_name }}" | sed -e 's|^v||') - else - VERSION=latest - fi - docker tag consdb-hinfo $ID:$VERSION - docker push $ID:$VERSION + - name: Build hinfo + uses: lsst-sqre/build-and-push-to-ghcr@v1 + with: + image: ${{ github.repository }}-hinfo + github_token: ${{ secrets.GITHUB_TOKEN }} + dockerfile: Dockerfile.hinfo From 9fdb6b8fb5ca9f09d454c9554f146f6fe111bcc1 Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Wed, 20 Mar 2024 20:18:32 -0700 Subject: [PATCH 23/26] Fix message handling. --- python/lsst/consdb/hinfo.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/lsst/consdb/hinfo.py b/python/lsst/consdb/hinfo.py index ba80cc3..52ee779 100644 --- a/python/lsst/consdb/hinfo.py +++ b/python/lsst/consdb/hinfo.py @@ -332,9 +332,10 @@ async def main() -> None: await consumer.start() try: async for msg in consumer: - message = (await deserializer.deserialize(msg.value)).message + message = (await deserializer.deserialize(msg.value))["message"] + url = message["url"] if bucket_prefix: - url = re.sub(r"s3://", "s3://" + bucket_prefix, message.url) + url = re.sub(r"s3://", "s3://" + bucket_prefix, url) resource = ResourcePath(url) while not resource.exists(): await asyncio.sleep(random.uniform(0.1, 2.0)) From 4a699f6f911c235d56a57b154a618e81aa347534 Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Wed, 20 Mar 2024 20:25:57 -0700 Subject: [PATCH 24/26] Add logging. --- python/lsst/consdb/hinfo.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/python/lsst/consdb/hinfo.py b/python/lsst/consdb/hinfo.py index 52ee779..c9efd15 100644 --- a/python/lsst/consdb/hinfo.py +++ b/python/lsst/consdb/hinfo.py @@ -1,7 +1,9 @@ import asyncio +import logging import os import random import re +import sys from dataclasses import dataclass from datetime import datetime from typing import Any, Sequence @@ -195,6 +197,7 @@ def process_resource(resource: ResourcePath) -> None: for field, keyword in OI_MAPPING.items(): exposure_rec[field] = process_column(keyword, obs_info) + logging.debug(f"Inserting {exposure_rec}") stmt = insert(exposure_table).values(exposure_rec).on_conflict_do_nothing() with engine.begin() as conn: conn.execute(stmt) @@ -252,6 +255,8 @@ def get_kafka_config() -> KafkaConfig: ) +logging.basicConfig(stream=sys.stderr, level=logging.INFO) + instrument = os.environ.get("INSTRUMENT", "LATISS") match instrument: case "LATISS": @@ -274,21 +279,22 @@ def get_kafka_config() -> KafkaConfig: translator = LsstCamTranslator instrument_mapping = LSSTCAM_MAPPING +logging.info(f"Instrument = {instrument}") host = os.environ.get("DB_HOST") passwd = os.environ.get("DB_PASS") user = os.environ.get("DB_USER") dbname = os.environ.get("DB_NAME") -url = "" +pg_url = "" if host and passwd and user and dbname: - print(f"Connecting to {host} as {user} to {dbname}") - url = f"postgresql://{user}:{passwd}@{host}/{dbname}" + logging.info(f"Connecting to {host} as {user} to {dbname}") + pg_url = f"postgresql://{user}:{passwd}@{host}/{dbname}" else: - url = os.environ.get( + pg_url = os.environ.get( "POSTGRES_URL", "postgresql://usdf-butler.slac.stanford.edu:5432/lsstdb1" ) - print("Using POSTGRES_URL {user} {host} {dbname}") -engine = create_engine(url) + logging.info(f"Using POSTGRES_URL {user} {host} {dbname}") +engine = create_engine(pg_url) metadata_obj = MetaData(schema=f"cdb_{instrument.lower()}") exposure_table = Table("exposure", metadata_obj, autoload_with=engine) @@ -330,16 +336,20 @@ async def main() -> None: ) await consumer.start() + logging.info("Consumer started") try: async for msg in consumer: message = (await deserializer.deserialize(msg.value))["message"] + logging.debug(f"Received message {message}") url = message["url"] if bucket_prefix: url = re.sub(r"s3://", "s3://" + bucket_prefix, url) resource = ResourcePath(url) + logging.info(f"Waiting for {url}") while not resource.exists(): await asyncio.sleep(random.uniform(0.1, 2.0)) process_resource(resource) + logging.info(f"Processed {url}") finally: await consumer.stop() From d9e9cc94d0b37d6cbb27053cbd18aa2742428600 Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Thu, 21 Mar 2024 05:25:16 -0700 Subject: [PATCH 25/26] Allow recursive function application. --- python/lsst/consdb/hinfo.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/python/lsst/consdb/hinfo.py b/python/lsst/consdb/hinfo.py index c9efd15..259bba7 100644 --- a/python/lsst/consdb/hinfo.py +++ b/python/lsst/consdb/hinfo.py @@ -146,7 +146,8 @@ def process_column(column_def: str | Sequence, info: dict) -> Any: column_def: `str` Definition of the column. Either a string specifying the info keyword to use as the column value, or a tuple containing a function to apply - to the values of one or more info keywords. + to the values of one or more info keywords or function application + tuples. info: `dict` A dictionary containing keyword/value pairs. @@ -154,15 +155,16 @@ def process_column(column_def: str | Sequence, info: dict) -> Any: ------- column_value: `Any` The value to use for the column. + None if any input value is missing. """ if type(column_def) is str: if column_def in info: return info[column_def] elif type(column_def) is tuple: fn = column_def[0] - args = column_def[1:] - if all([a in info for a in args]): - return fn(*[info[a] for a in args]) + arg_values = [process_column(a, info) for a in column_def[1:]] + if all(arg_values): + return fn(*arg_values) def process_resource(resource: ResourcePath) -> None: From 8e2720651da4a38853e1ae2455ab501f7127ca19 Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Thu, 21 Mar 2024 05:27:10 -0700 Subject: [PATCH 26/26] Add additional field definitions. --- python/lsst/consdb/hinfo.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/lsst/consdb/hinfo.py b/python/lsst/consdb/hinfo.py index 259bba7..9e06e81 100644 --- a/python/lsst/consdb/hinfo.py +++ b/python/lsst/consdb/hinfo.py @@ -52,6 +52,7 @@ def logical_or(*bools: int | str | None) -> bool: # Non-instrument-specific mapping to column name from Header Service keyword KW_MAPPING: dict[str, str | Sequence] = { + "exposure_name": "OBSID", "controller": "CONTRLLR", "seq_num": "SEQNUM", "band": "FILTBAND", @@ -60,10 +61,13 @@ def logical_or(*bools: int | str | None) -> bool: "skyrotation": "ROTPA", "azimuth_start": "AZSTART", "azimuth_end": "AZEND", + "azimuth": (mean, "AZSTART", "AZEND"), "altitude_start": (ninety_minus, "ELSTART"), "altitude_end": (ninety_minus, "ELEND"), + "altitude": (mean, (ninety_minus, "ELSTART"), (ninety_minus, "ELEND")), "zenithdistance_start": "ELSTART", "zenithdistance_end": "ELEND", + "zenithdistance": (mean, "ELSTART", "ELEND"), "expmidpt": (tai_mean, "DATE-BEG", "DATE-END"), "expmidptmjd": (mean, "MJD-BEG", "MJD-END"), "obsstart": (tai_convert, "DATE-BEG"),