Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 79 additions & 92 deletions python/tester/upload.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
__all__ = ["get_last_group", ]

import boto3
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What functionality is missing from lsst.resources that would make it work for this? Flexibility of endpoint (GCS vs S3 vs WebDAV) is what lsst.resources was designed to help with so it would help me if I knew why you had decided against using it. Thanks.

Copy link
Member

@kfindeisen kfindeisen Nov 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This particular script is not supposed to depend on anything LSST. It was previously only useful in an environment that didn't have the Stack installed (and where installation would have been logistically difficult), and may be so again in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But you are installing boto3 so you are installing external software. Why can't you install lsst-resources from PyPI as well? Or are you using conda-forge so want me to add lsst-resources to conda-forge instead? lsst-resources is completely standalone PyPI installable with BSD license.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I simply wasn't aware of lsst.resources. Reading more, it makes sense to me to use it instead. Though I prefer this ticket to cover just the IDF->USDF port of upload.py. I can create a new ticket for switching to lsst.resources here and other places in the prompt processing codebase and handling dependency.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lsst.resources is how butler datastore can work on S3, WebDAV, GCS, or local file system without butler having to care where the files end up.

from confluent_kafka import Producer
import dataclasses
from google.cloud import pubsub_v1, storage
from google.oauth2 import service_account
import itertools
import json
import logging
import os
import random
import re
import socket
import sys
import time
from activator.raw import RAW_REGEXP, get_raw_path
from activator.raw import Snap, get_raw_path
from activator.visit import Visit


Expand All @@ -28,10 +31,10 @@ class Instrument:
EXPOSURE_INTERVAL = 18
SLEW_INTERVAL = 2
FILTER_LIST = "ugrizy"
PUBSUB_TOKEN = "abc123"
KINDS = ("BIAS", "DARK", "FLAT")

PROJECT_ID = "prompt-proto"
# Kafka server
kafka_cluster = os.environ["KAFKA_CLUSTER"]


