Skip to content

Commit

Permalink
get rid of asyncio warnings in unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
julien6387 committed Aug 14, 2023
1 parent 24b9871 commit 8c36432
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 81 deletions.
8 changes: 4 additions & 4 deletions supvisors/internal_com/internal_com.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,11 @@ def __init__(self, async_loop: asyncio.AbstractEventLoop, supvisors: Any) -> Non
self.supvisors = supvisors
# asyncio loop attributes
self.loop: asyncio.AbstractEventLoop = async_loop
self.stop_event: asyncio.Event = asyncio.Event(loop=async_loop)
self.stop_event: asyncio.Event = asyncio.Event()
# asyncio queues
self.requester_queue = asyncio.Queue(loop=async_loop)
self.subscriber_queue = asyncio.Queue(loop=async_loop)
self.discovery_queue = asyncio.Queue(loop=async_loop)
self.requester_queue = asyncio.Queue()
self.subscriber_queue = asyncio.Queue()
self.discovery_queue = asyncio.Queue()
# asyncio tasks
self.puller: RequestAsyncPuller = RequestAsyncPuller(self.requester_queue, self.stop_event, supvisors)
self.subscribers: InternalAsyncSubscribers = InternalAsyncSubscribers(self.subscriber_queue,
Expand Down
14 changes: 8 additions & 6 deletions supvisors/internal_com/mainloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,8 @@ def __init__(self, supvisors: Any) -> None:
threading.Thread.__init__(self, daemon=True)
# keep a reference to the Supvisors instance
self.supvisors = supvisors
# create the asyncio object that will receive all events
self.async_loop: asyncio.AbstractEventLoop = asyncio.new_event_loop()
self.receiver = SupvisorsInternalReceiver(self.async_loop, supvisors)
# the asyncio object that will receive all events
self.receiver: Optional[SupvisorsInternalReceiver] = None
# create an XML-RPC client to the local Supervisor instance
self.proxy: SupervisorProxy = SupervisorProxy(supvisors)

Expand All @@ -322,8 +321,11 @@ def run(self):
""" The SupvisorsMainLoop thread runs an asynchronous event loop for all I/O operations. """
self.logger.info('SupvisorsMainLoop.run: entering main loop')
self.proxy.start()
# assign the asynchronous event loop to this thread
asyncio.set_event_loop(self.async_loop)
# assign a new asynchronous event loop to this thread
async_loop: asyncio.AbstractEventLoop = asyncio.new_event_loop()
asyncio.set_event_loop(async_loop)
# create the asyncio object that will receive all events
self.receiver = SupvisorsInternalReceiver(async_loop, self.supvisors)
# get the receiver tasks
all_coro = self.receiver.get_tasks()
# add the reception tasks for this class
Expand All @@ -332,7 +334,7 @@ def run(self):
self.read_queue(self.receiver.discovery_queue, self.check_discovery_event)])
all_tasks = asyncio.gather(*all_coro)
# run the asynchronous event loop with the given tasks
self.async_loop.run_until_complete(all_tasks)
async_loop.run_until_complete(all_tasks)
# exiting the main loop
self.proxy.stop()
self.logger.info('SupvisorsMainLoop.run: exiting main loop')
Expand Down
11 changes: 6 additions & 5 deletions supvisors/internal_com/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,11 @@ def _remove_client(self, fd):
:param fd: the descriptor of the client socket
:return: None
"""
self.poller.unregister(fd)
client = self.clients.pop(fd)
client.socket.close()
self.logger.debug(f'PublisherServer._remove_client: client={str(client)} closed')
if fd in self.clients:
self.poller.unregister(fd)
client = self.clients.pop(fd)
client.socket.close()
self.logger.debug(f'PublisherServer._remove_client: client={str(client)} closed')

def _handle_events(self, events):
""" Extract the messages from the readable sockets.
Expand Down Expand Up @@ -287,7 +288,7 @@ def _publish_message(self, message: bytes) -> None:
for client in list(self.clients.values()):
try:
client.socket.sendall(buffer)
except error:
except OSError:
# upon message send exception, remove the client from the clients list
self._remove_client(client.fd)

Expand Down
25 changes: 18 additions & 7 deletions supvisors/tests/test_internalcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ======================================================================

from socket import socket
from threading import Timer

