Skip to content

Commit

Permalink
Moving from Thrift to Avro, part 2
Browse files Browse the repository at this point in the history
  • Loading branch information
hammer committed May 18, 2010
1 parent 9b0f66c commit cfc0189
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 174 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
A Python client for HBase.
A Python client for the HBase Avro interface.

Borrows from the [Mozilla client](http://code.google.com/p/socorro/source/browse/trunk/socorro/hbase/hbaseClient.py).

Builds on the [Thrift client](http://pypi.python.org/pypi/python-hbase/0.20.4).

63 changes: 2 additions & 61 deletions examples/pyhbase-cli
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,12 @@ if __name__=="__main__":
Commands:
show_tables
describe_table table_name
get_table_regions table_name
is_table_enabled table_name
enable_table table_name
disable_table table_name
compact table_name
major_compact table_name
get table_name row_id
get_cell_versions table_name row_id column number_of_versions
put table_name row_id column_family column value
delete_row table_name row_id
delete_cells table_name row_id column
scan table_name start_row_id column_or_column_family number_of_rows
get table_name row family qualifier
""" % sys.argv[0]

if len(sys.argv) <= 1 or sys.argv[1] == '--help':
Expand Down Expand Up @@ -56,16 +44,6 @@ if __name__=="__main__":
usage()
sys.exit(1)
print connection.show_tables(*args)
elif cmd == 'describe_table':
if len(args) != 1:
usage()
sys.exit(1)
print connection.describe_table(*args)
elif cmd == 'get_table_regions':
if len(args) != 1:
usage()
sys.exit(1)
print connection.get_table_regions(*args)
elif cmd == 'is_table_enabled':
if len(args) != 1:
usage()
Expand All @@ -81,49 +59,12 @@ if __name__=="__main__":
usage()
sys.exit(1)
print connection.disable_table(*args)
elif cmd == 'compact':
if len(args) != 1:
usage()
sys.exit(1)
print connection.compact(*args)
elif cmd == 'major_compact':
if len(args) != 1:
usage()
sys.exit(1)
print connection.major_compact(*args)
elif cmd == 'get':
if len(args) != 2:
usage()
sys.exit(1)
print connection.get(*args)
elif cmd == 'get_cell_versions':
if len(args) != 4:
usage()
sys.exit(1)
print connection.get_cell_versions(*args)
elif cmd == 'put':
if len(args) != 5:
usage()
sys.exit(1)
print connection.put(*args)
elif cmd == 'delete_row':
if len(args) != 2:
usage()
sys.exit(1)
print connection.delete_row(*args)
elif cmd == 'delete_cells':
if len(args) != 3:
usage()
sys.exit(1)
print connection.delete_cellsn(*args)
elif cmd == 'scan':
if len(args) != 4:
usage()
sys.exit(1)
print connection.scan(*args)
print connection.get(*args)
else:
usage()
sys.exit(1)

connection.close()

127 changes: 22 additions & 105 deletions pyhbase/connection.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
import os
import sys

from thrift import Thrift
from thrift.transport import TSocket, TTransport
from thrift.protocol import TBinaryProtocol
from hbase import ttypes
from hbase.Hbase import Client, ColumnDescriptor, Mutation
import avro.ipc as ipc
import avro.protocol as protocol

# TODO(hammer): Figure the canonical place to put this file
PROTO_FILE = os.path.join(os.path.dirname(__file__), 'schema/hbase.avpr')
PROTOCOL = protocol.parse(open(PROTO_FILE).read())

def retry_wrapper(fn):
"""a decorator to add retry symantics to any method that uses hbase"""
def f(self, *args, **kwargs):
try:
return fn(self, *args, **kwargs)
except self.hbaseThriftExceptions:
except:
try:
self.close()
except self.hbaseThriftExceptions:
except:
pass
self.make_connection()
return fn(self, *args, **kwargs)
Expand All @@ -25,155 +27,70 @@ class HBaseConnection(object):
Base class for HBase connections. Supplies methods for a few basic
queries and methods for cleanup of thrift results.
"""
def __init__(self, host, port,
thrift=Thrift,
tsocket=TSocket,
ttrans=TTransport,
protocol=TBinaryProtocol,
ttp=ttypes,
client=Client,
column=ColumnDescriptor,
mutation=Mutation,
logger=None):
def __init__(self, host, port):
self.host = host
self.port = port
self.thriftModule = thrift
self.tsocketModule = tsocket
self.transportModule = ttrans
self.protocolModule = protocol
self.ttypesModule = ttp
self.clientClass = client
self.columnClass = column
self.mutationClass = mutation
self.logger = logger
self.hbaseThriftExceptions = (self.ttypesModule.IOError,
self.ttypesModule.IllegalArgument,
self.ttypesModule.AlreadyExists,
self.thriftModule.TException)
self.client = None
self.requestor = None
self.make_connection()

def make_connection(self, retry=2):
"""Establishes the underlying connection to HBase."""
while retry:
retry -= 1
try:
transport = self.tsocketModule.TSocket(self.host, self.port)
self.transport = self.transportModule.TBufferedTransport(transport)
self.protocol = self.protocolModule.TBinaryProtocol(self.transport)
self.client = self.clientClass(self.protocol)
self.transport.open()
self.client = ipc.HTTPTransceiver(self.host, self.port)
self.requestor = ipc.Requestor(PROTOCOL, self.client)
return
except self.hbaseThriftExceptions, x:
except:
pass
exceptionType, exception, tracebackInfo = sys.exc_info()
raise exception

def close(self):
"""
Close the HBase connection.
"""
self.transport.close()

def _make_rows_nice(self,client_result_object):
"""
Apply _make_row_nice to multiple rows.
"""
res = [self._make_row_nice(row) for row in client_result_object]
return res

def _make_row_nice(self,client_row_object):
"""
Pull out the contents of the Thrift TRowResult objects into a dict.
"""
return dict(((x,(y.value,y.timestamp))
for x, y in client_row_object.columns.items()))

#
# Metadata
#

@retry_wrapper
def show_tables(self):
"""Grab table names."""
return self.client.getTableNames()

@retry_wrapper
def describe_table(self, table_name):
"""Get information about the column families in a table."""
return self.client.getColumnDescriptors(table_name)

@retry_wrapper
def get_table_regions(self, table_name):
return self.client.getTableRegions(table_name)
return self.requestor.request("getTableNames", {})

@retry_wrapper
def is_table_enabled(self, table_name):
"""Determine if a table is enabled."""
return self.client.isTableEnabled(table_name)
return self.requestor.request("isTableEnabled", {"tableName": table_name})

#
# Administrative Operations
#

@retry_wrapper
def enable_table(self, table_name):
return self.client.enableTable(table_name)
return self.requestor.request("enableTable", {"tableName": table_name})

@retry_wrapper
def disable_table(self, table_name):
return self.client.disableTable(table_name)

@retry_wrapper
def compact(self, table_name):
return self.client.compact(table_name)

@retry_wrapper
def major_compact(self, table_name):
return self.client.majorCompact(table_name)
return self.requestor.request("disableTable", {"tableName": table_name})

#
# Get
#

@retry_wrapper
def get(self, table_name, row_id):
"""
Get back every column value for a specific row_id.
"""
return self._make_rows_nice(self.client.getRow(table_name, row_id))

@retry_wrapper
def get_cell_versions(self, table_name, row_id, column, n):
return self.client.getVer(table_name, row_id, column, int(n))
def get(self, table_name, row, family, qualifier):
params = {"tableName": table_name, "row": row, "family": family, "qualifier": qualifier}
return self.requestor.request("get", params)

#
# Put
#

@retry_wrapper
def put(self, table_name, r, cf, c, v):
m = self.mutationClass(0, ':'.join([cf,c]), v)
return self.client.mutateRow(table_name, r, [m])

#
# Delete
#

@retry_wrapper
def delete_row(self, table_name, r):
return self.client.deleteAllRow(table_name, r)

@retry_wrapper
def delete_cells(self, table_name, r, column):
return self.client.deleteAll(table_name, r, column)

#
# Scan
#

@retry_wrapper
def scan(self, table_name, r, column_or_cf, n):
s = self.client.scannerOpen(table_name, r, [column_or_cf])
result = self.client.scannerGetList(s, int(n))
self.client.scannerClose(s)
return result
13 changes: 7 additions & 6 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from setuptools import setup, find_packages

setup(name="PyHBase",
version='0.0.2',
version='0.0.3',
description="High-level Python interface to HBase",
url="http://github.com/hammer/pyhbase/tree/master",
packages=find_packages(),
include_package_data=True,
url="http://github.com/hammer/pyhbase/",
packages=['pyhbase'],
package_dir={'pyhbase': 'pyhbase'},
package_data={'pyhbase': ['schema/*.avpr']},
author="Jeff Hammerbacher",
author_email="hammer@cloudera.com",
keywords="database hbase",
keywords="database hbase avro",
scripts=['examples/pyhbase-cli'],
install_requires=['Thrift', 'python-hbase'])
install_requires=['avro'])

0 comments on commit cfc0189

Please sign in to comment.