Skip to content

Commit

Permalink
Merge pull request #208 from powerapi-ng/feature/issue179/processor-a…
Browse files Browse the repository at this point in the history
…ctors

Feature/issue179/processor actors
  • Loading branch information
roda82 committed Nov 9, 2023
2 parents 40a5eb3 + f4a7765 commit 23e74eb
Show file tree
Hide file tree
Showing 63 changed files with 3,811 additions and 337 deletions.
7 changes: 3 additions & 4 deletions powerapi/actor/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@
import traceback
import setproctitle

from powerapi.exception import PowerAPIExceptionWithMessage
from powerapi.exception import PowerAPIExceptionWithMessage, UnknownMessageTypeException
from powerapi.message import PoisonPillMessage
from powerapi.message import UnknownMessageTypeException
from powerapi.handler import HandlerException

from .socket_interface import SocketInterface
Expand Down Expand Up @@ -93,7 +92,7 @@ def __init__(self, name, level_logger=logging.WARNING, timeout=None):
:param str name: unique name that will be used to indentify the actor
processus
:param int level_logger: Define the level of the logger
:param int timeout: if define, do something if no msg is recv every
:param int timeout: if defined, do something if no msg is recv every
timeout (in ms)
"""
multiprocessing.Process.__init__(self, name=name)
Expand Down Expand Up @@ -229,7 +228,7 @@ def _initial_behaviour(self):
handler = self.state.get_corresponding_handler(msg)
handler.handle_message(msg)
except UnknownMessageTypeException:
self.logger.warning("UnknowMessageTypeException: " + str(msg))
self.logger.warning("UnknownMessageTypeException: " + str(msg))
except HandlerException:
self.logger.warning("HandlerException")

Expand Down
2 changes: 1 addition & 1 deletion powerapi/actor/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from powerapi.message import UnknownMessageTypeException
from powerapi.exception import UnknownMessageTypeException
from powerapi.actor.supervisor import Supervisor


Expand Down
13 changes: 11 additions & 2 deletions powerapi/backend_supervisor/backend_supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@
from powerapi.actor import Supervisor
from powerapi.puller import PullerActor
from powerapi.dispatcher import DispatcherActor
from powerapi.pusher import PusherActor


class BackendSupervisor(Supervisor):

"""
Provide additional functionality to deal with actors: join
"""
Expand All @@ -55,6 +55,9 @@ def __init__(self, stream_mode):
#: (list): List of Pusher
self.pushers = []

#: (list): List of pre processors
self.pre_processors = []

def join(self):
"""
wait until all actor are terminated
Expand All @@ -65,8 +68,10 @@ def join(self):
self.pullers.append(actor)
elif isinstance(actor, DispatcherActor):
self.dispatchers.append(actor)
else:
elif isinstance(actor, PusherActor):
self.pushers.append(actor)
else:
self.pre_processors.append(actor)

if self.stream_mode:
self.join_stream_mode_on()
Expand Down Expand Up @@ -97,12 +102,16 @@ def join_stream_mode_off(self):
"""
Supervisor behaviour when stream mode is off.
- Supervisor wait the Puller death
- Supervisor wait the pre-processors death
- Supervisor wait for the dispatcher death
- Supervisor send a PoisonPill (by_data) to the Pusher
- Supervisor wait for the Pusher death
"""
for puller in self.pullers:
puller.join()
for pre_processor in self.pre_processors:
pre_processor.soft_kill()
pre_processor.join()
for dispatcher in self.dispatchers:
dispatcher.soft_kill()
dispatcher.join()
Expand Down
233 changes: 233 additions & 0 deletions powerapi/cli/binding_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
# Copyright (c) 2023, INRIA
# Copyright (c) 2023, University of Lille
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# pylint: disable=R1702
from powerapi.exception import UnsupportedActorTypeException, UnexistingActorException, TargetActorAlreadyUsed
from powerapi.processor.processor_actor import ProcessorActor
from powerapi.puller import PullerActor
from powerapi.pusher import PusherActor


class BindingManager:
"""
Class for management the binding between actors during their creation process
"""

def __init__(self, actors: dict = {}):
"""
:param dict actors: Dictionary of actors to create the bindings. The name of the actor is the key
"""
if not actors:
self.actors = {}
else:
self.actors = actors

def process_bindings(self):
"""
Define bindings between self.actors according to the processors' targets.
"""
raise NotImplementedError()


class ProcessorBindingManager(BindingManager):
"""
Class for management of bindings between processor actors and others actors
"""

def __init__(self, actors: dict, processors: dict):
"""
The ProcessorBindingManager defines bindings between actors and processors
:param dict actors: Dictionary of actors with structure {<actor1_key>:actor1,<actor2_key>:actor2...}
:param dict processors: Dictionary of processors with structure {<processor1_key>:processor1,
<processor2_key>:processor2...}
"""

BindingManager.__init__(self, actors=actors)
if not processors:
self.processors = {}
else:
self.processors = processors

