Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions python/activator/activator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from flask import Flask, request
from google.cloud import pubsub_v1, storage

import lsst.geom
from lsst.daf.butler import Butler
from .middleware_interface import MiddlewareInterface
from .visit import Visit
Expand Down Expand Up @@ -119,6 +120,9 @@ def next_visit_handler() -> Tuple[str, int]:

payload = base64.b64decode(envelope["message"]["data"])
data = json.loads(payload)
# adapt hashable on-the-wire next_visit format to activator.Visit
data["boresight_center"] = lsst.geom.SpherePoint(data["ra"], data["dec"], lsst.geom.degrees),
data["orientation"] = lsst.geom.Angle(data["rot"], lsst.geom.degrees),
expected_visit = Visit(**data)
assert expected_visit.instrument == config_instrument
snap_set = set()
Expand Down
76 changes: 37 additions & 39 deletions python/tester/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@
import re
import sys
import time
from visit import Visit

from lsst.geom import SpherePoint, degrees
from visitMessage import VisitMessage


@dataclass
Expand Down Expand Up @@ -69,11 +67,11 @@ def process_group(publisher, visit_infos, uploader):
----------
publisher : `google.cloud.pubsub_v1.PublisherClient`
The client that posts ``next_visit`` messages.
visit_infos : `set` [`activator.Visit`]
visit_infos : `set` [`tester.VisitMessage`]
The visit-detector combinations to be observed; each object may
represent multiple snaps. Assumed to represent a single group, and to
share instrument, snaps, filter, and kind.
uploader : callable [`activator.Visit`, int]
uploader : callable [`tester.VisitMessage`, int]
A callable that takes an exposure spec and a snap ID, and uploads the
visit's data.
"""
Expand Down Expand Up @@ -106,7 +104,7 @@ def send_next_visit(publisher, group, visit_infos):
----------
group : `str`
The group ID for the message to send.
visit_infos : `set` [`activator.Visit`]
visit_infos : `set` [`tester.VisitMessage`]
The visit-detector combinations to be sent; each object may
represent multiple snaps.
"""
Expand Down Expand Up @@ -238,17 +236,17 @@ def make_random_visits(instrument, group):

