Permalink
Browse files

Add support for retrieving sorted columns in HBase >= 0.96

This is possible with the HBase 0.96 Thrift API. This feature uses a new
'sorted_columns' argument to Table.scan(). Fixes issue #39.
  • Loading branch information...
wbolster committed Nov 8, 2013
1 parent d972cab commit aea6980ea181912c0c8cdd61763bc5110c28a445
Showing with 59 additions and 6 deletions.
  1. +40 −6 happybase/table.py
  2. +19 −0 tests/test_api.py
View
@@ -8,7 +8,7 @@
from struct import Struct
from .hbase.ttypes import TScan
-from .util import thrift_type_to_dict, str_increment
+from .util import thrift_type_to_dict, str_increment, OrderedDict
from .batch import Batch
logger = logging.getLogger(__name__)
@@ -24,6 +24,14 @@ def make_row(cell_map, include_timestamp):
return dict((cn, cellfn(cell)) for cn, cell in cell_map.iteritems())
+def make_ordered_row(sorted_columns, include_timestamp):
+ """Make a row dict for sorted column results from scans."""
+ cellfn = include_timestamp and make_cell_timestamp or make_cell
+ return OrderedDict(
+ (column.columnName, cellfn(column.cell))
+ for column in sorted_columns)
+
+
class Table(object):
"""HBase table abstraction class.
@@ -206,7 +214,8 @@ def cells(self, row, column, versions=None, timestamp=None,
def scan(self, row_start=None, row_stop=None, row_prefix=None,
columns=None, filter=None, timestamp=None,
- include_timestamp=False, batch_size=1000, limit=None):
+ include_timestamp=False, batch_size=1000, limit=None,
+ sorted_columns=False):
"""Create a scanner for data in the table.
This method returns an iterable that can be used for looping over the
@@ -236,14 +245,26 @@ def scan(self, row_start=None, row_stop=None, row_prefix=None,
If `limit` is given, at most `limit` results will be returned.
+ If `sorted_columns` is `True`, the columns in the rows returned
+ by this scanner will be retrieved in sorted_columns order, and
+ the data will be stored in `OrderedDict` instances.
+
The `batch_size` argument specifies how many results should be
retrieved per batch when retrieving results from the scanner. Only set
this to a low value (or even 1) if your data is large, since a low
batch size results in added round-trips to the server.
- **Compatibility note:** The `filter` argument is only available when
- using HBase 0.92 (or up). In HBase 0.90 compatibility mode, specifying
- a `filter` raises an exception.
+ **Compatibility notes:**
+
+ * The `filter` argument is only available when using HBase 0.92
+ (or up). In HBase 0.90 compatibility mode, specifying
+ a `filter` raises an exception.
+
+ * The `sorted_columns` argument is only available when using
+ HBase 0.96 (or up).
+
+ .. versionadded:: 0.7
+ `sorted_columns` parameter
:param str row_start: the row key to start at (inclusive)
:param str row_stop: the row key to stop at (exclusive)
@@ -263,6 +284,10 @@ def scan(self, row_start=None, row_stop=None, row_prefix=None,
if limit is not None and limit < 1:
raise ValueError("'limit' must be >= 1")
+ if sorted_columns and self.connection.compat < '0.96':
+ raise NotImplementedError(
+ "'sorted_columns' is not supported in HBase >= 0.96")
+
if row_prefix is not None:
if row_start is not None or row_stop is not None:
raise TypeError(
@@ -312,6 +337,7 @@ def scan(self, row_start=None, row_stop=None, row_prefix=None,
caching=batch_size,
filterString=filter,
batchSize=batch_size,
+ sortColumns=sorted_columns,
)
scan_id = self.connection.client.scannerOpenWithScan(
self.name, scan, {})
@@ -335,7 +361,15 @@ def scan(self, row_start=None, row_stop=None, row_prefix=None,
n_fetched += len(items)
for n_returned, item in enumerate(items, n_returned + 1):
- yield item.row, make_row(item.columns, include_timestamp)
+ row_key = item.row
+ if sorted_columns:
+ row = make_ordered_row(item.sortedColumns,
+ include_timestamp)
+ else:
+ row = make_row(item.columns, include_timestamp)
+
+ yield row_key, row
+
if limit is not None and n_returned == limit:
return
View
@@ -14,6 +14,7 @@
assert_in,
assert_is_instance,
assert_is_not_none,
+ assert_list_equal,
assert_not_in,
assert_raises,
assert_true,
@@ -427,6 +428,24 @@ def calc_len(scanner):
next(scanner)
+def test_scan_sorting():
+ if connection.compat < '0.96':
+ return # not supported
+
+ input_row = {}
+ for i in xrange(100):
+ input_row['cf1:col-%03d' % i] = ''
+ input_key = 'row-scan-sorted'
+ table.put(input_key, input_row)
+
+ scan = table.scan(row_start=input_key, sorted_columns=True)
+ key, row = next(scan)
+ assert_equal(key, input_key)
+ assert_list_equal(
+ sorted(input_row.items()),
+ row.items())
+
+
def test_delete():
row_key = 'row-test-delete'
data = {'cf1:col1': 'v1',

0 comments on commit aea6980

Please sign in to comment.