From 4b2fede97c7d0e71f36b446b241bc1650db39ee4 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Mon, 14 Mar 2022 19:56:26 -0500 Subject: [PATCH 01/15] Factor exposure generation in upload.py. --- python/tester/upload.py | 33 +++++++++++++++++++++++++++++---- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/python/tester/upload.py b/python/tester/upload.py index c22241eb..c39516d6 100644 --- a/python/tester/upload.py +++ b/python/tester/upload.py @@ -47,9 +47,7 @@ def process_group(publisher, bucket, instrument, group, filter, kind): time.sleep(EXPOSURE_INTERVAL) for detector in range(INSTRUMENTS[instrument].n_detectors): _log.info(f"Uploading group: {group} snap: {snap} filter: {filter} detector: {detector}") - exposure_id = (group // 100000) * 100000 - exposure_id += (group % 100000) * INSTRUMENTS[instrument].n_snaps - exposure_id += snap + exposure_id = make_exposure_id(instrument, group, snap) fname = ( f"{instrument}/{detector}/{group}/{snap}" f"/{instrument}-{group}-{snap}" @@ -71,6 +69,33 @@ def send_next_visit(publisher, instrument, group, snaps, filter, kind): publisher.publish(topic_path, data=data) +def make_exposure_id(instrument, group, snap): + """Generate an exposure ID from an exposure's other metadata. + + The exposure ID is purely a placeholder, and does not conform to any + instrument's rules for how exposure IDs should be generated. + + Parameters + ---------- + instrument : `str` + The short name of the instrument. + group : `int` + A group ID. + snap : `int` + A snap ID. + + Returns + ------- + exposure : `int` + An exposure ID that is likely to be unique for each combination of + ``group`` and ``snap``, for a given ``instrument``. + """ + exposure_id = (group // 100_000) * 100_000 + exposure_id += (group % 100_000) * INSTRUMENTS[instrument].n_snaps + exposure_id += snap + return exposure_id + + def main(): if len(sys.argv) < 3: print(f"Usage: {sys.argv[0]} INSTRUMENT N_GROUPS") @@ -100,7 +125,7 @@ def main(): pass prefixes = [int(prefix.split("/")[2]) for prefix in blobs.prefixes] if len(prefixes) == 0: - last_group = int(date) * 100000 + last_group = int(date) * 100_000 else: last_group = max(prefixes) + random.randrange(10, 19) _log.info(f"Last group {last_group}") From 8a3a11760d49575378909d15fcbb0a859f012dc3 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Mon, 14 Mar 2022 19:56:47 -0500 Subject: [PATCH 02/15] Factor raw file format in upload.py. This format is assumed by several other files. --- python/tester/upload.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/python/tester/upload.py b/python/tester/upload.py index c39516d6..cda52f64 100644 --- a/python/tester/upload.py +++ b/python/tester/upload.py @@ -31,6 +31,18 @@ class Instrument: PROJECT_ID = "prompt-proto" +def raw_path(instrument, detector, group, snap, exposure_id, filter): + """The path on which to store raws in the raw bucket. + + This format is also assumed by ``activator/activator.py.`` + """ + return ( + f"{instrument}/{detector}/{group}/{snap}" + f"/{instrument}-{group}-{snap}" + f"-{exposure_id}-{filter}-{detector}.fz" + ) + + logging.basicConfig( format="{levelname} {asctime} {name} - {message}", style="{", @@ -48,11 +60,7 @@ def process_group(publisher, bucket, instrument, group, filter, kind): for detector in range(INSTRUMENTS[instrument].n_detectors): _log.info(f"Uploading group: {group} snap: {snap} filter: {filter} detector: {detector}") exposure_id = make_exposure_id(instrument, group, snap) - fname = ( - f"{instrument}/{detector}/{group}/{snap}" - f"/{instrument}-{group}-{snap}" - f"-{exposure_id}-{filter}-{detector}.fz" - ) + fname = raw_path(instrument, detector, group, snap, exposure_id, filter) bucket.blob(fname).upload_from_string("Test") _log.info(f"Uploaded group: {group} snap: {snap} filter: {filter} detector: {detector}") From b0cc9592c5cab1bd5e719670de973e297a48680d Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Mon, 14 Mar 2022 20:22:13 -0500 Subject: [PATCH 03/15] Factor group lookup in upload.py. --- python/tester/upload.py | 46 +++++++++++++++++++++++++++++++---------- 1 file changed, 35 insertions(+), 11 deletions(-) diff --git a/python/tester/upload.py b/python/tester/upload.py index cda52f64..cae7b517 100644 --- a/python/tester/upload.py +++ b/python/tester/upload.py @@ -124,27 +124,51 @@ def main(): publisher = pubsub_v1.PublisherClient(credentials=credentials, batch_settings=batch_settings) + last_group = get_last_group(storage_client, instrument, date) + _log.info(f"Last group {last_group}") + + for i in range(n_groups): + kind = KINDS[i % len(KINDS)] + group = last_group + i + 1 + filter = FILTER_LIST[random.randrange(0, len(FILTER_LIST))] + process_group(publisher, bucket, instrument, group, filter, kind) + _log.info("Slewing to next group") + time.sleep(SLEW_INTERVAL) + + +def get_last_group(storage_client, instrument, date): + """Identify a group number that will not collide with any previous groups. + + Parameters + ---------- + storage_client : `google.cloud.storage.Client` + A Google Cloud Storage object pointing to the active project. + instrument : `str` + The short name of the active instrument. + date : `str` + The current date in YYYYMMDD format. + + Returns + ------- + group : `int` + The largest existing group for ``instrument``, or a newly generated + group if none exist. + """ blobs = storage_client.list_blobs( "rubin-prompt-proto-main", prefix=f"{instrument}/0/{date}", delimiter="/", ) + # Contrary to the docs, blobs is not an iterator, but an iterable with a .prefixes member. for blob in blobs: + # Iterate over blobs to get past `list_blobs`'s pagination and + # fill .prefixes. pass prefixes = [int(prefix.split("/")[2]) for prefix in blobs.prefixes] if len(prefixes) == 0: - last_group = int(date) * 100_000 + return int(date) * 100_000 else: - last_group = max(prefixes) + random.randrange(10, 19) - _log.info(f"Last group {last_group}") - - for i in range(n_groups): - kind = KINDS[i % len(KINDS)] - group = last_group + i + 1 - filter = FILTER_LIST[random.randrange(0, len(FILTER_LIST))] - process_group(publisher, bucket, instrument, group, filter, kind) - _log.info("Slewing to next group") - time.sleep(SLEW_INTERVAL) + return max(prefixes) + random.randrange(10, 19) if __name__ == "__main__": From 51ff690e2fc3ca763c815bb00ff11b307d115191 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Tue, 15 Mar 2022 12:31:25 -0500 Subject: [PATCH 04/15] Move ra/dec generation to other visit metadata. --- python/tester/upload.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/tester/upload.py b/python/tester/upload.py index cae7b517..5366ae8a 100644 --- a/python/tester/upload.py +++ b/python/tester/upload.py @@ -51,9 +51,9 @@ def raw_path(instrument, detector, group, snap, exposure_id, filter): _log.setLevel(logging.DEBUG) -def process_group(publisher, bucket, instrument, group, filter, kind): +def process_group(publisher, bucket, instrument, group, filter, ra, dec, kind): n_snaps = INSTRUMENTS[instrument].n_snaps - send_next_visit(publisher, instrument, group, n_snaps, filter, kind) + send_next_visit(publisher, instrument, group, n_snaps, filter, ra, dec, kind) for snap in range(n_snaps): _log.info(f"Taking group: {group} snap: {snap}") time.sleep(EXPOSURE_INTERVAL) @@ -65,11 +65,9 @@ def process_group(publisher, bucket, instrument, group, filter, kind): _log.info(f"Uploaded group: {group} snap: {snap} filter: {filter} detector: {detector}") -def send_next_visit(publisher, instrument, group, snaps, filter, kind): +def send_next_visit(publisher, instrument, group, snaps, filter, ra, dec, kind): _log.info(f"Sending next_visit for group: {group} snaps: {snaps} filter: {filter} kind: {kind}") topic_path = publisher.topic_path(PROJECT_ID, "nextVisit") - ra = random.uniform(0.0, 360.0) - dec = random.uniform(-90.0, 90.0) for detector in range(INSTRUMENTS[instrument].n_detectors): _log.debug(f"Sending next_visit for group: {group} detector: {detector} ra: {ra} dec: {dec}") visit = Visit(instrument, detector, group, snaps, filter, ra, dec, kind) @@ -131,7 +129,9 @@ def main(): kind = KINDS[i % len(KINDS)] group = last_group + i + 1 filter = FILTER_LIST[random.randrange(0, len(FILTER_LIST))] - process_group(publisher, bucket, instrument, group, filter, kind) + ra = random.uniform(0.0, 360.0) + dec = random.uniform(-90.0, 90.0) + process_group(publisher, bucket, instrument, group, filter, ra, dec, kind) _log.info("Slewing to next group") time.sleep(SLEW_INTERVAL) From f4b1769b95dba23aca38191e921d784ba907b5f1 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Tue, 15 Mar 2022 13:10:38 -0500 Subject: [PATCH 05/15] Make Visit hashable. --- python/activator/visit.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/activator/visit.py b/python/activator/visit.py index b2729ac7..2c6cceee 100644 --- a/python/activator/visit.py +++ b/python/activator/visit.py @@ -3,7 +3,7 @@ from dataclasses import dataclass -@dataclass +@dataclass(frozen=True) class Visit: instrument: str detector: int From 182800494418830b52c52e06eb1c34e65809dd0c Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Tue, 15 Mar 2022 13:10:56 -0500 Subject: [PATCH 06/15] Keep observation info in Visit. This change moves the detector-handling logic from process_group to main, where it will be easier to customize for real test datasets. --- python/tester/upload.py | 48 ++++++++++++++++++++++++++++++++++------- 1 file changed, 40 insertions(+), 8 deletions(-) diff --git a/python/tester/upload.py b/python/tester/upload.py index 5366ae8a..907b950c 100644 --- a/python/tester/upload.py +++ b/python/tester/upload.py @@ -51,18 +51,46 @@ def raw_path(instrument, detector, group, snap, exposure_id, filter): _log.setLevel(logging.DEBUG) -def process_group(publisher, bucket, instrument, group, filter, ra, dec, kind): - n_snaps = INSTRUMENTS[instrument].n_snaps +def process_group(publisher, bucket, visit_infos): + """Simulate the observation of a single on-sky pointing. + + Parameters + ---------- + publisher : `google.cloud.pubsub_v1.PublisherClient` + The client that posts ``next_visit`` messages. + bucket : `google.cloud.storage.Bucket` + The bucket to which to transfer the raws, once observed. + visit_infos : `set` [`activator.Visit`] + The visit-detector combinations to be observed; each object may + represent multiple snaps. Assumed to represent a single group, and to + share instrument, snaps, filter, and kind. + """ + # Assume most metadata is shared among all visit_infos + for info in visit_infos: + instrument = info.instrument + group = info.group + n_snaps = info.snaps + filter = info.filter + ra = info.ra + dec = info.dec + kind = info.kind + break + else: + _log.info("No observations to make; aborting.") + return + send_next_visit(publisher, instrument, group, n_snaps, filter, ra, dec, kind) for snap in range(n_snaps): _log.info(f"Taking group: {group} snap: {snap}") time.sleep(EXPOSURE_INTERVAL) - for detector in range(INSTRUMENTS[instrument].n_detectors): - _log.info(f"Uploading group: {group} snap: {snap} filter: {filter} detector: {detector}") - exposure_id = make_exposure_id(instrument, group, snap) - fname = raw_path(instrument, detector, group, snap, exposure_id, filter) + for info in visit_infos: + _log.info(f"Uploading group: {info.group} snap: {snap} filter: {info.filter} " + f"detector: {info.detector}") + exposure_id = make_exposure_id(info.instrument, info.group, snap) + fname = raw_path(info.instrument, info.detector, info.group, snap, exposure_id, info.filter) bucket.blob(fname).upload_from_string("Test") - _log.info(f"Uploaded group: {group} snap: {snap} filter: {filter} detector: {detector}") + _log.info(f"Uploaded group: {info.group} snap: {snap} filter: {info.filter} " + f"detector: {info.detector}") def send_next_visit(publisher, instrument, group, snaps, filter, ra, dec, kind): @@ -131,7 +159,11 @@ def main(): filter = FILTER_LIST[random.randrange(0, len(FILTER_LIST))] ra = random.uniform(0.0, 360.0) dec = random.uniform(-90.0, 90.0) - process_group(publisher, bucket, instrument, group, filter, ra, dec, kind) + visit_infos = { + Visit(instrument, detector, group, INSTRUMENTS[instrument].n_snaps, filter, ra, dec, kind) + for detector in range(INSTRUMENTS[instrument].n_detectors) + } + process_group(publisher, bucket, visit_infos) _log.info("Slewing to next group") time.sleep(SLEW_INTERVAL) From 84b954bc2b51af6bcd7407ed6670aa4a9a312da0 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Tue, 15 Mar 2022 13:24:10 -0500 Subject: [PATCH 07/15] Pass Visit to send_next_visit. --- python/tester/upload.py | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/python/tester/upload.py b/python/tester/upload.py index 907b950c..fe4fe597 100644 --- a/python/tester/upload.py +++ b/python/tester/upload.py @@ -65,21 +65,16 @@ def process_group(publisher, bucket, visit_infos): represent multiple snaps. Assumed to represent a single group, and to share instrument, snaps, filter, and kind. """ - # Assume most metadata is shared among all visit_infos + # Assume group/snaps is shared among all visit_infos for info in visit_infos: - instrument = info.instrument group = info.group n_snaps = info.snaps - filter = info.filter - ra = info.ra - dec = info.dec - kind = info.kind break else: _log.info("No observations to make; aborting.") return - send_next_visit(publisher, instrument, group, n_snaps, filter, ra, dec, kind) + send_next_visit(publisher, group, visit_infos) for snap in range(n_snaps): _log.info(f"Taking group: {group} snap: {snap}") time.sleep(EXPOSURE_INTERVAL) @@ -93,13 +88,23 @@ def process_group(publisher, bucket, visit_infos): f"detector: {info.detector}") -def send_next_visit(publisher, instrument, group, snaps, filter, ra, dec, kind): - _log.info(f"Sending next_visit for group: {group} snaps: {snaps} filter: {filter} kind: {kind}") +def send_next_visit(publisher, group, visit_infos): + """Simulate the transmission of a ``next_visit`` message. + + Parameters + ---------- + group : `str` + The group ID for the message to send. + visit_infos : `set` [`activator.Visit`] + The visit-detector combinations to be sent; each object may + represent multiple snaps. + """ + _log.info(f"Sending next_visit for group: {group}") topic_path = publisher.topic_path(PROJECT_ID, "nextVisit") - for detector in range(INSTRUMENTS[instrument].n_detectors): - _log.debug(f"Sending next_visit for group: {group} detector: {detector} ra: {ra} dec: {dec}") - visit = Visit(instrument, detector, group, snaps, filter, ra, dec, kind) - data = json.dumps(visit.__dict__).encode("utf-8") + for info in visit_infos: + _log.debug(f"Sending next_visit for group: {info.group} detector: {info.detector} " + f"filter: {info.filter} ra: {info.ra} dec: {info.dec} kind: {info.kind}") + data = json.dumps(info.__dict__).encode("utf-8") publisher.publish(topic_path, data=data) From 8a59e705d2a434860945fd9e11ca04bcb504b5a7 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 16 Mar 2022 18:16:14 -0500 Subject: [PATCH 08/15] Factor upload strategy out of process_group. --- python/tester/upload.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/python/tester/upload.py b/python/tester/upload.py index fe4fe597..51f652d1 100644 --- a/python/tester/upload.py +++ b/python/tester/upload.py @@ -51,19 +51,20 @@ def raw_path(instrument, detector, group, snap, exposure_id, filter): _log.setLevel(logging.DEBUG) -def process_group(publisher, bucket, visit_infos): +def process_group(publisher, visit_infos, uploader): """Simulate the observation of a single on-sky pointing. Parameters ---------- publisher : `google.cloud.pubsub_v1.PublisherClient` The client that posts ``next_visit`` messages. - bucket : `google.cloud.storage.Bucket` - The bucket to which to transfer the raws, once observed. visit_infos : `set` [`activator.Visit`] The visit-detector combinations to be observed; each object may represent multiple snaps. Assumed to represent a single group, and to share instrument, snaps, filter, and kind. + uploader : callable [`activator.Visit`, int] + A callable that takes an exposure spec and a snap ID, and uploads the + visit's data. """ # Assume group/snaps is shared among all visit_infos for info in visit_infos: @@ -81,9 +82,7 @@ def process_group(publisher, bucket, visit_infos): for info in visit_infos: _log.info(f"Uploading group: {info.group} snap: {snap} filter: {info.filter} " f"detector: {info.detector}") - exposure_id = make_exposure_id(info.instrument, info.group, snap) - fname = raw_path(info.instrument, info.detector, info.group, snap, exposure_id, info.filter) - bucket.blob(fname).upload_from_string("Test") + uploader(info, snap) _log.info(f"Uploaded group: {info.group} snap: {snap} filter: {info.filter} " f"detector: {info.detector}") @@ -168,7 +167,15 @@ def main(): Visit(instrument, detector, group, INSTRUMENTS[instrument].n_snaps, filter, ra, dec, kind) for detector in range(INSTRUMENTS[instrument].n_detectors) } - process_group(publisher, bucket, visit_infos) + + # TODO: may be cleaner to use a functor object than to depend on + # closures for the bucket and data. + def upload_dummy(visit, snap_id): + exposure_id = make_exposure_id(visit.instrument, visit.group, snap_id) + filename = raw_path(visit.instrument, visit.detector, visit.group, snap_id, + exposure_id, visit.filter) + bucket.blob(filename).upload_from_string("Test") + process_group(publisher, visit_infos, upload_dummy) _log.info("Slewing to next group") time.sleep(SLEW_INTERVAL) From de263462ff3353b996b0ec24f642e3f57d25e0b6 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 16 Mar 2022 18:41:57 -0500 Subject: [PATCH 09/15] Factor visit generation out of main. --- python/tester/upload.py | 34 ++++++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/python/tester/upload.py b/python/tester/upload.py index 51f652d1..4e9607bf 100644 --- a/python/tester/upload.py +++ b/python/tester/upload.py @@ -158,15 +158,8 @@ def main(): _log.info(f"Last group {last_group}") for i in range(n_groups): - kind = KINDS[i % len(KINDS)] group = last_group + i + 1 - filter = FILTER_LIST[random.randrange(0, len(FILTER_LIST))] - ra = random.uniform(0.0, 360.0) - dec = random.uniform(-90.0, 90.0) - visit_infos = { - Visit(instrument, detector, group, INSTRUMENTS[instrument].n_snaps, filter, ra, dec, kind) - for detector in range(INSTRUMENTS[instrument].n_detectors) - } + visit_infos = make_random_visits(instrument, group) # TODO: may be cleaner to use a functor object than to depend on # closures for the bucket and data. @@ -215,5 +208,30 @@ def get_last_group(storage_client, instrument, date): return max(prefixes) + random.randrange(10, 19) +def make_random_visits(instrument, group): + """Create placeholder visits without reference to any data. + + Parameters + ---------- + instrument : `str` + The short name of the instrument carrying out the observation. + group : `int` + The group number being observed. + + Returns + ------- + visits : `set` [`activator.Visit`] + Visits generated for ``group`` for all ``instrument``'s detectors. + """ + kind = KINDS[group % len(KINDS)] + filter = FILTER_LIST[random.randrange(0, len(FILTER_LIST))] + ra = random.uniform(0.0, 360.0) + dec = random.uniform(-90.0, 90.0) + return { + Visit(instrument, detector, group, INSTRUMENTS[instrument].n_snaps, filter, ra, dec, kind) + for detector in range(INSTRUMENTS[instrument].n_detectors) + } + + if __name__ == "__main__": main() From 74ad168b237b3520458040115c4ba6295e569b6d Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 16 Mar 2022 20:36:04 -0500 Subject: [PATCH 10/15] Add preliminary support for uploading existing raws. This version of the code has a hardcoded dependency on specific HSC files, which will need to be removed later. --- python/tester/upload.py | 132 +++++++++++++++++++++++++++++++++++----- 1 file changed, 117 insertions(+), 15 deletions(-) diff --git a/python/tester/upload.py b/python/tester/upload.py index 4e9607bf..d321d129 100644 --- a/python/tester/upload.py +++ b/python/tester/upload.py @@ -1,9 +1,11 @@ from dataclasses import dataclass from google.cloud import pubsub_v1, storage from google.oauth2 import service_account +import itertools import json import logging import random +import re import sys import time from visit import Visit @@ -43,6 +45,13 @@ def raw_path(instrument, detector, group, snap, exposure_id, filter): ) +# TODO: unify the format code across prompt_prototype +RAW_REGEXP = re.compile( + r"(?P.*?)/(?P\d+)/(?P.*?)/(?P\d+)/" + r"(?P=instrument)-(?P=group)-(?P=snap)-(?P.*?)-(?P.*?)-(?P=detector)\.f" +) + + logging.basicConfig( format="{levelname} {asctime} {name} - {message}", style="{", @@ -147,7 +156,7 @@ def main(): "./prompt-proto-upload.json" ) storage_client = storage.Client(PROJECT_ID, credentials=credentials) - bucket = storage_client.bucket("rubin-prompt-proto-main") + dest_bucket = storage_client.bucket("rubin-prompt-proto-main") batch_settings = pubsub_v1.types.BatchSettings( max_messages=INSTRUMENTS[instrument].n_detectors, ) @@ -157,20 +166,46 @@ def main(): last_group = get_last_group(storage_client, instrument, date) _log.info(f"Last group {last_group}") - for i in range(n_groups): - group = last_group + i + 1 - visit_infos = make_random_visits(instrument, group) - - # TODO: may be cleaner to use a functor object than to depend on - # closures for the bucket and data. - def upload_dummy(visit, snap_id): - exposure_id = make_exposure_id(visit.instrument, visit.group, snap_id) - filename = raw_path(visit.instrument, visit.detector, visit.group, snap_id, - exposure_id, visit.filter) - bucket.blob(filename).upload_from_string("Test") - process_group(publisher, visit_infos, upload_dummy) - _log.info("Slewing to next group") - time.sleep(SLEW_INTERVAL) + src_bucket = storage_client.bucket("rubin-prompt-proto-unobserved") + raw_pool = get_samples(src_bucket, instrument) + + if raw_pool: + _log.info(f"Observing real raw files from {instrument}.") + # TODO: allow generated groups for raws; otherwise new uploads just + # overwrite the old files. + for group in itertools.islice(itertools.cycle(raw_pool), n_groups): + _log.debug(f"Processing group {group}...") + # snap_dict maps snap_id to {visit: blob} + snap_dict = raw_pool[group] + visit_infos = {info for det_dict in snap_dict.values() for info in det_dict} + + # TODO: may be cleaner to use a functor object than to depend on + # closures for the bucket and data. + def upload_from_pool(visit, snap_id): + src_blob = snap_dict[snap_id][visit] + exposure_id = f"{visit.group}_{snap_id}" + filename = raw_path(visit.instrument, visit.detector, visit.group, snap_id, + exposure_id, visit.filter) + src_bucket.copy_blob(src_blob, dest_bucket, new_name=filename) + process_group(publisher, visit_infos, upload_from_pool) + _log.info("Slewing to next group") + time.sleep(SLEW_INTERVAL) + else: + _log.info(f"No raw files found for {instrument}, generating dummy files instead.") + for i in range(n_groups): + group = last_group + i + 1 + visit_infos = make_random_visits(instrument, group) + + # TODO: may be cleaner to use a functor object than to depend on + # closures for the bucket and data. + def upload_dummy(visit, snap_id): + exposure_id = make_exposure_id(visit.instrument, visit.group, snap_id) + filename = raw_path(visit.instrument, visit.detector, visit.group, snap_id, + exposure_id, visit.filter) + dest_bucket.blob(filename).upload_from_string("Test") + process_group(publisher, visit_infos, upload_dummy) + _log.info("Slewing to next group") + time.sleep(SLEW_INTERVAL) def get_last_group(storage_client, instrument, date): @@ -233,5 +268,72 @@ def make_random_visits(instrument, group): } +def get_samples(bucket, instrument): + """Return any predefined raw exposures for a given instrument. + + Parameters + ---------- + bucket : `google.cloud.storage.Bucket` + The bucket in which to search for predefined raws. + instrument : `str` + The short name of the instrument to sample. + + Returns + ------- + raws : mapping [`str`, mapping [`int`, mapping [`activator.Visit`, `google.cloud.storage.Blob`]]] + A mapping from group IDs to a mapping of snap ID. The value of the + innermost mapping is the observation metadata for each detector, + and a Blob representing the image taken in that detector-snap. + """ + # TODO: set up a lookup-friendly class to represent the return value + + # TODO: replace this dict with something more scalable. + # One option is to attach metadata to the Google Storage objects at + # upload time, another is to download the blob and actually read + # its header. + hsc_metadata = { + 59150: {"ra": 149.96643749999996, "dec": 2.2202916666666668, "rot": 270.0}, + 59160: {"ra": 150.18157499999998, "dec": 2.2800083333333334, "rot": 270.0}, + } + + blobs = bucket.client.list_blobs(bucket.name, prefix=instrument) + result = {} + for blob in blobs: + # Assume that the unobserved bucket uses the same filename scheme as + # the observed bucket. + parsed = re.match(RAW_REGEXP, blob.name) + if not parsed: + _log.warning(f"Could not parse {blob.name}; ignoring file.") + continue + + group = parsed.group('group') + snap_id = int(parsed.group('snap')) + exposure_id = int(parsed.group('expid')) + visit = Visit(instrument=instrument, + detector=int(parsed.group('detector')), + group=group, + snaps=INSTRUMENTS[instrument].n_snaps, + filter=parsed.group('filter'), + ra=hsc_metadata[exposure_id]["ra"], + dec=hsc_metadata[exposure_id]["dec"], + kind="SURVEY", + ) + _log.debug(f"File {blob.name} parsed as snap {snap_id} of visit {visit}.") + if group in result: + snap_dict = result[group] + if snap_id in snap_dict: + _log.debug(f"New detector {visit.detector} added to snap {snap_id} of group {group}.") + detector_dict = snap_dict[snap_id] + detector_dict[visit] = blob + else: + _log.debug(f"New snap {snap_id} added to group {group}.") + snap_dict[snap_id] = {visit: blob} + else: + _log.debug(f"New group {group} registered.") + result[group] = {snap_id: {visit: blob}} + + return result + + if __name__ == "__main__": main() From 4e067710ec5f2f43ca5402244654279ee82a9843 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 17 Mar 2022 13:17:11 -0500 Subject: [PATCH 11/15] Fix bug in upload script. The uploaded files weren't conforming to the Prompt Processing path schema. --- bin/prompt_prototype_upload_raws.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bin/prompt_prototype_upload_raws.sh b/bin/prompt_prototype_upload_raws.sh index 17c9d875..752c09c6 100755 --- a/bin/prompt_prototype_upload_raws.sh +++ b/bin/prompt_prototype_upload_raws.sh @@ -33,6 +33,6 @@ UPLOAD_BUCKET=rubin-prompt-proto-unobserved # Filename format is defined in tester/upload.py and activator/activator.py: # instrument/detector/group/snap/instrument-group-snap-exposureId-filter-detector gsutil cp "${RAW_DIR}/HSC-0059150-050.fits.gz" \ - gs://${UPLOAD_BUCKET}/HSC/50/2016030700001/0/HSC-2016030700001-0-0059150-HSC-G-050.fits.gz + gs://${UPLOAD_BUCKET}/HSC/50/2016030700001/0/HSC-2016030700001-0-0059150-HSC-G-50.fits.gz gsutil cp "${RAW_DIR}/HSC-0059160-051.fits.gz" \ - gs://${UPLOAD_BUCKET}/HSC/51/2016030700002/0/HSC-2016030700002-0-0059160-HSC-G-051.fits.gz + gs://${UPLOAD_BUCKET}/HSC/51/2016030700002/0/HSC-2016030700002-0-0059160-HSC-G-51.fits.gz From e6ee76681bc0746dc43346946a7ec7e245663c1a Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 17 Mar 2022 13:19:07 -0500 Subject: [PATCH 12/15] Allow raw files to be uploaded to new groups. Without this fix, we would need to delete old files from the main bucket on every pass. --- python/tester/upload.py | 45 +++++++++++++++++++++++++++++++++++------ 1 file changed, 39 insertions(+), 6 deletions(-) diff --git a/python/tester/upload.py b/python/tester/upload.py index d321d129..f23fa45c 100644 --- a/python/tester/upload.py +++ b/python/tester/upload.py @@ -171,19 +171,26 @@ def main(): if raw_pool: _log.info(f"Observing real raw files from {instrument}.") - # TODO: allow generated groups for raws; otherwise new uploads just - # overwrite the old files. - for group in itertools.islice(itertools.cycle(raw_pool), n_groups): - _log.debug(f"Processing group {group}...") + for i, true_group in enumerate(itertools.islice(itertools.cycle(raw_pool), n_groups)): + group = last_group + i + 1 + _log.debug(f"Processing group {group} from unobserved {true_group}...") # snap_dict maps snap_id to {visit: blob} - snap_dict = raw_pool[group] + snap_dict = {} + # Copy all the visit-blob dictionaries under each snap_id, + # replacing the (immutable) Visit objects to point to group + # instead of true_group. + for snap_id, old_visits in raw_pool[true_group].items(): + snap_dict[snap_id] = {splice_group(true_visit, group): blob + for true_visit, blob in old_visits.items()} + # Gather all the Visit objects found in snap_dict, merging + # duplicates for different snaps of the same detector. visit_infos = {info for det_dict in snap_dict.values() for info in det_dict} # TODO: may be cleaner to use a functor object than to depend on # closures for the bucket and data. def upload_from_pool(visit, snap_id): src_blob = snap_dict[snap_id][visit] - exposure_id = f"{visit.group}_{snap_id}" + exposure_id = make_exposure_id(visit.instrument, visit.group, snap_id) filename = raw_path(visit.instrument, visit.detector, visit.group, snap_id, exposure_id, visit.filter) src_bucket.copy_blob(src_blob, dest_bucket, new_name=filename) @@ -335,5 +342,31 @@ def get_samples(bucket, instrument): return result +def splice_group(visit, group): + """Replace the group ID in a Visit object. + + Parameters + ---------- + visit : `activator.Visit` + The object to update. + group : `str` + The new group ID to use. + + Returns + ------- + new_visit : `activator.Visit` + A visit with group ``group``, but otherwise identical to ``visit``. + """ + return Visit(instrument=visit.instrument, + detector=visit.detector, + group=group, + snaps=visit.snaps, + filter=visit.filter, + ra=visit.ra, + dec=visit.dec, + kind=visit.kind, + ) + + if __name__ == "__main__": main() From efb9c4ee67d20ee137422d0c5183b9f306611843 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 17 Mar 2022 13:20:00 -0500 Subject: [PATCH 13/15] Fix get_last_group assuming detector 0. Current HSC raws use detectors 50 and 51 only. --- python/tester/upload.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/python/tester/upload.py b/python/tester/upload.py index f23fa45c..03478ed3 100644 --- a/python/tester/upload.py +++ b/python/tester/upload.py @@ -233,12 +233,23 @@ def get_last_group(storage_client, instrument, date): The largest existing group for ``instrument``, or a newly generated group if none exist. """ + preblobs = storage_client.list_blobs( + "rubin-prompt-proto-main", + prefix=f"{instrument}/", + delimiter="/", + ) + # See https://cloud.google.com/storage/docs/samples/storage-list-files-with-prefix + for blob in preblobs: + # Iterate over blobs to get past `list_blobs`'s pagination and + # fill .prefixes. + pass + detector = min(int(prefix.split("/")[1]) for prefix in preblobs.prefixes) + blobs = storage_client.list_blobs( "rubin-prompt-proto-main", - prefix=f"{instrument}/0/{date}", + prefix=f"{instrument}/{detector}/{date}", delimiter="/", ) - # Contrary to the docs, blobs is not an iterator, but an iterable with a .prefixes member. for blob in blobs: # Iterate over blobs to get past `list_blobs`'s pagination and # fill .prefixes. From 44273f9215318c599d356caff4069d4003d1e557 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 17 Mar 2022 16:00:22 -0500 Subject: [PATCH 14/15] Factor out the two upload algorithms from main. --- python/tester/upload.py | 127 +++++++++++++++++++++++++++------------- 1 file changed, 87 insertions(+), 40 deletions(-) diff --git a/python/tester/upload.py b/python/tester/upload.py index 03478ed3..682e7690 100644 --- a/python/tester/upload.py +++ b/python/tester/upload.py @@ -171,48 +171,10 @@ def main(): if raw_pool: _log.info(f"Observing real raw files from {instrument}.") - for i, true_group in enumerate(itertools.islice(itertools.cycle(raw_pool), n_groups)): - group = last_group + i + 1 - _log.debug(f"Processing group {group} from unobserved {true_group}...") - # snap_dict maps snap_id to {visit: blob} - snap_dict = {} - # Copy all the visit-blob dictionaries under each snap_id, - # replacing the (immutable) Visit objects to point to group - # instead of true_group. - for snap_id, old_visits in raw_pool[true_group].items(): - snap_dict[snap_id] = {splice_group(true_visit, group): blob - for true_visit, blob in old_visits.items()} - # Gather all the Visit objects found in snap_dict, merging - # duplicates for different snaps of the same detector. - visit_infos = {info for det_dict in snap_dict.values() for info in det_dict} - - # TODO: may be cleaner to use a functor object than to depend on - # closures for the bucket and data. - def upload_from_pool(visit, snap_id): - src_blob = snap_dict[snap_id][visit] - exposure_id = make_exposure_id(visit.instrument, visit.group, snap_id) - filename = raw_path(visit.instrument, visit.detector, visit.group, snap_id, - exposure_id, visit.filter) - src_bucket.copy_blob(src_blob, dest_bucket, new_name=filename) - process_group(publisher, visit_infos, upload_from_pool) - _log.info("Slewing to next group") - time.sleep(SLEW_INTERVAL) + upload_from_raws(publisher, instrument, raw_pool, src_bucket, dest_bucket, n_groups, last_group + 1) else: _log.info(f"No raw files found for {instrument}, generating dummy files instead.") - for i in range(n_groups): - group = last_group + i + 1 - visit_infos = make_random_visits(instrument, group) - - # TODO: may be cleaner to use a functor object than to depend on - # closures for the bucket and data. - def upload_dummy(visit, snap_id): - exposure_id = make_exposure_id(visit.instrument, visit.group, snap_id) - filename = raw_path(visit.instrument, visit.detector, visit.group, snap_id, - exposure_id, visit.filter) - dest_bucket.blob(filename).upload_from_string("Test") - process_group(publisher, visit_infos, upload_dummy) - _log.info("Slewing to next group") - time.sleep(SLEW_INTERVAL) + upload_from_random(publisher, instrument, dest_bucket, n_groups, last_group + 1) def get_last_group(storage_client, instrument, date): @@ -353,6 +315,91 @@ def get_samples(bucket, instrument): return result +def upload_from_raws(publisher, instrument, raw_pool, src_bucket, dest_bucket, n_groups, group_base): + """Upload visits and files using real raws. + + Parameters + ---------- + publisher : `google.cloud.pubsub_v1.PublisherClient` + The client that posts ``next_visit`` messages. + instrument : `str` + The short name of the instrument carrying out the observation. + raw_pool : mapping [`str`, mapping [`int`, mapping [`activator.Visit`, `google.cloud.storage.Blob`]]] + Available raws as a mapping from group IDs to a mapping of snap ID. + The value of the innermost mapping is the observation metadata for + each detector, and a Blob representing the image taken in that + detector-snap. + src_bucket : `google.cloud.storage.Bucket` + The bucket containing the blobs in ``raw_pool``. + dest_bucket : `google.cloud.storage.Bucket` + The bucket to which to upload the new images. + n_groups : `int` + The number of observation groups to simulate. If more than the number + of groups in ``raw_pool``, files will be re-uploaded under new + group IDs. + group_base : `int` + The base number from which to offset new group numbers. + """ + for i, true_group in enumerate(itertools.islice(itertools.cycle(raw_pool), n_groups)): + group = group_base + i + _log.debug(f"Processing group {group} from unobserved {true_group}...") + # snap_dict maps snap_id to {visit: blob} + snap_dict = {} + # Copy all the visit-blob dictionaries under each snap_id, + # replacing the (immutable) Visit objects to point to group + # instead of true_group. + for snap_id, old_visits in raw_pool[true_group].items(): + snap_dict[snap_id] = {splice_group(true_visit, group): blob + for true_visit, blob in old_visits.items()} + # Gather all the Visit objects found in snap_dict, merging + # duplicates for different snaps of the same detector. + visit_infos = {info for det_dict in snap_dict.values() for info in det_dict} + + # TODO: may be cleaner to use a functor object than to depend on + # closures for the bucket and data. + def upload_from_pool(visit, snap_id): + src_blob = snap_dict[snap_id][visit] + exposure_id = make_exposure_id(visit.instrument, visit.group, snap_id) + filename = raw_path(visit.instrument, visit.detector, visit.group, snap_id, + exposure_id, visit.filter) + src_bucket.copy_blob(src_blob, dest_bucket, new_name=filename) + process_group(publisher, visit_infos, upload_from_pool) + _log.info("Slewing to next group") + time.sleep(SLEW_INTERVAL) + + +def upload_from_random(publisher, instrument, dest_bucket, n_groups, group_base): + """Upload visits and files using randomly generated visits. + + Parameters + ---------- + publisher : `google.cloud.pubsub_v1.PublisherClient` + The client that posts ``next_visit`` messages. + instrument : `str` + The short name of the instrument carrying out the observation. + dest_bucket : `google.cloud.storage.Bucket` + The bucket to which to upload the new images. + n_groups : `int` + The number of observation groups to simulate. + group_base : `int` + The base number from which to offset new group numbers. + """ + for i in range(n_groups): + group = group_base + i + visit_infos = make_random_visits(instrument, group) + + # TODO: may be cleaner to use a functor object than to depend on + # closures for the bucket and data. + def upload_dummy(visit, snap_id): + exposure_id = make_exposure_id(visit.instrument, visit.group, snap_id) + filename = raw_path(visit.instrument, visit.detector, visit.group, snap_id, + exposure_id, visit.filter) + dest_bucket.blob(filename).upload_from_string("Test") + process_group(publisher, visit_infos, upload_dummy) + _log.info("Slewing to next group") + time.sleep(SLEW_INTERVAL) + + def splice_group(visit, group): """Replace the group ID in a Visit object. From 9c9960588cf0c7ee66f094954ce76c7a6e86fff0 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 17 Mar 2022 17:12:47 -0500 Subject: [PATCH 15/15] Add todo on next_visit delay. --- python/tester/upload.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/tester/upload.py b/python/tester/upload.py index 682e7690..ac7f08ba 100644 --- a/python/tester/upload.py +++ b/python/tester/upload.py @@ -85,6 +85,7 @@ def process_group(publisher, visit_infos, uploader): return send_next_visit(publisher, group, visit_infos) + # TODO: need asynchronous code to handle next_visit delay correctly for snap in range(n_snaps): _log.info(f"Taking group: {group} snap: {snap}") time.sleep(EXPOSURE_INTERVAL)