diff --git a/bobocep/__init__.py b/bobocep/__init__.py index 25f8c00a..bfbc06f7 100755 --- a/bobocep/__init__.py +++ b/bobocep/__init__.py @@ -4,4 +4,4 @@ __author__ = """r3w0p""" __email__ = '' -__version__ = '0.30.5' +__version__ = '0.30.6' diff --git a/docs/source/_static/bobo.png b/docs/source/_static/bobo.png index 56b0f859..09ae6b34 100644 Binary files a/docs/source/_static/bobo.png and b/docs/source/_static/bobo.png differ diff --git a/docs/source/conf.py b/docs/source/conf.py index 314e7ed7..ab6f4746 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -65,9 +65,9 @@ # built documents. # # The short X.Y version. -version = '0.30.5' +version = '0.30.6' # The full version, including alpha/beta/rc tags. -release = '0.30.5' +release = '0.30.6' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/setup.cfg b/setup.cfg index 404a99ae..3905be6b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.30.5 +current_version = 0.30.6 tag = True [bumpversion:file:setup.py] diff --git a/setup.py b/setup.py index 03d341e4..364e0fd5 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ author="r3w0p", author_email='', name='bobocep', - version='0.30.5', + version='0.30.6', description="A fault-tolerant complex event processing engine designed " "for edge computing in IoT systems.", long_description=long_description, diff --git a/tests/test_bobocep/test_setup/test_bobo_setup.py b/tests/test_bobocep/test_setup/test_bobo_setup.py index 5813a971..9121b7d4 100644 --- a/tests/test_bobocep/test_setup/test_bobo_setup.py +++ b/tests/test_bobocep/test_setup/test_bobo_setup.py @@ -1,12 +1,18 @@ import unittest from time import sleep +from bobocep.decider.decider_subscriber import IDeciderSubscriber +from bobocep.forwarder.forwarder_subscriber import IForwarderSubscriber +from bobocep.producer.producer_subscriber import IProducerSubscriber from bobocep.receiver.clocks.epoch_ns_clock import EpochNSClock from bobocep.receiver.generators.data.bobo_null_data_static import \ BoboNullDataStatic +from bobocep.receiver.receiver_subscriber import IReceiverSubscriber from bobocep.receiver.validators.str_dict_validator import StrDictValidator from bobocep.rules.actions.no_action import NoAction from bobocep.rules.actions.rate_limit_action import RateLimitAction +from bobocep.rules.events.action_event import ActionEvent +from bobocep.rules.events.bobo_event import BoboEvent from bobocep.rules.events.composite_event import CompositeEvent from bobocep.rules.events.histories.bobo_history import BoboHistory from bobocep.rules.events.primitive_event import PrimitiveEvent @@ -21,6 +27,7 @@ HOST_NAME = "127.0.0.1" NAME_NFA_A = "name_nfa_a" +RUN_ID_A = "run_id_a" LABEL_A = "label_a" LABEL_B = "label_b" @@ -62,6 +69,48 @@ event_d = PrimitiveEvent(timestamp=EpochNSClock.generate_timestamp()) +class StubSubscriberSetup(IReceiverSubscriber, + IDeciderSubscriber, + IProducerSubscriber, + IForwarderSubscriber): + + def __init__(self) -> None: + super().__init__() + + self.receiver_event = [] + self.invalid_data = [] + self.decider_complex_event = [] + self.accepted_producer_event = [] + self.rejected_producer_event = [] + self.producer_action = [] + self.forwarder_success_event = [] + self.forwarder_failure_event = [] + + def on_receiver_event(self, event: PrimitiveEvent): + self.receiver_event.append(event) + + def on_invalid_data(self, data): + self.invalid_data.append(data) + + def on_decider_complex_event(self, event: CompositeEvent): + self.decider_complex_event.append(event) + + def on_accepted_producer_event(self, event: CompositeEvent): + self.accepted_producer_event.append(event) + + def on_rejected_producer_event(self, event: CompositeEvent): + self.rejected_producer_event.append(event) + + def on_producer_action(self, event: ActionEvent): + self.producer_action.append(event) + + def on_forwarder_success_event(self, event: BoboEvent): + self.forwarder_success_event.append(event) + + def on_forwarder_failure_event(self, event: BoboEvent): + self.forwarder_failure_event.append(event) + + class TestBoboSetup(unittest.TestCase): def test_setup_before_configure(self): @@ -70,7 +119,8 @@ def test_setup_before_configure(self): setup.add_complex_event( event_def=BoboComplexEvent( name=NAME_NFA_A, - pattern=stub_pattern_1 + pattern=stub_pattern_1, + action=NoAction() )) setup.config_null_data( @@ -89,7 +139,8 @@ def test_setup_after_configure(self): setup.add_complex_event( event_def=BoboComplexEvent( name=NAME_NFA_A, - pattern=stub_pattern_1 + pattern=stub_pattern_1, + action=NoAction() )) setup.config_null_data( @@ -116,7 +167,8 @@ def test_setup_after_start(self): setup.add_complex_event( event_def=BoboComplexEvent( name=NAME_NFA_A, - pattern=stub_pattern_1 + pattern=stub_pattern_1, + action=NoAction() )) setup.config_null_data( @@ -146,7 +198,8 @@ def test_setup_after_cancel(self): setup.add_complex_event( event_def=BoboComplexEvent( name=NAME_NFA_A, - pattern=stub_pattern_1 + pattern=stub_pattern_1, + action=NoAction() )) setup.config_null_data( @@ -231,6 +284,9 @@ def test_access_before_configuration(self): with self.assertRaises(RuntimeError): setup.get_forwarder() + with self.assertRaises(RuntimeError): + setup.get_distributed() + with self.assertRaises(RuntimeError): setup.get_null_data_generator() @@ -240,7 +296,8 @@ def test_configure_when_cancelled(self): setup.add_complex_event( event_def=BoboComplexEvent( name=NAME_NFA_A, - pattern=stub_pattern_1 + pattern=stub_pattern_1, + action=NoAction() )) setup.cancel() @@ -255,7 +312,8 @@ def test_configure_when_active(self): setup.add_complex_event( event_def=BoboComplexEvent( name=NAME_NFA_A, - pattern=stub_pattern_1 + pattern=stub_pattern_1, + action=NoAction() )) setup.start() @@ -272,7 +330,8 @@ def test_configure_when_configured(self): setup.add_complex_event( event_def=BoboComplexEvent( name=NAME_NFA_A, - pattern=stub_pattern_1 + pattern=stub_pattern_1, + action=NoAction() )) setup.configure() @@ -287,6 +346,358 @@ def test_configure_when_no_complex_event_definitions(self): self.assertFalse(setup.is_configured()) + def test_start_when_active(self): + setup = BoboSetup() + + setup.add_complex_event( + event_def=BoboComplexEvent( + name=NAME_NFA_A, + pattern=stub_pattern_1, + action=NoAction() + )) + + setup.start() + sleep(SLEEP_WAIT) + + with self.assertRaises(RuntimeError): + setup.start() + + setup.cancel() + + +class TestBoboSetupExtraSubscriber(unittest.TestCase): + + def test_subscribe_receiver_valid_data(self): + setup = BoboSetup() + sub = StubSubscriberSetup() + + setup.add_complex_event( + event_def=BoboComplexEvent( + name=NAME_NFA_A, + pattern=stub_pattern_1, + action=NoAction() + )) + + setup.config_receiver(StrDictValidator()) + setup.subscribe_receiver(sub) + setup.configure() + + receiver = setup.get_receiver() + receiver.setup() + + receiver.add_data(DATA_DICT_A) + receiver.loop() + + self.assertIsInstance(sub.receiver_event[0], PrimitiveEvent) + + def test_subscribe_receiver_invalid_data(self): + setup = BoboSetup() + sub = StubSubscriberSetup() + + setup.add_complex_event( + event_def=BoboComplexEvent( + name=NAME_NFA_A, + pattern=stub_pattern_1, + action=NoAction() + )) + + setup.config_receiver(StrDictValidator()) + setup.subscribe_receiver(sub) + setup.configure() + + receiver = setup.get_receiver() + receiver.setup() + + receiver.add_data(VALUE_A) + receiver.loop() + + self.assertEqual(VALUE_A, sub.invalid_data[0]) + + def test_subscribe_decider_composite(self): + setup = BoboSetup() + sub = StubSubscriberSetup() + + setup.add_complex_event( + event_def=BoboComplexEvent( + name=NAME_NFA_A, + pattern=stub_pattern_1, + action=NoAction() + )) + + setup.subscribe_decider(NAME_NFA_A, sub) + setup.configure() + + decider = setup.get_decider() + decider.setup() + + c_event = CompositeEvent( + timestamp=EpochNSClock.generate_timestamp(), + name=NAME_NFA_A, + history=BoboHistory(), + data={}) + + decider.on_handler_final(NAME_NFA_A, RUN_ID_A, c_event) + + self.assertEqual(c_event, sub.decider_complex_event[0]) + + def test_subscribe_producer_composite_accepted(self): + setup = BoboSetup() + sub = StubSubscriberSetup() + + setup.add_complex_event( + event_def=BoboComplexEvent( + name=NAME_NFA_A, + pattern=stub_pattern_1, + action=NoAction() + )) + + setup.subscribe_producer(NAME_NFA_A, sub) + setup.configure() + + producer = setup.get_producer() + producer.setup() + + c_event = CompositeEvent( + timestamp=EpochNSClock.generate_timestamp(), + name=NAME_NFA_A, + history=BoboHistory(), + data={}) + + producer.on_decider_complex_event(c_event) + producer.loop() + + self.assertEqual(c_event, sub.accepted_producer_event[0]) + + def test_subscribe_producer_composite_rejected(self): + setup = BoboSetup() + sub = StubSubscriberSetup() + + setup.add_complex_event( + event_def=BoboComplexEvent( + name=NAME_NFA_A, + pattern=stub_pattern_1, + action=NoAction() + )) + + # this rejects all events passed to Producer + setup.config_producer(NoAction(bool_return=False)) + + setup.subscribe_producer(NAME_NFA_A, sub) + setup.configure() + + producer = setup.get_producer() + producer.setup() + + c_event = CompositeEvent( + timestamp=EpochNSClock.generate_timestamp(), + name=NAME_NFA_A, + history=BoboHistory(), + data={}) + + producer.on_decider_complex_event(c_event) + producer.loop() + + self.assertEqual(c_event, sub.rejected_producer_event[0]) + + def test_subscribe_producer_action(self): + setup = BoboSetup() + sub = StubSubscriberSetup() + + setup.add_complex_event( + event_def=BoboComplexEvent( + name=NAME_NFA_A, + pattern=stub_pattern_1, + action=NoAction() + )) + + setup.subscribe_producer(NAME_NFA_A, sub) + setup.configure() + + producer = setup.get_producer() + producer.setup() + + a_event = NoAction().execute( + CompositeEvent( + timestamp=EpochNSClock.generate_timestamp(), + name=NAME_NFA_A, + history=BoboHistory(), + data={})) + + producer.on_action_attempt(a_event) + producer.loop() + + self.assertEqual(a_event, sub.producer_action[0]) + + def test_subscribe_forward_composite_success(self): + setup = BoboSetup() + sub = StubSubscriberSetup() + + setup.add_complex_event( + event_def=BoboComplexEvent( + name=NAME_NFA_A, + pattern=stub_pattern_1, + action=NoAction() + )) + + setup.subscribe_forwarder(sub) + setup.configure() + + forwarder = setup.get_forwarder() + forwarder.setup() + + c_event = CompositeEvent( + timestamp=EpochNSClock.generate_timestamp(), + name=NAME_NFA_A, + history=BoboHistory(), + data={}) + + forwarder.on_accepted_producer_event(c_event) + forwarder.loop() + + self.assertEqual(c_event, sub.forwarder_success_event[0]) + + def test_subscribe_forward_composite_failure(self): + setup = BoboSetup() + sub = StubSubscriberSetup() + + setup.add_complex_event( + event_def=BoboComplexEvent( + name=NAME_NFA_A, + pattern=stub_pattern_1, + action=NoAction() + )) + + # this rejects all events passed to Forwarder + setup.config_forwarder(NoAction(bool_return=False)) + + setup.subscribe_forwarder(sub) + setup.configure() + + forwarder = setup.get_forwarder() + forwarder.setup() + + c_event = CompositeEvent( + timestamp=EpochNSClock.generate_timestamp(), + name=NAME_NFA_A, + history=BoboHistory(), + data={}) + + forwarder.on_accepted_producer_event(c_event) + forwarder.loop() + + self.assertEqual(c_event, sub.forwarder_failure_event[0]) + + def test_subscribe_forward_action_success(self): + setup = BoboSetup() + sub = StubSubscriberSetup() + + setup.add_complex_event( + event_def=BoboComplexEvent( + name=NAME_NFA_A, + pattern=stub_pattern_1, + action=NoAction() + )) + + setup.subscribe_forwarder(sub) + setup.configure() + + forwarder = setup.get_forwarder() + forwarder.setup() + + a_event = NoAction().execute( + CompositeEvent( + timestamp=EpochNSClock.generate_timestamp(), + name=NAME_NFA_A, + history=BoboHistory(), + data={})) + + forwarder.on_producer_action(a_event) + forwarder.loop() + + self.assertEqual(a_event, sub.forwarder_success_event[0]) + + def test_subscribe_forward_action_failure(self): + setup = BoboSetup() + sub = StubSubscriberSetup() + + setup.add_complex_event( + event_def=BoboComplexEvent( + name=NAME_NFA_A, + pattern=stub_pattern_1, + action=NoAction() + )) + + # this rejects all events passed to Forwarder + setup.config_forwarder(NoAction(bool_return=False)) + + setup.subscribe_forwarder(sub) + setup.configure() + + forwarder = setup.get_forwarder() + forwarder.setup() + + a_event = NoAction().execute( + CompositeEvent( + timestamp=EpochNSClock.generate_timestamp(), + name=NAME_NFA_A, + history=BoboHistory(), + data={})) + + forwarder.on_producer_action(a_event) + forwarder.loop() + + self.assertEqual(a_event, sub.forwarder_failure_event[0]) + + +class TestBoboSetupDistributed(unittest.TestCase): + + def test_get_distributed_configured_but_not_distributed(self): + setup = BoboSetup() + + setup.add_complex_event( + event_def=BoboComplexEvent( + name=NAME_NFA_A, + pattern=stub_pattern_1, + action=NoAction() + )) + setup.configure() + + with self.assertRaises(RuntimeError): + setup.get_distributed() + + def test_on_sync(self): + setup = BoboSetup() + + setup.add_complex_event( + event_def=BoboComplexEvent( + name=NAME_NFA_A, + pattern=stub_pattern_4, + action=NoAction() + )) + + setup.config_null_data( + delay_sec=NULL_DATA_DELAY, + null_data=BoboNullDataStatic(DATA_DICT_A)) + + setup.config_distributed( + exchange_name=EXCHANGE_NAME, + user_name=USER_NAME, + host_name=HOST_NAME) + + setup.configure() + self.assertFalse(setup.get_receiver().is_active()) + self.assertFalse(setup.get_decider().is_active()) + self.assertFalse(setup.get_producer().is_active()) + self.assertFalse(setup.get_forwarder().is_active()) + self.assertFalse(setup.get_null_data_generator().is_active()) + + setup.on_sync() + self.assertTrue(setup.get_receiver().is_active()) + self.assertTrue(setup.get_decider().is_active()) + self.assertTrue(setup.get_producer().is_active()) + self.assertTrue(setup.get_forwarder().is_active()) + self.assertTrue(setup.get_null_data_generator().is_active()) + class TestBoboSetupScenarios(unittest.TestCase):