Permalink
Browse files

add (Tornado-based) asynchronous operation mode

  • Loading branch information...
mjrusso committed Dec 28, 2010
1 parent 5a272a7 commit 07ed39527bd6752158b1293e8d2df528d649a613
Showing with 302 additions and 94 deletions.
  1. +43 −3 README.md
  2. +101 −0 pyhbase/async.py
  3. +158 −91 pyhbase/connection.py
View
@@ -1,13 +1,53 @@
-A Python client for the HBase Avro interface, currenlty under development at [hbase-trunk-with-avro](http://github.com/hammer/hbase-trunk-with-avro).
+PyHBase
+=======
-Borrows from the [Mozilla client](http://code.google.com/p/socorro/source/browse/trunk/socorro/hbase/hbaseClient.py).
+Python client for the HBase Avro interface, supporting both synchronous and asynchronous ([Tornado](http://tornadoweb.org/)-based) modes of operation.
-You can find this project on PyPI as [PyHBase](http://pypi.python.org/pypi/PyHBase/). To install, run:
+Installation
+------------
+
+This project is [available on PyPI](http://pypi.python.org/pypi/PyHBase/).
+
+To install, run:
$ sudo pip install pyhbase
+Alternatively, to build directly from source, run:
+
+ $ sudo python setup.py install
+
+HBase Avro Gateway
+------------------
+
+The HBase Avro Gateway is available in HBase 0.9x, and in the HBase 0.2x [hbase-trunk-with-avro](http://github.com/hammer/hbase-trunk-with-avro) fork.
+
+To start the Avro Gateway:
+
+ $ $HBASE_HOME/bin/hbase-daemon.sh start avro
+
+Usage
+-----
+Synchronous usage example:
+ >>> from pyhbase.connection import HBaseConnection
+ >>> sc = HBaseConnection('localhost', 9090)
+ >>> sc.create_table('test_table', 'cf1', 'cf2')
+ >>> sc.put('test_table', 'key1', 'cf1:qualifier1', 'value1')
+ >>> sc.get('test_table', 'key1')
+ {u'entries': [{u'value': 'value1', u'qualifier': 'qualifier1', u'family': 'cf1', u'timestamp': 1293494506843}], u'row': 'key1'}
+Asynchronous usage example:
+ >>> from pyhbase.connection import AsyncHBaseConnection
+ >>> from tornado.ioloop import IOLoop
+ >>> ac = AsyncHBaseConnection('localhost', 9090)
+ >>> def on_response(response):
+ ... print response
+ ... IOLoop.instance().stop()
+ ...
+ >>> ac.get('test_table', 'key1', callback=on_response)
+ >>> IOLoop.instance().start()
+ {u'entries': [{u'value': 'value1', u'qualifier': 'qualifier1', u'family': 'cf1', u'timestamp': 1293494506843}], u'row': 'key1'}
+Note that administrative operations (`create_table`, `alter`, `truncate`, `flush`, etc.) are not available from the asynchronous client.
View
@@ -0,0 +1,101 @@
+# TODO(mjrusso): Discuss possibility of inclusion in canonical Avro package,
+# potentially with module name `avro.tornadoipc`. Note that
+# this module is coded against Avro 1.3.3, the latest Python release
+# available on PyPI as of this writing. Some code changes should be made
+# for integration against Avro trunk, and are indicated with TODO
+# comments in the source, below.
+
+import logging
+import uuid
+
+from avro import ipc
+from avro import io
+from functools import partial
+
+try:
+ from cStringIO import StringIO
+except ImportError:
+ from StringIO import StringIO
+
+try:
+ from tornado import httpclient
+except ImportError:
+ # async operations not supported unless Tornado is installed
+ httpclient = None
+
+
+class TornadoRequestor(ipc.Requestor):
+ # TODO: Avro 1.4+: extend `ipc.BaseRequestor` instead of `ipc.Requestor`
+
+ def request(self, message_name, request_datum, callback):
+ buffer_writer = StringIO()
+ buffer_encoder = io.BinaryEncoder(buffer_writer)
+ self.write_handshake_request(buffer_encoder)
+ self.write_call_request(message_name, request_datum, buffer_encoder)
+
+ call_request = buffer_writer.getvalue()
+ self.issue_request(call_request, message_name, request_datum, callback)
+
+ def issue_request(self, call_request, message_name, request_datum,
+ callback):
+ callback = partial(self._on_issue_request, message_name,
+ request_datum, callback)
+ self.transceiver.transceive(call_request, callback)
+
+ def _on_issue_request(self, message_name, request_datum, callback,
+ call_response):
+ buffer_decoder = io.BinaryDecoder(StringIO(call_response))
+ call_response_exists = self.read_handshake_response(buffer_decoder)
+
+ response = None
+ if call_response_exists:
+ response = self.read_call_response(message_name, buffer_decoder)
+ else:
+ self.request(message_name, request_datum, callback)
+
+ callback(response)
+
+
+class TornadoHTTPTransceiver(object):
+
+ def __init__(self, host, port):
+ self.url = "http://%s:%d/" % (host, port)
+ # use uuid to identify client
+ self._remote_name = uuid.uuid4()
+
+ remote_name = property(lambda self: self._remote_name)
+
+ def transceive(self, request, callback):
+ callback = partial(self._on_transceive, callback)
+ self.write_framed_message(request, callback=callback)
+
+ def _on_transceive(self, callback, response):
+ if response.error:
+ logging.error(
+ "Avro HTTP Error: {error}".format(error=str(response.error)))
+ callback(None)
+ return
+ result = self.read_framed_message(response.buffer)
+ callback(result)
+
+ def read_framed_message(self, response):
+ response_reader = ipc.FramedReader(response)
+ framed_message = response_reader.read_framed_message()
+ response.read() # ensure we're ready for subsequent requests
+ return framed_message
+
+ def write_framed_message(self, message, callback):
+ req_method = 'POST'
+ req_headers = {'Content-Type': 'avro/binary'}
+
+ req_body_buffer = ipc.FramedWriter(StringIO())
+ req_body_buffer.write_framed_message(message)
+ req_body = req_body_buffer.writer.getvalue()
+
+ http_client = httpclient.AsyncHTTPClient()
+ http_request = httpclient.HTTPRequest(
+ self.url, method=req_method, headers=req_headers, body=req_body)
+ http_client.fetch(http_request, callback)
+
+ def close(self):
+ pass
Oops, something went wrong.

0 comments on commit 07ed395

Please sign in to comment.