Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make it possible to configure publisher with a dictionary of settings #35

Merged
merged 17 commits into from
Nov 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 87 additions & 34 deletions posttroll/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@


def get_own_ip():
"""Get the host's ip number.
"""
"""Get the host's ip number."""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
sock.connect(("8.8.8.8", 80))
Expand Down Expand Up @@ -92,9 +91,7 @@ class Publisher(object):
"""

def __init__(self, address, name="", min_port=None, max_port=None):
"""Bind the publisher class to a port.
"""
# pylint: disable=E1103
"""Bind the publisher class to a port."""
self.name = name
self.destination = address
self.publish = get_context().socket(zmq.PUB)
Expand Down Expand Up @@ -129,33 +126,33 @@ def __init__(self, address, name="", min_port=None, max_port=None):
self._heartbeat = None
self._pub_lock = Lock()

def send(self, msg):
"""Send the given message.
def start(self):
"""Start the publisher.

Actually just returns *self*, but needed for consistent use from context manager.
"""
return self

def send(self, msg):
"""Send the given message."""
with self._pub_lock:
self.publish.send_string(msg)
return self

def stop(self):
"""Stop the publisher.
"""
"""Stop the publisher."""
self.publish.setsockopt(zmq.LINGER, 1)
self.publish.close()
return self

def heartbeat(self, min_interval=0):
"""Send a heartbeat ... but only if *min_interval* seconds has passed
since last beat.
"""
"""Send a heartbeat ... but only if *min_interval* seconds has passed since last beat."""
if not self._heartbeat:
self._heartbeat = _PublisherHeartbeat(self)
self._heartbeat(min_interval)


class _PublisherHeartbeat(object):

"""Publisher for heartbeat.
"""
"""Publisher for heartbeat."""

