Skip to content

Commit

Permalink
Merge pull request #155 from powerapi-ng/refactor/puller-ipc-timeout
Browse files Browse the repository at this point in the history
Refactor puller ipc timeout
  • Loading branch information
gfieni committed Apr 12, 2023
2 parents 4d95f75 + a3ec169 commit 2b1b70d
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 97 deletions.
3 changes: 0 additions & 3 deletions powerapi/actor/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,6 @@ def receive_control(self, timeout=None):
"""
Receive a message from this actor on the control canal
"""
if timeout is None:
timeout = self.socket_interface.timeout

msg = self.socket_interface.receive_control(timeout)
self.logger.debug('Actor ' + self.name + ' receive control : [' + str(msg) + ']')
return msg
Expand Down
2 changes: 1 addition & 1 deletion powerapi/puller/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def pull_db(self):
db_puller_thread.start()

while self.state.alive:
msg = self.state.actor.receive_control(0.1)
msg = self.state.actor.receive_control(self.timeout)
if msg is not None:
self.handle_internal_msg(msg)

Expand Down
4 changes: 2 additions & 2 deletions powerapi/puller/puller_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class PullerActor(Actor):

def __init__(self, name, database, report_filter, report_model, stream_mode=False, report_modifier_list=[],
level_logger=logging.WARNING,
timeout=0, timeout_puller=100):
timeout=5000, timeout_puller=100):
"""
:param str name: Actor name.
:param BaseDB database: Allow to interact with a Database.
Expand All @@ -120,4 +120,4 @@ def setup(self):
Define StartMessage handler and PoisonPillMessage handler
"""
self.add_handler(PoisonPillMessage, PullerPoisonPillMessageHandler(self.state))
self.add_handler(StartMessage, PullerStartHandler(self.state, 0.1))
self.add_handler(StartMessage, PullerStartHandler(self.state, self.socket_interface.timeout))
192 changes: 101 additions & 91 deletions tests/unit/puller/test_puller_actor.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
# Copyright (c) 2022, INRIA
# Copyright (c) 2022, University of Lille
# Copyright (c) 2023, INRIA
# Copyright (c) 2023, University of Lille
# All rights reserved.

#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:

#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.

#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.

#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.

#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
Expand All @@ -27,116 +27,126 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

# pylint: disable=arguments-differ,unused-argument,no-self-use

import logging
from multiprocessing import Queue

import pytest
from mock import Mock
from datetime import datetime

from powerapi.database import BaseDB
from powerapi.filter import Filter
from powerapi.message import StartMessage, ErrorMessage
from powerapi.message import StartMessage, ErrorMessage, OKMessage
from powerapi.puller import PullerActor
from powerapi.report import Report
from tests.utils.db.db import REPORT1, REPORT2, define_database_content
from tests.unit.actor.abstract_test_actor import AbstractTestActorWithDB, pytest_generate_tests_abstract
from tests.utils.db import SilentFakeDB
from tests.utils.dispatcher import FakeDispatcher


def define_filter(filt):
def _setup_fake_database(num_reports: int = 5):
"""
Decorator to set the _filter
attribute for individual tests.
Set up a fake database for tests.
"""
return SilentFakeDB([Report(datetime.utcfromtimestamp(i), 'pytest', f'report-{i}') for i in range(num_reports)])

def wrap(func):
setattr(func, '_filter', filt)
return func

return wrap
def _setup_puller_actor(name: str, database: BaseDB, report_filter: Filter, stream_mode: bool = False):
"""
Set up a PullerActor for tests.
"""
puller = PullerActor(name, database, report_filter, None, stream_mode, level_logger=logging.DEBUG, timeout=2000)
puller.start()
puller.connect_control()
puller.connect_data()
return puller


def pytest_generate_tests(metafunc):
def test_puller_start_message_empty_filter():
"""
Function called by pytest when collecting a test_XXX function
Test that the puller send an ErrorMessage when it receives a StartMessage with an empty filter.
"""
report_filter = Filter()
puller = _setup_puller_actor('puller_test_start_message_empty_filter', _setup_fake_database(), report_filter)
assert puller.is_alive() is True

puller.send_control(StartMessage('pytest'))
message = puller.receive_control()
assert isinstance(message, ErrorMessage)

define the content fixtures in test environment with collected the
value _content if it exists or with an empty content

:param metafunc: the test context given by pytest
def test_puller_send_reports_to_dispatcher():
"""
Test that the puller send reports to the dispatcher when it receives a StartMessage.
"""
pytest_generate_tests_abstract(metafunc)
dispatcher = FakeDispatcher('dispatcher_test_send_reports_to_dispatcher')

if 'filt' in metafunc.fixturenames:
filt = getattr(metafunc.function, '_filter', None)
metafunc.parametrize('filt', [filt])
report_filter = Filter()
report_filter.filter(lambda msg: True, dispatcher)

