diff --git a/python/tester/upload_from_repo.py b/python/tester/upload_from_repo.py index b5402eed..773ae05a 100644 --- a/python/tester/upload_from_repo.py +++ b/python/tester/upload_from_repo.py @@ -36,6 +36,7 @@ from lsst.utils.timer import time_this from lsst.daf.butler import Butler +from lsst.resources import ResourcePath from shared.raw import get_raw_path, _LSST_CAMERA_LIST from shared.visit import SummitVisit @@ -148,12 +149,13 @@ def main(): for visit in visit_list: group = increment_group(instrument, group, 1) refs = prepare_one_visit(kafka_url, group, butler, instrument, visit) + ref_dict = butler.get_many_uris(refs) _log.info(f"Slewing to group {group}, with {instrument} visit {visit}") time.sleep(SLEW_INTERVAL) _log.info(f"Taking exposure for group {group}") time.sleep(EXPOSURE_INTERVAL) _log.info(f"Uploading detector images for group {group}") - upload_images(pool, temp_dir, group, butler, refs) + upload_images(pool, temp_dir, group, ref_dict) pool.close() _log.info("Waiting for uploads to finish...") pool.join() @@ -249,7 +251,7 @@ def prepare_one_visit(kafka_url, group_id, butler, instrument, visit_id): return refs -def upload_images(pool, temp_dir, group_id, butler, refs): +def upload_images(pool, temp_dir, group_id, ref_dict): """Upload one group of raw images to the central repo Parameters @@ -261,19 +263,18 @@ def upload_images(pool, temp_dir, group_id, butler, refs): metadata can be modified. group_id : `str` The group ID under which to store the images. - butler : `lsst.daf.butler.Butler` - The source Butler with the raw data. - refs : iterable of `lsst.daf.butler.DatasetRef` - The datasets to upload + ref_dict : `dict` [ `lsst.daf.butler.DatasetRef`, `lsst.daf.butler.datastore.DatasetRefURIs` ] + A dict of the datasetRefs to upload and their corresponding URIs. """ # Non-blocking assignment lets us upload during the next exposure. # Can't time these tasks directly, but the blocking equivalent took # 12-20 s depending on tuning, or less than a single exposure. - pool.starmap_async(_upload_one_image, - [(temp_dir, group_id, butler, ref) for ref in refs], - error_callback=_log.exception, - chunksize=5 # Works well across a broad range of # processes - ) + pool.starmap_async( + _upload_one_image, + [(temp_dir, group_id, r, ref_dict[r].primaryURI) for r in ref_dict], + error_callback=_log.exception, + chunksize=5 # Works well across a broad range of # processes + ) def _get_max_processes(): @@ -291,7 +292,7 @@ def _get_max_processes(): return 4 -def _upload_one_image(temp_dir, group_id, butler, ref): +def _upload_one_image(temp_dir, group_id, ref, uri): """Upload a raw image to the central repo. Parameters @@ -301,10 +302,10 @@ def _upload_one_image(temp_dir, group_id, butler, ref): metadata can be modified. group_id : `str` The group ID under which to store the images. - butler : `lsst.daf.butler.Butler` - The source Butler with the raw data. ref : `lsst.daf.butler.DatasetRef` The dataset to upload. + uri : `lsst.resources.ResourcePath` + URI to the image to upload. """ instrument = ref.dataId["instrument"] with time_this(log=_log, msg="Single-image processing", prefix=None): @@ -321,7 +322,7 @@ def _upload_one_image(temp_dir, group_id, butler, ref): sidecar_uploaded = False if instrument in _LSST_CAMERA_LIST: # Upload a corresponding sidecar json file - sidecar = butler.getURI(ref).updatedExtension("json") + sidecar = uri.updatedExtension("json") if sidecar.exists(): with sidecar.open("r") as f: md = json.load(f) @@ -331,22 +332,8 @@ def _upload_one_image(temp_dir, group_id, butler, ref): ) sidecar_uploaded = True - # Each ref is done separately because butler.retrieveArtifacts does not preserve the order. - transferred = butler.retrieveArtifacts( - [ref], - transfer="copy", - preserve_path=False, - destination=temp_dir, - ) - if len(transferred) != 1: - _log.error( - f"{ref} has multiple artifacts and cannot be handled by current implementation" - ) - for transfer in transferred: - os.remove(transfer.path) - return - - path = transferred[0].path + path = os.path.join(temp_dir, uri.basename()) + ResourcePath(path).transfer_from(uri, transfer="copy") _log.debug( f"Raw file for {ref.dataId} was copied from Butler to {path}" )