def __init__(self, publisher):
self.publisher = publisher
Expand Down Expand Up @@ -210,29 +207,25 @@ def __init__(self, name, port=0, aliases=None, broadcast_interval=2,
self.max_port = max_port

def start(self):
"""Start the publisher.
"""
pub_addr = "tcp://*:" + str(self._port)
"""Start the publisher."""
pub_addr = _get_publish_address(self._port)
self._publisher = self._publisher_class(pub_addr, self._name,
min_port=self.min_port,
max_port=self.max_port)
LOGGER.debug("entering publish %s", str(self._publisher.destination))
addr = ("tcp://" + str(get_own_ip()) + ":" +
str(self._publisher.port_number))
addr = _get_publish_address(self._publisher.port_number, str(get_own_ip()))
self._broadcaster = sendaddressservice(self._name, addr,
self._aliases,
self._broadcast_interval,
self._nameservers).start()
return self._publisher

def send(self, msg):
"""Send a *msg*.
"""
"""Send a *msg*."""
return self._publisher.send(msg)

def stop(self):
"""Stop the publisher.
"""
"""Stop the publisher."""
LOGGER.debug("exiting publish")
if self._publisher is not None:
self._publisher.stop()
Expand All @@ -242,14 +235,18 @@ def stop(self):
self._broadcaster = None


class Publish(NoisyPublisher):
def _get_publish_address(port, ip_address="*"):
return "tcp://" + ip_address + ":" + str(port)


class Publish(object):

"""The publishing context.

Broadcasts also the *name*, *port*, and optional *aliases* (using
:class:`posttroll.message_broadcaster.MessageBroadcaster`).
See :class:`Publisher` and :class:`NoisyPublisher` for more information on the arguments.

See :class:`NoisyPublisher` for more information on the arguments.
The publisher is selected based on the arguments, see :function:`dict_config` for
information how the selection is done.

Example on how to use the :class:`Publish` context::

Expand All @@ -258,7 +255,7 @@ class Publish(NoisyPublisher):
import time

try:
with Publish("my_service", 9000) as pub:
with Publish("my_service", port=9000) as pub:
counter = 0
while True:
counter += 1
Expand All @@ -270,11 +267,67 @@ class Publish(NoisyPublisher):
print("terminating publisher...")

"""
# Make this one subclassable with another publisher.
_publisher_class = Publisher

def __init__(self, name, port=0, aliases=None, broadcast_interval=2, nameservers=None,
min_port=None, max_port=None):
"""Initialize the class."""
settings = {'name': name, 'port': port, 'min_port': min_port, 'max_port': max_port,
'aliases': aliases, 'broadcast_interval': broadcast_interval,
'nameservers': nameservers}
self.publisher = dict_config(settings)

def __enter__(self):
return self.start()
return self.publisher.start()

def __exit__(self, exc_type, exc_val, exc_tb):
return self.stop()
self.publisher.stop()


def dict_config(settings):
"""Create a publisher based on dictionary of configuration items.

The publisher is created based on the given options in the following way:

- setting *settings['port']* to non-zero integer AND *settings['nameservers']* to *False*
will disable nameserver connections and address broadcasting, and publish the
messages only on the localhost on the given port

- setting *settings['nameservers']* to a list of hostnames will connect to nameservers
running on those servers, and in addition publish the messages on a random port on the
localhost

- setting *settings['port']* to zero and *settings['namservers']* to *None* will broadcast
the publisher address and port with multicast, and publish the messages on a random port.

The last two cases will require *settings['name']* to be set. Additional options are
described in the docstrings of the respective classes, namely :class:`~posttroll.publisher.Publisher` and
:class:`~posttroll.publisher.NoisyPublisher`.
"""
if settings.get('port') and settings.get('nameservers') is False:
return _get_publisher_instance(settings)
return _get_noisypublisher_instance(settings)


def _get_publisher_instance(settings):
publisher_address = _get_publish_address(settings['port'])
publisher_name = settings.get("name", "")
min_port = settings.get("min_port")
max_port = settings.get("max_port")

return Publisher(publisher_address, name=publisher_name, min_port=min_port, max_port=max_port)


def _get_noisypublisher_instance(settings):
try:
publisher_name = settings["name"]
except KeyError:
raise KeyError("NoisyPublisher requires a name")
port = settings.get("port", 0)
aliases = settings.get("aliases")
broadcast_interval = settings.get("broadcast_interval", 2)
nameservers = settings.get("nameservers")
min_port = settings.get("min_port")
max_port = settings.get("max_port")

return NoisyPublisher(publisher_name, port=port, aliases=aliases, broadcast_interval=broadcast_interval,
nameservers=nameservers, min_port=min_port, max_port=max_port)
112 changes: 112 additions & 0 deletions posttroll/tests/test_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,116 @@ def test_localhost_restriction(self, mcrec, pub, msg):
adr.stop()


class TestPublisherDictConfig(unittest.TestCase):
"""Test configuring publishers with a dictionary."""

@mock.patch('posttroll.publisher.Publisher')
def test_publisher_is_selected(self, Publisher):
"""Test that Publisher is selected as publisher class."""
from posttroll.publisher import dict_config

settings = {'port': 12345, 'nameservers': False}

pub = dict_config(settings)
Publisher.assert_called_once()
assert pub is not None

@mock.patch('posttroll.publisher.Publisher')
def test_publisher_all_arguments(self, Publisher):
"""Test that only valid arguments are passed to Publisher."""
from posttroll.publisher import dict_config

settings = {'port': 12345, 'nameservers': False, 'name': 'foo',
'min_port': 40000, 'max_port': 41000, 'invalid_arg': 'bar'}
_ = dict_config(settings)
_check_valid_settings_in_call(settings, Publisher, ignore=['port', 'nameservers'])
assert Publisher.call_args[0][0].startswith("tcp://*:")
assert Publisher.call_args[0][0].endswith(str(settings['port']))

def test_no_name_raises_keyerror(self):
"""Trying to create a NoisyPublisher without a given name will raise KeyError."""
from posttroll.publisher import dict_config

with self.assertRaises(KeyError):
_ = dict_config(dict())

@mock.patch('posttroll.publisher.NoisyPublisher')
def test_noisypublisher_is_selected_only_name(self, NoisyPublisher):
"""Test that NoisyPublisher is selected as publisher class."""
from posttroll.publisher import dict_config

settings = {'name': 'publisher_name'}

pub = dict_config(settings)
NoisyPublisher.assert_called_once()
assert pub is not None

@mock.patch('posttroll.publisher.NoisyPublisher')
def test_noisypublisher_is_selected_name_and_port(self, NoisyPublisher):
"""Test that NoisyPublisher is selected as publisher class."""
from posttroll.publisher import dict_config

settings = {'name': 'publisher_name', 'port': 40000}

_ = dict_config(settings)
NoisyPublisher.assert_called_once()

@mock.patch('posttroll.publisher.NoisyPublisher')
def test_noisypublisher_all_arguments(self, NoisyPublisher):
"""Test that only valid arguments are passed to NoisyPublisher."""
from posttroll.publisher import dict_config

settings = {'port': 12345, 'nameservers': ['foo'], 'name': 'foo',
'min_port': 40000, 'max_port': 41000, 'invalid_arg': 'bar',
'aliases': ['alias1', 'alias2'], 'broadcast_interval': 42}
_ = dict_config(settings)
_check_valid_settings_in_call(settings, NoisyPublisher, ignore=['name'])
assert NoisyPublisher.call_args[0][0] == settings["name"]

@mock.patch('posttroll.publisher.Publisher')
def test_publish_is_not_noisy(self, Publisher):
"""Test that Publisher is selected with the context manager when it should be."""
from posttroll.publisher import Publish

with Publish("service_name", port=40000, nameservers=False):
Publisher.assert_called_once()

@mock.patch('posttroll.publisher.NoisyPublisher')
def test_publish_is_noisy_only_name(self, NoisyPublisher):
"""Test that NoisyPublisher is selected with the context manager when only name is given."""
from posttroll.publisher import Publish

with Publish("service_name"):
NoisyPublisher.assert_called_once()

@mock.patch('posttroll.publisher.NoisyPublisher')
def test_publish_is_noisy_with_port(self, NoisyPublisher):
"""Test that NoisyPublisher is selected with the context manager when port is given."""
from posttroll.publisher import Publish

with Publish("service_name", port=40000):
NoisyPublisher.assert_called_once()

@mock.patch('posttroll.publisher.NoisyPublisher')
def test_publish_is_noisy_with_nameservers(self, NoisyPublisher):
"""Test that NoisyPublisher is selected with the context manager when nameservers are given."""
from posttroll.publisher import Publish

with Publish("service_name", nameservers=['a', 'b']):
NoisyPublisher.assert_called_once()


def _check_valid_settings_in_call(settings, pub_class, ignore=None):
ignore = ignore or []
for key in settings:
if key == 'invalid_arg':
assert 'invalid_arg' not in pub_class.call_args[1]
continue
if key in ignore:
continue
assert pub_class.call_args[1][key] == settings[key]


def suite():
"""The suite for test_bbmcast.
"""
Expand All @@ -399,4 +509,6 @@ def suite():
mysuite.addTest(loader.loadTestsFromTestCase(TestListenerContainer))
mysuite.addTest(loader.loadTestsFromTestCase(TestPub))
mysuite.addTest(loader.loadTestsFromTestCase(TestAddressReceiver))
mysuite.addTest(loader.loadTestsFromTestCase(TestPublisherDictConfig))

return mysuite