Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 18 additions & 31 deletions python/tester/upload_from_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

from lsst.utils.timer import time_this
from lsst.daf.butler import Butler
from lsst.resources import ResourcePath

from shared.raw import get_raw_path, _LSST_CAMERA_LIST
from shared.visit import SummitVisit
Expand Down Expand Up @@ -148,12 +149,13 @@ def main():
for visit in visit_list:
group = increment_group(instrument, group, 1)
refs = prepare_one_visit(kafka_url, group, butler, instrument, visit)
ref_dict = butler.get_many_uris(refs)
_log.info(f"Slewing to group {group}, with {instrument} visit {visit}")
time.sleep(SLEW_INTERVAL)
_log.info(f"Taking exposure for group {group}")
time.sleep(EXPOSURE_INTERVAL)
_log.info(f"Uploading detector images for group {group}")
upload_images(pool, temp_dir, group, butler, refs)
upload_images(pool, temp_dir, group, ref_dict)
pool.close()
_log.info("Waiting for uploads to finish...")
pool.join()
Expand Down Expand Up @@ -249,7 +251,7 @@ def prepare_one_visit(kafka_url, group_id, butler, instrument, visit_id):
return refs


def upload_images(pool, temp_dir, group_id, butler, refs):
def upload_images(pool, temp_dir, group_id, ref_dict):
"""Upload one group of raw images to the central repo

Parameters
Expand All @@ -261,19 +263,18 @@ def upload_images(pool, temp_dir, group_id, butler, refs):
metadata can be modified.
group_id : `str`
The group ID under which to store the images.
butler : `lsst.daf.butler.Butler`
The source Butler with the raw data.
refs : iterable of `lsst.daf.butler.DatasetRef`
The datasets to upload
ref_dict : `dict` [ `lsst.daf.butler.DatasetRef`, `lsst.daf.butler.datastore.DatasetRefURIs` ]
A dict of the datasetRefs to upload and their corresponding URIs.
"""
# Non-blocking assignment lets us upload during the next exposure.
# Can't time these tasks directly, but the blocking equivalent took
# 12-20 s depending on tuning, or less than a single exposure.
pool.starmap_async(_upload_one_image,
[(temp_dir, group_id, butler, ref) for ref in refs],
error_callback=_log.exception,
chunksize=5 # Works well across a broad range of # processes
)
pool.starmap_async(
_upload_one_image,
[(temp_dir, group_id, r, ref_dict[r].primaryURI) for r in ref_dict],
error_callback=_log.exception,
chunksize=5 # Works well across a broad range of # processes
)


def _get_max_processes():
Expand All @@ -291,7 +292,7 @@ def _get_max_processes():
return 4


def _upload_one_image(temp_dir, group_id, butler, ref):
def _upload_one_image(temp_dir, group_id, ref, uri):
"""Upload a raw image to the central repo.

Parameters
Expand All @@ -301,10 +302,10 @@ def _upload_one_image(temp_dir, group_id, butler, ref):
metadata can be modified.
group_id : `str`
The group ID under which to store the images.
butler : `lsst.daf.butler.Butler`
The source Butler with the raw data.
ref : `lsst.daf.butler.DatasetRef`
The dataset to upload.
uri : `lsst.resources.ResourcePath`
URI to the image to upload.
"""
instrument = ref.dataId["instrument"]
with time_this(log=_log, msg="Single-image processing", prefix=None):
Expand All @@ -321,7 +322,7 @@ def _upload_one_image(temp_dir, group_id, butler, ref):
sidecar_uploaded = False
if instrument in _LSST_CAMERA_LIST:
# Upload a corresponding sidecar json file
sidecar = butler.getURI(ref).updatedExtension("json")
sidecar = uri.updatedExtension("json")
if sidecar.exists():
with sidecar.open("r") as f:
md = json.load(f)
Expand All @@ -331,22 +332,8 @@ def _upload_one_image(temp_dir, group_id, butler, ref):
)
sidecar_uploaded = True

# Each ref is done separately because butler.retrieveArtifacts does not preserve the order.
transferred = butler.retrieveArtifacts(
[ref],
transfer="copy",
preserve_path=False,
destination=temp_dir,
)
if len(transferred) != 1:
_log.error(
f"{ref} has multiple artifacts and cannot be handled by current implementation"
)
for transfer in transferred:
os.remove(transfer.path)
return

path = transferred[0].path
path = os.path.join(temp_dir, uri.basename())
ResourcePath(path).transfer_from(uri, transfer="copy")
Copy link
Member

@timj timj Apr 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely sure what this code is trying to do, but it looks like it's downloading a file from remote to posix path, optionally extracting a sidecar json, and then uploading the raw it to a new location. Is that correct?

Why isn't this doing a direct transfer from one location to another using ResourcePath? Secondly, why isn't the JSON header extracted in the remote using astropy.io.fits with ResourcePath.to_fsspec? Would simplify the code a lot (and use ResourcePath.write without boto calls).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a direct transfer because it needs to modify some headers in the files

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we modifying the headers of raw files?

Copy link
Collaborator

@hsinfang hsinfang Apr 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we are giving them new group ID, new exposure ID, etc to fake new exposures. The script sends a next_visit event with that metadata, and the prompt service will attempt to process the matching exposure (matching via the headers).

_log.debug(
f"Raw file for {ref.dataId} was copied from Butler to {path}"
)
Expand Down