Expand Down Expand Up @@ -109,6 +108,8 @@ def receiver(supvisors, request) -> SupvisorsInternalReceiver:
""" Fixture for the instance to test. """
if request.param == 'discovery':
supvisors.options.multicast_group = '239.0.0.1', 7777
# store a real unix socket in emitter
supvisors.internal_com.puller_sock, _ = socketpair()
loop = asyncio.get_event_loop()
internal_receiver = SupvisorsInternalReceiver(loop, supvisors)
return internal_receiver
Expand All @@ -128,9 +129,14 @@ def test_receiver(supvisors, receiver):
assert receiver.discovery_coro is None
# test the number of tasks (one per Supvisors instance, local instance excepted, + stop, + puller)
assert len(supvisors.supvisors_mapper.instances) == 7
tasks = receiver.get_tasks()
assert all(asyncio.iscoroutine(x) for x in tasks)
assert len(tasks) == len(supvisors.supvisors_mapper.instances) + 1
try:
tasks = receiver.get_tasks()
assert all(asyncio.iscoroutine(x) for x in tasks)
assert len(tasks) == len(supvisors.supvisors_mapper.instances) + 1
finally:
# avoid warnings about coroutines never awaited
receiver.stop_event.set()
asyncio.get_event_loop().run_until_complete(asyncio.gather(*tasks))


@pytest.mark.parametrize('receiver', ['discovery'], indirect=True)
Expand All @@ -147,9 +153,14 @@ def test_receiver_discovery(supvisors, receiver):
assert asyncio.iscoroutine(receiver.discovery_coro)
# test the number of tasks (one per Supvisors instance, local instance excepted, + stop, + puller, + discovery)
assert len(supvisors.supvisors_mapper.instances) == 7
tasks = receiver.get_tasks()
assert all(asyncio.iscoroutine(x) for x in tasks)
assert len(tasks) == len(supvisors.supvisors_mapper.instances) + 2
try:
tasks = receiver.get_tasks()
assert all(asyncio.iscoroutine(x) for x in tasks)
assert len(tasks) == len(supvisors.supvisors_mapper.instances) + 2
finally:
# avoid warnings about coroutines never awaited
receiver.stop_event.set()
asyncio.get_event_loop().run_until_complete(asyncio.gather(*tasks))


@pytest.mark.parametrize('receiver', [''], indirect=True)
Expand Down
16 changes: 6 additions & 10 deletions supvisors/tests/test_mainloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,24 +417,23 @@ def test_mainloop_creation(supvisors, main_loop):
""" Test the values set at construction. """
assert isinstance(main_loop, threading.Thread)
assert main_loop.supvisors is supvisors
assert main_loop.async_loop != asyncio.get_event_loop()
assert type(main_loop.receiver) is SupvisorsInternalReceiver
assert main_loop.receiver is None
assert type(main_loop.proxy) is SupervisorProxy
# start and stop


def test_mainloop_stop(mocker, main_loop):
""" Test the stopping of the main loop thread. """
mocked_join = mocker.patch.object(main_loop, 'join')
mocked_recv = mocker.patch.object(main_loop.receiver, 'stop')
mocked_recv = mocker.patch.object(main_loop, 'receiver')
# try to stop main loop before it is started
main_loop.stop()
assert not mocked_recv.called
assert not mocked_recv.stop.called
assert not mocked_join.called
# stop main loop when alive
mocker.patch.object(main_loop, 'is_alive', return_value=True)
main_loop.stop()
assert mocked_recv.called
assert mocked_recv.stop.called
assert mocked_join.called


Expand All @@ -446,11 +445,8 @@ def test_mainloop_run(mocker, main_loop):
# disable the SupervisorProxy thread
mocked_proxy_start = mocker.patch.object(main_loop.proxy, 'start')
mocked_proxy_stop = mocker.patch.object(main_loop.proxy, 'stop')
# patch the get_coroutines method to return a subscriber on the local Supvisors instance
subscribers = main_loop.receiver.subscribers
mocker.patch.object(subscribers, 'get_coroutines',
return_value=[subscribers.create_coroutine(local_identifier),
subscribers.check_stop()])
# add a Supvisors instance that has the same parameters as the local Supvisors instance, but with a different name
main_loop.supvisors.supvisors_mapper.instances['async_test'] = local_instance_id
# WARN: handle_puller is blocking as long as there is no RequestPusher active,
# so make sure it has been started before starting the main loop
assert main_loop.supvisors.internal_com.pusher is not None
Expand Down

0 comments on commit 8c36432

Please sign in to comment.