diff --git a/doc/playbook.rst b/doc/playbook.rst index 7505896e..d94446c0 100644 --- a/doc/playbook.rst +++ b/doc/playbook.rst @@ -252,7 +252,7 @@ A few useful commands for managing the service: tester ====== -``python/tester/upload.py`` is a script that simulates the CCS image writer. +``python/tester/upload.py`` and ``python/tester/upload_hsc_rc2.py`` are scripts that simulate the CCS image writer. It can be run from ``rubin-devl``, but requires the user to install the ``confluent_kafka`` package in their environment. You must have a profile set up for the ``rubin-pp`` bucket (see `Buckets`_, above), and must set the ``KAFKA_CLUSTER`` environment variable. @@ -271,7 +271,12 @@ Install the prototype code: git clone https://github.com/lsst-dm/prompt_prototype -Command line arguments are the instrument name (currently HSC only) and the number of groups of images to send. +The tester scripts send ``next_visit`` events for each detector via Kafka on the ``next-visit-topic`` topic. +They then upload a batch of files representing the snaps of the visit to the ``rubin-pp`` S3 bucket, simulating incoming raw images. + +Eventually a set of parallel processes running on multiple nodes will be needed to upload the images sufficiently rapidly. + +``python/tester/upload.py``: Command line arguments are the instrument name (currently HSC only) and the number of groups of images to send. Sample command line: @@ -279,16 +284,26 @@ Sample command line: python upload.py HSC 3 -It sends ``next_visit`` events for each detector via Kafka on the ``next-visit-topic`` topic. -It then uploads a batch of files representing the snaps of the visit to the ``rubin-pp`` S3 bucket. +This draw images from 4 groups, in total 10 raw files, stored in the ``rubin-pp-users`` bucket. + +``python/tester/upload_hsc_rc2.py``: Command line argument is the number of groups of images to send. + +Sample command line: + +.. code-block:: sh + + python upload_hsc_rc2.py 3 + +This scripts draws images from the curated ``HSC/RC2/defaults`` collection at USDF's ``/repo/main`` butler repository. +The source collection includes 432 visits, each with 103 detector images. +The visits are randomly selected and uploaded as one new group for each visit. -Eventually a set of parallel processes running on multiple nodes will be needed to upload the images sufficiently rapidly. .. note:: - ``upload.py`` uploads from the same small pool of raws every time it is run, while the APDB assumes that every visit has unique timestamps. + Both of the tester scripts use data from a limited pool of raws every time it is run, while the APDB assumes that every visit has unique timestamps. This causes collisions in the APDB that crash the pipeline. - To prevent this, follow the reset instructions under `Databases`_ before calling ``upload.py`` again. + To prevent this, follow the reset instructions under `Databases`_ before calling ``upload.py`` or ``upload_hsc_rc2.py`` again. Databases diff --git a/python/tester/upload.py b/python/tester/upload.py index 64ba8483..b7a1b245 100644 --- a/python/tester/upload.py +++ b/python/tester/upload.py @@ -2,7 +2,6 @@ import dataclasses import itertools -import json import logging import os import random @@ -13,10 +12,10 @@ import boto3 from confluent_kafka import Producer -from astropy.io import fits from activator.raw import Snap, get_raw_path from activator.visit import Visit +from .utils import get_last_group, make_exposure_id, replace_header_key, send_next_visit @dataclasses.dataclass @@ -84,96 +83,6 @@ def process_group(producer, visit_infos, uploader): f"detector: {info.detector}") -def send_next_visit(producer, group, visit_infos): - """Simulate the transmission of a ``next_visit`` message. - - Parameters - ---------- - producer : `confluent_kafka.Producer` - The client that posts ``next_visit`` messages. - group : `str` - The group ID for the message to send. - visit_infos : `set` [`activator.Visit`] - The visit-detector combinations to be sent; each object may - represent multiple snaps. - """ - _log.info(f"Sending next_visit for group: {group}") - topic = "next-visit-topic" - for info in visit_infos: - _log.debug(f"Sending next_visit for group: {info.group} detector: {info.detector} " - f"filter: {info.filter} ra: {info.ra} dec: {info.dec} kind: {info.kind}") - data = json.dumps(info.__dict__).encode("utf-8") - producer.produce(topic, data) - - -def make_hsc_id(group_num, snap): - """Generate an exposure ID that the Butler can parse as a valid HSC ID. - - This function returns a value in the "HSCE########" format (introduced June - 2016) for all exposures, even if the source image is older. - - Parameters - ---------- - group_num : `int` - The integer used to generate a group ID. - snap : `int` - A snap ID. - - Returns - ------- - exposure_header : `str` - An exposure ID that is likely to be unique for each combination of - ``group`` and ``snap``, in the form it appears in HSC headers. - exposure_num : `int` - The exposure ID genereated by Middleware from ``exposure_header``. - - Notes - ----- - The current implementation may overflow if more than ~60 calls to upload.py - are done on the same day. - """ - # This is a bit too dependent on how group_num is generated, but I want the - # group number to be discernible even after compressing to 8 digits. - night_id = (group_num // 100_000) % 2020_00_00 # Always 5 digits - run_id = group_num % 100_000 # Up to 5 digits, but usually 2-3 - exposure_id = (night_id * 1000) + (run_id % 1000) # Always 8 digits - return f"HSCE{exposure_id:08d}", exposure_id - - -def make_exposure_id(instrument, group_num, snap): - """Generate an exposure ID from an exposure's other metadata. - - The exposure ID is designed for insertion into an image header, and is - therefore a string in the instrument's native format. - - Parameters - ---------- - instrument : `str` - The short name of the instrument. - group_num : `int` - The integer used to generate a group ID. - snap : `int` - A snap ID. - - Returns - ------- - exposure_key : `str` - The header key under which ``instrument`` stores the exposure ID. - exposure_header : `str` - An exposure ID that is likely to be unique for each combination of - ``group_num`` and ``snap``, for a given ``instrument``, in the format - for ``instrument``'s header. - exposure_num : `int` - An exposure ID equivalent to ``exposure_header`` in the format expected - by Gen 3 Middleware. - """ - match instrument: - case "HSC": - return "EXP-ID", *make_hsc_id(group_num, snap) - case _: - raise NotImplementedError(f"Exposure ID generation not supported for {instrument}.") - - def main(): if len(sys.argv) < 3: print(f"Usage: {sys.argv[0]} INSTRUMENT N_GROUPS") @@ -208,42 +117,6 @@ def main(): producer.flush(30.0) -def get_last_group(bucket, instrument, date): - """Identify the largest group number or a new group number. - - This number helps decide the next group number so it will not - collide with any previous groups. - - Parameters - ---------- - bucket : `s3.Bucket` - A S3 storage bucket - instrument : `str` - The short name of the active instrument. - date : `str` - The current date in YYYYMMDD format. - - Returns - ------- - group : `int` - The largest existing group for ``instrument``, or a newly generated - group if none exist. - """ - preblobs = bucket.objects.filter( - Prefix=f"{instrument}/", - ) - detector = min((int(preblob.key.split("/")[1]) for preblob in preblobs), default=0) - - blobs = preblobs.filter( - Prefix=f"{instrument}/{detector}/{date}" - ) - prefixes = [int(blob.key.split("/")[2]) for blob in blobs] - if len(prefixes) == 0: - return int(date) * 100_000 - else: - return max(prefixes) - - def get_samples(bucket, instrument): """Return any predefined raw exposures for a given instrument. @@ -346,7 +219,7 @@ def upload_from_raws(producer, instrument, raw_pool, src_bucket, dest_bucket, n_ """ if n_groups > len(raw_pool): raise ValueError(f"Requested {n_groups} groups, but only {len(raw_pool)} " - "unobserved raws are available.") + "unobserved raw groups are available.") for i, true_group in enumerate(itertools.islice(raw_pool, n_groups)): group = str(group_base + i) @@ -371,10 +244,10 @@ def upload_from_pool(visit, snap_id): make_exposure_id(visit.instrument, int(visit.group), snap_id) filename = get_raw_path(visit.instrument, visit.detector, visit.group, snap_id, exposure_num, visit.filter) - # r+b required by _replace_header_key. + # r+b required by replace_header_key. with tempfile.TemporaryFile(mode="r+b") as buffer: src_bucket.download_fileobj(src_blob.key, buffer) - _replace_header_key(buffer, exposure_key, exposure_header) + replace_header_key(buffer, exposure_key, exposure_header) buffer.seek(0) # Assumed by upload_fileobj. dest_bucket.upload_fileobj(buffer, filename) @@ -383,33 +256,5 @@ def upload_from_pool(visit, snap_id): time.sleep(SLEW_INTERVAL) -def _replace_header_key(file, key, value): - """Replace a header key in a FITS file with a new key-value pair. - - The file is updated in place, and left open when the function returns, - making this function safe to use with temporary files. - - Parameters - ---------- - file : file-like object - The file to update. Must already be open in "rb+" mode. - key : `str` - The header key to update. - value : `str` - The value to assign to ``key``. - """ - # Can't use astropy.io.fits.update, because that closes the underlying file. - hdus = fits.open(file, mode="update") - try: - # Don't know which header is supposed to contain the key. - for header in (hdu.header for hdu in hdus): - if key in header: - _log.debug("Setting %s to %s.", key, value) - header[key] = value - finally: - # Clean up HDUList object *without* closing ``file``. - hdus.close(closed=False) - - if __name__ == "__main__": main() diff --git a/python/tester/upload_hsc_rc2.py b/python/tester/upload_hsc_rc2.py new file mode 100644 index 00000000..40dd2dba --- /dev/null +++ b/python/tester/upload_hsc_rc2.py @@ -0,0 +1,215 @@ +# This file is part of prompt_prototype. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import logging +import os +import random +import socket +import sys +import tempfile +import time + +import boto3 +from confluent_kafka import Producer + +from lsst.daf.butler import Butler + +from activator.raw import get_raw_path +from activator.visit import Visit +from .utils import get_last_group, make_exposure_id, replace_header_key, send_next_visit + + +EXPOSURE_INTERVAL = 18 +SLEW_INTERVAL = 2 + +# Kafka server +kafka_cluster = os.environ["KAFKA_CLUSTER"] + +logging.basicConfig( + format="{levelname} {asctime} {name} - {message}", + style="{", +) +_log = logging.getLogger("lsst." + __name__) +_log.setLevel(logging.DEBUG) + + +def main(): + if len(sys.argv) < 2: + print(f"Usage: {sys.argv[0]} N_GROUPS") + sys.exit(1) + n_groups = int(sys.argv[1]) + + date = time.strftime("%Y%m%d") + + endpoint_url = "https://s3dfrgw.slac.stanford.edu" + s3 = boto3.resource("s3", endpoint_url=endpoint_url) + dest_bucket = s3.Bucket("rubin-pp") + + producer = Producer( + {"bootstrap.servers": kafka_cluster, "client.id": socket.gethostname()} + ) + + last_group = get_last_group(dest_bucket, "HSC", date) + group_num = last_group + random.randrange(10, 19) + _log.debug(f"Last group {last_group}; new group base {group_num}") + + butler = Butler("/repo/main") + visit_list = get_hsc_visit_list(butler, n_groups) + try: + for visit in visit_list: + group_num += 1 + _log.info(f"Slewing to group {group_num}, with HSC visit {visit}") + time.sleep(SLEW_INTERVAL) + refs = prepare_one_visit(producer, str(group_num), butler, visit) + _log.info(f"Taking exposure for group {group_num}") + time.sleep(EXPOSURE_INTERVAL) + _log.info(f"Uploading detector images for group {group_num}") + upload_hsc_images(dest_bucket, str(group_num), butler, refs) + finally: + producer.flush(30.0) + + +def get_hsc_visit_list(butler, n_sample): + """Return a list of randomly selected raw visits from HSC-RC2 in the butler repo. + + Parameters + ---------- + butler : `lsst.daf.butler.Butler` + The Butler in which to search for records of raw data. + n_sample: `int` + The number of visits to select. + + Returns + ------- + visits : `list` [`int`] + A list of randomly selected visit IDs from the HSC-RC2 dataset + """ + results = butler.registry.queryDimensionRecords( + "visit", + datasets="raw", + collections="HSC/RC2/defaults", + where="instrument='HSC' and exposure.observation_type='science'", + ) + rc2 = [record.id for record in set(results)] + visits = random.choices(rc2, k=n_sample) + return visits + + +def prepare_one_visit(producer, group_id, butler, visit_id): + """Extract metadata and send next_visit events for one HSC-RC2 visit + + One ``next_visit`` message is sent for each detector, to mimic the + current prototype design in which a single message is sent from the + Summit to the USDF and then a USDF-based server translates it into + multiple messages. + + Parameters + ---------- + producer : `confluent_kafka.Producer` + The client that posts ``next_visit`` messages. + group_id : `str` + The group ID for the message to send. + butler : `lsst.daf.butler.Butler` + The Butler with the raw data. + visit_id : `int` + The ID of a visit in the HSC-RC2 dataset. + + Returns + ------- + refs : iterable of `lsst.daf.butler.DatasetRef` + The datasets for which the events are sent. + """ + refs = butler.registry.queryDatasets( + datasetType="raw", + collections="HSC/RC2/defaults", + dataId={"exposure": visit_id, "instrument": "HSC"}, + ) + + visits = set() + for dataId in refs.dataIds.expanded(): + visit = Visit( + instrument="HSC", + detector=dataId.records["detector"].id, + group=group_id, + snaps=1, + filter=dataId.records["physical_filter"].name, + ra=dataId.records["exposure"].tracking_ra, + dec=dataId.records["exposure"].tracking_dec, + rot=dataId.records["exposure"].sky_angle, + kind="SURVEY", + ) + visits.add(visit) + + send_next_visit(producer, group_id, visits) + + return refs + + +def upload_hsc_images(dest_bucket, group_id, butler, refs): + """Upload one group of raw HSC images to the central repo + + Parameters + ---------- + dest_bucket: `S3.Bucket` + The bucket to which to upload the images. + group_id : `str` + The group ID under which to store the images. + butler : `lsst.daf.butler.Butler` + The source Butler with the raw data. + refs : iterable of `lsst.daf.butler.DatasetRef` + The datasets to upload + """ + exposure_key, exposure_header, exposure_num = make_exposure_id("HSC", int(group_id), 0) + with tempfile.TemporaryDirectory() as temp_dir: + # Each ref is done separately because butler.retrieveArtifacts does not preserve the order. + for ref in refs: + dest_key = get_raw_path( + "HSC", + ref.dataId["detector"], + group_id, + 0, + ref.dataId["exposure"], + ref.dataId["physical_filter"], + ) + transferred = butler.retrieveArtifacts( + [ref], + transfer="copy", + preserve_path=False, + destination=temp_dir, + ) + if len(transferred) != 1: + _log.error( + f"{ref} has multitple artifacts and cannot be handled by current implementation" + ) + continue + + path = transferred[0].path + _log.debug( + f"Raw file for {ref.dataId} was copied from Butler to {path}" + ) + with open(path, "r+b") as temp_file: + replace_header_key(temp_file, exposure_key, exposure_header) + dest_bucket.upload_file(path, dest_key) + _log.debug(f"{dest_key} was written at {dest_bucket}") + + +if __name__ == "__main__": + main() diff --git a/python/tester/utils.py b/python/tester/utils.py new file mode 100644 index 00000000..475e862a --- /dev/null +++ b/python/tester/utils.py @@ -0,0 +1,186 @@ +# This file is part of prompt_prototype. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +__all__ = ["get_last_group", "make_exposure_id", "replace_header_key", "send_next_visit"] + +import json +import logging + +from astropy.io import fits + + +_log = logging.getLogger("lsst." + __name__) +_log.setLevel(logging.DEBUG) + + +def get_last_group(bucket, instrument, date): + """Identify the largest group number or a new group number. + + This number helps decide the next group number so it will not + collide with any previous groups. + + Parameters + ---------- + bucket : `s3.Bucket` + A S3 storage bucket + instrument : `str` + The short name of the active instrument. + date : `str` + The current date in YYYYMMDD format. + + Returns + ------- + group : `int` + The largest existing group for ``instrument``, or a newly generated + group if none exist. + """ + preblobs = bucket.objects.filter( + Prefix=f"{instrument}/", + ) + detector = min((int(preblob.key.split("/")[1]) for preblob in preblobs), default=0) + + blobs = preblobs.filter( + Prefix=f"{instrument}/{detector}/{date}" + ) + prefixes = [int(blob.key.split("/")[2]) for blob in blobs] + if len(prefixes) == 0: + return int(date) * 100_000 + else: + return max(prefixes) + + +def make_exposure_id(instrument, group_num, snap): + """Generate an exposure ID from an exposure's other metadata. + + The exposure ID is designed for insertion into an image header, and is + therefore a string in the instrument's native format. + + Parameters + ---------- + instrument : `str` + The short name of the instrument. + group_num : `int` + The integer used to generate a group ID. + snap : `int` + A snap ID. + + Returns + ------- + exposure_key : `str` + The header key under which ``instrument`` stores the exposure ID. + exposure_header : `str` + An exposure ID that is likely to be unique for each combination of + ``group_num`` and ``snap``, for a given ``instrument``, in the format + for ``instrument``'s header. + exposure_num : `int` + An exposure ID equivalent to ``exposure_header`` in the format expected + by Gen 3 Middleware. + """ + match instrument: + case "HSC": + return "EXP-ID", *make_hsc_id(group_num, snap) + case _: + raise NotImplementedError(f"Exposure ID generation not supported for {instrument}.") + + +def make_hsc_id(group_num, snap): + """Generate an exposure ID that the Butler can parse as a valid HSC ID. + + This function returns a value in the "HSCE########" format (introduced June + 2016) for all exposures, even if the source image is older. + + Parameters + ---------- + group_num : `int` + The integer used to generate a group ID. + snap : `int` + A snap ID. + + Returns + ------- + exposure_header : `str` + An exposure ID that is likely to be unique for each combination of + ``group`` and ``snap``, in the form it appears in HSC headers. + exposure_num : `int` + The exposure ID genereated by Middleware from ``exposure_header``. + + Notes + ----- + The current implementation allows up to 1000 group numbers per day. + It can overflow with ~60 calls to upload.py on the same day or + upload_hsc_rc2.py with a large N_GROUPS. + """ + # This is a bit too dependent on how group_num is generated, but I want the + # group number to be discernible even after compressing to 8 digits. + night_id = (group_num // 100_000) % 2020_00_00 # Always 5 digits + run_id = group_num % 100_000 # Up to 5 digits, but usually 2-3 + exposure_id = (night_id * 1000) + (run_id % 1000) # Always 8 digits + return f"HSCE{exposure_id:08d}", exposure_id + + +def send_next_visit(producer, group, visit_infos): + """Simulate the transmission of a ``next_visit`` message. + + Parameters + ---------- + producer : `confluent_kafka.Producer` + The client that posts ``next_visit`` messages. + group : `str` + The group ID for the message to send. + visit_infos : `set` [`activator.Visit`] + The visit-detector combinations to be sent; each object may + represent multiple snaps. + """ + _log.info(f"Sending next_visit for group: {group}") + topic = "next-visit-topic" + for info in visit_infos: + _log.debug(f"Sending next_visit for group: {info.group} detector: {info.detector} " + f"filter: {info.filter} ra: {info.ra} dec: {info.dec} kind: {info.kind}") + data = json.dumps(info.__dict__).encode("utf-8") + producer.produce(topic, data) + + +def replace_header_key(file, key, value): + """Replace a header key in a FITS file with a new key-value pair. + + The file is updated in place, and left open when the function returns, + making this function safe to use with temporary files. + + Parameters + ---------- + file : file-like object + The file to update. Must already be open in "rb+" mode. + key : `str` + The header key to update. + value : `str` + The value to assign to ``key``. + """ + # Can't use astropy.io.fits.update, because that closes the underlying file. + hdus = fits.open(file, mode="update") + try: + # Don't know which header is supposed to contain the key. + for header in (hdu.header for hdu in hdus): + if key in header: + _log.debug("Setting %s to %s.", key, value) + header[key] = value + finally: + # Clean up HDUList object *without* closing ``file``. + hdus.close(closed=False) diff --git a/tests/test_tester_utils.py b/tests/test_tester_utils.py new file mode 100644 index 00000000..4fff0cd5 --- /dev/null +++ b/tests/test_tester_utils.py @@ -0,0 +1,78 @@ +# This file is part of prompt_prototype. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import unittest + +import boto3 +import botocore +from moto import mock_s3 + +from activator.raw import get_raw_path +from tester.utils import get_last_group + + +class TesterUtilsTest(unittest.TestCase): + """Test components in tester. + """ + mock_s3 = mock_s3() + bucket_name = "testBucketName" + + def setUp(self): + self.mock_s3.start() + s3 = boto3.resource("s3") + s3.create_bucket(Bucket=self.bucket_name) + + path = get_raw_path("TestCam", 123, "2022110200001", 2, 30, "TestFilter") + obj = s3.Object(self.bucket_name, path) + obj.put(Body=b'test1') + path = get_raw_path("TestCam", 123, "2022110200002", 2, 30, "TestFilter") + obj = s3.Object(self.bucket_name, path) + obj.put(Body=b'test2') + + def tearDown(self): + s3 = boto3.resource("s3") + bucket = s3.Bucket(self.bucket_name) + try: + try: + bucket.objects.all().delete() + except botocore.exceptions.ClientError as e: + if e.response["Error"]["Code"] == "404": + # the key was not reachable - pass + pass + else: + raise + finally: + bucket = s3.Bucket(self.bucket_name) + bucket.delete() + finally: + # Stop the S3 mock. + self.mock_s3.stop() + + def test_get_last_group(self): + s3 = boto3.resource("s3") + bucket = s3.Bucket(self.bucket_name) + + last_group = get_last_group(bucket, "TestCam", "20221102") + self.assertEqual(last_group, 2022110200002) + + # Test the case of no match + last_group = get_last_group(bucket, "TestCam", "20110101") + self.assertEqual(last_group, int(20110101) * 100_000)