Skip to content

Commit

Permalink
Merge branch 'master' of git://github.com/geromueller/rpyc into gerom…
Browse files Browse the repository at this point in the history
…ueller-master

Conflicts:
	rpyc/core/protocol.py
  • Loading branch information
tomer-weka committed Mar 25, 2015
2 parents cab4543 + 716423e commit 14d88ad
Showing 1 changed file with 26 additions and 5 deletions.
31 changes: 26 additions & 5 deletions rpyc/core/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import socket
import time

from threading import Lock
from threading import Lock, RLock, Event
from rpyc.lib.compat import pickle, next, is_py3k, maxint, select_error
from rpyc.lib.colls import WeakValueDict, RefCountingColl
from rpyc.core import consts, brine, vinegar, netref
Expand Down Expand Up @@ -128,6 +128,9 @@ class Connection(object):
the connection. Default is True. If set to False, you will
need to call :func:`_init_service` manually later
"""

SYNC_REQUEST_TIMEOUT = 30

def __init__(self, service, channel, config = {}, _lazy = False):
self._closed = True
self._config = DEFAULT_CONFIG.copy()
Expand All @@ -140,6 +143,8 @@ def __init__(self, service, channel, config = {}, _lazy = False):
self._recvlock = Lock()
self._sendlock = Lock()
self._sync_replies = {}
self._sync_lock = RLock()
self._sync_event = Event()
self._async_callbacks = {}
self._local_objects = RefCountingColl()
self._last_traceback = None
Expand All @@ -150,6 +155,8 @@ def __init__(self, service, channel, config = {}, _lazy = False):
if not _lazy:
self._init_service()
self._closed = False


def _init_service(self):
self._local_root.on_connect()

Expand Down Expand Up @@ -224,8 +231,7 @@ def ping(self, data = None, timeout = 3):
raise PingError("echo mismatches sent data")

def _get_seq_id(self):
seq = next(self._seqcounter)
return seq
return next(self._seqcounter)

def _send(self, msg, seq, args):
data = brine.dump((msg, seq, args))
Expand All @@ -237,7 +243,6 @@ def _send(self, msg, seq, args):

def _send_request(self, seq, handler, args):
self._send(consts.MSG_REQUEST, seq, (handler, self._box(args)))
return seq

def _send_reply(self, seq, obj):
self._send(consts.MSG_REPLY, seq, self._box(obj))
Expand Down Expand Up @@ -442,8 +447,24 @@ def sync_request(self, handler, *args):
"""
seq = self._get_seq_id()
self._send_request(seq, handler, args)
start_time = time.time()

while seq not in self._sync_replies:
self.serve(0.1)
remaining_time = self.SYNC_REQUEST_TIMEOUT - (time.time() - start_time)
if remaining_time < 0:
raise socket.timeout

# lock or wait for signal
if self._sync_lock.acquire(False):
self._sync_event.clear()
try:
self.serve(remaining_time)
finally:
self._sync_lock.release()
self._sync_event.set()
else:
self._sync_event.wait(remaining_time)

isexc, obj = self._sync_replies.pop(seq)
if isexc:
raise obj
Expand Down

0 comments on commit 14d88ad

Please sign in to comment.