From 9578227299f6d742b082e1f211744a33a5773661 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Mon, 5 Sep 2011 00:31:58 -0500 Subject: [PATCH] Make mutations directly in (batch_)insert This yields a good performance increase over using on pycassa.batch.Mutator --- pycassa/columnfamily.py | 80 ++++++++++++++++++++++++++++++++++------- 1 file changed, 67 insertions(+), 13 deletions(-) diff --git a/pycassa/columnfamily.py b/pycassa/columnfamily.py index 1da44583..48a865f9 100644 --- a/pycassa/columnfamily.py +++ b/pycassa/columnfamily.py @@ -15,7 +15,7 @@ from pycassa.cassandra.ttypes import Column, ColumnOrSuperColumn,\ ColumnParent, ColumnPath, ConsistencyLevel, NotFoundException,\ SlicePredicate, SliceRange, SuperColumn, KeyRange,\ - IndexExpression, IndexClause, CounterColumn + IndexExpression, IndexClause, CounterColumn, Mutation import pycassa.util as util import pycassa.marshal as marshal import pycassa.types as types @@ -159,10 +159,34 @@ def _set_default_validation_class(self, t): self._default_validation_class = t self._default_value_packer = t.pack self._default_value_unpacker = t.unpack + have_counters = isinstance(t, types.CountercolumnType) else: self._default_validation_class = marshal.extract_type_name(t) self._default_value_packer = marshal.packer_for(t) self._default_value_unpacker = marshal.unpacker_for(t) + have_counters = self._default_validation_class == "CounterColumnType" + + if not self.super: + if have_counters: + def _make_cosc(name, value, timestamp, ttl): + return ColumnOrSuperColumn(counter_column=CounterColumn(name, value)) + else: + def _make_cosc(name, value, timestamp, ttl): + return ColumnOrSuperColumn(Column(name, value, timestamp, ttl)) + self._make_cosc = _make_cosc + else: + if have_counters: + def _make_column(name, value, timestamp, ttl): + return CounterColumn(name, value) + self._make_column = _make_column + + def _make_cosc(scol_name, subcols): + return ColumnOrSuperColumn(counter_super_column=(SuperColumn(scol_name, subcols))) + else: + self._make_column = Column + def _make_cosc(scol_name, subcols): + return ColumnOrSuperColumn(super_column=(SuperColumn(scol_name, subcols))) + self._make_cosc = _make_cosc def _get_default_validation_class(self): return self._default_validation_class @@ -375,7 +399,7 @@ def _pack_name(self, value, is_supercol_name=False, slice_start=None): (value.__class__.__name__, d_type)) def _unpack_name(self, b, is_supercol_name=False): - if not self.autopack_names or b is None: + if not self.autopack_names: return b try: @@ -445,6 +469,19 @@ def _unpack_key(self, b): raise TypeError("%s cannot be converted to a type matching %s" % (b, d_type)) + def _make_mutation_list(self, columns, timestamp, ttl): + _pack_name = self._pack_name + _pack_value = self._pack_value + if not self.super: + return map(lambda (c, v): Mutation(self._make_cosc(_pack_name(c), _pack_value(v, c), timestamp, ttl)), + columns.items()) + else: + mut_list = [] + for super_col, subcs in columns.items(): + subcols = map(lambda (c, v): self._make_column(_pack_name(c), _pack_value(v, c), timestamp, ttl), subcs.items()) + mut_list.append(Mutation(self._make_cosc(_pack_name(super_col, True), subcols))) + return mut_list + def _obtain_connection(self): self._tlocal.client = self.pool.get() @@ -846,13 +883,14 @@ def insert(self, key, columns, timestamp=None, ttl=None, The timestamp Cassandra reports as being used for insert is returned. """ + if timestamp is None: + timestamp = self.timestamp() + packed_key = self._pack_key(key) + if ((not self.super) and len(columns) == 1) or \ (self.super and len(columns) == 1 and len(columns.values()[0]) == 1): - if timestamp is None: - timestamp = self.timestamp() - if self.super: super_col = columns.keys()[0] cp = self._column_path(super_col) @@ -865,14 +903,19 @@ def insert(self, key, columns, timestamp=None, ttl=None, column = Column(colname, colval, timestamp, ttl) try: self._obtain_connection() - self._tlocal.client.insert(packed_key, cp, column, - write_consistency_level or self.write_consistency_level) + return self._tlocal.client.insert(packed_key, cp, column, + write_consistency_level or self.write_consistency_level) finally: self._release_connection() - return timestamp else: - return self.batch_insert({key: columns}, timestamp=timestamp, ttl=ttl, - write_consistency_level=write_consistency_level) + mut_list = self._make_mutation_list(columns, timestamp, ttl) + mutations = {packed_key: {self.column_family: mut_list}} + try: + self._obtain_connection() + return self._tlocal.client.batch_mutate(mutations, + write_consistency_level or self.write_consistency_level) + finally: + self._release_connection() def batch_insert(self, rows, timestamp=None, ttl=None, write_consistency_level = None): """ @@ -887,10 +930,21 @@ def batch_insert(self, rows, timestamp=None, ttl=None, write_consistency_level = if timestamp == None: timestamp = self.timestamp() - batch = self.batch(write_consistency_level=write_consistency_level) + + cf = self.column_family + mutations = {} for key, columns in rows.iteritems(): - batch.insert(key, columns, timestamp=timestamp, ttl=ttl) - batch.send() + packed_key = self._pack_key(key) + mut_list = self._make_mutation_list(columns, timestamp, ttl) + mutations[packed_key] = {cf: mut_list} + + if mutations: + try: + self._obtain_connection() + res = self._tlocal.client.batch_mutate(mutations, write_consistency_level or self.write_consistency_level) + finally: + self._release_connection() + return timestamp def add(self, key, column, value=1, super_column=None, write_consistency_level=None):