Skip to content

Commit

Permalink
Merge pull request pytroll#137 from adybbroe/feature-scisys-receiver-…
Browse files Browse the repository at this point in the history
…create-publish-topic-from-config-pattern-and-message

Create publish topic in scisys-receiver from config-pattern and message
  • Loading branch information
mraspaud committed Jan 24, 2024
2 parents 9a4482b + b752d38 commit 4fb2813
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 115 deletions.
32 changes: 9 additions & 23 deletions bin/scisys_receiver.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012 - 2021 Pytroll developers
# Copyright (c) 2012 - 2023 Pytroll developers
#
# Author(s):
#
# Martin Raspaud <martin.raspaud@smhi.se>
# Janne Kotro <janne.kotro@fmi.fi>
# Panu Lahtinen <panu.lahtinen@fmi.fi>
# Adam Dybbroe <Firstname.Lastname at smhi.se>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
Expand Down Expand Up @@ -52,24 +53,13 @@ def parse_args():
local_ips.remove('127.0.0.1')

parser = argparse.ArgumentParser()
parser.add_argument("host", help="GMC host")
parser.add_argument("port", help="Port to listen to", type=int)
parser.add_argument("-c", "--config",
help="YAML config file to use.")
parser.add_argument("-P", "--publish-port", type=int, default=0,
dest="publish_port", help="Publish port")
parser.add_argument("-n", "--nameserver", nargs='+', default=[],
dest="nameservers",
help="Nameserver(s) to connect to")
parser.add_argument("-s", "--station", help="Name of the station",
dest="station",
default="unknown")
parser.add_argument("-x", "--excluded_satellites", nargs='*',
help="List of platform names to exclude",
dest="excluded_satellites",
default=[])
parser.add_argument("-e", "--environment",
help="Name of the environment (e.g. dev, test, oper)",
dest="environment",
default="dev")
parser.add_argument("-l", "--log", help="File to log to",
dest="log", default=None)
parser.add_argument("-f", "--ftp_prefix", dest="ftp_prefix",
Expand All @@ -82,11 +72,6 @@ def parse_args():
help="IP of the target server."
"In case of multiple dispatches in GMC."
"Defaults to the local host.")
parser.add_argument("-T", "--topic_postfix",
dest="topic_postfix",
type=str,
help="Publish topic postfix. "
"Prefix will be /format/data_processing_level/")

return parser.parse_args()

Expand Down Expand Up @@ -115,15 +100,16 @@ def main():
"""Run scisys receiver."""
opts = parse_args()

no_sats = opts.excluded_satellites
configfile = opts.config

# no_sats = opts.excluded_satellites

setup_logging(log_file=opts.log)

