Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/playbook.rst
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ tester
On a local machine, it requires a JSON token in ``./prompt-proto-upload.json``.
To obtain a token, see the GCP documentation on `service account keys`_; the relevant service account is ``prompt-image-upload@prompt-proto.iam.gserviceaccount.com``.

.. _service account keys: https://cloud.google.com/iam/docs/creating-managing-service-account-keys#getting_a_service_account_key
.. _service account keys: https://cloud.google.com/iam/docs/creating-managing-service-account-keys

Run the tester either on a local machine, or in Cloud Shell.
In Cloud Shell, install the prototype code and the GCP PubSub client:
Expand Down
35 changes: 18 additions & 17 deletions python/activator/activator.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@
),
style="{",
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
_log = logging.getLogger("lsst." + __name__)
_log.setLevel(logging.DEBUG)
app = Flask(__name__)

subscriber = pubsub_v1.SubscriberClient()
Expand All @@ -74,13 +74,13 @@ def check_for_snap(
instrument: str, group: int, snap: int, detector: int
) -> Optional[str]:
prefix = f"{instrument}/{detector}/{group}/{snap}/{instrument}-{group}-{snap}-"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to make use of the MDC feature that we use in butler logging? That lets you set an MDC record and after that every log message will get that content included without you having to explicitly add "prefix" to every log message.

logger.debug(f"Checking for '{prefix}'")
_log.debug(f"Checking for '{prefix}'")
blobs = list(storage_client.list_blobs(image_bucket, prefix=prefix))
if not blobs:
return None
elif len(blobs) > 1:
logger.error(
f"Multiple files detected for a single group/snap/detector: '{prefix}'"
_log.error(
f"Multiple files detected for a single detector/group/snap: '{prefix}'"
)
return blobs[0]

Expand All @@ -93,17 +93,17 @@ def next_visit_handler() -> Tuple[str, int]:
topic=topic_path,
ack_deadline_seconds=60,
)
logger.debug(f"Created subscription '{subscription.name}'")
_log.debug(f"Created subscription '{subscription.name}'")
try:
envelope = request.get_json()
if not envelope:
msg = "no Pub/Sub message received"
logging.warn(f"error: '{msg}'")
_log.warn(f"error: '{msg}'")
return f"Bad Request: {msg}", 400

if not isinstance(envelope, dict) or "message" not in envelope:
msg = "invalid Pub/Sub message format"
logging.warn(f"error: '{msg}'")
_log.warn(f"error: '{msg}'")
return f"Bad Request: {msg}", 400

payload = base64.b64decode(envelope["message"]["data"])
Expand All @@ -127,7 +127,7 @@ def next_visit_handler() -> Tuple[str, int]:
mwi.ingest_image(oid)
snap_set.add(snap)

