Skip to content

Commit

Permalink
Merge pull request #44 from marier-nico/feature/invoked-processors
Browse files Browse the repository at this point in the history
Keep track of invoked processors
  • Loading branch information
marier-nico committed Jun 27, 2021
2 parents df53db2 + 35d71ad commit 74b0810
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 14 deletions.
1 change: 1 addition & 0 deletions docs/content/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
- v2.6.0: Support getting a list of invoked processors after an invocation
- v2.5.0: Support adding multiple subprocessors at once and also adding all processors from all modules in a package
- v2.4.1: Fix scalar dependency resolution without pydantic (only raise on actual missing values and not none values)
- v2.4.0: Support scalar value dependencies in processor parameters
Expand Down
38 changes: 38 additions & 0 deletions docs/content/processors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,44 @@ To use a non-default invocation strategy, use the provided ``InvocationStrategie
Processor a!
Processor b!

Processor Names
---------------

Sometimes, it might be useful to gather invoked processor names after they have been invoked, either to do something
with the returned results depending on the processor that was invoked, or perhaps for logging purposes.

You can access the invoked processor names like so :

.. testcode::

from event_processor import EventProcessor, InvocationStrategies
from event_processor.filters import Exists, Eq

event_processor = EventProcessor(invocation_strategy=InvocationStrategies.ALL_MATCHES)


@event_processor.processor(Exists("a"))
def processor_a():
pass


@event_processor.processor(Eq("a", "b"))
def processor_b():
pass


event_processor.invoke({"a": "b"})

print(event_processor.invoked_processor_names)

.. testoutput::

['processor_a', 'processor_b']

.. note::
If, for any reason, the name of a processor is not available, it will be replaced by ``"unavailable"``. Also, if you
use the same name for multiple processors, you will find duplicate values for the invoked processor names.

Caveats
-------

Expand Down
14 changes: 12 additions & 2 deletions src/event_processor/event_processor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Contains the EventProcessor class."""
import inspect
from types import ModuleType
from typing import Dict, Callable, Any, Tuple
from typing import Dict, Callable, Any, Tuple, List

from .dependencies import (
get_required_dependencies,
Expand All @@ -27,6 +27,7 @@ def __init__(self, invocation_strategy: InvocationStrategies = InvocationStrateg
self.processors: Dict[Tuple[Filter, int], Callable] = {}
self.dependency_cache: Dict[Depends, Any] = {}
self.invocation_strategy = invocation_strategy
self.invoked_processor_names: List[str] = []

def add_subprocessors_in_package(self, package: ModuleType):
"""Add all the processors found in all modules of a package as subprocessors.
Expand Down Expand Up @@ -99,7 +100,16 @@ def invoke(self, event: Dict) -> Any:
matching.append(processor)

if matching:
return self.invocation_strategy.value.invoke(matching, event=Event(event), cache=self.dependency_cache)
results = self.invocation_strategy.value.invoke(matching, event=Event(event), cache=self.dependency_cache)

invoked_names = []
returned_values = []
for invoked_name, returned_value in results:
invoked_names.append(invoked_name)
returned_values.append(returned_value)

self.invoked_processor_names = invoked_names
return returned_values[0] if len(returned_values) == 1 else tuple(returned_values)
else:
raise InvocationError(f"No matching processor for the event '{event}'")

Expand Down
39 changes: 28 additions & 11 deletions src/event_processor/invocation_strategies.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,62 @@
"""Contains the different invocation strategies for calling processors."""
from abc import ABC
from enum import Enum
from typing import Dict, Optional, List, Callable, Any
from typing import Dict, Optional, List, Callable, Any, Tuple

from .exceptions import InvocationError
from .dependencies import call_with_injection, Event
from .exceptions import InvocationError


def _get_processor_name(processor: Any) -> str:
try:
return processor.__name__
except AttributeError:
return "unavailable"


class InvocationStrategy(ABC):
"""Class defining an abstract invocation strategy."""

@staticmethod
def invoke(matching: List[Callable], event: Optional[Event] = None, cache: Optional[Dict] = None) -> Any:
def invoke(
matching: List[Callable], event: Optional[Event] = None, cache: Optional[Dict] = None
) -> List[Tuple[str, Any]]:
"""Invoke one or multiple matching processors."""


class FirstMatch(InvocationStrategy):
"""Strategy calling the first matching processor."""

@staticmethod
def invoke(matching: List[Callable], event: Optional[Event] = None, cache: Optional[Dict] = None) -> Any:
return call_with_injection(matching[0], event=event, cache=cache)
def invoke(
matching: List[Callable], event: Optional[Event] = None, cache: Optional[Dict] = None
) -> List[Tuple[str, Any]]:
return [(_get_processor_name(matching[0]), call_with_injection(matching[0], event=event, cache=cache))]


class AllMatches(InvocationStrategy):
"""Strategy calling all matching processors."""

@staticmethod
def invoke(matching: List[Callable], event: Optional[Event] = None, cache: Optional[Dict] = None) -> Any:
def invoke(
matching: List[Callable], event: Optional[Event] = None, cache: Optional[Dict] = None
) -> List[Tuple[str, Any]]:
results = []
for match in matching:
results.append(call_with_injection(match, event=event, cache=cache))
results.append((_get_processor_name(match), call_with_injection(match, event=event, cache=cache)))

