Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions doc/playbook.rst
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ A few useful commands for managing the service:
tester
======

``python/tester/upload.py`` is a script that simulates the CCS image writer.
``python/tester/upload.py`` and ``python/tester/upload_hsc_rc2.py`` are scripts that simulate the CCS image writer.
It can be run from ``rubin-devl``, but requires the user to install the ``confluent_kafka`` package in their environment.

You must have a profile set up for the ``rubin-pp`` bucket (see `Buckets`_, above), and must set the ``KAFKA_CLUSTER`` environment variable.
Expand All @@ -271,24 +271,39 @@ Install the prototype code:

git clone https://github.com/lsst-dm/prompt_prototype

Command line arguments are the instrument name (currently HSC only) and the number of groups of images to send.
The tester scripts send ``next_visit`` events for each detector via Kafka on the ``next-visit-topic`` topic.
They then upload a batch of files representing the snaps of the visit to the ``rubin-pp`` S3 bucket, simulating incoming raw images.

Eventually a set of parallel processes running on multiple nodes will be needed to upload the images sufficiently rapidly.

``python/tester/upload.py``: Command line arguments are the instrument name (currently HSC only) and the number of groups of images to send.

Sample command line:

.. code-block:: sh

python upload.py HSC 3

It sends ``next_visit`` events for each detector via Kafka on the ``next-visit-topic`` topic.
It then uploads a batch of files representing the snaps of the visit to the ``rubin-pp`` S3 bucket.
This draw images from 4 groups, in total 10 raw files, stored in the ``rubin-pp-users`` bucket.

``python/tester/upload_hsc_rc2.py``: Command line argument is the number of groups of images to send.

Sample command line:

.. code-block:: sh

python upload_hsc_rc2.py 3

This scripts draws images from the curated ``HSC/RC2/defaults`` collection at USDF's ``/repo/main`` butler repository.
The source collection includes 432 visits, each with 103 detector images.
The visits are randomly selected and uploaded as one new group for each visit.

Eventually a set of parallel processes running on multiple nodes will be needed to upload the images sufficiently rapidly.

.. note::

``upload.py`` uploads from the same small pool of raws every time it is run, while the APDB assumes that every visit has unique timestamps.
Both of the tester scripts use data from a limited pool of raws every time it is run, while the APDB assumes that every visit has unique timestamps.
This causes collisions in the APDB that crash the pipeline.
To prevent this, follow the reset instructions under `Databases`_ before calling ``upload.py`` again.
To prevent this, follow the reset instructions under `Databases`_ before calling ``upload.py`` or ``upload_hsc_rc2.py`` again.


Databases
Expand Down
163 changes: 4 additions & 159 deletions python/tester/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import dataclasses
import itertools
import json
import logging
import os
import random
Expand All @@ -13,10 +12,10 @@

import boto3
from confluent_kafka import Producer
from astropy.io import fits

from activator.raw import Snap, get_raw_path
from activator.visit import Visit
from .utils import get_last_group, make_exposure_id, replace_header_key, send_next_visit


@dataclasses.dataclass
Expand Down Expand Up @@ -84,96 +83,6 @@ def process_group(producer, visit_infos, uploader):
f"detector: {info.detector}")


def send_next_visit(producer, group, visit_infos):
"""Simulate the transmission of a ``next_visit`` message.

Parameters
----------
producer : `confluent_kafka.Producer`
The client that posts ``next_visit`` messages.
group : `str`
The group ID for the message to send.
visit_infos : `set` [`activator.Visit`]
The visit-detector combinations to be sent; each object may
represent multiple snaps.
"""
_log.info(f"Sending next_visit for group: {group}")
topic = "next-visit-topic"
for info in visit_infos:
_log.debug(f"Sending next_visit for group: {info.group} detector: {info.detector} "
f"filter: {info.filter} ra: {info.ra} dec: {info.dec} kind: {info.kind}")
data = json.dumps(info.__dict__).encode("utf-8")
producer.produce(topic, data)