Returns
-------
visits : `set` [`activator.Visit`]
visits : `set` [`tester.VisitMessage`]
Visits generated for ``group`` for all ``instrument``'s detectors.
"""
kind = KINDS[group % len(KINDS)]
filter = FILTER_LIST[random.randrange(0, len(FILTER_LIST))]
ra = random.uniform(0.0, 360.0) * degrees
dec = random.uniform(-90.0, 90.0) * degrees
rot = random.uniform(0.0, 360.0) * degrees
ra = random.uniform(0.0, 360.0)
dec = random.uniform(-90.0, 90.0)
rot = random.uniform(0.0, 360.0)
return {
Visit(instrument, detector, group, INSTRUMENTS[instrument].n_snaps, filter,
SpherePoint(ra, dec), rot, kind)
VisitMessage(instrument, detector, group, INSTRUMENTS[instrument].n_snaps, filter,
ra, dec, rot, kind)
for detector in range(INSTRUMENTS[instrument].n_detectors)
}

Expand All @@ -265,7 +263,7 @@ def get_samples(bucket, instrument):

Returns
-------
raws : mapping [`str`, mapping [`int`, mapping [`activator.Visit`, `google.cloud.storage.Blob`]]]
raws : mapping [`str`, mapping [`int`, mapping [`tester.VisitMessage`, `google.cloud.storage.Blob`]]]
A mapping from group IDs to a mapping of snap ID. The value of the
innermost mapping is the observation metadata for each detector,
and a Blob representing the image taken in that detector-snap.
Expand Down Expand Up @@ -294,17 +292,16 @@ def get_samples(bucket, instrument):
group = parsed.group('group')
snap_id = int(parsed.group('snap'))
exposure_id = int(parsed.group('expid'))
visit = Visit(instrument=instrument,
detector=int(parsed.group('detector')),
group=group,
snaps=INSTRUMENTS[instrument].n_snaps,
filter=parsed.group('filter'),
boresight_center=SpherePoint(hsc_metadata[exposure_id]["ra"],
hsc_metadata[exposure_id]["dec"],
degrees),
orientation=hsc_metadata[exposure_id]["rot"] * degrees,
kind="SURVEY",
)
visit = VisitMessage(instrument=instrument,
detector=int(parsed.group('detector')),
group=group,
snaps=INSTRUMENTS[instrument].n_snaps,
filter=parsed.group('filter'),
ra=hsc_metadata[exposure_id]["ra"],
dec=hsc_metadata[exposure_id]["dec"],
rot=hsc_metadata[exposure_id]["rot"],
kind="SURVEY",
)
_log.debug(f"File {blob.name} parsed as snap {snap_id} of visit {visit}.")
if group in result:
snap_dict = result[group]
Expand All @@ -331,7 +328,7 @@ def upload_from_raws(publisher, instrument, raw_pool, src_bucket, dest_bucket, n
The client that posts ``next_visit`` messages.
instrument : `str`
The short name of the instrument carrying out the observation.
raw_pool : mapping [`str`, mapping [`int`, mapping [`activator.Visit`, `google.cloud.storage.Blob`]]]
raw_pool : mapping [`str`, mapping [`int`, mapping [`tester.VisitMessage`, `google.cloud.storage.Blob`]]]
Available raws as a mapping from group IDs to a mapping of snap ID.
The value of the innermost mapping is the observation metadata for
each detector, and a Blob representing the image taken in that
Expand All @@ -353,12 +350,12 @@ def upload_from_raws(publisher, instrument, raw_pool, src_bucket, dest_bucket, n
# snap_dict maps snap_id to {visit: blob}
snap_dict = {}
# Copy all the visit-blob dictionaries under each snap_id,
# replacing the (immutable) Visit objects to point to group
# replacing the (immutable) VisitMessage objects to point to group
# instead of true_group.
for snap_id, old_visits in raw_pool[true_group].items():
snap_dict[snap_id] = {splice_group(true_visit, group): blob
for true_visit, blob in old_visits.items()}
# Gather all the Visit objects found in snap_dict, merging
# Gather all the VisitMessage objects found in snap_dict, merging
# duplicates for different snaps of the same detector.
visit_infos = {info for det_dict in snap_dict.values() for info in det_dict}

Expand Down Expand Up @@ -408,29 +405,30 @@ def upload_dummy(visit, snap_id):


def splice_group(visit, group):
"""Replace the group ID in a Visit object.
"""Replace the group ID in a VisitMessage object.

Parameters
----------
visit : `activator.Visit`
visit : `tester.VisitMessage`
The object to update.
group : `str`
The new group ID to use.

Returns
-------
new_visit : `activator.Visit`
new_visit : `tester.VisitMessage`
A visit with group ``group``, but otherwise identical to ``visit``.
"""
return Visit(instrument=visit.instrument,
detector=visit.detector,
group=group,
snaps=visit.snaps,
filter=visit.filter,
boresight_center=visit.boresight_center,
orientation=visit.orientation,
kind=visit.kind,
)
return VisitMessage(instrument=visit.instrument,
detector=visit.detector,
group=group,
snaps=visit.snaps,
filter=visit.filter,
ra=visit.ra,
dec=visit.dec,
rot=visit.rot,
kind=visit.kind,
)


if __name__ == "__main__":
Expand Down
1 change: 0 additions & 1 deletion python/tester/visit.py

This file was deleted.

16 changes: 16 additions & 0 deletions python/tester/visitMessage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
__all__ = ["VisitMessage"]

from dataclasses import dataclass


@dataclass(frozen=True)
class VisitMessage:
instrument: str
detector: int
group: str
snaps: int
filter: str
ra: float
dec: float
rot: float
kind: str