Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: pycassa/pycassa
base: master
...
head fork: krystynak/pycassa
compare: master
Checking mergeability… Don't worry, you can still create the pull request.
  • 2 commits
  • 3 files changed
  • 0 commit comments
  • 1 contributor
Showing with 53 additions and 46 deletions.
  1. +51 −40 pycassa/columnfamily.py
  2. +0 −5 pycassa/connection.py
  3. +2 −1  setup.py
View
91 pycassa/columnfamily.py
@@ -1,14 +1,13 @@
from cassandra.ttypes import Column, ColumnOrSuperColumn, ColumnParent, \
ColumnPath, ConsistencyLevel, NotFoundException, SlicePredicate, \
- SliceRange, SuperColumn, Clock, KeyRange, IndexExpression, IndexClause
+ SliceRange, SuperColumn, Mutation, Deletion, Clock, KeyRange, \
+ IndexExpression, IndexClause
import time
import sys
import uuid
import struct
-from batch import CfMutator
-
__all__ = ['gm_timestamp', 'ColumnFamily']
_TYPES = ['BytesType', 'LongType', 'IntegerType', 'UTF8Type', 'AsciiType',
'LexicalUUIDType', 'TimeUUIDType']
@@ -289,11 +288,7 @@ def _pack(self, value, data_type):
elif data_type == 'AsciiType':
return struct.pack(">%ds" % len(value), value)
elif data_type == 'UTF8Type':
- try:
- st = value.encode('utf-8')
- except UnicodeDecodeError:
- # value is already utf-8 encoded
- st = value
+ st = value.encode('utf-8')
return struct.pack(">%ds" % len(st), st)
elif data_type == 'TimeUUIDType' or data_type == 'LexicalUUIDType':
if not hasattr(value, 'bytes'):
@@ -612,8 +607,7 @@ def get_range(self, start="", finish="", columns=None, column_start="",
last_key = key_slices[-1].key
i += 1
- def insert(self, key, columns, clock=None, ttl=None,
- write_consistency_level=None):
+ def insert(self, key, columns, write_consistency_level=None):
"""
Insert or update columns for a key
@@ -633,10 +627,10 @@ def insert(self, key, columns, clock=None, ttl=None,
-------
int timestamp
"""
- return self.batch_insert({key: columns}, clock=clock, ttl=ttl,
- write_consistency_level=write_consistency_level)
+ return self.batch_insert({key: columns},
+ write_consistency_level = write_consistency_level)
- def batch_insert(self, rows, clock=None, ttl=None, write_consistency_level = None):
+ def batch_insert(self, rows, write_consistency_level = None):
"""
Insert or update columns for multiple keys
@@ -654,10 +648,29 @@ def batch_insert(self, rows, clock=None, ttl=None, write_consistency_level = Non
int timestamp
"""
clock = Clock(timestamp=self.timestamp())
- batch = self.batch(write_consistency_level=write_consistency_level)
- for key, columns in rows.iteritems():
- batch.insert(key, columns, clock=clock, ttl=ttl)
- batch.send()
+
+ mutation_map = {}
+
+ for row, cs in rows.iteritems():
+ cols = []
+
+ for c, v in cs.iteritems():
+ if self.super:
+ subc = [Column(name=self._pack_name(subname), \
+ value=self._pack_value(subvalue, subname), clock=clock) \
+ for subname, subvalue in v.iteritems()]
+ column = SuperColumn(name=self._pack_name(c, is_supercol_name=True), columns=subc)
+ cols.append(Mutation(column_or_supercolumn=ColumnOrSuperColumn(super_column=column)))
+ else:
+ column = Column(name=self._pack_name(c), value=self._pack_value(v, c), clock=clock)
+ cols.append(Mutation(column_or_supercolumn=ColumnOrSuperColumn(column=column)))
+
+ if cols:
+ mutation_map[row] = {self.column_family: cols}
+
+ self.client.batch_mutate(mutation_map,
+ self._wcl(write_consistency_level))
+
return clock.timestamp
def remove(self, key, columns=None, super_column=None, write_consistency_level = None):
@@ -680,32 +693,30 @@ def remove(self, key, columns=None, super_column=None, write_consistency_level =
-------
int timestamp
"""
- clock = Clock(timestamp=self.timestamp())
- batch = self.batch(write_consistency_level=write_consistency_level)
- batch.remove(key, columns, super_column, clock)
- batch.send()
- return clock.timestamp
- def batch(self, queue_size=100, write_consistency_level=None):
- """
- Create batch mutator for doing multiple insert,update,remove
- operations using as few roundtrips as possible.
+ packed_cols = None
+ if columns is not None:
+ packed_cols = []
+ for col in columns:
+ packed_cols.append(self._pack_name(col, is_supercol_name = self.super))
- Parameters
- ----------
- queue_size : int
- Max number of mutations per request
- write_consistency_level: ConsistencyLevel
- Consistency level used for mutations.
+ if super_column != '':
+ super_column = self._pack_name(super_column, is_supercol_name=True)
- Returns
- -------
- CfMutator mutator
- """
- if write_consistency_level is None:
- write_consistency_level = self.write_consistency_level
- return CfMutator(self, queue_size=queue_size,
- write_consistency_level=write_consistency_level)
+ clock = Clock(timestamp=self.timestamp())
+ if packed_cols is not None:
+ # Deletion doesn't support SliceRange predicates as of Cassandra 0.6.0,
+ # so we can't add column_start, column_finish, etc... yet
+ sp = SlicePredicate(column_names=packed_cols)
+ deletion = Deletion(clock=clock, super_column=super_column, predicate=sp)
+ mutation = Mutation(deletion=deletion)
+ self.client.batch_mutate({key: {self.column_family: [mutation]}},
+ self._wcl(write_consistency_level))
+ else:
+ cp = ColumnPath(column_family=self.column_family, super_column=super_column)
+ self.client.remove(key, cp, clock,
+ self._wcl(write_consistency_level))
+ return clock.timestamp
def truncate(self):
"""
View
5 pycassa/connection.py
@@ -13,8 +13,6 @@
from cassandra.constants import VERSION
from cassandra.ttypes import AuthenticationRequest
-from batch import Mutator
-
__all__ = ['connect', 'connect_thread_local', 'NoServerAvailable']
DEFAULT_SERVER = 'localhost:9160'
@@ -235,6 +233,3 @@ def get_keyspace_description(self, keyspace=None):
new_metadata[datum.name] = datum
cf_def.column_metadata = new_metadata
return cf_defs
-
- def batch(self, *args, **kwargs):
- return Mutator(self, *args, **kwargs)
View
3  setup.py
@@ -27,7 +27,8 @@
download_url = 'http://github.com/vomjom/pycassa',
license = 'MIT',
keywords = 'cassandra client db distributed thrift',
- packages = ['pycassa'],
+ package_dir = {'pycassa':'pycassa', 'cassandra':'pycassa/cassandra'},
+ packages = ['pycassa','cassandra'],
platforms = 'any',
install_requires = ['thrift'],
scripts=['pycassaShell'],

No commit comments for this range

Something went wrong with that request. Please try again.