def make_hsc_id(group_num, snap):
"""Generate an exposure ID that the Butler can parse as a valid HSC ID.

This function returns a value in the "HSCE########" format (introduced June
2016) for all exposures, even if the source image is older.

Parameters
----------
group_num : `int`
The integer used to generate a group ID.
snap : `int`
A snap ID.

Returns
-------
exposure_header : `str`
An exposure ID that is likely to be unique for each combination of
``group`` and ``snap``, in the form it appears in HSC headers.
exposure_num : `int`
The exposure ID genereated by Middleware from ``exposure_header``.

Notes
-----
The current implementation may overflow if more than ~60 calls to upload.py
are done on the same day.
"""
# This is a bit too dependent on how group_num is generated, but I want the
# group number to be discernible even after compressing to 8 digits.
night_id = (group_num // 100_000) % 2020_00_00 # Always 5 digits
run_id = group_num % 100_000 # Up to 5 digits, but usually 2-3
exposure_id = (night_id * 1000) + (run_id % 1000) # Always 8 digits
return f"HSCE{exposure_id:08d}", exposure_id


def make_exposure_id(instrument, group_num, snap):
"""Generate an exposure ID from an exposure's other metadata.

The exposure ID is designed for insertion into an image header, and is
therefore a string in the instrument's native format.

Parameters
----------
instrument : `str`
The short name of the instrument.
group_num : `int`
The integer used to generate a group ID.
snap : `int`
A snap ID.

Returns
-------
exposure_key : `str`
The header key under which ``instrument`` stores the exposure ID.
exposure_header : `str`
An exposure ID that is likely to be unique for each combination of
``group_num`` and ``snap``, for a given ``instrument``, in the format
for ``instrument``'s header.
exposure_num : `int`
An exposure ID equivalent to ``exposure_header`` in the format expected
by Gen 3 Middleware.
"""
match instrument:
case "HSC":
return "EXP-ID", *make_hsc_id(group_num, snap)
case _:
raise NotImplementedError(f"Exposure ID generation not supported for {instrument}.")


def main():
if len(sys.argv) < 3:
print(f"Usage: {sys.argv[0]} INSTRUMENT N_GROUPS")
Expand Down Expand Up @@ -208,42 +117,6 @@ def main():
producer.flush(30.0)


def get_last_group(bucket, instrument, date):
"""Identify the largest group number or a new group number.

This number helps decide the next group number so it will not
collide with any previous groups.

Parameters
----------
bucket : `s3.Bucket`
A S3 storage bucket
instrument : `str`
The short name of the active instrument.
date : `str`
The current date in YYYYMMDD format.

Returns
-------
group : `int`
The largest existing group for ``instrument``, or a newly generated
group if none exist.
"""
preblobs = bucket.objects.filter(
Prefix=f"{instrument}/",
)
detector = min((int(preblob.key.split("/")[1]) for preblob in preblobs), default=0)

blobs = preblobs.filter(
Prefix=f"{instrument}/{detector}/{date}"
)
prefixes = [int(blob.key.split("/")[2]) for blob in blobs]
if len(prefixes) == 0:
return int(date) * 100_000
else:
return max(prefixes)


def get_samples(bucket, instrument):
"""Return any predefined raw exposures for a given instrument.

Expand Down Expand Up @@ -346,7 +219,7 @@ def upload_from_raws(producer, instrument, raw_pool, src_bucket, dest_bucket, n_
"""
if n_groups > len(raw_pool):
raise ValueError(f"Requested {n_groups} groups, but only {len(raw_pool)} "
"unobserved raws are available.")
"unobserved raw groups are available.")

for i, true_group in enumerate(itertools.islice(raw_pool, n_groups)):
group = str(group_base + i)
Expand All @@ -371,10 +244,10 @@ def upload_from_pool(visit, snap_id):
make_exposure_id(visit.instrument, int(visit.group), snap_id)
filename = get_raw_path(visit.instrument, visit.detector, visit.group, snap_id,
exposure_num, visit.filter)
# r+b required by _replace_header_key.
# r+b required by replace_header_key.
with tempfile.TemporaryFile(mode="r+b") as buffer:
src_bucket.download_fileobj(src_blob.key, buffer)
_replace_header_key(buffer, exposure_key, exposure_header)
replace_header_key(buffer, exposure_key, exposure_header)
buffer.seek(0) # Assumed by upload_fileobj.
dest_bucket.upload_fileobj(buffer, filename)

Expand All @@ -383,33 +256,5 @@ def upload_from_pool(visit, snap_id):
time.sleep(SLEW_INTERVAL)


def _replace_header_key(file, key, value):
"""Replace a header key in a FITS file with a new key-value pair.

The file is updated in place, and left open when the function returns,
making this function safe to use with temporary files.

Parameters
----------
file : file-like object
The file to update. Must already be open in "rb+" mode.
key : `str`
The header key to update.
value : `str`
The value to assign to ``key``.
"""
# Can't use astropy.io.fits.update, because that closes the underlying file.
hdus = fits.open(file, mode="update")
try:
# Don't know which header is supposed to contain the key.
for header in (hdu.header for hdu in hdus):
if key in header:
_log.debug("Setting %s to %s.", key, value)
header[key] = value
finally:
# Clean up HDUList object *without* closing ``file``.
hdus.close(closed=False)


if __name__ == "__main__":
main()
Loading