diff --git a/powerapi/actor/actor.py b/powerapi/actor/actor.py index 07e15516..74f96172 100644 --- a/powerapi/actor/actor.py +++ b/powerapi/actor/actor.py @@ -268,9 +268,6 @@ def receive_control(self, timeout=None): """ Receive a message from this actor on the control canal """ - if timeout is None: - timeout = self.socket_interface.timeout - msg = self.socket_interface.receive_control(timeout) self.logger.debug('Actor ' + self.name + ' receive control : [' + str(msg) + ']') return msg diff --git a/powerapi/puller/handlers.py b/powerapi/puller/handlers.py index 9913f180..24817a19 100644 --- a/powerapi/puller/handlers.py +++ b/powerapi/puller/handlers.py @@ -201,7 +201,7 @@ def pull_db(self): db_puller_thread.start() while self.state.alive: - msg = self.state.actor.receive_control(0.1) + msg = self.state.actor.receive_control(self.timeout) if msg is not None: self.handle_internal_msg(msg) diff --git a/powerapi/puller/puller_actor.py b/powerapi/puller/puller_actor.py index baf5ae41..0a724319 100644 --- a/powerapi/puller/puller_actor.py +++ b/powerapi/puller/puller_actor.py @@ -97,7 +97,7 @@ class PullerActor(Actor): def __init__(self, name, database, report_filter, report_model, stream_mode=False, report_modifier_list=[], level_logger=logging.WARNING, - timeout=0, timeout_puller=100): + timeout=5000, timeout_puller=100): """ :param str name: Actor name. :param BaseDB database: Allow to interact with a Database. @@ -120,4 +120,4 @@ def setup(self): Define StartMessage handler and PoisonPillMessage handler """ self.add_handler(PoisonPillMessage, PullerPoisonPillMessageHandler(self.state)) - self.add_handler(StartMessage, PullerStartHandler(self.state, 0.1)) + self.add_handler(StartMessage, PullerStartHandler(self.state, self.socket_interface.timeout)) diff --git a/tests/unit/puller/test_puller_actor.py b/tests/unit/puller/test_puller_actor.py index 7bc31162..95a0b229 100644 --- a/tests/unit/puller/test_puller_actor.py +++ b/tests/unit/puller/test_puller_actor.py @@ -1,21 +1,21 @@ -# Copyright (c) 2022, INRIA -# Copyright (c) 2022, University of Lille +# Copyright (c) 2023, INRIA +# Copyright (c) 2023, University of Lille # All rights reserved. - +# # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: - +# # * Redistributions of source code must retain the above copyright notice, this # list of conditions and the following disclaimer. - +# # * Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. - +# # * Neither the name of the copyright holder nor the names of its # contributors may be used to endorse or promote products derived from # this software without specific prior written permission. - +# # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE @@ -27,116 +27,126 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -# pylint: disable=arguments-differ,unused-argument,no-self-use - import logging -from multiprocessing import Queue - -import pytest -from mock import Mock +from datetime import datetime +from powerapi.database import BaseDB from powerapi.filter import Filter -from powerapi.message import StartMessage, ErrorMessage +from powerapi.message import StartMessage, ErrorMessage, OKMessage from powerapi.puller import PullerActor from powerapi.report import Report -from tests.utils.db.db import REPORT1, REPORT2, define_database_content -from tests.unit.actor.abstract_test_actor import AbstractTestActorWithDB, pytest_generate_tests_abstract +from tests.utils.db import SilentFakeDB +from tests.utils.dispatcher import FakeDispatcher -def define_filter(filt): +def _setup_fake_database(num_reports: int = 5): """ - Decorator to set the _filter - attribute for individual tests. + Set up a fake database for tests. """ + return SilentFakeDB([Report(datetime.utcfromtimestamp(i), 'pytest', f'report-{i}') for i in range(num_reports)]) - def wrap(func): - setattr(func, '_filter', filt) - return func - return wrap +def _setup_puller_actor(name: str, database: BaseDB, report_filter: Filter, stream_mode: bool = False): + """ + Set up a PullerActor for tests. + """ + puller = PullerActor(name, database, report_filter, None, stream_mode, level_logger=logging.DEBUG, timeout=2000) + puller.start() + puller.connect_control() + puller.connect_data() + return puller -def pytest_generate_tests(metafunc): +def test_puller_start_message_empty_filter(): """ - Function called by pytest when collecting a test_XXX function + Test that the puller send an ErrorMessage when it receives a StartMessage with an empty filter. + """ + report_filter = Filter() + puller = _setup_puller_actor('puller_test_start_message_empty_filter', _setup_fake_database(), report_filter) + assert puller.is_alive() is True + + puller.send_control(StartMessage('pytest')) + message = puller.receive_control() + assert isinstance(message, ErrorMessage) - define the content fixtures in test environment with collected the - value _content if it exists or with an empty content - :param metafunc: the test context given by pytest +def test_puller_send_reports_to_dispatcher(): + """ + Test that the puller send reports to the dispatcher when it receives a StartMessage. """ - pytest_generate_tests_abstract(metafunc) + dispatcher = FakeDispatcher('dispatcher_test_send_reports_to_dispatcher') - if 'filt' in metafunc.fixturenames: - filt = getattr(metafunc.function, '_filter', None) - metafunc.parametrize('filt', [filt]) + report_filter = Filter() + report_filter.filter(lambda msg: True, dispatcher) + num_reports = 10 + database = _setup_fake_database(num_reports) -class FakeDispatcher: + puller = _setup_puller_actor('puller_test_send_reports_to_dispatcher', database, report_filter) + assert puller.is_alive() is True + puller.send_control(StartMessage('pytest')) + message = puller.receive_control() + assert isinstance(message, OKMessage) + + puller.join(timeout=5.0) + assert puller.is_alive() is False + assert dispatcher.get_num_received_data() == num_reports + + +def test_puller_with_multiple_reports_filter(): """ - Fake dispatcher using a queue for sending the reports from the puller + Test that the puller follow defined filter when sending the reports to the dispatcher. """ + dispatcher_even = FakeDispatcher('even_dispatcher_test_with_multiple_reports_filter') + dispatcher_odd = FakeDispatcher('odd_dispatcher_test_with_multiple_reports_filter') + dispatcher_catch_all = FakeDispatcher('catchall_dispatcher_test_with_multiple_reports_filter') + + num_reports = 10 + database = _setup_fake_database(num_reports) + + report_filter = Filter() + report_filter.filter(lambda msg: (int(msg.target.lstrip('report-')) % 2) == 0, dispatcher_even) + report_filter.filter(lambda msg: (int(msg.target.lstrip('report-')) % 2) != 0, dispatcher_odd) + report_filter.filter(lambda msg: True, dispatcher_catch_all) + + puller = _setup_puller_actor('puller_test_with_multiple_reports_filter', database, report_filter) + assert puller.is_alive() is True - def __init__(self): - self.q = Queue() + puller.send_control(StartMessage('pytest')) + message = puller.receive_control() + assert isinstance(message, OKMessage) - def send_data(self, report): - """ - Put the report in the queue - """ - self.q.put(report, block=False) + puller.join(timeout=5.0) + assert puller.is_alive() is False + assert dispatcher_even.get_num_received_data() == 5 # 0, 2, 4, 6, 8 + assert dispatcher_odd.get_num_received_data() == 5 # 1, 3, 5, 7, 9 + assert dispatcher_catch_all.get_num_received_data() == num_reports -class TestPuller(AbstractTestActorWithDB): +def test_puller_with_stream_mode(): """ - Class for testing the PullerActor + Test that the puller send reports to the dispatcher and stay alive waiting for new reports. """ - @pytest.fixture - def fake_dispatcher(self): - """ - Return a FakeDispatcher - """ - return FakeDispatcher() - - @pytest.fixture - def fake_filter(self, fake_dispatcher): - """ - Return a fake filter by using mocks - """ - fake_filter = Mock() - fake_filter.filters = [(Mock(return_value=True), Mock())] - fake_filter.route = Mock(return_value=[fake_dispatcher]) - fake_filter.get_type = Mock(return_value=Report) - return fake_filter - - @pytest.fixture - def actor(self, fake_db, filt, fake_filter): - filter = fake_filter if filt is None else filt - return PullerActor('puller_test', fake_db, filter, 0, level_logger=logging.DEBUG) - - @define_database_content([REPORT1, REPORT2]) - def test_start_actor_with_db_that_contains_2_report_make_actor_send_reports_to_dispatcher(self, started_actor, - fake_dispatcher, - content): - """ - Check that PullerActor sent a report to the dispatcher when the input database has at least 2 reports - """ - for report in content: - assert fake_dispatcher.q.get(timeout=2) == report - - def test_starting_actor_in_stream_mode_make_it_terminate_itself_after_empty_db(self, started_actor): - """ - Check that started actor stopped with empty database in stream mode - """ - started_actor.join(2) - assert started_actor.is_alive() is False - - @define_filter(Filter()) - def test_send_start_message_to_puller_without_filter_answer_with_error_message(self, init_actor): - """ - Check that starting a PullerActor without filter produces an error message - """ - init_actor.send_control(StartMessage('test-case')) - msg = init_actor.receive_control(2000) - assert isinstance(msg, ErrorMessage) + dispatcher = FakeDispatcher('dispatcher_test_with_stream_mode') + + report_filter = Filter() + report_filter.filter(lambda msg: True, dispatcher) + + num_reports = 10 + database = _setup_fake_database(num_reports) + + puller = _setup_puller_actor('puller_test_with_stream_mode', database, report_filter, stream_mode=True) + assert puller.is_alive() is True + + puller.send_control(StartMessage('pytest')) + message = puller.receive_control() + assert isinstance(message, OKMessage) + + puller.join(timeout=5.0) + assert puller.is_alive() is True + assert dispatcher.get_num_received_data() == num_reports + + puller.terminate() + puller.join(timeout=5.0) + assert puller.is_alive() is False diff --git a/tests/utils/dispatcher/__init__.py b/tests/utils/dispatcher/__init__.py new file mode 100644 index 00000000..3b4dc6b4 --- /dev/null +++ b/tests/utils/dispatcher/__init__.py @@ -0,0 +1,30 @@ +# Copyright (c) 2023, INRIA +# Copyright (c) 2023, University of Lille +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from .dispatcher import FakeDispatcher diff --git a/tests/utils/dispatcher/dispatcher.py b/tests/utils/dispatcher/dispatcher.py new file mode 100644 index 00000000..8b49fe32 --- /dev/null +++ b/tests/utils/dispatcher/dispatcher.py @@ -0,0 +1,104 @@ +# Copyright (c) 2023, INRIA +# Copyright (c) 2023, University of Lille +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from multiprocessing import Queue + +from powerapi.dispatcher import DispatcherActor, RouteTable + + +class FakeDispatcher(DispatcherActor): + """ + Dispatcher that can only receive data and control messages. (for testing purpose) + """ + + def __init__(self, name: str): + super().__init__(name, lambda *args: None, [], RouteTable()) + self.control_mailbox = Queue() + self.data_mailbox = Queue() + + def run(self) -> None: + return + + def start(self) -> None: + return + + def terminate(self) -> None: + return + + def is_alive(self) -> bool: + return True + + def connect_data(self) -> None: + return + + def connect_control(self) -> None: + return + + def send_control(self, msg): + self.control_mailbox.put(msg) + + def receive_control(self, timeout=None): + """ + Remove and returns the last control message received by the dispatcher. + """ + if self.control_mailbox.empty(): + return self.control_mailbox.get(timeout=timeout) + return None + + def get_num_received_control(self): + """ + Returns the number of control messages received by the dispatcher. + """ + return self.control_mailbox.qsize() + + def send_data(self, msg): + self.data_mailbox.put(msg) + + def receive_data(self, timeout=None): + """ + Remove and returns the last data message received by the dispatcher. + """ + if self.data_mailbox.empty(): + return self.data_mailbox.get(timeout=timeout) + return None + + def get_num_received_data(self): + """ + Returns the number of data messages received by the dispatcher. + """ + return self.data_mailbox.qsize() + + def receive(self): + if self.control_mailbox.empty(): + return self.control_mailbox.get() + + if self.data_mailbox.empty(): + return self.data_mailbox.get() + + return None