Skip to content

Commit

Permalink
Merge pull request #154 from lsst-dm/tickets/DM-43913
Browse files Browse the repository at this point in the history
DM-43913: Make tester upload_hsc_rc2.py work with LATISS/LSSTComCamSim
  • Loading branch information
hsinfang committed May 17, 2024
2 parents 4be1cf4 + 9f85050 commit 0ba5ec0
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 76 deletions.
18 changes: 11 additions & 7 deletions doc/playbook.rst
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ If in doubt, check the logs first.
Testers
=======

``python/tester/upload.py`` and ``python/tester/upload_hsc_rc2.py`` are scripts that simulate the CCS image writer.
``python/tester/upload.py`` and ``python/tester/upload_from_repo.py`` are scripts that simulate the CCS image writer.
It can be run from ``rubin-devl``, but requires the user to install the ``confluent_kafka`` package in their environment.

You must have a profile set up for the ``rubin-pp-dev`` bucket (see `Buckets`_, above).
Expand Down Expand Up @@ -410,17 +410,21 @@ This script draws images stored in the ``rubin-pp-dev-users`` bucket.
This visit can test pipeline fallback features.
* For LSSTComCamSim, 2 groups, in total 18 raw fits files and their corresponding json metadata files, are curated.

``python/tester/upload_hsc_rc2.py``: Command line argument is the number of groups of images to send.
``python/tester/upload_from_repo.py``: Command line arguments are a configuration file and the number of groups of images to send.

Sample command line:

.. code-block:: sh
python upload_hsc_rc2.py 3
python upload_from_repo.py $PROMPT_PROCESSING_DIR/etc/tester/HSC.yaml 3
python upload_from_repo.py $PROMPT_PROCESSING_DIR/etc/tester/LATISS.yaml 4
python upload_from_repo.py $PROMPT_PROCESSING_DIR/etc/tester/LSSTComCamSim.yaml 2 --ordered
This scripts draws images from the curated ``HSC/RC2/defaults`` collection at USDF's ``/repo/main`` butler repository.
The source collection includes 432 visits, each with 103 detector images.
The visits are randomly selected and uploaded as one new group for each visit.
This scripts draws images from a butler repository as defined in the input configuration file.
A butler query constrains the data selection.
By default, visits are randomly selected and uploaded as one new group for each visit.
With the optional ``--ordered`` command line argument, images are uploaded following the order of the original exposure IDs.
Currently the upload script does not follow the actual relative timing of the input exposures.
Images can be uploaded in parallel processes.