try:
receive_from_zmq(opts.host, opts.port,
opts.station, opts.environment, no_sats,
receive_from_zmq(configfile,
opts.target_server, opts.ftp_prefix,
opts.topic_postfix, publish_port=opts.publish_port,
publish_port=opts.publish_port,
nameservers=opts.nameservers, days=1)
except KeyboardInterrupt:
pass
Expand Down
13 changes: 13 additions & 0 deletions examples/scisys_receiver.yaml_template
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Publish topic
publish_topic_pattern: "/{sensor}/{format}/{data_processing_level}/{platform_name}"

# GMC Server host
host: my-reception-server-name
# Port to listen to
port: portnumber

station: nrk
environment: dev

excluded_satellites:
- fy3d
33 changes: 33 additions & 0 deletions pytroll_collectors/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Copyright (c) 2023 Pytroll Developers

# Author(s):

# Adam Dybbroe <Firstname.Lastname at smhi.se>

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Handling the yaml configurations."""

import yaml


def read_config(config_filepath):
"""Read and extract config information."""
with open(config_filepath, 'r') as fp_:
config = yaml.safe_load(fp_)

return config
41 changes: 15 additions & 26 deletions pytroll_collectors/scisys.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
from time import sleep
from urllib.parse import SplitResult, urlsplit, urlunsplit

from trollsift import compose

from pytroll_collectors.config import read_config
from posttroll.message import Message
from posttroll.publisher import Publish
from pytroll_collectors.helper_functions import is_uri_on_server
Expand Down Expand Up @@ -493,37 +496,23 @@ def stop(self):
self.loop = False


def get_subject_from_msg2send(to_send, station, env, topic_postfix):
"""Get the publish topic from the message to be send."""
if isinstance(to_send['sensor'], str):
prefix = '/' + to_send['sensor'].replace('/', '-')
else:
prefix = ''
if topic_postfix is not None:
subject = "/".join((prefix, to_send['format'],
to_send['data_processing_level'],
topic_postfix))
else:
subject = "/".join((prefix, to_send['format'],
to_send['data_processing_level'],
station, env,
"polar", "direct_readout"))
return subject


def receive_from_zmq(host, port, station, environment, excluded_platforms,
target_server, ftp_prefix, topic_postfix,
def get_subject_from_message_and_config(to_send, config):
"""Get the publish topic from the message and the yaml configuration settings."""
return compose(config['publish_topic_pattern'], to_send)


def receive_from_zmq(config_filename,
target_server, ftp_prefix,
publish_port=0, nameservers=None, days=1):
"""Receive 2met! messages from zeromq."""
logger.debug("target_server: %s", str(target_server))
logger.debug("ftp_prefix: %s", str(ftp_prefix))
logger.debug("topic_postfix: %s", str(topic_postfix))
logger.debug("station %s", str(station))
logger.debug(type(target_server))

# socket = Subscriber(["tcp://localhost:9331"], ["2met!"])
sock = GMCSubscriber(host, port)
msg_rec = MessageReceiver(host, excluded_platforms,
config = read_config(config_filename)

sock = GMCSubscriber(config['host'], config['port'])
msg_rec = MessageReceiver(config['host'], config['excluded_platforms'],
target_server, ftp_prefix)

with Publish("receiver", port=publish_port,
Expand All @@ -539,7 +528,7 @@ def receive_from_zmq(host, port, station, environment, excluded_platforms,
if to_send is None:
continue

subject = get_subject_from_msg2send(to_send, station, environment, topic_postfix)
subject = get_subject_from_message_and_config(to_send, config)
logger.debug("Subject: %s", str(subject))
msg = Message(subject, "file", to_send).encode()
logger.debug("publishing %s", str(msg))
Expand Down
53 changes: 53 additions & 0 deletions pytroll_collectors/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Copyright (c) 2023 Adam.Dybbroe

# Author(s):

# Adam.Dybbroe <a000680@c21856.ad.smhi.se>

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Fixtures for unittests."""

import pytest


TEST_YAML_CONFIG_CONTENT_SCISYS_RECEIVER = """
# Publish topic
publish_topic_pattern: '/{sensor}/{format}/{data_processing_level}/{platform_name}'
# It is possible to here add a static postfix topic if needed:
topic_postfix: "my/cool/postfix/topic"
host: merlin
port: 10600
station: nrk
environment: dev
excluded_satellites:
- fy3d
"""


@pytest.fixture
def fake_yamlconfig_file_for_scisys_receiver(tmp_path):
"""Write fake yaml config file for the SCISYS receiver."""
file_path = tmp_path / 'test_scisys_receiver_config.yaml'
with open(file_path, 'w') as fpt:
fpt.write(TEST_YAML_CONFIG_CONTENT_SCISYS_RECEIVER)

yield file_path
40 changes: 40 additions & 0 deletions pytroll_collectors/tests/test_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Copyright (c) 2023 Adam.Dybbroe

# Author(s):

# Adam.Dybbroe <a000680@c21856.ad.smhi.se>

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Test getting the yaml configurations from file."""

from pytroll_collectors.config import read_config


def test_get_yaml_configuration(fake_yamlconfig_file_for_scisys_receiver):
"""Test read and get the yaml configuration for the scisys receiver from file."""
config = read_config(fake_yamlconfig_file_for_scisys_receiver)

assert config['publish_topic_pattern'] == '/{sensor}/{format}/{data_processing_level}/{platform_name}'
assert config['topic_postfix'] == 'my/cool/postfix/topic'
assert config['host'] == 'merlin'
assert isinstance(config['port'], int)
assert config['port'] == 10600
assert config['station'] == 'nrk'
assert config['environment'] == 'dev'
assert len(config['excluded_satellites']) == 1
assert config['excluded_satellites'][0] == 'fy3d'
92 changes: 26 additions & 66 deletions pytroll_collectors/tests/test_scisys.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from copy import deepcopy

from pytroll_collectors.scisys import MessageReceiver, TwoMetMessage
from pytroll_collectors.scisys import get_subject_from_msg2send
from pytroll_collectors.scisys import get_subject_from_message_and_config

hostname = 'localhost'

Expand Down Expand Up @@ -257,10 +257,28 @@ def test_twomet_messages_with_stop_reception_message(platform_name, tmp_path):
TestCase().assertDictEqual(to_send, CREATE_MESSAGES[platform_name](tmp_path))


@pytest.mark.parametrize("sensor, sensor_name", [(VIIRS, 'viirs'),
(ATMS, 'atms')])
def test_get_subject_from_msg2send_postfix_topic_is_none(sensor, sensor_name, tmp_path):
"""Test the get the subject from the message being send."""
@pytest.mark.parametrize("sensor, sensor_name, topic_pattern, topic_result",
[(VIIRS, 'viirs',
'/{sensor}/{format}/{data_processing_level}/{platform_name}',
'/viirs/RDR/0/Suomi-NPP'),
(VIIRS, 'viirs',
'/RDR/nrk/dev',
'/RDR/nrk/dev'),
(ATMS, 'atms',
'/{sensor}/{data_processing_level}/{format}/{platform_name}/{type}',
'/atms/0/RDR/Suomi-NPP/HDF5'),
(ATMS, 'atms',
'/{sensor}/{data_processing_level}/{format}/{platform_name}/{type}/nrk/dev',
'/atms/0/RDR/Suomi-NPP/HDF5/nrk/dev'),
])
def test_create_message_topic_from_message_and_config_pattern(sensor, sensor_name,
topic_pattern, topic_result, tmp_path):
"""Test create a message topic from the config pattern and the message."""
config = {'publish_topic_pattern': 'some-pattern-for-a-publish-topic',
'topic_postfix': 'my/cool/postfix/topic',
'host': 'merlin', 'port': 10600,
'station': 'nrk', 'environment': 'dev',
'excluded_satellites': ['fy3d']}
msg_rec = MessageReceiver("nimbus")

filename = tmp_path / sensor['uid']
Expand All @@ -269,65 +287,7 @@ def test_get_subject_from_msg2send_postfix_topic_is_none(sensor, sensor_name, tm
string = TwoMetMessage(INPUT_DISPATCH[sensor_name](str(filename.parent)))
to_send = msg_rec.receive(string)

topic_postfix = None
station = 'nrk'
environment = 'dev'
subject = get_subject_from_msg2send(to_send, station, environment, topic_postfix)

assert subject == "/{sensor}/RDR/0/nrk/dev/polar/direct_readout".format(sensor=sensor_name)


@pytest.mark.parametrize("sensor, sensor_name", [(VIIRS, 'viirs'),
(ATMS, 'atms')])
def test_get_subject_from_msg2send_with_postfix_topic(sensor, sensor_name, tmp_path):
"""Test the get the subject from the message being send."""
msg_rec = MessageReceiver("nimbus")

filename = tmp_path / sensor['uid']
create_empty_file(filename)

string = TwoMetMessage(INPUT_DISPATCH[sensor_name](str(filename.parent)))
to_send = msg_rec.receive(string)

station = 'nrk'
environment = 'dev'
topic_postfix = 'my_topic'
subject = get_subject_from_msg2send(to_send, station, environment, topic_postfix)

assert subject == "/{sensor}/RDR/0/my_topic".format(sensor=sensor_name)


def test_get_subject_from_msg2send_empty_postfix_topic(tmp_path):
"""Test get the subject from the message being send - empty postfix topic."""
msg_rec = MessageReceiver("nimbus")

filename = tmp_path / ATMS['uid']
create_empty_file(filename)

string = TwoMetMessage(INPUT_DISPATCH['atms'](str(filename.parent)))
to_send = msg_rec.receive(string)

station = 'nrk'
environment = 'dev'
topic_postfix = ''
subject = get_subject_from_msg2send(to_send, station, environment, topic_postfix)

assert subject == "/atms/RDR/0/"


def test_get_subject_from_msg2send_avhrr3(tmp_path):
"""Test get the subject from the message being send - avhrr data."""
msg_rec = MessageReceiver("nimbus")

filename = tmp_path / create_msg_n19(tmp_path)['uid']
create_empty_file(filename)

string = TwoMetMessage(create_fildis_n19(tmp_path))
to_send = msg_rec.receive(string)

station = 'nrk'
environment = 'dev'
topic_postfix = ''
config.update({'publish_topic_pattern': topic_pattern})
subject = get_subject_from_message_and_config(to_send, config)

subject = get_subject_from_msg2send(to_send, station, environment, topic_postfix)
assert subject == "/HRPT/0/"
assert subject == topic_result

0 comments on commit 4fb2813

Please sign in to comment.