Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions python/activator/activator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,26 @@
# This file is part of prompt_prototype.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

__all__ = ["check_for_snap", "next_visit_handler"]

import base64
import json
import logging
Expand All @@ -9,8 +32,8 @@
from flask import Flask, request
from google.cloud import pubsub_v1, storage

from middleware_interface import MiddlewareInterface
from visit import Visit
from .middleware_interface import MiddlewareInterface
from .visit import Visit

PROJECT_ID = "prompt-proto"

Expand Down
76 changes: 73 additions & 3 deletions python/activator/middleware_interface.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,34 @@
# This file is part of prompt_prototype.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

__all__ = ["MiddlewareInterface"]

import logging
import os
import shutil

from astropy.time import Time
from lsst.daf.butler import Butler

from visit import Visit
from .visit import Visit

PIPELINE_MAP = dict(
BIAS="bias.yaml",
Expand All @@ -18,14 +41,34 @@


class MiddlewareInterface:
"""Interface layer between the Butler middleware and the prompt processing
data handling system.

An instance of this class will accept an incoming group of snaps to
process, using an instance-local butler repo. The instance can pre-load
the necessary calibrations to process an incoming visit, ingest the data
when it is available, and run the difference imaging pipeline, all in that
local butler.

Parameters
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you include an explanation of what a MiddlewareInterface object actually represents? I find it very confusing that __init__ performs Butler operations, some of them ones that change state.

The fact that some (but not all) of the code assumes multiple concurrent MiddlewareInterface objects is probably a clue...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a paragraph, but I think we don't want to assume that any of the stuff in __init__ will stick around (I suspect most of that should move to prep_butler, but don't know yet).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, and we shouldn't document implementations anyway. I just meant that the class seems to be intended to be used in particular ways, which are not at all obvious from the name.

----------
input_repo : `str`
Path to a butler repo containing the calibration and other data needed
for processing images as they are received.
image_bucket : `str`
Google storage bucket where images will be written to as they arrive.
instrument : `str`
Name of the instrument taking the data, for populating butler
collections and dataIds.
"""
def __init__(self, input_repo: str, image_bucket: str, instrument: str):

# self.src = Butler(input_repo, writeable=False)
_log.info(f"Butler({input_repo}, writeable=False)")
self.image_bucket = image_bucket
self.instrument = instrument

self.repo = "/tmp/butler-{os.getpid()}"
self.repo = f"/tmp/butler-{os.getpid()}"
if not os.path.exists(self.repo):
_log.info(f"Making local Butler {self.repo}")
Butler.makeRepo(self.repo)
Expand Down Expand Up @@ -83,6 +126,13 @@ def __init__(self, input_repo: str, image_bucket: str, instrument: str):
self.calib_types = ["bias", "dark", "defects", "flat", "fringe", ]

def prep_butler(self, visit: Visit) -> None:
"""Prepare a temporary butler repo for processing the incoming data.

Parameters
----------
visit : Visit
Group of snaps from one detector to prepare the butler for.
"""
_log.info(f"Preparing Butler for visit '{visit}'")
visit_info = visit.__dict__
for calib_type in self.calib_types:
Expand Down Expand Up @@ -122,6 +172,14 @@ def prep_butler(self, visit: Visit) -> None:
shutil.rmtree(visit_dir, ignore_errors=True)

def ingest_image(self, oid: str) -> None:
"""Ingest an image into the temporary butler.

Parameters
----------
oid : `str`
Google storage identifier for incoming image, relative to the
image bucket.
"""
_log.info(f"Ingesting image '{oid}'")
run = f"{self.instrument}/raw/all"
cmd = [
Expand All @@ -137,7 +195,19 @@ def ingest_image(self, oid: str) -> None:
_log.debug(str(cmd))
# subprocess.run(cmd, check=True)

def run_pipeline(self, visit: Visit, snaps) -> None:
def run_pipeline(self, visit: Visit, snaps: set) -> None:
"""Process the received image.

Parameters
----------
visit : Visit
Group of snaps from one detector to be processed.
snaps : `set`
Identifiers of the snaps that were received.
TODO: I believe this is unnecessary because it should be encoded
in the `visit` object, but we'll have to test how that works once
we implemented this with actual data.
"""
pipeline = PIPELINE_MAP[visit.kind]
_log.info(f"Running pipeline {pipeline} on visit '{visit}', snaps {snaps}")
cmd = [
Expand Down
2 changes: 2 additions & 0 deletions python/activator/visit.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
__all__ = ["Visit"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No license statement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to muck with the visit.py files, because we may want to consolidate those somehow (there's two copies in different places).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the real one, the other is a symlink. I'm pretty sure we can delete the latter if we change the import in upload.py, I guess I was waiting for test coverage of upload before trying. 🙂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point, but I'm nervous about writing tests of upload (see KT's comment on DM-33938: upload.py is a test itself)

Copy link
Member

@kfindeisen kfindeisen Mar 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree -- for now, at least, upload.py is the server for which activator.py is the client; that makes it part of the system we're developing. In particular, we probably will need to make changes to upload.py as we start doing more substantial integration testing. (And some of those changes, especially anything involving the next_visit message, might well be reflected in the production system.)


from dataclasses import dataclass


Expand Down
2 changes: 1 addition & 1 deletion python/tester/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def process_group(publisher, bucket, instrument, group, filter, kind):
logging.info(f"Taking group {group} snap {snap}")
time.sleep(EXPOSURE_INTERVAL)
for detector in range(INSTRUMENTS[instrument].n_detectors):
logging.info(f"Uploading {group} {snap} {filter} {detector}")
logging.info(f"Uploading group: {group} snap: {snap} filter: {filter} detector: {detector}")
exposure_id = (group // 100000) * 100000
exposure_id += (group % 100000) * INSTRUMENTS[instrument].n_snaps
exposure_id += snap
Expand Down
72 changes: 72 additions & 0 deletions tests/test_middleware_interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# This file is part of prompt_prototype.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import os.path
import tempfile
import unittest

from activator.middleware_interface import MiddlewareInterface
from activator.visit import Visit


class MiddlewareInterfaceTest(unittest.TestCase):
def setUp(self):
input_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), "data")
instrument = "TestCam"
self.output_dir = tempfile.TemporaryDirectory()
self.interface = MiddlewareInterface(input_dir, self.output_dir, instrument)
self.next_visit = Visit(instrument,
detector=1,
group=1,
snaps=1,
filter="r",
ra=10,
dec=20,
kind="BIAS")
self.logger_name = "activator.middleware_interface"

def test_init(self):
"""Basic tests of initializing an interface object.
"""
# did we get a useable butler instance?
self.assertIn("/tmp/butler-", self.interface.dest.datastore.root.path)
# Ideas for things to test:
# * On init, does the right kind of butler get created, with the right
# collections, etc?
# * On init, is the local butler repo purely in memory?

def test_prep_butler(self):
with self.assertLogs(self.logger_name, level="INFO") as cm:
self.interface.prep_butler(self.next_visit)
msg = f"INFO:{self.logger_name}:Preparing Butler for visit '{self.next_visit}'"
self.assertEqual(cm.output, [msg])

def test_ingest_image(self):
with self.assertLogs(self.logger_name, level="INFO") as cm:
self.interface.ingest_image(self.next_visit)
msg = f"INFO:{self.logger_name}:Ingesting image '{self.next_visit}'"
self.assertEqual(cm.output, [msg])

def test_run_pipeline(self):
with self.assertLogs(self.logger_name, level="INFO") as cm:
self.interface.run_pipeline(self.next_visit, 1)
msg = f"INFO:{self.logger_name}:Running pipeline bias.yaml on visit '{self.next_visit}', snaps 1"
self.assertEqual(cm.output, [msg])