Skip to content

Commit

Permalink
Merge 65674a1 into a2ced1a
Browse files Browse the repository at this point in the history
  • Loading branch information
julien6387 committed Mar 12, 2023
2 parents a2ced1a + 65674a1 commit a5149d2
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 17 deletions.
26 changes: 18 additions & 8 deletions supvisors/tests/test_supvisorswebsockets.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
import threading

import pytest

# ======================================================================
# Copyright 2023 Julien LE CLEACH
Expand All @@ -16,8 +19,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ======================================================================

import pytest
pytest.importorskip('websockets', reason='cannot test as optional websockets is not installed')

import json
Expand Down Expand Up @@ -77,14 +78,24 @@ def publish_all(publisher, close=False):
publisher.close()


def wait_thread_alive(thr: threading.Thread) -> bool:
""" Wait for thread to be alive (5 seconds max). """
cpt = 10
while cpt > 0 or not thr.is_alive():
time.sleep(0.5)
cpt -= 1
return thr.is_alive()


def check_subscription(subscriber, publisher,
supvisors_subscribed=False, instance_subscribed=False,
application_subscribed=False, event_subscribed=False, process_subscribed=False,
hstats_subscribed=False, pstats_subscribed=False):
""" The method tests the emission and reception of all status, depending on their subscription status. """
subscriber.start()
# give time to the websocket client to connect the server
time.sleep(2)
assert wait_thread_alive(publisher.thread)
assert wait_thread_alive(subscriber.thread)
# publish and receive
publish_all(publisher)
# give time to the subscriber to receive data
Expand Down Expand Up @@ -122,16 +133,15 @@ def test_external_publish_subscribe(supvisors):
publisher = WsEventPublisher(supvisors.supvisors_mapper.local_instance, supvisors.logger)
subscriber = SupvisorsWsEventInterface('localhost', port, supvisors.logger)
subscriber.start()
time.sleep(1)
assert wait_thread_alive(publisher.thread)
assert wait_thread_alive(subscriber.thread)
# check the Server side
assert publisher.thread.is_alive()
assert publisher.thread.loop.is_running()
assert not publisher.thread.stop_event.is_set()
# sleep a bit to give time to hit the reception timeout
time.sleep(WsEventSubscriber.RecvTimeout)
# check the Client side
assert subscriber.headers == set()
assert subscriber.thread.is_alive()
assert subscriber.thread.loop.is_running()
assert not subscriber.thread.stop_event.is_set()
# close the sockets
Expand Down Expand Up @@ -272,7 +282,7 @@ def test_unknown_message(mocker, publisher, real_subscriber):
""" Test the reception of a message with unknown header. """
# give time to the websocket client to connect the server
real_subscriber.start()
time.sleep(2)
assert wait_thread_alive(real_subscriber.thread)
# mock the on_xxx methods
mocked_ons = [mocker.patch.object(real_subscriber, method_name)
for method_name in ['on_supvisors_status', 'on_instance_status', 'on_application_status',
Expand All @@ -296,7 +306,7 @@ def test_close_server(real_subscriber, publisher):
""" Test the server closure while a client is connected. """
# give time to the websocket client to connect the server
real_subscriber.start()
time.sleep(2)
assert wait_thread_alive(real_subscriber.thread)
# the publisher will be stopped just after all the publications
# default websocket ping is 20 seconds
publish_all(publisher, True)
Expand Down
26 changes: 17 additions & 9 deletions supvisors/tests/test_supvisorszmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
pytest.importorskip('zmq', reason='cannot test as optional pyzmq is not installed')

import time
import threading

from time import sleep
from unittest.mock import call

from supvisors.client.zmqsubscriber import SupvisorsZmqEventInterface
Expand All @@ -34,7 +34,7 @@ def publisher(supvisors):
test_publisher = ZmqEventPublisher(supvisors.supvisors_mapper.local_instance, supvisors.logger)
yield test_publisher
test_publisher.close()
sleep(0.5)
time.sleep(0.5)


@pytest.fixture
Expand All @@ -44,7 +44,7 @@ def subscriber(mocker, supvisors):
mocker.patch.object(test_subscriber, 'on_receive')
yield test_subscriber
test_subscriber.stop()
sleep(0.5)
time.sleep(0.5)


@pytest.fixture
Expand All @@ -57,19 +57,27 @@ def real_subscriber(supvisors):
test_subscriber.stop()


def wait_thread_alive(thr: threading.Thread) -> bool:
""" Wait for thread to be alive (5 seconds max). """
cpt = 10
while cpt > 0 or not thr.is_alive():
time.sleep(0.5)
cpt -= 1
return thr.is_alive()


def test_external_publish_subscribe(supvisors):
""" Test the ZeroMQ publish-subscribe sockets used in the event interface of Supvisors. """
# create publisher and subscriber
publisher = ZmqEventPublisher(supvisors.supvisors_mapper.local_instance, supvisors.logger)
subscriber = SupvisorsZmqEventInterface(zmq.asyncio.Context.instance(), 'localhost', supvisors.options.event_port,
supvisors.logger)
subscriber.start()
time.sleep(1)
assert wait_thread_alive(subscriber.thread)
# check that the ZMQ sockets are ready
assert not publisher.socket.closed
# check the Client side
assert subscriber.headers == set()
assert subscriber.thread.is_alive()
assert subscriber.thread.loop.is_running()
assert not subscriber.thread.stop_event.is_set()
# close the sockets and stop the reception thread
Expand Down Expand Up @@ -109,7 +117,7 @@ def check_subscription(subscriber, publisher, supvisors_subscribed=False, instan
""" The method tests the emission and reception of all status, depending on their subscription status. """
subscriber.start()
# give time to the websocket client to connect the server
time.sleep(2)
assert wait_thread_alive(subscriber.thread)
# publish and receive
publish_all(publisher)
# give time to the subscriber to receive data
Expand Down Expand Up @@ -259,7 +267,7 @@ def test_unknown_message(mocker, publisher, real_subscriber):
""" Test the reception of a message with unknown header. """
# give time to the websocket client to connect the server
real_subscriber.start()
time.sleep(2)
assert wait_thread_alive(real_subscriber.thread)
# mock the on_xxx methods
mocked_ons = [mocker.patch.object(real_subscriber, method_name)
for method_name in ['on_supvisors_status', 'on_instance_status', 'on_application_status',
Expand All @@ -284,7 +292,7 @@ def test_erroneous_message(mocker, publisher, real_subscriber):
""" Test the reception of a message with unknown header. """
# give time to the websocket client to connect the server
real_subscriber.start()
time.sleep(2)
assert wait_thread_alive(real_subscriber.thread)
# mock the on_xxx methods
mocked_ons = [mocker.patch.object(real_subscriber, method_name)
for method_name in ['on_supvisors_status', 'on_instance_status', 'on_application_status',
Expand All @@ -303,7 +311,7 @@ def test_erroneous_socket(mocker, publisher, real_subscriber):
mocker.patch('zmq.sugar.socket.Socket.recv_json', side_effect=zmq.ZMQError)
# give time to the websocket client to connect the server
real_subscriber.start()
time.sleep(2)
assert wait_thread_alive(real_subscriber.thread)
# mock the on_xxx methods
mocked_ons = [mocker.patch.object(real_subscriber, method_name)
for method_name in ['on_supvisors_status', 'on_instance_status', 'on_application_status',
Expand Down

0 comments on commit a5149d2

Please sign in to comment.