Skip to content

Commit

Permalink
refactoring insert to use batch_mutate
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanking committed Mar 15, 2010
1 parent 4aae0cb commit 4570dc3
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 7 deletions.
50 changes: 45 additions & 5 deletions lib/cassandra/cassandra.rb
Expand Up @@ -93,10 +93,50 @@ def insert(column_family, key, hash, options = {})
column_family, _, _, options = extract_and_validate_params(column_family, key, [options], WRITE_DEFAULTS)

timestamp = options[:timestamp] || Time.stamp
cfmap = hash_to_cfmap(column_family, hash, timestamp)
mutation = [:insert, [key, cfmap, options[:consistency]]]
mutation_map = if is_super(column_family)
{
key => {
column_family => hash.collect{|k,v| _super_insert_mutation(column_family, k, v, timestamp) }
}
}
else
{
key => {
column_family => hash.collect{|k,v| _standard_insert_mutation(column_family, k, v, timestamp)}
}
}
end

@batch ? @batch << [mutation_map, options[:consistency]] : _mutate(mutation_map, options[:consistency])
end

def _standard_insert_mutation(column_family, column_name, value, timestamp)
CassandraThrift::Mutation.new(
:column_or_supercolumn => CassandraThrift::ColumnOrSuperColumn.new(
:column => CassandraThrift::Column.new(
:name => column_name_class(column_family).new(column_name).to_s,
:value => value,
:timestamp => timestamp
)
)
)
end

@batch ? @batch << mutation : _insert(*mutation[1])
def _super_insert_mutation(column_family, super_column_name, sub_columns, timestamp)
CassandraThrift::Mutation.new(:column_or_supercolumn =>
CassandraThrift::ColumnOrSuperColumn.new(
:super_column => CassandraThrift::SuperColumn.new(
:name => column_name_class(column_family).new(super_column_name).to_s,
:columns => sub_columns.collect { |sub_column_name, sub_column_value|
CassandraThrift::Column.new(
:name => sub_column_name_class(column_family).new(sub_column_name).to_s,
:value => sub_column_value.to_s,
:timestamp => timestamp
)
}
)
)
)
end

## Delete
Expand Down Expand Up @@ -227,10 +267,10 @@ def batch(options = {})

@batch.each do |mutation|
case mutation.first
when :insert
_insert(*mutation[1])
when :remove
_remove(*mutation[1])
else
_mutate(*mutation)
end
end
ensure
Expand Down
1 change: 1 addition & 0 deletions lib/cassandra/columns.rb
Expand Up @@ -104,5 +104,6 @@ def hash_to_cfmap(column_family, hash, timestamp)
end
h
end

end
end
4 changes: 2 additions & 2 deletions lib/cassandra/protocol.rb
Expand Up @@ -4,8 +4,8 @@ class Cassandra
module Protocol #:nodoc:
private

def _insert(key, cfmap, consistency_level)
@client.batch_insert(@keyspace, key, cfmap, consistency_level)
def _mutate(mutation_map, consistency_level)
@client.batch_mutate(@keyspace, mutation_map, consistency_level)
end

def _remove(key, column_path, timestamp, consistency_level)
Expand Down

0 comments on commit 4570dc3

Please sign in to comment.