logger.debug(
_log.debug(
"Waiting for snaps from group"
f" '{expected_visit.group}' detector {expected_visit.detector}"
)
Expand All @@ -141,14 +141,14 @@ def next_visit_handler() -> Tuple[str, int]:
end = time.time()
if len(response.received_messages) == 0:
if end - start < timeout:
logger.debug(
f"Empty pull after {end - start}"
f" for '{expected_visit.group}'"
_log.debug(
f"Empty pull after {end - start}s"
f" for group '{expected_visit.group}'"
)
continue
logger.warning(
_log.warning(
"Timed out waiting for image in"
f" '{expected_visit.group}' after receiving snaps {snap_set}"
f" group '{expected_visit.group}' after receiving snaps {snap_set}"
)
break

Expand All @@ -159,7 +159,7 @@ def next_visit_handler() -> Tuple[str, int]:
m = re.match(oid_regexp, oid)
if m:
instrument, detector, group, snap = m.groups()
logger.debug(m.groups())
_log.debug("instrument, detector, group, snap = %s", m.groups())
if (
instrument == expected_visit.instrument
and int(detector) == int(expected_visit.detector)
Expand All @@ -170,10 +170,11 @@ def next_visit_handler() -> Tuple[str, int]:
mwi.ingest_image(oid)
snap_set.add(snap)
else:
logger.error(f"Failed to match object id '{oid}'")
_log.error(f"Failed to match object id '{oid}'")
subscriber.acknowledge(subscription=subscription.name, ack_ids=ack_list)

# Got all the snaps; run the pipeline
_log.info(f"Running pipeline on group: {expected_visit.group} detector: {expected_visit.detector}")
mwi.run_pipeline(expected_visit, snap_set)
return "Pipeline executed", 200
finally:
Expand All @@ -182,7 +183,7 @@ def next_visit_handler() -> Tuple[str, int]:

@app.errorhandler(500)
def server_error(e) -> Tuple[str, int]:
logger.exception("An error occurred during a request.")
_log.exception("An error occurred during a request.")
return (
f"""
An internal error occurred: <pre>{e}</pre>
Expand Down
12 changes: 7 additions & 5 deletions python/activator/middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
FLAT="flat.yaml",
)

_log = logging.getLogger(__name__)
_log = logging.getLogger("lsst." + __name__)
_log.setLevel(logging.DEBUG)


Expand Down Expand Up @@ -64,7 +64,7 @@ class MiddlewareInterface:
def __init__(self, input_repo: str, image_bucket: str, instrument: str):

# self.src = Butler(input_repo, writeable=False)
_log.info(f"Butler({input_repo}, writeable=False)")
_log.debug(f"Butler({input_repo}, writeable=False)")
self.image_bucket = image_bucket
self.instrument = instrument

Expand All @@ -82,6 +82,7 @@ def __init__(self, input_repo: str, image_bucket: str, instrument: str):

export_collections = set()
export_collections.add(self.calibration_collection)
_log.debug("Finding secondary collections")
# calib_collections = list(
# self.r.queryCollections(
# self.calibration_collection,
Expand All @@ -94,6 +95,7 @@ def __init__(self, input_repo: str, image_bucket: str, instrument: str):
# export_collections.add(collection)
export_collections.add(refcat_collection)

_log.debug("Finding refcats")
# for dataset in self.r.queryDatasets(
# "gaia_dr2_20200414",
# where=f"htm7 IN ({htm7})",
Expand Down Expand Up @@ -180,7 +182,7 @@ def ingest_image(self, oid: str) -> None:
Google storage identifier for incoming image, relative to the
image bucket.
"""
_log.info(f"Ingesting image '{oid}'")
_log.info(f"Ingesting image id '{oid}'")
run = f"{self.instrument}/raw/all"
cmd = [
"butler",
Expand All @@ -192,7 +194,7 @@ def ingest_image(self, oid: str) -> None:
self.repo,
f"gs://{self.image_bucket}/{oid}",
]
_log.debug(str(cmd))
_log.debug("ingest command line: %s", cmd)
# subprocess.run(cmd, check=True)

def run_pipeline(self, visit: Visit, snaps: set) -> None:
Expand Down Expand Up @@ -221,7 +223,7 @@ def run_pipeline(self, visit: Visit, snaps: set) -> None:
"-i",
f"{self.instrument}/raw/all",
]
_log.debug(str(cmd))
_log.debug("pipetask command line: %s", cmd)
# subprocess.run(cmd, check=True)


Expand Down
15 changes: 9 additions & 6 deletions python/tester/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,19 @@ class Instrument:
logging.basicConfig(
format="{levelname} {asctime} {name} - {message}",
style="{",
level=logging.DEBUG,
)
_log = logging.getLogger("lsst." + __name__)
_log.setLevel(logging.DEBUG)


def process_group(publisher, bucket, instrument, group, filter, kind):
n_snaps = INSTRUMENTS[instrument].n_snaps
send_next_visit(publisher, instrument, group, n_snaps, filter, kind)
for snap in range(n_snaps):
logging.info(f"Taking group {group} snap {snap}")
_log.info(f"Taking group: {group} snap: {snap}")
time.sleep(EXPOSURE_INTERVAL)
for detector in range(INSTRUMENTS[instrument].n_detectors):
logging.info(f"Uploading group: {group} snap: {snap} filter: {filter} detector: {detector}")
_log.info(f"Uploading group: {group} snap: {snap} filter: {filter} detector: {detector}")
exposure_id = (group // 100000) * 100000
exposure_id += (group % 100000) * INSTRUMENTS[instrument].n_snaps
exposure_id += snap
Expand All @@ -54,14 +55,16 @@ def process_group(publisher, bucket, instrument, group, filter, kind):
f"-{exposure_id}-{filter}-{detector}.fz"
)
bucket.blob(fname).upload_from_string("Test")
logging.info(f"Uploaded {group} {snap} {filter} {detector}")
_log.info(f"Uploaded group: {group} snap: {snap} filter: {filter} detector: {detector}")


def send_next_visit(publisher, instrument, group, snaps, filter, kind):
_log.info(f"Sending next_visit for group: {group} snaps: {snaps} filter: {filter} kind: {kind}")
topic_path = publisher.topic_path(PROJECT_ID, "nextVisit")
ra = random.uniform(0.0, 360.0)
dec = random.uniform(-90.0, 90.0)
for detector in range(INSTRUMENTS[instrument].n_detectors):
_log.debug(f"Sending next_visit for group: {group} detector: {detector} ra: {ra} dec: {dec}")
visit = Visit(instrument, detector, group, snaps, filter, ra, dec, kind)
data = json.dumps(visit.__dict__).encode("utf-8")
publisher.publish(topic_path, data=data)
Expand Down Expand Up @@ -99,14 +102,14 @@ def main():
last_group = int(date) * 100000
else:
last_group = max(prefixes) + random.randrange(10, 19)
logging.info(f"Last group {last_group}")
_log.info(f"Last group {last_group}")

for i in range(n_groups):
kind = KINDS[i % len(KINDS)]
group = last_group + i + 1
filter = FILTER_LIST[random.randrange(0, len(FILTER_LIST))]
process_group(publisher, bucket, instrument, group, filter, kind)
logging.info("Slewing to next group")
_log.info("Slewing to next group")
time.sleep(SLEW_INTERVAL)


Expand Down
4 changes: 2 additions & 2 deletions tests/test_middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def setUp(self):
ra=10,
dec=20,
kind="BIAS")
self.logger_name = "activator.middleware_interface"
self.logger_name = "lsst.activator.middleware_interface"

def test_init(self):
"""Basic tests of initializing an interface object.
Expand All @@ -62,7 +62,7 @@ def test_prep_butler(self):
def test_ingest_image(self):
with self.assertLogs(self.logger_name, level="INFO") as cm:
self.interface.ingest_image(self.next_visit)
msg = f"INFO:{self.logger_name}:Ingesting image '{self.next_visit}'"
msg = f"INFO:{self.logger_name}:Ingesting image id '{self.next_visit}'"
self.assertEqual(cm.output, [msg])

def test_run_pipeline(self):
Expand Down