Permalink
Browse files

added support for asynchronous writes using new put_cells API methods

  • Loading branch information...
1 parent 781e0e2 commit db85f9bfebc4dafcdfb5035e9703ecaa2a5096b7 @tylerkovacs committed May 5, 2010
Showing with 79 additions and 4 deletions.
  1. +1 −0 CHANGELOG
  2. +14 −2 lib/active_record/connection_adapters/hypertable_adapter.rb
  3. +11 −2 lib/hyper_record.rb
  4. +53 −0 spec/lib/hyper_record_spec.rb
View
@@ -3,6 +3,7 @@
- now compatible with Hypertable version 0.9.3.0
- no longer compatible with earlier Hypertable versions due to a Thrift
API change
+- added support for asynchronous writes through new put_cells ThriftBroker API
0.2.8 (2010/03/01)
- support friendly migration-syntax declared using blocks
@@ -529,16 +529,28 @@ def tables(name=nil)
# mutator as argument. Mutators can be created with the open_mutator
# method. In the near future (Summer 2009), Hypertable will provide
# a periodic mutator that automatically flushes at specific intervals.
- def write_cells(table_name, cells, mutator=nil, flags=nil, flush_interval=nil)
+ def write_cells(table_name, cells, options={})
return if cells.blank?
+ mutator = options[:mutator]
+ flags = options[:flags]
+ flush_interval = options[:flush_interval]
+
retry_on_connection_error {
local_mutator_created = !mutator
begin
t1 = Time.now
mutator ||= open_mutator(table_name, flags, flush_interval)
- @connection.set_cells_as_arrays(mutator, cells)
+ if options[:asynchronous_write]
+ mutate_spec = Hypertable::ThriftGen::MutateSpec.new
+ mutate_spec.appname = 'hyper_record'
+ mutate_spec.flush_interval = 1000
+ mutate_spec.flags = 2
+ @connection.put_cells_as_arrays(table_name, mutate_spec, cells)
+ else
+ @connection.set_cells_as_arrays(mutator, cells)
+ end
ensure
if local_mutator_created && mutator
close_mutator(mutator)
View
@@ -180,7 +180,12 @@ def write_cells(cells, table=self.class.table_name, mutator=self.class.mutator)
# puts msg
end
- connection.write_cells(table, cells, mutator, self.class.mutator_flags, self.class.mutator_flush_interval)
+ connection.write_cells(table, cells, {
+ :mutator => mutator,
+ :flags => self.class.mutator_flags,
+ :flush_interval => self.class.mutator_flush_interval,
+ :asynchronous_write => self.class.asynchronous_write
+ })
end
# Delete an array of cells from Hypertable
@@ -573,11 +578,15 @@ def self.assemble_row_key_from_attributes(attributes)
end
end
- attr_accessor :mutator, :mutator_flags, :mutator_flush_interval
+ attr_accessor :mutator, :mutator_flags, :mutator_flush_interval,
+ :asynchronous_write
+
def mutator_options(*attrs)
symbolized_attrs = attrs.first.symbolize_keys
@mutator_flags = symbolized_attrs[:flags].to_i
@mutator_flush_interval = symbolized_attrs[:flush_interval].to_i
+ @asynchronous_write = symbolized_attrs[:asynchronous_write]
+
if symbolized_attrs[:persistent]
@mutator = self.open_mutator(@mutator_flags, @mutator_flush_interval)
end
@@ -842,6 +842,59 @@ module HyperRecord
it "should support native each_row_as_arrays scanner method"
end
+ describe HyperBase, '.mutator_options' do
+ it 'should default to set_cells method' do
+ class Temp < ActiveRecord::HyperBase
+ def self.create_table
+ hql = "CREATE TABLE #{table_name} (
+ 'name',
+ 'url'
+ )"
+ connection.execute(hql)
+ end
+ end
+
+ Temp.drop_table
+ Temp.create_table
+ Temp.connection.instance_eval("@connection").should_receive(:set_cells_as_arrays).and_return(nil)
+ t = Temp.new({:ROW => 'test', :name => 'test'})
+ t.save!
+ Temp.drop_table
+ end
+
+ describe 'asynchronous_write' do
+ before(:each) do
+ class Temp < ActiveRecord::HyperBase
+ mutator_options :asynchronous_write => true
+
+ def self.create_table
+ hql = "CREATE TABLE #{table_name} (
+ 'name',
+ 'url'
+ )"
+ connection.execute(hql)
+ end
+
+ Temp.drop_table
+ Temp.create_table
+ end
+ end
+
+ it 'should use put_cells method when asynchronous_write enabled' do
+ Temp.connection.instance_eval("@connection").should_receive(:put_cells_as_arrays).and_return(nil)
+ t = Temp.new({:ROW => 'test', :name => 'test'})
+ t.save!
+ end
+
+ it 'should still result in cells being written to hypertable' do
+ t = Temp.new({:ROW => 'test', :name => 'test'})
+ t.save!
+ sleep 3
+ Temp.find('test').should == t
+ end
+ end
+ end
+
describe HyperBase, '.row_key_attributes' do
it "should assemble a row key in the order that matches row key attributes" do
Page.class_eval do

0 comments on commit db85f9b

Please sign in to comment.