Skip to content

Commit

Permalink
threading changes
Browse files Browse the repository at this point in the history
  • Loading branch information
r3w0p committed Sep 16, 2019
1 parent 36749ff commit 787140f
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 8 deletions.
2 changes: 1 addition & 1 deletion bobocep/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@

__author__ = """r3w0p"""
__email__ = ''
__version__ = '0.30.6'
__version__ = '0.30.7'
2 changes: 1 addition & 1 deletion bobocep/forwarder/bobo_forwarder.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def _handle_forwarder_event(self, event: BoboEvent) -> bool:
"""

def _loop(self) -> None:
while not self._event_queue.empty():
if not self._event_queue.empty():
event = self._event_queue.get_nowait()

if event is not None:
Expand Down
2 changes: 1 addition & 1 deletion bobocep/producer/bobo_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, max_queue_size: int = 0, active: bool = True) -> None:
self._subs = {}

def _loop(self) -> None:
while not self._event_queue.empty():
if not self._event_queue.empty():
event = self._event_queue.get_nowait()

if event is not None:
Expand Down
2 changes: 1 addition & 1 deletion bobocep/receiver/bobo_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __init__(self,
self._subs = []

def _loop(self) -> None:
while not self._data_queue.empty():
if not self._data_queue.empty():
data = self._data_queue.get_nowait()

if self._validator.validate(data):
Expand Down
10 changes: 10 additions & 0 deletions bobocep/setup/bobo_setup.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from threading import RLock
from time import time_ns
from typing import List
from uuid import uuid4

from bobocep.decider.bobo_decider import BoboDecider
Expand Down Expand Up @@ -230,6 +231,15 @@ def get_null_data_generator(self) -> BoboNullDataGenerator:

return self._null_data_generator

def get_complex_events(self) -> List[BoboComplexEvent]:
"""
:return: A list of complex event definitions currently in setup.
"""

with self._lock:
if self.is_inactive():
return self._event_defs[:]

def add_complex_event(self, event_def: BoboComplexEvent) -> None:
"""
Adds a complex event definition to the setup.
Expand Down
4 changes: 2 additions & 2 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@
# built documents.
#
# The short X.Y version.
version = '0.30.6'
version = '0.30.7'
# The full version, including alpha/beta/rc tags.
release = '0.30.6'
release = '0.30.7'

# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.30.6
current_version = 0.30.7
tag = True

[bumpversion:file:setup.py]
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
author="r3w0p",
author_email='',
name='bobocep',
version='0.30.6',
version='0.30.7',
description="A fault-tolerant complex event processing engine designed "
"for edge computing in IoT systems.",
long_description=long_description,
Expand Down
32 changes: 32 additions & 0 deletions tests/test_bobocep/test_setup/test_bobo_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
HOST_NAME = "127.0.0.1"

NAME_NFA_A = "name_nfa_a"
NAME_NFA_B = "name_nfa_b"
NAME_NFA_C = "name_nfa_c"
RUN_ID_A = "run_id_a"

LABEL_A = "label_a"
Expand Down Expand Up @@ -779,6 +781,36 @@ def test_composite_from_decider_to_producer_not_recursive(self):
handler = decider.get_all_handlers()[0]
self.assertEqual(0, len(handler.get_all_recent()))

def test_multi_composite_from_decider_to_producer_recursive(self):
setup = BoboSetup(recursive=True, max_recent=2)

nfa_names = [NAME_NFA_A, NAME_NFA_B, NAME_NFA_C]

for nfa_name in nfa_names:
setup.add_complex_event(
event_def=BoboComplexEvent(
name=nfa_name,
pattern=stub_pattern_1,
action=NoAction()
))
setup.config_receiver(StrDictValidator())
setup.configure()

decider = setup.get_decider()
decider.setup()
handlers = decider.get_all_handlers()
self.assertEqual(3, len(handlers))

producer = setup.get_producer()
producer.setup()

decider.on_receiver_event(event_a)
decider.loop()
producer.loop()

for handler in handlers:
self.assertEqual(2, len(handler.get_all_recent()))

def test_action_from_producer_to_decider_recursive(self):
setup = BoboSetup(recursive=True)

Expand Down

0 comments on commit 787140f

Please sign in to comment.