Permalink
Browse files

negotiator protocol

  • Loading branch information...
1 parent c9aa84e commit ab017dbb468ed6d8b2864c77f0aeee452e070fec @tomerfiliba committed Sep 27, 2009
Showing with 73 additions and 24 deletions.
  1. +36 −24 mediums/rpc_v1.py
  2. +37 −0 negotiators/v1.txt
View
@@ -3,16 +3,14 @@
from .base import RPCMediumBase, RemotableException, GenericRemotableException
-
-
-
class RPCMedium(RPCMediumBase):
VERSION = "1.0"
_DISPATCHERS = {}
TAG_CALL = 1
TAG_METACALL = 2
TAG_REPLY = 3
+ TAG_ERROR = 4
REP_SYNC = 1
REP_ASYNC = 2
@@ -27,65 +25,79 @@ def __init__(self, channel, service):
self._seq = itertools.counter()
self._replies = {}
- def _send(self, tag, seq, *args):
- raw = brine.dump((tag, seq, args))
+ def _send(self, tag, *args):
+ raw = brine.dump((tag, args))
self.channel.send(raw)
def serve(self, timeout = None):
try:
- tag, seq, args = brine.load(self.channel.recv(timeout))
- dispatcher = self._DISPATCHERS[tag]
- except Exception:
- pass
- try:
- dispatcher(seq, args)
+ raw = self.channel.recv(timeout)
+ try:
+ tag, args = brine.load(raw)
+ except Exception, ex:
+ raise InvalidMessage(str(ex))
+ try:
+ dispatcher = self._DISPATCHERS[tag]
+ except KeyError:
+ raise InvalidTag(tag)
+ dispatcher(args)
except RpcProtocolError, ex:
- pass
+ self._send(self.TAG_ERROR, dump_exc())
@_register(TAG_CALL)
- def _handle_call(self, seq, args):
+ def _handle_call(self, args):
try:
- funcname, args, kwargs = args
+ seq, funcname, args, kwargs = args
except ValueError:
raise InvalidCall("wrong structure")
try:
res = self.serve.invoke(funcname, args, kwargs)
except Exception, ex:
- dumped = dump_exc()
- self._send(self.TAG_REPLY, seq, False, dumped)
+ self._send(self.TAG_REPLY, seq, False, dump_exc())
else:
self._send(self.TAG_REPLY, seq, True, res)
@_register(TAG_METACALL)
- def _handle_metacall(self, seq, args):
+ def _handle_metacall(self, args):
try:
- funcname, args, kwargs = args
+ seq, funcname, args, kwargs = args
except ValueError:
raise InvalidMetaCall("wrong structure")
try:
if funcname not in self.service.META_FUNCTIONS:
raise InvalidMetaCall("cannot access %r" % (funcname,))
res = getattr(self.serve, funcname)(*args, **kwargs)
except Exception, ex:
- dumped = dump_exc(sys.exc_info())
- self._send(self.TAG_REPLY, seq, False, dumped)
+ self._send(self.TAG_REPLY, seq, False, dump_exc())
else:
self._send(self.TAG_REPLY, seq, True, res)
@_register(TAG_REPLY)
- def _handle_reply(self, seq, args):
+ def _handle_reply(self, args):
try:
- mode, slot = self._replies[seq]
- succ, obj = args
+ seq, succ, obj = args
except Exception:
raise InvalidReply("wrong structure")
+ try:
+ mode, slot = self._replies[seq]
+ except KeyError:
+ raise InvalidReply("invalid sequence")
if mode == self.REP_SYNC:
+ if not succ:
+ obj = load_exc(obj)
slot.extend((succ, obj))
elif mode == self.REP_ASYNC:
+ if not succ:
+ obj = load_exc(obj)
slot(succ, obj)
else:
raise InvalidReply("invalid mode")
+ @_register(TAG_ERROR)
+ def _handle_error(self, args):
+ exc = load_exc(args[0])
+ raise exc
+
def invoke_sync(self, func, args, kwargs):
seq = self._seq.next()
self._send(self.TAG_CALL, seq, func, args, kwargs)
@@ -102,6 +114,6 @@ def invoke_async(self, callback, func, args, kwargs):
seq = self._seq.next()
self._send(self.TAG_CALL, seq, func, args, kwargs)
self._replies[seq] = (self.REP_ASYNC, callback)
-
+
View
@@ -38,4 +38,41 @@
is returned to the caller
+======================
+ Capabilities
+======================
+* compression - channel-level compression of frames; may heuristically disable
+ itself if compression ratio is too low (configurable), to save time
+* TLS - stream-level encryption and authentication
+* session - exchange cookies between both parties; allow the re-establishment
+ of dropped connections without the higher layers experiencing interruption
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+

0 comments on commit ab017db

Please sign in to comment.