Permalink
Browse files

Merge branch '211-merge'

  • Loading branch information...
2 parents da20f70 + 02cac9f commit d5b666b01187dfe664f36d566f09019bd993a981 @thobbs thobbs committed Jul 31, 2013
Showing with 56 additions and 7 deletions.
  1. +36 −6 pycassa/columnfamily.py
  2. +19 −0 tests/test_columnfamily.py
  3. +1 −1 tests/util.py
View
@@ -878,7 +878,8 @@ def get_range(self, start="", finish="", columns=None, column_start="",
column_finish="", column_reversed=False, column_count=100,
row_count=None, include_timestamp=False,
super_column=None, read_consistency_level=None,
- buffer_size=None, filter_empty=True, include_ttl=False):
+ buffer_size=None, filter_empty=True, include_ttl=False,
+ start_token=None, finish_token=None):
"""
Get an iterator over rows in a specified key range.
@@ -888,6 +889,14 @@ def get_range(self, start="", finish="", columns=None, column_start="",
order of the MD5 hash of their keys, so getting a lexicographical range
of keys is not feasible.
+ In place of `start` and `finish`, you may use `start_token` and
+ `finish_token` or a combination of `start` and `finish_token`. In this
+ case, you are specifying a token range to fetch instead of a key
+ range. This can be useful for fetching all data owned
+ by a node or for parallelizing a full data set scan. Otherwise,
+ you should typically just use `start` and `finish`. Both `start_token`
+ and `finish_token` must be specified as hex-encoded strings.
+
The `row_count` parameter limits the total number of rows that may be
returned. If left as ``None``, the number of rows that may be returned
is unlimted (this is the default).
@@ -914,10 +923,29 @@ def get_range(self, start="", finish="", columns=None, column_start="",
sp = self._slice_predicate(columns, column_start, column_finish,
column_reversed, column_count, super_column)
+ kr_args = {}
count = 0
i = 0
- last_key = self._pack_key(start)
- finish = self._pack_key(finish)
+
+ if start_token is not None and (start not in ("", None) or finish not in ("", None)):
+ raise ValueError(
+ "ColumnFamily.get_range() received incompatible arguments: "
+ "'start_token' may not be used with 'start' or 'finish'")
+
+ if finish_token is not None and finish not in ("", None):
+ raise ValueError(
+ "ColumnFamily.get_range() received incompatible arguments: "
+ "'finish_token' may not be used with 'finish'")
+
+ if start_token is not None:
+ kr_args['start_token'] = start_token
+ kr_args['end_token'] = "" if finish_token is None else finish_token
+ elif finish_token is not None:
+ kr_args['start_key'] = self._pack_key(start)
+ kr_args['end_token'] = finish_token
+ else:
+ kr_args['start_key'] = self._pack_key(start)
+ kr_args['end_key'] = self._pack_key(finish)
if buffer_size is None:
buffer_size = self.buffer_size
@@ -928,8 +956,8 @@ def get_range(self, start="", finish="", columns=None, column_start="",
buffer_size = row_count
else:
buffer_size = min(row_count - count + 1, buffer_size)
-
- key_range = KeyRange(start_key=last_key, end_key=finish, count=buffer_size)
+ kr_args['count'] = buffer_size
+ key_range = KeyRange(**kr_args)
key_slices = self.pool.execute('get_range_slices', cp, sp, key_range, cl)
# This may happen if nothing was ever inserted
if key_slices is None:
@@ -949,7 +977,9 @@ def get_range(self, start="", finish="", columns=None, column_start="",
if len(key_slices) != buffer_size:
return
- last_key = key_slices[-1].key
+ if 'start_token' in kr_args:
+ del kr_args['start_token']
+ kr_args['start_key'] = key_slices[-1].key
i += 1
def insert(self, key, columns, timestamp=None, ttl=None,
View
@@ -245,6 +245,25 @@ def test_get_range_batching(self):
cf.truncate()
+ @requireOPP
+ def test_get_range_tokens(self):
+ cf.truncate()
+ columns = {'c': 'v'}
+ for i in range(100, 201):
+ cf.insert('key%d' % i, columns)
+
+ results = list(cf.get_range(start_token="key100".encode('hex'), finish_token="key200".encode('hex')))
+ assert_equal(100, len(results))
+
+ results = list(cf.get_range(start_token="key100".encode('hex'), finish_token="key200".encode('hex'), buffer_size=10))
+ assert_equal(100, len(results))
+
+ results = list(cf.get_range(start_token="key100".encode('hex'), buffer_size=10))
+ assert_equal(100, len(results))
+
+ results = list(cf.get_range(finish_token="key201".encode('hex'), buffer_size=10))
+ assert_equal(101, len(results))
+
def insert_insert_get_indexed_slices(self):
indexed_cf = ColumnFamily(pool, 'Indexed1')
View
@@ -10,6 +10,6 @@ def wrapper(self, *args, **kwargs):
partitioner = self.sys_man.describe_partitioner()
if partitioner in ('RandomPartitioner', 'Murmur3Partitioner'):
raise SkipTest('Must use order preserving partitioner for this test')
- return f(*args, **kwargs)
+ return f(self, *args, **kwargs)
return wrapper

0 comments on commit d5b666b

Please sign in to comment.