logging.basicConfig(
Expand All @@ -42,12 +45,12 @@ class Instrument:
_log.setLevel(logging.DEBUG)


def process_group(publisher, visit_infos, uploader):
def process_group(producer, visit_infos, uploader):
"""Simulate the observation of a single on-sky pointing.

Parameters
----------
publisher : `google.cloud.pubsub_v1.PublisherClient`
producer : `confluent_kafka.Producer`
The client that posts ``next_visit`` messages.
visit_infos : `set` [`activator.Visit`]
The visit-detector combinations to be observed; each object may
Expand All @@ -66,7 +69,7 @@ def process_group(publisher, visit_infos, uploader):
_log.info("No observations to make; aborting.")
return

send_next_visit(publisher, group, visit_infos)
send_next_visit(producer, group, visit_infos)
# TODO: need asynchronous code to handle next_visit delay correctly
for snap in range(n_snaps):
_log.info(f"Taking group: {group} snap: {snap}")
Expand All @@ -79,24 +82,26 @@ def process_group(publisher, visit_infos, uploader):
f"detector: {info.detector}")


def send_next_visit(publisher, group, visit_infos):
def send_next_visit(producer, group, visit_infos):
"""Simulate the transmission of a ``next_visit`` message.

Parameters
----------
producer : `confluent_kafka.Producer`
The client that posts ``next_visit`` messages.
group : `str`
The group ID for the message to send.
visit_infos : `set` [`activator.Visit`]
The visit-detector combinations to be sent; each object may
represent multiple snaps.
"""
_log.info(f"Sending next_visit for group: {group}")
topic_path = publisher.topic_path(PROJECT_ID, "nextVisit")
topic = "next-visit-topic"
for info in visit_infos:
_log.debug(f"Sending next_visit for group: {info.group} detector: {info.detector} "
f"filter: {info.filter} ra: {info.ra} dec: {info.dec} kind: {info.kind}")
data = json.dumps(info.__dict__).encode("utf-8")
publisher.publish(topic_path, data=data)
producer.produce(topic, data)


def make_exposure_id(instrument, group, snap):
Expand Down Expand Up @@ -135,38 +140,38 @@ def main():

date = time.strftime("%Y%m%d")

credentials = service_account.Credentials.from_service_account_file(
"./prompt-proto-upload.json"
)
storage_client = storage.Client(PROJECT_ID, credentials=credentials)
dest_bucket = storage_client.bucket("rubin-prompt-proto-main")
batch_settings = pubsub_v1.types.BatchSettings(
max_messages=INSTRUMENTS[instrument].n_detectors,
endpoint_url = "https://s3dfrgw.slac.stanford.edu"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the endpoint URL need to be passed to boto3? I thought all environments already had S3_ENDPOINT_URL set.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems I need to pass in the endpoint URL even if the env var S3_ENDPOINT_URL is set.

Copy link
Member

@kfindeisen kfindeisen Nov 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's surprising, since activator.py didn't need to (it was using client instead of resource, but I don't think they're supposed to behave differently).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. To me using client without explicit endpoint but with the env var S3_ENDPOINT_URL doesn't work either.

Copy link
Collaborator Author

@hsinfang hsinfang Nov 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, I can instantiate a client without an explicit endpoint, but will then fail to access anything likely because the endpoint is not what's intended:
botocore.exceptions.ClientError: An error occurred (InvalidAccessKeyId) when calling the ListObjects operation: The AWS Access Key Id you provided does not exist in our records.

s3 = boto3.resource("s3", endpoint_url=endpoint_url)
dest_bucket = s3.Bucket("rubin-pp")
producer = Producer(
{"bootstrap.servers": kafka_cluster, "client.id": socket.gethostname()}
)
publisher = pubsub_v1.PublisherClient(credentials=credentials,
batch_settings=batch_settings)

last_group = get_last_group(storage_client, instrument, date)
last_group = get_last_group(dest_bucket, instrument, date)
_log.info(f"Last group {last_group}")

src_bucket = storage_client.bucket("rubin-prompt-proto-unobserved")
src_bucket = s3.Bucket("rubin-pp-users")
raw_pool = get_samples(src_bucket, instrument)

new_group_base = last_group + random.randrange(10, 19)
if raw_pool:
_log.info(f"Observing real raw files from {instrument}.")
upload_from_raws(publisher, instrument, raw_pool, src_bucket, dest_bucket, n_groups, last_group + 1)
upload_from_raws(producer, instrument, raw_pool, src_bucket, dest_bucket, n_groups, new_group_base)
else:
_log.info(f"No raw files found for {instrument}, generating dummy files instead.")
upload_from_random(publisher, instrument, dest_bucket, n_groups, last_group + 1)
upload_from_random(producer, instrument, dest_bucket, n_groups, new_group_base)


def get_last_group(storage_client, instrument, date):
"""Identify a group number that will not collide with any previous groups.
def get_last_group(bucket, instrument, date):
"""Identify the largest group number or a new group number.

This number helps decide the next group number so it will not
collide with any previous groups.

Parameters
----------
storage_client : `google.cloud.storage.Client`
A Google Cloud Storage object pointing to the active project.
bucket : `s3.Bucket`
A S3 storage bucket
instrument : `str`
The short name of the active instrument.
date : `str`
Expand All @@ -178,32 +183,19 @@ def get_last_group(storage_client, instrument, date):
The largest existing group for ``instrument``, or a newly generated
group if none exist.
"""
preblobs = storage_client.list_blobs(
"rubin-prompt-proto-main",
prefix=f"{instrument}/",
delimiter="/",
preblobs = bucket.objects.filter(
Prefix=f"{instrument}/",
)
# See https://cloud.google.com/storage/docs/samples/storage-list-files-with-prefix
for blob in preblobs:
# Iterate over blobs to get past `list_blobs`'s pagination and
# fill .prefixes.
pass
detector = min(int(prefix.split("/")[1]) for prefix in preblobs.prefixes)

blobs = storage_client.list_blobs(
"rubin-prompt-proto-main",
prefix=f"{instrument}/{detector}/{date}",
delimiter="/",
detector = min((int(preblob.key.split("/")[1]) for preblob in preblobs), default=0)

blobs = preblobs.filter(
Prefix=f"{instrument}/{detector}/{date}"
)
for blob in blobs:
# Iterate over blobs to get past `list_blobs`'s pagination and
# fill .prefixes.
pass
prefixes = [int(prefix.split("/")[2]) for prefix in blobs.prefixes]
prefixes = [int(blob.key.split("/")[2]) for blob in blobs]
if len(prefixes) == 0:
return int(date) * 100_000
else:
return max(prefixes) + random.randrange(10, 19)
return max(prefixes)


def make_random_visits(instrument, group):
Expand All @@ -221,7 +213,7 @@ def make_random_visits(instrument, group):
visits : `set` [`activator.Visit`]
Visits generated for ``group`` for all ``instrument``'s detectors.
"""
kind = KINDS[group % len(KINDS)]
kind = KINDS[int(group) % len(KINDS)]
filter = FILTER_LIST[random.randrange(0, len(FILTER_LIST))]
ra = random.uniform(0.0, 360.0)
dec = random.uniform(-90.0, 90.0)
Expand All @@ -238,14 +230,14 @@ def get_samples(bucket, instrument):

Parameters
----------
bucket : `google.cloud.storage.Bucket`
bucket : `S3.Bucket`
The bucket in which to search for predefined raws.
instrument : `str`
The short name of the instrument to sample.

Returns
-------
raws : mapping [`str`, mapping [`int`, mapping [`activator.Visit`, `google.cloud.storage.Blob`]]]
raws : mapping [`str`, mapping [`int`, mapping [`activator.Visit`, `s3.ObjectSummary`]]]
A mapping from group IDs to a mapping of snap ID. The value of the
innermost mapping is the observation metadata for each detector,
and a Blob representing the image taken in that detector-snap.
Expand All @@ -269,63 +261,57 @@ def get_samples(bucket, instrument):
59160: {"ra": 150.18157499999998, "dec": 2.2800083333333334, "rot": 270.0},
}

blobs = bucket.client.list_blobs(bucket.name, prefix=instrument)
# The pre-made raw files are stored with the "unobserved" prefix
blobs = bucket.objects.filter(Prefix=f"unobserved/{instrument}/")
result = {}
for blob in blobs:
# Assume that the unobserved bucket uses the same filename scheme as
# the observed bucket.
parsed = re.match(RAW_REGEXP, blob.name)
if not parsed:
_log.warning(f"Could not parse {blob.name}; ignoring file.")
continue

group = parsed.group('group')
snap_id = int(parsed.group('snap'))
exposure_id = int(parsed.group('expid'))
snap = Snap.from_oid(blob.key)
visit = Visit(instrument=instrument,
detector=int(parsed.group('detector')),
group=group,
detector=snap.detector,
group=snap.group,
snaps=INSTRUMENTS[instrument].n_snaps,
filter=parsed.group('filter'),
ra=hsc_metadata[exposure_id]["ra"],
dec=hsc_metadata[exposure_id]["dec"],
rot=hsc_metadata[exposure_id]["rot"],
filter=snap.filter,
ra=hsc_metadata[snap.exp_id]["ra"],
dec=hsc_metadata[snap.exp_id]["dec"],
rot=hsc_metadata[snap.exp_id]["rot"],
kind="SURVEY",
)
_log.debug(f"File {blob.name} parsed as snap {snap_id} of visit {visit}.")
if group in result:
snap_dict = result[group]
if snap_id in snap_dict:
_log.debug(f"New detector {visit.detector} added to snap {snap_id} of group {group}.")
detector_dict = snap_dict[snap_id]
_log.debug(f"File {blob.key} parsed as snap {snap.snap} of visit {visit}.")
if snap.group in result:
snap_dict = result[snap.group]
if snap.snap in snap_dict:
_log.debug(f"New detector {visit.detector} added to snap {snap.snap} of group {snap.group}.")
detector_dict = snap_dict[snap.snap]
detector_dict[visit] = blob
else:
_log.debug(f"New snap {snap_id} added to group {group}.")
snap_dict[snap_id] = {visit: blob}
_log.debug(f"New snap {snap.snap} added to group {snap.group}.")
snap_dict[snap.snap] = {visit: blob}
else:
_log.debug(f"New group {group} registered.")
result[group] = {snap_id: {visit: blob}}
_log.debug(f"New group {snap.group} registered.")
result[snap.group] = {snap.snap: {visit: blob}}

return result


def upload_from_raws(publisher, instrument, raw_pool, src_bucket, dest_bucket, n_groups, group_base):
def upload_from_raws(producer, instrument, raw_pool, src_bucket, dest_bucket, n_groups, group_base):
"""Upload visits and files using real raws.

Parameters
----------
publisher : `google.cloud.pubsub_v1.PublisherClient`
producer : `confluent_kafka.Producer`
The client that posts ``next_visit`` messages.
instrument : `str`
The short name of the instrument carrying out the observation.
raw_pool : mapping [`str`, mapping [`int`, mapping [`activator.Visit`, `google.cloud.storage.Blob`]]]
raw_pool : mapping [`str`, mapping [`int`, mapping [`activator.Visit`, `s3.ObjectSummary`]]]
Available raws as a mapping from group IDs to a mapping of snap ID.
The value of the innermost mapping is the observation metadata for
each detector, and a Blob representing the image taken in that
detector-snap.
src_bucket : `google.cloud.storage.Bucket`
src_bucket : `S3.Bucket`
The bucket containing the blobs in ``raw_pool``.
dest_bucket : `google.cloud.storage.Bucket`
dest_bucket : `S3.Bucket`
The bucket to which to upload the new images.
n_groups : `int`
The number of observation groups to simulate. If more than the number
Expand Down Expand Up @@ -364,25 +350,26 @@ def upload_from_pool(visit, snap_id):
src_blob = snap_dict[snap_id][visit]
# TODO: converting raw_pool from a nested mapping to an indexable
# custom class would make it easier to include such metadata as expId.
exposure_id = int(re.match(RAW_REGEXP, src_blob.name).group('expid'))
exposure_id = Snap.from_oid(src_blob.key).exp_id
filename = get_raw_path(visit.instrument, visit.detector, visit.group, snap_id,
exposure_id, visit.filter)
src_bucket.copy_blob(src_blob, dest_bucket, new_name=filename)
process_group(publisher, visit_infos, upload_from_pool)
src = {'Bucket': src_bucket.name, 'Key': src_blob.key}
dest_bucket.copy(src, filename)
process_group(producer, visit_infos, upload_from_pool)
_log.info("Slewing to next group")
time.sleep(SLEW_INTERVAL)


def upload_from_random(publisher, instrument, dest_bucket, n_groups, group_base):
def upload_from_random(producer, instrument, dest_bucket, n_groups, group_base):
"""Upload visits and files using randomly generated visits.

Parameters
----------
publisher : `google.cloud.pubsub_v1.PublisherClient`
producer : `confluent_kafka.Producer`
The client that posts ``next_visit`` messages.
instrument : `str`
The short name of the instrument carrying out the observation.
dest_bucket : `google.cloud.storage.Bucket`
dest_bucket : `S3.Bucket`
The bucket to which to upload the new images.
n_groups : `int`
The number of observation groups to simulate.
Expand All @@ -396,11 +383,11 @@ def upload_from_random(publisher, instrument, dest_bucket, n_groups, group_base)
# TODO: may be cleaner to use a functor object than to depend on
# closures for the bucket and data.
def upload_dummy(visit, snap_id):
exposure_id = make_exposure_id(visit.instrument, visit.group, snap_id)
exposure_id = make_exposure_id(visit.instrument, int(visit.group), snap_id)
filename = get_raw_path(visit.instrument, visit.detector, visit.group, snap_id,
exposure_id, visit.filter)
dest_bucket.blob(filename).upload_from_string("Test")
process_group(publisher, visit_infos, upload_dummy)
dest_bucket.put_object(Body=b"Test", Key=filename)
process_group(producer, visit_infos, upload_dummy)
_log.info("Slewing to next group")
time.sleep(SLEW_INTERVAL)

Expand Down
Loading