Expand All @@ -432,7 +436,7 @@ The schema of the ``next_visit`` events from the summit can be found at `ScriptQ
To implement schema changes in the development environment:

* Update the ``*Visit`` classes in ``python/activator/visit.py`` accordingly.
* Update the upload tester scripts ``python/tester/upload.py`` and ``python/tester/upload_hsc_rc2.py`` where simulated ``next_visit`` events originate.
* Update the upload tester scripts ``python/tester/upload.py`` and ``python/tester/upload_from_repo.py`` where simulated ``next_visit`` events originate.
* Update relevant unit tests.
* Register the new schema to the Sasquatch's schema registry for the ``test.next-visit`` topic.
The `Sasquatch documentation <https://sasquatch.lsst.io/user-guide/avro.html>`_ describes the schema evolution.
Expand Down
6 changes: 6 additions & 0 deletions etc/tester/HSC.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
instrument: HSC
query:
collections: HSC/RC2/defaults
# Select only non-narrow bands
where: "exposure.observation_type='science' and band in ('g', 'r', 'i', 'z', 'y')"
repo: /repo/main
5 changes: 5 additions & 0 deletions etc/tester/LATISS.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
instrument: LATISS
query:
collections: LATISS/raw/all
where: "exposure.science_program='AUXTEL_PHOTO_IMAGING' and exposure.day_obs=20240403"
repo: /repo/embargo
5 changes: 5 additions & 0 deletions etc/tester/LSSTComCamSim.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
instrument: LSSTComCamSim
query:
collections: LSSTComCamSim/raw/all
where: "exposure.science_program='ops-rehearsal-3' and exposure.day_obs=20240404"
repo: /repo/embargo
4 changes: 4 additions & 0 deletions python/activator/middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -1425,6 +1425,9 @@ def clean_local_repo(self, exposure_ids: set[int]) -> None:
# VALIDITY-HACK: remove cached calibs to avoid future conflicts
if not cache_calibs:
calib_chain = self.instrument.makeCalibrationCollectionName()
calib_refs = self.butler.registry.queryDatasets(
...,
collections=calib_chain)
calib_taggeds = self.butler.registry.queryCollections(
calib_chain,
flattenChains=True,
Expand All @@ -1434,6 +1437,7 @@ def clean_local_repo(self, exposure_ids: set[int]) -> None:
flattenChains=True,
collectionTypes=CollectionType.RUN)
self.butler.collection_chains.redefine_chain(calib_chain, [])
self.butler.pruneDatasets(calib_refs, disassociate=True, unstore=True, purge=True)
for member in calib_taggeds:
self.butler.registry.removeCollection(member)
self.butler.removeRuns(calib_runs, unstore=True)
Expand Down
36 changes: 11 additions & 25 deletions python/tester/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
)
from activator.visit import FannedOutVisit, SummitVisit
from tester.utils import (
INSTRUMENTS,
get_last_group,
increment_group,
make_exposure_id,
Expand All @@ -52,21 +53,6 @@
)


@dataclasses.dataclass
class Instrument:
n_snaps: int
n_detectors: int
sal_index: int = 0


INSTRUMENTS = {
"LSSTCam": Instrument(2, 189 + 8 + 8, 1),
"LSSTComCam": Instrument(2, 9, 1),
"LSSTComCamSim": Instrument(1, 9, 3),
"LATISS": Instrument(1, 1, 2),
"DECam": Instrument(1, 62),
"HSC": Instrument(1, 112),
}
EXPOSURE_INTERVAL = 18
SLEW_INTERVAL = 2

Expand Down Expand Up @@ -384,14 +370,6 @@ def upload_from_pool(visit, snap_id):
make_exposure_id(visit.instrument, visit.groupId, snap_id)
filename = get_raw_path(visit.instrument, visit.detector, visit.groupId, snap_id,
exposure_num, visit.filters)
# r+b required by replace_header_key.
with tempfile.TemporaryFile(mode="r+b") as buffer:
src_bucket.download_fileobj(src_blob.key, buffer)
for header_key in headers:
replace_header_key(buffer, header_key, headers[header_key])
buffer.seek(0) # Assumed by upload_fileobj.
dest_bucket.upload_fileobj(buffer, filename)
_log.debug(f"{filename} is uploaded to {dest_bucket}")

if instrument in _LSST_CAMERA_LIST:
# Upload a corresponding sidecar json file
Expand All @@ -401,10 +379,18 @@ def upload_from_pool(visit, snap_id):
filename_sidecar = filename.removesuffix("fits") + "json"
with sidecar.open("r") as f:
md = json.load(f)
for header_key in headers:
md[header_key] = headers[header_key]
md.update(headers)
dest_bucket.put_object(Body=json.dumps(md), Key=filename_sidecar)

# r+b required by replace_header_key.
with tempfile.TemporaryFile(mode="r+b") as buffer:
src_bucket.download_fileobj(src_blob.key, buffer)
for header_key in headers:
replace_header_key(buffer, header_key, headers[header_key])
buffer.seek(0) # Assumed by upload_fileobj.
dest_bucket.upload_fileobj(buffer, filename)
_log.debug(f"{filename} is uploaded to {dest_bucket}")

process_group(kafka_url, visit_infos, upload_from_pool)


Expand Down

0 comments on commit 0ba5ec0

Please sign in to comment.