Permalink
Browse files

Added support for batch mutations (including batch deletes). Support …

…for slice-ranges/multicolumn deletion mutations not implemented (since the batch mutation ca mix removes and mutation-deletes but removes only support single column deletes)
  • Loading branch information...
1 parent 0216872 commit 5d4634eeef9482dce2e948e88d0d8843b1e6e226 @blanquer blanquer committed Jan 25, 2011
@@ -31,5 +31,31 @@ def _super_insert_mutation(column_family, super_column_name, sub_columns, timest
)
)
end
+
+ # General info about a deletion object within a mutation
+ # timestamp - required. If this is the only param, it will cause deletion of the whole key at that TS
+ # supercolumn - opt. If passed, the deletes will only occur within that supercolumn (only subcolumns
+ # will be deleted). Otherwise the normal columns will be deleted.
+ # predicate - opt. Defines how to match the columns to delete. if supercolumn passed, the slice will
+ # be scoped to subcolumns of that supercolumn.
+
+ # Deletes a single column from the containing key/CF (and possibly supercolumn), at a given timestamp.
+ # Although mutations (as opposed to 'remove' calls) support deleting slices and lists of columns in one shot, this is not implemented here.
+ # The main reason being that the batch function takes removes, but removes don't have that capability...so we'd need to change the remove
+ # methods to use delete mutation calls...although that might have performance implications. We'll leave that refactoring for later.
+ def _delete_mutation(cf, column, subcolumn, timestamp, options={})
+
+ deletion_hash = {:timestamp => timestamp}
+ if is_super(cf)
+ deletion_hash[:super_column] = column if column
+ deletion_hash[:predicate] = CassandraThrift::SlicePredicate.new(:column_names => [subcolumn]) if subcolumn
+ else
+ deletion_hash[:predicate] = CassandraThrift::SlicePredicate.new(:column_names => [column]) if column
+ end
+ CassandraThrift::Mutation.new(
+ :deletion => CassandraThrift::Deletion.new(deletion_hash)
+ )
+ end
+
end
end
@@ -168,18 +168,18 @@ def batch(options = {})
_, _, _, options =
extract_and_validate_params(schema.cf_defs.first.name, "", [options], WRITE_DEFAULTS)
- @batch = []
- yield(self)
- compact_mutations!
-
- @batch.each do |mutation|
- case mutation.first
- when :remove
- _remove(*mutation[1])
- else
- _mutate(*mutation)
- end
- end
+ @batch = []
+ yield(self)
+ compacted_map,seen_clevels = compact_mutations!
+ clevel = if options[:consistency] != nil # Override any clevel from individual mutations if
+ options[:consistency]
+ elsif seen_clevels.length > 1 # Cannot choose which CLevel to use if there are several ones
+ raise "Multiple consistency levels used in the batch, and no override...cannot pick one"
+ else # if no consistency override has been provided but all the clevels in the batch are the same: use that one
+ seen_clevels.first
+ end
+
+ _mutate(compacted_map,clevel)
ensure
@batch = nil
end
@@ -55,5 +55,31 @@ def _super_insert_mutation(column_family, super_column_name, sub_columns, timest
)
)
end
+
+ # General info about a deletion object within a mutation
+ # timestamp - required. If this is the only param, it will cause deletion of the whole key at that TS
+ # supercolumn - opt. If passed, the deletes will only occur within that supercolumn (only subcolumns
+ # will be deleted). Otherwise the normal columns will be deleted.
+ # predicate - opt. Defines how to match the columns to delete. if supercolumn passed, the slice will
+ # be scoped to subcolumns of that supercolumn.
+
+ # Deletes a single column from the containing key/CF (and possibly supercolumn), at a given timestamp.
+ # Although mutations (as opposed to 'remove' calls) support deleting slices and lists of columns in one shot, this is not implemented here.
+ # The main reason being that the batch function takes removes, but removes don't have that capability...so we'd need to change the remove
+ # methods to use delete mutation calls...although that might have performance implications. We'll leave that refactoring for later.
+ def _delete_mutation(cf, column, subcolumn, timestamp, options={})
+
+ deletion_hash = {:timestamp => timestamp}
+ if is_super(cf)
+ deletion_hash[:super_column] = column if column
+ deletion_hash[:predicate] = CassandraThrift::SlicePredicate.new(:column_names => [subcolumn]) if subcolumn
+ else
+ deletion_hash[:predicate] = CassandraThrift::SlicePredicate.new(:column_names => [column]) if column
+ end
+ CassandraThrift::Mutation.new(
+ :deletion => CassandraThrift::Deletion.new(deletion_hash)
+ )
+ end
+
end
end
View
@@ -138,16 +138,26 @@ def insert(column_family, key, hash, options = {})
# _mutate the element at the column_family:key:[column]:[sub_column]
# path you request. Supports the <tt>:consistency</tt> and <tt>:timestamp</tt>
# options.
+ # TODO: we could change this function or add another that support multi-column removal (by list or predicate)
def remove(column_family, key, *columns_and_options)
column_family, column, sub_column, options = extract_and_validate_params(column_family, key, columns_and_options, WRITE_DEFAULTS)
- args = {:column_family => column_family}
- columns = is_super(column_family) ? {:super_column => column, :column => sub_column} : {:column => column}
- column_path = CassandraThrift::ColumnPath.new(args.merge(columns))
-
- mutation = [:remove, [key, column_path, options[:timestamp] || Time.stamp, options[:consistency]]]
-
- @batch ? @batch << mutation : _remove(*mutation[1])
+ if @batch
+ mutation_map =
+ {
+ key => {
+ column_family => [ _delete_mutation(column_family , is_super(column_family)? column : nil , sub_column , options[:timestamp]|| Time.stamp) ]
+ }
+ }
+ @batch << [mutation_map, options[:consistency]]
+ else
+ # Let's continue using the 'remove' thrift method...not sure about the implications/performance of using the mutate instead
+ # Otherwise we coul get use the mutation_map above, and do _mutate(mutation_map, options[:consistency])
+ args = {:column_family => column_family}
+ columns = is_super(column_family) ? {:super_column => column, :column => sub_column} : {:column => column}
+ column_path = CassandraThrift::ColumnPath.new(args.merge(columns))
+ _remove(key, column_path, options[:timestamp] || Time.stamp, options[:consistency])
+ end
end
### Read
@@ -243,16 +253,16 @@ def batch(options = {})
@batch = []
yield(self)
- compact_mutations!
-
- @batch.each do |mutation|
- case mutation.first
- when :remove
- _remove(*mutation[1])
- else
- _mutate(*mutation)
- end
- end
+ compacted_map,seen_clevels = compact_mutations!
+ clevel = if options[:consistency] != nil # Override any clevel from individual mutations if
+ options[:consistency]
+ elsif seen_clevels.length > 1 # Cannot choose which CLevel to use if there are several ones
+ raise "Multiple consistency levels used in the batch, and no override...cannot pick one"
+ else # if no consistency override has been provided but all the clevels in the batch are the same: use that one
+ seen_clevels.first
+ end
+
+ _mutate(compacted_map,clevel)
ensure
@batch = nil
end
@@ -263,9 +273,41 @@ def calling_method
"#{self.class}##{caller[0].split('`').last[0..-3]}"
end
- # Roll up queued mutations, to improve atomicity.
+ # Roll up queued mutations, to improve atomicity (and performance).
def compact_mutations!
- #TODO re-do this rollup
+ used_clevels = {} # hash that lists the consistency levels seen in the batch array. key is the clevel, value is true
+ by_key = {}
+ # @batch is an array of mutation_ops.
+ # A mutation op is a 2-item array containing [mutationmap, consistency_number]
+ # a mutation map is a hash, by key (string) that has a hash by CF name, containing a list of column_mutations)
+ @batch.each do |mutation_op|
+ # A single mutation op looks like:
+ # For an insert/update
+ #[ { key1 =>
+ # { CF1 => [several of CassThrift:Mutation(colname,value,TS,ttl)]
+ # CF2 => [several mutations]
+ # },
+ # key2 => {...} # Not sure if they can come batched like this...so there might only be a single key (and CF)
+ # }, # [0]
+ # consistency # [1]
+ #]
+ # For a remove:
+ # [ :remove, # [0]
+ # [key, CassThrift:ColPath, timestamp, consistency ] # [1]
+ # ]
+ mmap = mutation_op[0] # :remove OR a hash like {"key"=> {"CF"=>[mutationclass1,...] } }
+ used_clevels[mutation_op[1]]=true #save the clevel required for this operation
+
+ mmap.keys.each do |k|
+ by_key[k] = {} unless by_key.has_key? k #make sure the key exists
+ mmap[k].keys.each do |cf| # For each CF in that key
+ by_key[k][cf] = [] unless by_key[k][cf] != nil
+ by_key[k][cf].concat mmap[k][cf] # Append the list of mutations for that key and CF
+ end
+ end
+ end
+ # Returns the batch mutations map, and an array with the consistency levels 'seen' in the batch
+ [by_key, used_clevels.keys]
end
def new_client
View
@@ -133,7 +133,7 @@ def multi_get(column_family, keys, *columns_and_options)
def remove(column_family, key, *columns_and_options)
column_family, column, sub_column, options = extract_and_validate_params_for_real(column_family, key, columns_and_options, WRITE_DEFAULTS)
if @batch
- @batch << [:remove, column_family, key, column]
+ @batch << [:remove, column_family, key, column, sub_column]
else
if column
if sub_column
View
@@ -11,10 +11,10 @@ def setup
@twitter = Cassandra.new('Twitter', "127.0.0.1:9160", :retries => 2, :exception_classes => [])
@twitter.clear_keyspace!
- @blogs = Cassandra.new('Multiblog')
+ @blogs = Cassandra.new('Multiblog', "127.0.0.1:9160",:retries => 2, :exception_classes => [])
@blogs.clear_keyspace!
- @blogs_long = Cassandra.new('MultiblogLong')
+ @blogs_long = Cassandra.new('MultiblogLong', "127.0.0.1:9160",:retries => 2, :exception_classes => [])
@blogs_long.clear_keyspace!
@uuids = (0..6).map {|i| SimpleUUID::UUID.new(Time.at(2**(24+i))) }
@@ -256,11 +256,12 @@ def test_remove_super_sub_key
end
def test_remove_super_value
- columns = {@uuids[1] => 'v1'}
+ columns = {@uuids[1] => 'v1', @uuids[2] => 'v2'}
+ column_name_to_remove = @uuids[2]
@twitter.insert(:StatusRelationships, key, {'user_timelines' => columns})
- @twitter.remove(:StatusRelationships, key, 'user_timelines', columns.keys.first)
- assert_nil @twitter.get(:StatusRelationships, key, 'user_timelines', columns.keys.first)
- assert_nil @twitter.get(:StatusRelationships, key, 'user_timelines').timestamps[columns.keys.first]
+ @twitter.remove(:StatusRelationships, key, 'user_timelines', column_name_to_remove)
+ assert_equal( columns.reject{|k,v| k == column_name_to_remove}, @twitter.get(:StatusRelationships, key, 'user_timelines') )
+ assert_nil @twitter.get(:StatusRelationships, key, 'user_timelines').timestamps[column_name_to_remove]
end
def test_clear_column_family
@@ -360,6 +361,12 @@ def test_batch_mutate
k = key
@twitter.insert(:Users, k + '1', {'body' => 'v1', 'user' => 'v1'})
+ initial_subcolumns = {@uuids[1] => 'v1', @uuids[2] => 'v2'}
+ @twitter.insert(:StatusRelationships, k, {'user_timelines' => initial_subcolumns, 'dummy_supercolumn' => {@uuids[5] => 'value'}})
+ assert_equal(initial_subcolumns, @twitter.get(:StatusRelationships, k, 'user_timelines'))
+ assert_equal({@uuids[5] => 'value'}, @twitter.get(:StatusRelationships, k, 'dummy_supercolumn'))
+ new_subcolumns = {@uuids[3] => 'v3', @uuids[4] => 'v4'}
+ subcolumn_to_delete = initial_subcolumns.keys.first # the first column of the initial set
@twitter.batch do
@twitter.insert(:Users, k + '2', {'body' => 'v2', 'user' => 'v2'})
@@ -377,6 +384,15 @@ def test_batch_mutate
@twitter.remove(:Users, k + '4')
@twitter.insert(:Users, k + '4', {'body' => 'v4', 'user' => 'v4'})
assert_equal({}, @twitter.get(:Users, k + '4')) # Not yet written
+
+ # SuperColumns
+ # Add and delete new sub columns to the user timeline supercolumn
+ @twitter.insert(:StatusRelationships, k, {'user_timelines' => new_subcolumns })
+ @twitter.remove(:StatusRelationships, k, 'user_timelines' , subcolumn_to_delete ) # Delete the first of the initial_subcolumns from the user_timeline supercolumn
+ assert_equal(initial_subcolumns, @twitter.get(:StatusRelationships, k, 'user_timelines')) # No additions or deletes reflected yet
+ # Delete a complete supercolumn
+ @twitter.remove(:StatusRelationships, k, 'dummy_supercolumn' ) # Delete the full dummy supercolumn
+ assert_equal({@uuids[5] => 'value'}, @twitter.get(:StatusRelationships, k, 'dummy_supercolumn')) # dummy supercolumn not yet deleted
end
assert_equal({'body' => 'v2', 'user' => 'v2'}, @twitter.get(:Users, k + '2')) # Written
@@ -389,6 +405,12 @@ def test_batch_mutate
assert_equal({'body' => 'v3', 'user' => 'v3', 'location' => 'v3'}.keys.sort, @twitter.get(:Users, k + '3').timestamps.keys.sort) # Written and compacted
assert_equal({'body' => 'v4', 'user' => 'v4'}.keys.sort, @twitter.get(:Users, k + '4').timestamps.keys.sort) # Written
assert_equal({'body' => 'v'}.keys.sort, @twitter.get(:Statuses, k + '3').timestamps.keys.sort) # Written
+
+ # Final result: initial_subcolumns - initial_subcolumns.first + new_subcolumns
+ resulting_subcolumns = initial_subcolumns.merge(new_subcolumns).reject{|k,v| k == subcolumn_to_delete }
+ assert_equal(resulting_subcolumns, @twitter.get(:StatusRelationships, key, 'user_timelines'))
+ assert_equal({}, @twitter.get(:StatusRelationships, key, 'dummy_supercolumn')) # dummy supercolumn deleted
+
end
def test_complain_about_nil_key

0 comments on commit 5d4634e

Please sign in to comment.