return tuple(results)
return results


class NoMatches(InvocationStrategy):
"""Strategy not calling any matching processors."""

@staticmethod
def invoke(matching: List[Callable], event: Optional[Event] = None, cache: Optional[Dict] = None) -> Any:
def invoke(
matching: List[Callable], event: Optional[Event] = None, cache: Optional[Dict] = None
) -> List[Tuple[str, Any]]:
if len(matching) >= 2:
return None
return [(_get_processor_name(None), None)]

return FirstMatch.invoke(matching, event=event, cache=cache)

Expand All @@ -50,7 +65,9 @@ class NoMatchesStrict(InvocationStrategy):
"""Strategy failing when there are multiple matching."""

@staticmethod
def invoke(matching: List[Callable], event: Optional[Event] = None, cache: Optional[Dict] = None) -> Any:
def invoke(
matching: List[Callable], event: Optional[Event] = None, cache: Optional[Dict] = None
) -> List[Tuple[str, Any]]:
if len(matching) >= 2:
raise InvocationError("Multiple matching processors of the same rank")

Expand Down
18 changes: 18 additions & 0 deletions src/tests/test_event_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest
from pydantic import BaseModel

from src.event_processor.invocation_strategies import InvocationStrategies
from src.event_processor.dependencies import Depends, Event
from src.event_processor.event_processor import EventProcessor, processor_params_are_valid
from src.event_processor.exceptions import (
Expand Down Expand Up @@ -194,6 +195,23 @@ def fn_b():
assert called_b is False


def test_invoke_sets_invoked_processor_names():
event_processor = EventProcessor(InvocationStrategies.ALL_MATCHES)

@event_processor.processor(Exists("a"))
def fn_a():
pass

@event_processor.processor(Exists("b"))
def fn_b():
pass

event_processor.invoke({"a": 0, "b": 0})

assert "fn_a" in event_processor.invoked_processor_names
assert "fn_b" in event_processor.invoked_processor_names


def test_processor_params_are_valid_returns_true_for_valid_params():
def processor(_a: Event, _b: Event, _c: BaseModel, _d: str, _e=Depends(Mock())):
pass
Expand Down
63 changes: 62 additions & 1 deletion src/tests/test_invocation_strategies.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest
from unittest.mock import Mock

import pytest

from src.event_processor.exceptions import InvocationError
from src.event_processor.invocation_strategies import FirstMatch, AllMatches, NoMatches, NoMatchesStrict

Expand All @@ -14,6 +15,27 @@ def test_first_match_invokes_first_matching_processor():
processor_b.assert_not_called()


def test_first_match_returns_invoked_processor_name():
def processor_a():
pass

invoked_name, _result = FirstMatch.invoke([processor_a])[0]

assert invoked_name == "processor_a"


def test_first_match_returns_unavailable_on_processor_without_name():
"""Test that processors without names return a value.
NOTE: If this test fails, make sure to update the docs for processors.
"""
processor_a = Mock()

invoked_name, _result = FirstMatch.invoke([processor_a])[0]

assert invoked_name == "unavailable"


def test_all_matches_invokes_all_matching_processors():
processor_a, processor_b = Mock(), Mock()

Expand All @@ -23,6 +45,19 @@ def test_all_matches_invokes_all_matching_processors():
processor_b.assert_called_once()


def test_all_matches_returns_invoked_processor_names():
def processor_a():
pass

def processor_b():
pass

results = AllMatches.invoke([processor_a, processor_b])

assert results[0][0] == "processor_a"
assert results[1][0] == "processor_b"


def test_no_matches_invokes_no_matching_processors_on_multiple_matches():
processor_a, processor_b = Mock(), Mock()

Expand All @@ -32,6 +67,14 @@ def test_no_matches_invokes_no_matching_processors_on_multiple_matches():
processor_b.assert_not_called()


def test_no_matches_returns_unavailable_processor_name_on_multiple_matches():
processor_a, processor_b = Mock(), Mock()

invoked_name, _result = NoMatches.invoke([processor_a, processor_b])[0]

assert invoked_name == "unavailable"


def test_no_matches_invokes_the_matching_processor_on_a_single_match():
processor_a = Mock()

Expand All @@ -40,6 +83,15 @@ def test_no_matches_invokes_the_matching_processor_on_a_single_match():
processor_a.assert_called_once()


def test_no_matches_returns_the_processor_name_on_a_single_match():
def processor_a():
pass

invoked_name, _result = NoMatches.invoke([processor_a])[0]

assert invoked_name == "processor_a"


def test_no_matches_strict_raises_when_multiple_processors_match():
processor_a, processor_b = Mock(), Mock()

Expand All @@ -53,3 +105,12 @@ def test_no_matches_strict_invokes_the_matching_processor_on_a_single_match():
NoMatchesStrict.invoke([processor_a])

processor_a.assert_called_once()


def test_no_matches_strict_returns_the_processor_name_on_a_single_match():
def processor_a():
pass

invoked_name, _result = NoMatchesStrict.invoke([processor_a])[0]

assert invoked_name == "processor_a"

0 comments on commit 74b0810

Please sign in to comment.