num_reports = 10
database = _setup_fake_database(num_reports)

class FakeDispatcher:
puller = _setup_puller_actor('puller_test_send_reports_to_dispatcher', database, report_filter)
assert puller.is_alive() is True

puller.send_control(StartMessage('pytest'))
message = puller.receive_control()
assert isinstance(message, OKMessage)

puller.join(timeout=5.0)
assert puller.is_alive() is False
assert dispatcher.get_num_received_data() == num_reports


def test_puller_with_multiple_reports_filter():
"""
Fake dispatcher using a queue for sending the reports from the puller
Test that the puller follow defined filter when sending the reports to the dispatcher.
"""
dispatcher_even = FakeDispatcher('even_dispatcher_test_with_multiple_reports_filter')
dispatcher_odd = FakeDispatcher('odd_dispatcher_test_with_multiple_reports_filter')
dispatcher_catch_all = FakeDispatcher('catchall_dispatcher_test_with_multiple_reports_filter')

num_reports = 10
database = _setup_fake_database(num_reports)

report_filter = Filter()
report_filter.filter(lambda msg: (int(msg.target.lstrip('report-')) % 2) == 0, dispatcher_even)
report_filter.filter(lambda msg: (int(msg.target.lstrip('report-')) % 2) != 0, dispatcher_odd)
report_filter.filter(lambda msg: True, dispatcher_catch_all)

puller = _setup_puller_actor('puller_test_with_multiple_reports_filter', database, report_filter)
assert puller.is_alive() is True

def __init__(self):
self.q = Queue()
puller.send_control(StartMessage('pytest'))
message = puller.receive_control()
assert isinstance(message, OKMessage)

def send_data(self, report):
"""
Put the report in the queue
"""
self.q.put(report, block=False)
puller.join(timeout=5.0)
assert puller.is_alive() is False
assert dispatcher_even.get_num_received_data() == 5 # 0, 2, 4, 6, 8
assert dispatcher_odd.get_num_received_data() == 5 # 1, 3, 5, 7, 9
assert dispatcher_catch_all.get_num_received_data() == num_reports


class TestPuller(AbstractTestActorWithDB):
def test_puller_with_stream_mode():
"""
Class for testing the PullerActor
Test that the puller send reports to the dispatcher and stay alive waiting for new reports.
"""
@pytest.fixture
def fake_dispatcher(self):
"""
Return a FakeDispatcher
"""
return FakeDispatcher()

@pytest.fixture
def fake_filter(self, fake_dispatcher):
"""
Return a fake filter by using mocks
"""
fake_filter = Mock()
fake_filter.filters = [(Mock(return_value=True), Mock())]
fake_filter.route = Mock(return_value=[fake_dispatcher])
fake_filter.get_type = Mock(return_value=Report)
return fake_filter

@pytest.fixture
def actor(self, fake_db, filt, fake_filter):
filter = fake_filter if filt is None else filt
return PullerActor('puller_test', fake_db, filter, 0, level_logger=logging.DEBUG)

@define_database_content([REPORT1, REPORT2])
def test_start_actor_with_db_that_contains_2_report_make_actor_send_reports_to_dispatcher(self, started_actor,
fake_dispatcher,
content):
"""
Check that PullerActor sent a report to the dispatcher when the input database has at least 2 reports
"""
for report in content:
assert fake_dispatcher.q.get(timeout=2) == report

def test_starting_actor_in_stream_mode_make_it_terminate_itself_after_empty_db(self, started_actor):
"""
Check that started actor stopped with empty database in stream mode
"""
started_actor.join(2)
assert started_actor.is_alive() is False

@define_filter(Filter())
def test_send_start_message_to_puller_without_filter_answer_with_error_message(self, init_actor):
"""
Check that starting a PullerActor without filter produces an error message
"""
init_actor.send_control(StartMessage('test-case'))
msg = init_actor.receive_control(2000)
assert isinstance(msg, ErrorMessage)
dispatcher = FakeDispatcher('dispatcher_test_with_stream_mode')

report_filter = Filter()
report_filter.filter(lambda msg: True, dispatcher)

num_reports = 10
database = _setup_fake_database(num_reports)

puller = _setup_puller_actor('puller_test_with_stream_mode', database, report_filter, stream_mode=True)
assert puller.is_alive() is True

puller.send_control(StartMessage('pytest'))
message = puller.receive_control()
assert isinstance(message, OKMessage)

puller.join(timeout=5.0)
assert puller.is_alive() is True
assert dispatcher.get_num_received_data() == num_reports

puller.terminate()
puller.join(timeout=5.0)
assert puller.is_alive() is False
30 changes: 30 additions & 0 deletions tests/utils/dispatcher/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright (c) 2023, INRIA
# Copyright (c) 2023, University of Lille
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from .dispatcher import FakeDispatcher

0 comments on commit 2b1b70d

Please sign in to comment.