Skip to content
This repository has been archived by the owner on Aug 4, 2020. It is now read-only.

Commit

Permalink
Make mutations directly in (batch_)insert
Browse files Browse the repository at this point in the history
This yields a good performance increase over using
on pycassa.batch.Mutator
  • Loading branch information
thobbs committed Sep 5, 2011
1 parent 19630ea commit 9578227
Showing 1 changed file with 67 additions and 13 deletions.
80 changes: 67 additions & 13 deletions pycassa/columnfamily.py
Expand Up @@ -15,7 +15,7 @@
from pycassa.cassandra.ttypes import Column, ColumnOrSuperColumn,\
ColumnParent, ColumnPath, ConsistencyLevel, NotFoundException,\
SlicePredicate, SliceRange, SuperColumn, KeyRange,\
IndexExpression, IndexClause, CounterColumn
IndexExpression, IndexClause, CounterColumn, Mutation
import pycassa.util as util
import pycassa.marshal as marshal
import pycassa.types as types
Expand Down Expand Up @@ -159,10 +159,34 @@ def _set_default_validation_class(self, t):
self._default_validation_class = t
self._default_value_packer = t.pack
self._default_value_unpacker = t.unpack
have_counters = isinstance(t, types.CountercolumnType)
else:
self._default_validation_class = marshal.extract_type_name(t)
self._default_value_packer = marshal.packer_for(t)
self._default_value_unpacker = marshal.unpacker_for(t)
have_counters = self._default_validation_class == "CounterColumnType"

if not self.super:
if have_counters:
def _make_cosc(name, value, timestamp, ttl):
return ColumnOrSuperColumn(counter_column=CounterColumn(name, value))
else:
def _make_cosc(name, value, timestamp, ttl):
return ColumnOrSuperColumn(Column(name, value, timestamp, ttl))
self._make_cosc = _make_cosc
else:
if have_counters:
def _make_column(name, value, timestamp, ttl):
return CounterColumn(name, value)
self._make_column = _make_column

def _make_cosc(scol_name, subcols):
return ColumnOrSuperColumn(counter_super_column=(SuperColumn(scol_name, subcols)))
else:
self._make_column = Column
def _make_cosc(scol_name, subcols):
return ColumnOrSuperColumn(super_column=(SuperColumn(scol_name, subcols)))
self._make_cosc = _make_cosc

def _get_default_validation_class(self):
return self._default_validation_class
Expand Down Expand Up @@ -375,7 +399,7 @@ def _pack_name(self, value, is_supercol_name=False, slice_start=None):
(value.__class__.__name__, d_type))

def _unpack_name(self, b, is_supercol_name=False):
if not self.autopack_names or b is None:
if not self.autopack_names:
return b

try:
Expand Down Expand Up @@ -445,6 +469,19 @@ def _unpack_key(self, b):
raise TypeError("%s cannot be converted to a type matching %s" %
(b, d_type))

def _make_mutation_list(self, columns, timestamp, ttl):
_pack_name = self._pack_name
_pack_value = self._pack_value
if not self.super:
return map(lambda (c, v): Mutation(self._make_cosc(_pack_name(c), _pack_value(v, c), timestamp, ttl)),
columns.items())
else:
mut_list = []
for super_col, subcs in columns.items():
subcols = map(lambda (c, v): self._make_column(_pack_name(c), _pack_value(v, c), timestamp, ttl), subcs.items())
mut_list.append(Mutation(self._make_cosc(_pack_name(super_col, True), subcols)))
return mut_list

def _obtain_connection(self):
self._tlocal.client = self.pool.get()

Expand Down Expand Up @@ -846,13 +883,14 @@ def insert(self, key, columns, timestamp=None, ttl=None,
The timestamp Cassandra reports as being used for insert is returned.
"""
if timestamp is None:
timestamp = self.timestamp()

packed_key = self._pack_key(key)

if ((not self.super) and len(columns) == 1) or \
(self.super and len(columns) == 1 and len(columns.values()[0]) == 1):

if timestamp is None:
timestamp = self.timestamp()

if self.super:
super_col = columns.keys()[0]
cp = self._column_path(super_col)
Expand All @@ -865,14 +903,19 @@ def insert(self, key, columns, timestamp=None, ttl=None,
column = Column(colname, colval, timestamp, ttl)
try:
self._obtain_connection()
self._tlocal.client.insert(packed_key, cp, column,
write_consistency_level or self.write_consistency_level)
return self._tlocal.client.insert(packed_key, cp, column,
write_consistency_level or self.write_consistency_level)
finally:
self._release_connection()
return timestamp
else:
return self.batch_insert({key: columns}, timestamp=timestamp, ttl=ttl,
write_consistency_level=write_consistency_level)
mut_list = self._make_mutation_list(columns, timestamp, ttl)
mutations = {packed_key: {self.column_family: mut_list}}
try:
self._obtain_connection()
return self._tlocal.client.batch_mutate(mutations,
write_consistency_level or self.write_consistency_level)
finally:
self._release_connection()

def batch_insert(self, rows, timestamp=None, ttl=None, write_consistency_level = None):
"""
Expand All @@ -887,10 +930,21 @@ def batch_insert(self, rows, timestamp=None, ttl=None, write_consistency_level =

if timestamp == None:
timestamp = self.timestamp()
batch = self.batch(write_consistency_level=write_consistency_level)

cf = self.column_family
mutations = {}
for key, columns in rows.iteritems():
batch.insert(key, columns, timestamp=timestamp, ttl=ttl)
batch.send()
packed_key = self._pack_key(key)
mut_list = self._make_mutation_list(columns, timestamp, ttl)
mutations[packed_key] = {cf: mut_list}

if mutations:
try:
self._obtain_connection()
res = self._tlocal.client.batch_mutate(mutations, write_consistency_level or self.write_consistency_level)
finally:
self._release_connection()

return timestamp

def add(self, key, column, value=1, super_column=None, write_consistency_level=None):
Expand Down

0 comments on commit 9578227

Please sign in to comment.