Permalink
Browse files

Goodbye Python directory!

  • Loading branch information...
1 parent d3824d3 commit 9b0fe023af9200d86e770033ce1b5f4523eb256e @repeatedly repeatedly committed Feb 16, 2012
View
3 python/.gitignore
@@ -1,3 +0,0 @@
-*.pyc
-*.pyo
-/build/
View
1 python/README
@@ -0,0 +1 @@
+MessagePack RPC for Python was moved to https://github.com/msgpack/msgpack-rpc-python
View
0 python/msgpackrpc/__init__.py
No changes.
View
152 python/msgpackrpc/client/__init__.py
@@ -1,152 +0,0 @@
-# coding: utf-8
-
-from msgpack import packs, Unpacker
-
-__all__ = ['ProtocolError', 'TransportError', 'Client']
-
-class ProtocolError(Exception):
- pass
-
-class TransportError(Exception):
- pass
-
-def msgidgen():
- """Generator that generates msgid.
-
- NOTE: Don't use in multithread. If you want use this
- in multithreaded application, use lock.
- """
- counter = 0
- while True:
- yield counter
- counter += 1
- if counter > (1 << 30):
- counter = 0
-
-class Future(object):
- """Future object.
- """
-
- def __init__(self, client):
- self._done = False
- self._result = None
- self._error = None
- self._client = client
- self._done_callbacks = None
-
- def add_done_callback(fn):
- if self._done_callbacks is None:
- self._done_callbacks = []
- self._done_callbacks.append(fn)
-
- def remove_done_callback(fn):
- self._done_callbacks.remove(fn)
-
- def _wait(self):
- # TODO: implement timeout.
- while not self._done:
- self._client.try_recv()
-
- def result(self):
- self._wait()
- return self._result
-
- def error(self):
- self._wait()
- return self._error
-
- def done(self):
- return self._done
-
- def _exec_callbacks(self):
- if self._done_callbacks is not None:
- for fn in self._done_callbacks:
- fn(self)
- self._done_callbacks = None
-
- def set_result(self, result):
- self._result = result
- self._done = True
-
- def set_error(self, error):
- self._error = error
- self._done = True
-
-
-class Client(object):
- """Simple client.
- This class is not thread safe."""
- _transport = None
-
- def __init__(self, transport_factory, args, kwargs):
- self._transport_factory = transport_factory
- self._transport_args = args
- self._transport_kwargs = kwargs
- self._msgidgen = msgidgen()
- self._req_table = {}
- self._unpacker = Unpacker()
-
- def _get_transport(self):
- if not self._transport:
- self._transport = self._transport_factory(
- *self._transport_args,
- **self._transport_kwargs)
- return self._transport
-
- def _send_msg(self, msg):
- msg = packs(msg)
- transport = self._get_transport()
- transport.try_send(msg)
-
- def send_request(self, method, args, callback=None):
- msgid = self._msgidgen.next()
- future = Future(self)
- self._req_table[msgid] = future
- try:
- self._send_msg((0, msgid, method, args))
- if callback is not None:
- future.add_done_callback(callback)
- return future
- except:
- del self._req_table[msgid]
- raise
-
- def call_request(self, method, args):
- """Call request synchronous.
- Return (error, result) tuple."""
- future = self.send_request()
- return future.error(), future.result()
-
- def try_close(self):
- if self._transport:
- self._transport.try_close()
- self._transport = None
-
- def try_recv(self):
- data = self._get_transport().try_recv()
- self._unpacker.feed(data)
- for msg in self._unpacker:
- self._recv_msg(msg)
-
- def _recv_msg(self, msg):
- if len(msg) != 4:
- raise ProtocolError('Invalid msgpack-rpc protocol')
- msgtype, msgid, msgerr, msgret = msg
- if msgtype != 1:
- raise ProtocolError('Invalid msgpack-rpc protocol')
-
- try:
- future = self._req_table.pop(msgid)
- except KeyError:
- raise ProtocolError('Unknown msgid: %r' % (msgid,))
-
- if msgerr is not None:
- future.set_error(msgerr)
- else:
- future.set_result(msgret)
-
- def _connection_closed(self, reason):
- for msgid, future in self._req_table.iteritems():
- future.set_error(TransportError(reason))
- self._req_table.clear()
- self.try_close()
View
17 python/msgpackrpc/client/address.py
@@ -1,17 +0,0 @@
-class Address(object):
- """
- The class to represent the RPC address.
- Currently, only IPV4 is supported in this version.
-
- TODO(kzk): support IPV6, and UnixDomainSocket
- """
-
- def __init__(self, host, port):
- self._host = host
- self._port = port
-
- def get_host(self):
- return self._host
-
- def get_port(self):
- return self._port
View
40 python/msgpackrpc/client/twisted/__init__.py
@@ -1,40 +0,0 @@
-from msgpackrpc.client.address import Address
-from msgpackrpc.client.twisted import session, loop
-
-__all__ = ['Client']
-
-class Client(session.Session):
- """
- The RPC client class, which exposes the API to the users.
- The folowing code shows how to use the asynchronous API.
-
- from msgpackrpc.client.twisted import Client
- c = client.Client("127.0.0.1", 1985)
- f = c.send("hello1", [1])
- f.join()
- print f.get_result()
- c.close()
- """
-
- def __init__(self, host, port):
- session.Session.__init__(self, Address(host, port), loop.Loop())
-
- def send(self, method, args):
- """
- This is the asynchronous API, which returns Future class.
- To get the result from the Future, you need to future.join() like this:
-
- future = client.send("method", args)
- future.join()
- ret = future.get_result()
- """
- return self.send_request(method, args)
-
- def call(self, method, args):
- """Synchronous call"""
- future = self.send(method, args)
- future.join()
- return future.get_result()
-
- def close(self):
- self.try_close()
View
24 python/msgpackrpc/client/twisted/future.py
@@ -1,24 +0,0 @@
-class Future(object):
- """
- This class is used as the result of asynchronous call.
- By using join(), the caller is able to wait for the completion.
- """
-
- def __init__(self, loop):
- self._error = None
- self._result = None
- self._loop = loop
-
- def join(self):
- # TODO(kzk): implement client-side timeout
- while (self._error == None and self._result == None):
- self._loop.run()
-
- def get_result(self):
- return self._result
-
- def set_error(self, error):
- self._error = error
-
- def set_result(self, result):
- self._result = result
View
37 python/msgpackrpc/client/twisted/loop.py
@@ -1,37 +0,0 @@
-from twisted.internet import reactor, error
-
-class Loop(object):
- """
- An I/O loop class which wraps the Twisted reactor.
- """
- def __init__(self):
- self._reactor = reactor
-
- def run(self):
- """
- Enter the IO loop. The caller thread is blocked until someone calls
- the stop() function.
- """
- if not self._reactor.running:
- self._reactor.run()
-
- def stop(self):
- """
- Stopping the twisted reactor if it's running
- This is necessary to wakeup the process which calls reactor.run().
- That is called by Future::join()
- """
- if self._reactor.running:
- try:
- """
- We call crash() instead of stop() here. This is because
- stop() fires 'shutdown' event, and close the connection.
-
- TODO: The documents of Twisted says the crash() has some
- possibilities to lose the internal data. Is there any way to
- *just stop* the ioloop, by not using crash()?
- """
- self._reactor.crash()
- self._reactor._justStopped = False
- except:
- return
View
123 python/msgpackrpc/client/twisted/session.py
@@ -1,123 +0,0 @@
-from msgpackrpc.client.twisted import future, transport
-
-class Session(object):
- """
- Session processes send/recv request of the message, by using underlying
- transport layer.
-
- self._req_table stores the relationship between messageid and corresponding
- future. When the new requets are sent, the Session generates new message id
- and new future. Then the Session registers them to req_table.
-
- When it receives the message, the Session lookups the req_table and set the
- result to the corresponding future.
- """
-
- def __init__(self, addr, loop):
- """
- :param addr: address of the server.
- :param loop: context object.
- """
- self._addr = addr
- self._loop = loop
- self._req_table = {}
- self._transport = None
-
- @property
- def address(self):
- """address of the server."""
- return self._addr
-
- def send_request(self, method, args):
- """Sends the request to the remote MessagePack-RPC server. This takes
- the following steps.
- (1) Generates the new message id and the new future.
- (2) Registers them to the req_table.
- (3) Passes the message to the underlying transport layer
- """
- f = future.Future(self._loop)
- msgid = self._gen_msgid()
- self._req_table[msgid] = f
-
- transport = self._get_transport()
- transport.send_message([0, msgid, method, args])
- return f
-
- def try_close(self):
- if self._transport:
- self._transport.try_close()
- self._transport = None
-
- _msgid_counter = 0
-
- def _gen_msgid(self):
- """Generates new message id, from the global counter"""
- with self._lock:
- counter = msgid = self._msgid_counter
- counter += 1
- if counter > (1 << 30):
- counter = 0
- self._msgid_counter = counter
- return msgid
-
- def _get_transport(self):
- """Creates new transport when it's not available"""
- if self._transport is not None:
- return self._transport
- self._transport = transport.TCPTransport(self, self._loop)
- return self._transport
-
- def _cb_connect_failed(self, reason):
- """The callback called when the connection failed.
- Called by the transport layer.
- """
- # set error for all requests
- for msgid, future in self._req_table.iteritems():
- future.set_error(reason)
- self._req_table = {}
- self.try_close()
- self._loop.stop()
-
- def _cb_msg_received(self, msg):
- """The callback called when the message arrives.
- Called by the transport layer.
- """
- if len(msg) != 4:
- raise Exception("invalid mprpc protocol")
- msgtype, msgid, msgerr, msgret = msg
- if msgtype != 1:
- raise Exception("invalid mprpc protocol")
-
- # lookup msgid in req_table
- if not msgid in self._req_table:
- raise Exception("unknown msgid")
- future = self._req_table.pop(msgid)
-
- # set value to the future
- if msgerr != None:
- future.set_error(msgerr)
- else:
- future.set_result(msgret)
- self._loop.stop()
-
- def _cb_closed(self, reason):
- """The callback called when the connection closed.
- Called by the transport layer.
- """
- # set error for all requests
- for msgid, future in self._req_table.iteritems():
- future.set_error(reason)
- self._req_table = {}
- self.try_close()
- self._loop.stop()
-
- def _cb_failed(self):
- """The callback called when the error occurred.
- Called by the transport layer.
- """
- # set error for all requests
- for msgid, future in self._req_table.items():
- future.set_error("failed")
- self._req_table = {}
- self.try_close()
- self._loop.stop()
View
199 python/msgpackrpc/client/twisted/transport.py
@@ -1,199 +0,0 @@
-import sys
-import msgpack
-from twisted.internet import protocol, reactor
-
-__all__ = ['TCPTransport']
-
-
-class TCPTransport(object):
- """
- TCPTransport sends/receives the data, by using underlying socket layer.
- Before sending the data, it serializes the data into MessagePack format.
- As same, it deserializes the data when it receives the data.
-
- This class also hides the latency of establishing the connection. If the
- connection is not established. the sending messages are temporarily queued.
- Then, they are actually sent to the network when it's connected.
- """
-
- def __init__(self, session, loop):
- self._session = session
- self._loop = loop
-
- self._packer = msgpack.Packer()
- self._unpacker = msgpack.Unpacker()
- self._is_connecting = False
- self._is_connected = False
- self._socket = TCPSocket(self._session.get_addr(), loop, self)
- self._pending_msgs = []
-
- def send_message(self, message):
- packed_msg = self._packer.pack(message)
- if (self._is_connected):
- self._socket.try_send(packed_msg)
- else:
- if not self._is_connecting:
- self._socket.try_connect()
- self._socket.is_connecting = False
- self._pending_msgs.append(packed_msg)
-
- def try_send_pending(self):
- for msg in self._pending_msgs:
- self._socket.try_send(msg)
- self._pending_msgs = []
-
- def try_close(self):
- if (self._socket != None):
- self._socket.try_close()
- self._is_connecting = False
- self._is_connected = False
- self._socket = None
- self._pending_msgs = []
-
- def _cb_connected(self):
- """The callback called when the connection failed.
- Called by the socket layer.
- """
- self._is_connecting = False
- self._is_connected = True
- self.try_send_pending()
-
- def _cb_connect_failed(self, reason):
- """The callback called when the connection failed.
- Called by the socket layer.
- """
- self.try_close()
- self._session._cb_connect_failed(reason)
-
- def _cb_msg_received(self, buf):
- """The callback called when the message arrives.
- Called by the socket layer.
- """
- self._unpacker.feed(buf)
- for msg in self._unpacker:
- self._session._cb_msg_received(msg)
-
- def _cb_closed(self, reason):
- """The callback called when the connection closed.
- Called by the socket layer.
- """
-
- self.try_close()
- self._session._cb_closed(reason)
-
- def _cb_failed(self):
- """The callback called when the error occurred.
- Called by the socket layer.
- """
- self.try_close()
- self._session._cb_failed()
-
-
-class TCPSocket(object):
- """
- TCPSocket uses Twisted framework to actually establish the connection, and
- send/recv the data buffer.
- """
- def __init__(self, addr, loop, transport):
- self._addr = addr
- self._loop = loop
- self._transport = transport
- self._client_factory = None
-
- def try_connect(self):
- if self._client_factory != None:
- raise Exception("already connected")
- self._client_factory = _TwistedClientFactory(self)
- host = self._addr.get_host()
- port = self._addr.get_port()
- reactor.connectTCP(host, port, self._client_factory)
-
- def try_send(self, buf):
- self._client_factory.try_send(buf)
-
- def try_close(self):
- if (self._client_factory != None):
- self._client_factory.try_close()
- self._client_factory = None
-
- def _cb_connected(self):
- self._transport._cb_connected()
-
- def _cb_connect_failed(self, reason):
- self.try_close()
- self._transport._cb_connect_failed(reason)
-
- def _cb_msg_received(self, buf):
- self._transport._cb_msg_received(buf)
-
- def _cb_closed(self, reason):
- self.try_close()
- self._transport._cb_closed(reason)
-
- def _cb_failed(self, error):
- self.try_close()
- self._transport._cb_failed()
-
-
-class _TwistedClientFactory(protocol.ClientFactory):
- """Twisted ClientFactory implementation"""
- def __init__(self, sock):
- self._sock = sock
-
- def buildProtocol(self, addr):
- self._protocol = _TwistedProtocol(self._sock)
- return self._protocol
-
- def clientConnectionLost(self, connector, reason):
- try:
- self._sock._cb_closed(reason)
- except:
- print sys.exc_info()[0]
- self._sock._cb_failed(sys.exc_info()[0])
-
- def clientConnectionFailed(self, connector, reason):
- try:
- self._sock._cb_connect_failed(reason)
- except:
- print sys.exc_info()[0]
- self._sock._cb_failed(sys.exc_info()[0])
-
- def try_send(self, buf):
- try:
- self._protocol.transport.write(buf)
- except:
- print sys.exc_info()[0]
- self._sock._cb_failed(sys.exc_info()[0])
-
- def try_close(self):
- try:
- self._protocol.transport.loseConnection()
- except:
- print sys.exc_info()[0]
- self._sock._cb_failed(sys.exc_info()[0])
-
-class _TwistedProtocol(protocol.Protocol):
- """Twisted Protocol implementation"""
- def __init__(self, sock):
- self._sock = sock
-
- def connectionMade(self):
- try:
- self._sock._cb_connected()
- except:
- print sys.exc_info()[0]
- self._sock._cb_failed(sys.exc_info()[0])
-
- def connectionLost(self, reason):
- try:
- self._sock._cb_closed(reason)
- except:
- print sys.exc_info()[0]
- self._sock._cb_failed(sys.exc_info()[0])
-
- def dataReceived(self, buf):
- try:
- self._sock._cb_msg_received(buf)
- except:
- print sys.exc_info()[0]
- self._sock._cb_failed(sys.exc_info()[0])
View
0 python/msgpackrpc/transport/__init__.py
No changes.
View
15 python/msgpackrpc/transport/tcp.py
@@ -1,15 +0,0 @@
-import socket
-
-class TCPTransport(object):
- def __init__(self, host, port):
- self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self._socket.connect((host, port))
-
- def try_close(self):
- pass
-
- def try_recv(self):
- return self._socket.recv(4096)
-
- def try_send(self, data):
- self._socket.sendall(data)
View
14 python/setup.py
@@ -1,14 +0,0 @@
-#!/usr/bin/env python
-# coding: utf-8
-
-from setuptools import setup, find_packages
-
-setup(name='msgpackrpc',
- version='0.0.1dev',
- author='INADA Naoki',
- author_email='songofacandy@gmail.com',
- packages=find_packages(),
- install_requires=['msgpack-python'],
- test_suite='nose.collector',
- tests_require=['nose',],
- )
View
51 python/test/echoserver.py
@@ -1,51 +0,0 @@
-#!/usr/bin/env python
-# coding: utf-8
-
-"""Echo service.
-This server doesn't use msgpackrpc.server.
-"""
-
-import SocketServer
-from msgpack import packs, Unpacker
-
-class EchoHandler(SocketServer.BaseRequestHandler):
-
- def handle(self):
- unpacker = Unpacker()
-
- while 1:
- data = self.request.recv(4096)
- if len(data) == 0:
- break
- unpacker.feed(data)
- for msg in unpacker:
- print msg
- assert len(msg) == 4
- assert msg[0] == 0
- assert msg[2] == "echo"
- sdata = packs((1, msg[1], None, msg[-1]))
- self.request.sendall(sdata)
-
-def serve_background(server, daemon=False):
- import threading
- t = threading.Thread(target=server.serve_forever)
- t.setDaemon(daemon)
- t.start()
-
-def serve(daemon=False):
- """Serve echo server in background on localhost.
- This returns (server, port). port is number in integer.
-
- To stop, use ``server.shutdown()``
- """
- for port in xrange(9000, 10000):
- try:
- server = SocketServer.TCPServer(('localhost', port), EchoHandler)
- serve_background(server, daemon)
- return server, port
- except Exception:
- pass
-
-if __name__ == '__main__':
- port = serve(False)
- print "Serving on localhost:%d\n" % (port,)
View
34 python/test/test_client.py
@@ -1,34 +0,0 @@
-#!/usr/bin/env python
-# coding: utf-8
-
-from msgpackrpc.client import *
-from msgpackrpc.transport import tcp
-
-import echoserver
-
-SERVER = PORT = None
-
-def setup():
- global SERVER, PORT
- SERVER, PORT = echoserver.serve()
-
-def teardown():
- global SERVER, PORT
- SERVER.shutdown()
- SERVER = PORT = None
-
-def test_client():
- client = Client(tcp.TCPTransport, ('localhost', PORT), {})
-
- f1 = client.send_request('echo', 'foo')
- f2 = client.send_request('echo', 'bar')
- f3 = client.send_request('echo', 'baz')
-
- assert f2.result() == 'bar'
- assert f1.result() == 'foo'
- assert f3.result() == 'baz'
-
-if __name__ == '__main__':
- setup()
- test_client()
- teardown()

0 comments on commit 9b0fe02

Please sign in to comment.