diff --git a/python/tester/upload.py b/python/tester/upload.py index 5c414195..ca501e9b 100644 --- a/python/tester/upload.py +++ b/python/tester/upload.py @@ -1,14 +1,17 @@ +__all__ = ["get_last_group", ] + +import boto3 +from confluent_kafka import Producer import dataclasses -from google.cloud import pubsub_v1, storage -from google.oauth2 import service_account import itertools import json import logging +import os import random -import re +import socket import sys import time -from activator.raw import RAW_REGEXP, get_raw_path +from activator.raw import Snap, get_raw_path from activator.visit import Visit @@ -28,10 +31,10 @@ class Instrument: EXPOSURE_INTERVAL = 18 SLEW_INTERVAL = 2 FILTER_LIST = "ugrizy" -PUBSUB_TOKEN = "abc123" KINDS = ("BIAS", "DARK", "FLAT") -PROJECT_ID = "prompt-proto" +# Kafka server +kafka_cluster = os.environ["KAFKA_CLUSTER"] logging.basicConfig( @@ -42,12 +45,12 @@ class Instrument: _log.setLevel(logging.DEBUG) -def process_group(publisher, visit_infos, uploader): +def process_group(producer, visit_infos, uploader): """Simulate the observation of a single on-sky pointing. Parameters ---------- - publisher : `google.cloud.pubsub_v1.PublisherClient` + producer : `confluent_kafka.Producer` The client that posts ``next_visit`` messages. visit_infos : `set` [`activator.Visit`] The visit-detector combinations to be observed; each object may @@ -66,7 +69,7 @@ def process_group(publisher, visit_infos, uploader): _log.info("No observations to make; aborting.") return - send_next_visit(publisher, group, visit_infos) + send_next_visit(producer, group, visit_infos) # TODO: need asynchronous code to handle next_visit delay correctly for snap in range(n_snaps): _log.info(f"Taking group: {group} snap: {snap}") @@ -79,11 +82,13 @@ def process_group(publisher, visit_infos, uploader): f"detector: {info.detector}") -def send_next_visit(publisher, group, visit_infos): +def send_next_visit(producer, group, visit_infos): """Simulate the transmission of a ``next_visit`` message. Parameters ---------- + producer : `confluent_kafka.Producer` + The client that posts ``next_visit`` messages. group : `str` The group ID for the message to send. visit_infos : `set` [`activator.Visit`] @@ -91,12 +96,12 @@ def send_next_visit(publisher, group, visit_infos): represent multiple snaps. """ _log.info(f"Sending next_visit for group: {group}") - topic_path = publisher.topic_path(PROJECT_ID, "nextVisit") + topic = "next-visit-topic" for info in visit_infos: _log.debug(f"Sending next_visit for group: {info.group} detector: {info.detector} " f"filter: {info.filter} ra: {info.ra} dec: {info.dec} kind: {info.kind}") data = json.dumps(info.__dict__).encode("utf-8") - publisher.publish(topic_path, data=data) + producer.produce(topic, data) def make_exposure_id(instrument, group, snap): @@ -135,38 +140,38 @@ def main(): date = time.strftime("%Y%m%d") - credentials = service_account.Credentials.from_service_account_file( - "./prompt-proto-upload.json" - ) - storage_client = storage.Client(PROJECT_ID, credentials=credentials) - dest_bucket = storage_client.bucket("rubin-prompt-proto-main") - batch_settings = pubsub_v1.types.BatchSettings( - max_messages=INSTRUMENTS[instrument].n_detectors, + endpoint_url = "https://s3dfrgw.slac.stanford.edu" + s3 = boto3.resource("s3", endpoint_url=endpoint_url) + dest_bucket = s3.Bucket("rubin-pp") + producer = Producer( + {"bootstrap.servers": kafka_cluster, "client.id": socket.gethostname()} ) - publisher = pubsub_v1.PublisherClient(credentials=credentials, - batch_settings=batch_settings) - last_group = get_last_group(storage_client, instrument, date) + last_group = get_last_group(dest_bucket, instrument, date) _log.info(f"Last group {last_group}") - src_bucket = storage_client.bucket("rubin-prompt-proto-unobserved") + src_bucket = s3.Bucket("rubin-pp-users") raw_pool = get_samples(src_bucket, instrument) + new_group_base = last_group + random.randrange(10, 19) if raw_pool: _log.info(f"Observing real raw files from {instrument}.") - upload_from_raws(publisher, instrument, raw_pool, src_bucket, dest_bucket, n_groups, last_group + 1) + upload_from_raws(producer, instrument, raw_pool, src_bucket, dest_bucket, n_groups, new_group_base) else: _log.info(f"No raw files found for {instrument}, generating dummy files instead.") - upload_from_random(publisher, instrument, dest_bucket, n_groups, last_group + 1) + upload_from_random(producer, instrument, dest_bucket, n_groups, new_group_base) -def get_last_group(storage_client, instrument, date): - """Identify a group number that will not collide with any previous groups. +def get_last_group(bucket, instrument, date): + """Identify the largest group number or a new group number. + + This number helps decide the next group number so it will not + collide with any previous groups. Parameters ---------- - storage_client : `google.cloud.storage.Client` - A Google Cloud Storage object pointing to the active project. + bucket : `s3.Bucket` + A S3 storage bucket instrument : `str` The short name of the active instrument. date : `str` @@ -178,32 +183,19 @@ def get_last_group(storage_client, instrument, date): The largest existing group for ``instrument``, or a newly generated group if none exist. """ - preblobs = storage_client.list_blobs( - "rubin-prompt-proto-main", - prefix=f"{instrument}/", - delimiter="/", + preblobs = bucket.objects.filter( + Prefix=f"{instrument}/", ) - # See https://cloud.google.com/storage/docs/samples/storage-list-files-with-prefix - for blob in preblobs: - # Iterate over blobs to get past `list_blobs`'s pagination and - # fill .prefixes. - pass - detector = min(int(prefix.split("/")[1]) for prefix in preblobs.prefixes) - - blobs = storage_client.list_blobs( - "rubin-prompt-proto-main", - prefix=f"{instrument}/{detector}/{date}", - delimiter="/", + detector = min((int(preblob.key.split("/")[1]) for preblob in preblobs), default=0) + + blobs = preblobs.filter( + Prefix=f"{instrument}/{detector}/{date}" ) - for blob in blobs: - # Iterate over blobs to get past `list_blobs`'s pagination and - # fill .prefixes. - pass - prefixes = [int(prefix.split("/")[2]) for prefix in blobs.prefixes] + prefixes = [int(blob.key.split("/")[2]) for blob in blobs] if len(prefixes) == 0: return int(date) * 100_000 else: - return max(prefixes) + random.randrange(10, 19) + return max(prefixes) def make_random_visits(instrument, group): @@ -221,7 +213,7 @@ def make_random_visits(instrument, group): visits : `set` [`activator.Visit`] Visits generated for ``group`` for all ``instrument``'s detectors. """ - kind = KINDS[group % len(KINDS)] + kind = KINDS[int(group) % len(KINDS)] filter = FILTER_LIST[random.randrange(0, len(FILTER_LIST))] ra = random.uniform(0.0, 360.0) dec = random.uniform(-90.0, 90.0) @@ -238,14 +230,14 @@ def get_samples(bucket, instrument): Parameters ---------- - bucket : `google.cloud.storage.Bucket` + bucket : `S3.Bucket` The bucket in which to search for predefined raws. instrument : `str` The short name of the instrument to sample. Returns ------- - raws : mapping [`str`, mapping [`int`, mapping [`activator.Visit`, `google.cloud.storage.Blob`]]] + raws : mapping [`str`, mapping [`int`, mapping [`activator.Visit`, `s3.ObjectSummary`]]] A mapping from group IDs to a mapping of snap ID. The value of the innermost mapping is the observation metadata for each detector, and a Blob representing the image taken in that detector-snap. @@ -269,63 +261,57 @@ def get_samples(bucket, instrument): 59160: {"ra": 150.18157499999998, "dec": 2.2800083333333334, "rot": 270.0}, } - blobs = bucket.client.list_blobs(bucket.name, prefix=instrument) + # The pre-made raw files are stored with the "unobserved" prefix + blobs = bucket.objects.filter(Prefix=f"unobserved/{instrument}/") result = {} for blob in blobs: # Assume that the unobserved bucket uses the same filename scheme as # the observed bucket. - parsed = re.match(RAW_REGEXP, blob.name) - if not parsed: - _log.warning(f"Could not parse {blob.name}; ignoring file.") - continue - - group = parsed.group('group') - snap_id = int(parsed.group('snap')) - exposure_id = int(parsed.group('expid')) + snap = Snap.from_oid(blob.key) visit = Visit(instrument=instrument, - detector=int(parsed.group('detector')), - group=group, + detector=snap.detector, + group=snap.group, snaps=INSTRUMENTS[instrument].n_snaps, - filter=parsed.group('filter'), - ra=hsc_metadata[exposure_id]["ra"], - dec=hsc_metadata[exposure_id]["dec"], - rot=hsc_metadata[exposure_id]["rot"], + filter=snap.filter, + ra=hsc_metadata[snap.exp_id]["ra"], + dec=hsc_metadata[snap.exp_id]["dec"], + rot=hsc_metadata[snap.exp_id]["rot"], kind="SURVEY", ) - _log.debug(f"File {blob.name} parsed as snap {snap_id} of visit {visit}.") - if group in result: - snap_dict = result[group] - if snap_id in snap_dict: - _log.debug(f"New detector {visit.detector} added to snap {snap_id} of group {group}.") - detector_dict = snap_dict[snap_id] + _log.debug(f"File {blob.key} parsed as snap {snap.snap} of visit {visit}.") + if snap.group in result: + snap_dict = result[snap.group] + if snap.snap in snap_dict: + _log.debug(f"New detector {visit.detector} added to snap {snap.snap} of group {snap.group}.") + detector_dict = snap_dict[snap.snap] detector_dict[visit] = blob else: - _log.debug(f"New snap {snap_id} added to group {group}.") - snap_dict[snap_id] = {visit: blob} + _log.debug(f"New snap {snap.snap} added to group {snap.group}.") + snap_dict[snap.snap] = {visit: blob} else: - _log.debug(f"New group {group} registered.") - result[group] = {snap_id: {visit: blob}} + _log.debug(f"New group {snap.group} registered.") + result[snap.group] = {snap.snap: {visit: blob}} return result -def upload_from_raws(publisher, instrument, raw_pool, src_bucket, dest_bucket, n_groups, group_base): +def upload_from_raws(producer, instrument, raw_pool, src_bucket, dest_bucket, n_groups, group_base): """Upload visits and files using real raws. Parameters ---------- - publisher : `google.cloud.pubsub_v1.PublisherClient` + producer : `confluent_kafka.Producer` The client that posts ``next_visit`` messages. instrument : `str` The short name of the instrument carrying out the observation. - raw_pool : mapping [`str`, mapping [`int`, mapping [`activator.Visit`, `google.cloud.storage.Blob`]]] + raw_pool : mapping [`str`, mapping [`int`, mapping [`activator.Visit`, `s3.ObjectSummary`]]] Available raws as a mapping from group IDs to a mapping of snap ID. The value of the innermost mapping is the observation metadata for each detector, and a Blob representing the image taken in that detector-snap. - src_bucket : `google.cloud.storage.Bucket` + src_bucket : `S3.Bucket` The bucket containing the blobs in ``raw_pool``. - dest_bucket : `google.cloud.storage.Bucket` + dest_bucket : `S3.Bucket` The bucket to which to upload the new images. n_groups : `int` The number of observation groups to simulate. If more than the number @@ -364,25 +350,26 @@ def upload_from_pool(visit, snap_id): src_blob = snap_dict[snap_id][visit] # TODO: converting raw_pool from a nested mapping to an indexable # custom class would make it easier to include such metadata as expId. - exposure_id = int(re.match(RAW_REGEXP, src_blob.name).group('expid')) + exposure_id = Snap.from_oid(src_blob.key).exp_id filename = get_raw_path(visit.instrument, visit.detector, visit.group, snap_id, exposure_id, visit.filter) - src_bucket.copy_blob(src_blob, dest_bucket, new_name=filename) - process_group(publisher, visit_infos, upload_from_pool) + src = {'Bucket': src_bucket.name, 'Key': src_blob.key} + dest_bucket.copy(src, filename) + process_group(producer, visit_infos, upload_from_pool) _log.info("Slewing to next group") time.sleep(SLEW_INTERVAL) -def upload_from_random(publisher, instrument, dest_bucket, n_groups, group_base): +def upload_from_random(producer, instrument, dest_bucket, n_groups, group_base): """Upload visits and files using randomly generated visits. Parameters ---------- - publisher : `google.cloud.pubsub_v1.PublisherClient` + producer : `confluent_kafka.Producer` The client that posts ``next_visit`` messages. instrument : `str` The short name of the instrument carrying out the observation. - dest_bucket : `google.cloud.storage.Bucket` + dest_bucket : `S3.Bucket` The bucket to which to upload the new images. n_groups : `int` The number of observation groups to simulate. @@ -396,11 +383,11 @@ def upload_from_random(publisher, instrument, dest_bucket, n_groups, group_base) # TODO: may be cleaner to use a functor object than to depend on # closures for the bucket and data. def upload_dummy(visit, snap_id): - exposure_id = make_exposure_id(visit.instrument, visit.group, snap_id) + exposure_id = make_exposure_id(visit.instrument, int(visit.group), snap_id) filename = get_raw_path(visit.instrument, visit.detector, visit.group, snap_id, exposure_id, visit.filter) - dest_bucket.blob(filename).upload_from_string("Test") - process_group(publisher, visit_infos, upload_dummy) + dest_bucket.put_object(Body=b"Test", Key=filename) + process_group(producer, visit_infos, upload_dummy) _log.info("Slewing to next group") time.sleep(SLEW_INTERVAL) diff --git a/tests/test_upload.py b/tests/test_upload.py new file mode 100644 index 00000000..581f68b4 --- /dev/null +++ b/tests/test_upload.py @@ -0,0 +1,77 @@ +# This file is part of prompt_prototype. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import unittest + +import boto3 +import botocore +from moto import mock_s3 + +from activator.raw import get_raw_path +from tester.upload import get_last_group + + +class UploadTest(unittest.TestCase): + """Test components in the upload tester. + """ + mock_s3 = mock_s3() + bucket_name = "testBucketName" + + def setUp(self): + self.mock_s3.start() + s3 = boto3.resource("s3") + s3.create_bucket(Bucket=self.bucket_name) + + path = get_raw_path("TestCam", 123, "2022110200001", 2, 30, "TestFilter") + obj = s3.Object(self.bucket_name, path) + obj.put(Body=b'test1') + path = get_raw_path("TestCam", 123, "2022110200002", 2, 30, "TestFilter") + obj = s3.Object(self.bucket_name, path) + obj.put(Body=b'test2') + + def tearDown(self): + s3 = boto3.resource("s3") + bucket = s3.Bucket(self.bucket_name) + try: + bucket.objects.all().delete() + except botocore.exceptions.ClientError as e: + if e.response["Error"]["Code"] == "404": + # the key was not reachable - pass + pass + else: + raise + + bucket = s3.Bucket(self.bucket_name) + bucket.delete() + + # Stop the S3 mock. + self.mock_s3.stop() + + def test_upload(self): + s3 = boto3.resource("s3") + bucket = s3.Bucket(self.bucket_name) + + last_group = get_last_group(bucket, "TestCam", "20221102") + self.assertEqual(last_group, 2022110200002) + + # Test the case of no match + last_group = get_last_group(bucket, "TestCam", "20110101") + self.assertEqual(last_group, int(20110101) * 100_000)