Skip to content

Commit

Permalink
distributed fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
r3w0p committed Oct 28, 2019
1 parent 71f5c5b commit 909215a
Show file tree
Hide file tree
Showing 9 changed files with 18 additions and 15 deletions.
2 changes: 1 addition & 1 deletion bobocep/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@

__author__ = """r3w0p"""
__email__ = ''
__version__ = '0.34.0'
__version__ = '0.34.1'
2 changes: 1 addition & 1 deletion bobocep/decider/handlers/bobo_nfa_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def __init__(self,
self._recent = []
self._max_recent = max(1, max_recent)
self._subs = []
self._synced = False
self._is_synced = False
self._lock = RLock()

def process(self, event: BoboEvent) -> None:
Expand Down
10 changes: 6 additions & 4 deletions bobocep/setup/bobo_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
from time import time_ns
from typing import List
from uuid import uuid4

from pika import ConnectionParameters

from bobocep.decider.bobo_decider import BoboDecider
from bobocep.decider.buffers.shared_versioned_match_buffer import \
SharedVersionedMatchBuffer
Expand Down Expand Up @@ -87,7 +89,7 @@ def __init__(self,
self._user_name = None
self._parameters = None
self._user_id = None
self._synced = False
self._is_synced = False

self._receiver = None
self._decider = None
Expand All @@ -113,7 +115,7 @@ def is_ready(self) -> bool:
with self._lock:
if self.is_active():
if self._distributed:
return self._synced
return self._is_synced
else:
return True

Expand Down Expand Up @@ -343,7 +345,7 @@ def on_sync(self) -> None:
with self._lock:
if self._distributed:
self._activate_tasks()
self._synced = True
self._is_synced = True

def start(self) -> None:
"""Start the setup.
Expand All @@ -363,7 +365,7 @@ def start(self) -> None:

# tasks active by default if not distributed, sync immediately
if not self._distributed:
self._synced = True
self._is_synced = True

def configure(self) -> None:
with self._lock:
Expand Down
3 changes: 2 additions & 1 deletion bobocep/setup/distributed/bobo_dist_manager.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from pika import ConnectionParameters

from bobocep.decider.bobo_decider import BoboDecider
from bobocep.setup.distributed.incoming.bobo_dist_incoming import \
BoboDistIncoming
from bobocep.setup.distributed.outgoing.bobo_dist_outgoing import \
BoboDistOutgoing
from bobocep.setup.task.bobo_task_thread import \
BoboTaskThread
from pika import ConnectionParameters


class BoboDistManager:
Expand Down
2 changes: 1 addition & 1 deletion bobocep/setup/distributed/incoming/bobo_dist_incoming.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def _callback(self, ch, method, properties, body):
elif method.routing_key == bdc.SYNC_REQ:
self._handle_sync_request(ch, method, properties, body)

elif not self._is_synced and method.routing_key == bdc.SYNC_RES:
elif (not self._is_synced) and method.routing_key == bdc.SYNC_RES:
self._handle_sync_response(ch, method, properties, body)

def _handle_transition(self, data: str) -> None:
Expand Down
6 changes: 3 additions & 3 deletions bobocep/setup/distributed/outgoing/bobo_dist_outgoing.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def unsubscribe(self, unsubscriber: IDistOutgoingSubscriber) -> None:
def _sync(self) -> None:
sync_attempt = 0

while not self._is_synced and sync_attempt < self.max_sync_attempts:
while (not self._is_synced) and sync_attempt < self.max_sync_attempts:
sync_attempt += 1
self._is_synced = self._sync_request()

Expand Down Expand Up @@ -157,7 +157,7 @@ def _sync_request(self, timeout: int = 3) -> bool:
return True

def _put_current_state(self, decider_dict: dict) -> None:
if self._is_cancelled or self._synced:
if self._is_cancelled or self._is_synced:
return

for handler_dict in decider_dict[bdc.HANDLERS]:
Expand All @@ -177,7 +177,7 @@ def _put_current_state(self, decider_dict: dict) -> None:
run = BoboDeciderBuilder.run(run_dict, buffer, handler.nfa)
handler.add_run(run)

self._synced = True
self._is_synced = True

def _send_events(self, queue: Queue, routing_key: str):
if not queue.empty():
Expand Down
4 changes: 2 additions & 2 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@
# built documents.
#
# The short X.Y version.
version = '0.34.0'
version = '0.34.1'
# The full version, including alpha/beta/rc tags.
release = '0.34.0'
release = '0.34.1'

# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.34.0
current_version = 0.34.1
tag = True

[bumpversion:file:setup.py]
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
author="r3w0p",
author_email='',
name='bobocep',
version='0.34.0',
version='0.34.1',
description="A fault-tolerant complex event processing engine designed "
"for edge computing in IoT systems.",
long_description=long_description,
Expand Down

0 comments on commit 909215a

Please sign in to comment.