Skip to content

Commit

Permalink
Merge pull request #14 from gerritholl/use-publisher-config
Browse files Browse the repository at this point in the history
Use publisher config.
  • Loading branch information
mraspaud committed May 24, 2022
2 parents 0201dc9 + 2cfa513 commit c87250d
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 9 deletions.
10 changes: 9 additions & 1 deletion cspp_runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import subprocess
import tempfile
import time
import yaml
from glob import glob
from datetime import datetime, timedelta
from multiprocessing.pool import ThreadPool
Expand Down Expand Up @@ -580,6 +581,7 @@ def npp_rolling_runner(
viirs_sdr_options,
granule_time_tolerance=10,
ncpus=1,
publisher_config=None
):
"""The NPP/VIIRS runner. Listens and triggers processing on RDR granules."""

Expand Down Expand Up @@ -609,10 +611,16 @@ def npp_rolling_runner(
LOG.info("Will use %d CPUs when running CSPP instances" % ncpus)
viirs_proc = ViirsSdrProcessor(ncpus, level1_home)

if publisher_config is None:
pubconf = {"name": "viirs_dr_runner", "port": 0}
else:
with open(publisher_config, mode="rt", encoding="utf-8") as fp:
pubconf = yaml.safe_load(fp)

LOG.debug("Subscribe topics = %s", str(subscribe_topics))
with posttroll.subscriber.Subscribe('',
subscribe_topics, True) as subscr:
with Publish('viirs_dr_runner', 0) as publisher:
with Publish(**pubconf) as publisher:
while True:
viirs_proc.initialise()
for msg in subscr.recv(timeout=300):
Expand Down
15 changes: 14 additions & 1 deletion cspp_runner/tests/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,13 @@ def test_spawn_cspp_failure(monkeypatch, tmp_path, caplog):
assert "CSPP probably failed!" in caplog.text


fake_publisher_config_contents = """name: test-publisher
port: 0
nameservers:
- localhost
"""


def test_rolling_runner(tmp_path, caplog, monkeypatch, fakemessage,
fake_results):
"""Test NPP rolling runner."""
Expand All @@ -299,7 +306,11 @@ def handler(signum, frame):
def fake_spawn_cspp(current_granule, *glist, viirs_sdr_call,
viirs_sdr_options):
fake_workdir.mkdir(exist_ok=True, parents=True)

return (os.fspath(fake_workdir), fake_results)
yaml_conf = tmp_path / "publisher.yaml"
with yaml_conf.open(mode="wt", encoding="ascii") as fp:
fp.write(fake_publisher_config_contents)

monkeypatch.setenv("CSPP_WORKDIR", os.fspath(fake_workdir))
with unittest.mock.patch("posttroll.subscriber.Subscribe") as psS, \
Expand All @@ -320,7 +331,9 @@ def fake_spawn_cspp(current_granule, *glist, viirs_sdr_call,
"true", "/file/available/rdr", "earth",
"test",
"/product/available/sdr", tmp_path / "sdr/results",
"true", [], 2)
"true", [],
ncpus=2,
publisher_config=os.fspath(yaml_conf))
except TimeOut:
pass # probably all is fine
else:
Expand Down
22 changes: 17 additions & 5 deletions cspp_runner/tests/test_viirs_dr_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,31 +35,43 @@
level1_home = /nowhere/special
"""

fake_publisher_config_contents = """name: test-publisher
port: 0
nameservers:
- localhost
"""


@patch("argparse.ArgumentParser", autospec=True)
def test_get_parser(ap):
"""Test parsing argumentsr."""
import cspp_runner.viirs_dr_runner
cspp_runner.viirs_dr_runner.parse_args()
assert ap.return_value.add_argument.call_count == 3
assert ap.return_value.add_argument.call_count == 4


@patch("cspp_runner.viirs_dr_runner.parse_args")
@patch("cspp_runner.viirs_dr_runner.npp_rolling_runner")
def test_main(crn, cvp, tmp_path):
import cspp_runner.viirs_dr_runner

# create fake config file
conf = tmp_path / "conf"
# create fake config files
conf = tmp_path / "conf.ini"
with conf.open(mode="wt", encoding="ascii") as fp:
fp.write(fake_conf_contents)
yaml_conf = tmp_path / "publisher.yaml"
with yaml_conf.open(mode="wt", encoding="ascii") as fp:
fp.write(fake_publisher_config_contents)

# and fake log destination
# fake log destination
log = tmp_path / "log"

# fake

cvp.return_value = cspp_runner.viirs_dr_runner.get_parser().parse_args(
["-c", os.fspath(conf),
"-C", "test",
"-l", os.fspath(log)])
"-l", os.fspath(log),
"-p", os.fspath(yaml_conf)])

cspp_runner.viirs_dr_runner.main()
5 changes: 4 additions & 1 deletion cspp_runner/viirs_dr_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ def get_parser():
type=str,
default=None,
help="The file to log to (stdout per default).")
parser.add_argument("-p", "--publisher", type=str,
help="File with publisher config (YAML).")
return parser


Expand Down Expand Up @@ -140,7 +142,8 @@ def main():
viirs_sdr_call,
viirs_sdr_options,
int(OPTIONS.get("granule_time_tolerance", 10)),
int(OPTIONS.get("ncpus", 1))
int(OPTIONS.get("ncpus", 1)),
publisher_config=args.publisher,
)


Expand Down
9 changes: 9 additions & 0 deletions examples/publisher-config.yaml_template
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# This configuration file can be used to configure the posttroll publisher
# that viirs_dr_runner uses to announce SDR production is complete. The
# contents are converted to a dict using yaml.safe_load and then passed as
# options to posttroll.publisher.Publish using the dict config introduced in
# posttroll 1.7.
name: viirs_dr_runner
port: 0
nameservers:
- localhost
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
long_description=long_description,
packages=['cspp_runner', ],
data_files=[],
install_requires=['posttroll', 'trollsift'],
install_requires=['posttroll>1.7', 'trollsift'],
# test_requires=['mock'],
# test_suite='cspp_runner.tests.suite',
python_requires='>=3.8',
Expand Down

0 comments on commit c87250d

Please sign in to comment.