def check_processor_targets(self, processor: ProcessorActor):
"""
Check that targets of a processor exist in the dictionary of targets.
If it is not the case, it raises a UnexistingActorException
"""
for target_actor_name in processor.state.target_actors_names:
if target_actor_name not in self.actors:
raise UnexistingActorException(actor=target_actor_name)

def check_processors_targets_are_unique(self):
"""
Check that processors targets are unique, i.e., the same target is not related to
two different processors
"""
used_targets = []
for _, processor in self.processors.items():
for target_actor_name in processor.state.target_actors_names:
if target_actor_name in used_targets:
raise TargetActorAlreadyUsed(target_actor=target_actor_name)
else:
used_targets.append(target_actor_name)


class PreProcessorBindingManager(ProcessorBindingManager):
"""
Class for management the binding between pullers and pre-processor actors
"""

def __init__(self, pullers: dict, processors: dict):
"""
The PreProcessorBindingManager defines bindings between pullers and processors: puller->processor->dispatcher
:param dict pullers: Dictionary of actors with structure {<actor1_key>:actor1,<actor2_key>:actor2...}
:param dict processors: Dictionary of processors with structure {<processor1_key>:processor1,
<processor2_key>:processor2...}
"""

ProcessorBindingManager.__init__(self, actors=pullers, processors=processors)

def process_bindings(self):
"""
Define bindings between self.actors according to the pre-processors' targets.
"""

# Check that processors targets are unique
self.check_processors_targets_are_unique()

# For each processor, we get targets and create the binding:
# puller->processor->dispatcher
for _, processor in self.processors.items():

self.check_processor_targets(processor=processor)

for target_actor_name in processor.state.target_actors_names:

# The processor has to be between the puller and the dispatcher
# The dispatcher becomes a target of the processor

puller_actor = self.actors[target_actor_name]

# The dispatcher defines the relationship between the Formula and
# Puller
number_of_filters = len(puller_actor.state.report_filter.filters)

for index in range(number_of_filters):
# The filters define the relationship with the dispatcher
# The relationship has to be updated
current_filter = list(puller_actor.state.report_filter.filters[index])
current_filter_dispatcher = current_filter[1]
processor.add_target_actor(actor=current_filter_dispatcher)
current_filter[1] = processor
puller_actor.state.report_filter.filters[index] = tuple(current_filter)

def check_processor_targets(self, processor: ProcessorActor):
"""
Check that targets of a processor exist in the dictionary of targets.
If it is not the case, it raises a UnexistingActorException
It also checks that the actor is a PullerActor instance.
If it is not the case, it raises UnsupportedActorTypeException
"""
ProcessorBindingManager.check_processor_targets(self, processor=processor)

for target_actor_name in processor.state.target_actors_names:
actor = self.actors[target_actor_name]

if not isinstance(actor, PullerActor):
raise UnsupportedActorTypeException(actor_type=type(actor).__name__)


class PostProcessorBindingManager(ProcessorBindingManager):
"""
Class for management the binding between post-processor and pusher actors
"""

def __init__(self, pushers: dict, processors: dict, pullers: dict):
"""
The PostProcessorBindingManager defines bindings between processors and pushers: formula->processor->pushers
:param dict pushers: Dictionary of PusherActors with structure {<actor1_key>:actor1,<actor2_key>:actor2...}
:param dict processors: Dictionary of processors with structure {<processor1_key>:processor1,
<processor2_key>:processor2...}
"""
ProcessorBindingManager.__init__(self, actors=pushers, processors=processors)
self.pullers = pullers

def process_bindings(self):
"""
Define bindings between self.actors according to the post-processors' targets.
"""

# For each processor, we get targets and create the binding:
# formula->processor->pusher
for _, processor in self.processors.items():

self.check_processor_targets(processor=processor)

for target_actor_name in processor.state.target_actors_names:

# The processor has to be between the formula and the pusher
# The pusher becomes a target of the processor

pusher_actor = self.actors[target_actor_name]

processor.add_target_actor(actor=pusher_actor)

# We look for the pusher on each dispatcher in order to replace it by
# the processor
for _, puller in self.pullers:

for current_filter in puller.state.report_filter.filters:
dispatcher = current_filter[1]

number_of_pushers = len(dispatcher.pusher)
pusher_updated = False

for index in range(number_of_pushers):
if dispatcher.pusher[index] == pusher_actor:
dispatcher.pusher[index] = processor
pusher_updated = True
break

if pusher_updated:
dispatcher.update_state_formula_factory()

def check_processor_targets(self, processor: ProcessorActor):
"""
Check that targets of a processor exist in the dictionary of targets.
If it is not the case, it raises a UnexistingActorException
It also checks that the actor is a PusherActor instance.
If it is not the case, it raises UnsupportedActorTypeException
"""
ProcessorBindingManager.check_processor_targets(self, processor=processor)

for target_actor_name in processor.state.target_actors_names:
actor = self.actors[target_actor_name]

if not isinstance(actor, PusherActor):
raise UnsupportedActorTypeException(actor_type=type(actor).__name__)

0 comments on commit 23e74eb

Please sign in to comment.