Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

create Taps::Chunksize model to be able to feed it arbitrary extra ti…

…me to ignore
  • Loading branch information...
commit c7debaaa7558ec6ca6c3f50a95c2cba77fa0f480 1 parent bba76ac
@ricardochimal authored
View
52 lib/taps/chunksize.rb
@@ -0,0 +1,52 @@
+require 'taps/errors'
+
+class Taps::Chunksize
+ attr_accessor :idle_secs, :time_in_db, :start_time, :end_time, :retries
+ attr_reader :chunksize
+
+ def initialize(chunksize)
+ @chunksize = chunksize
+ @idle_secs = 0.0
+ @retries = 0
+ end
+
+ def to_i
+ chunksize
+ end
+
+ def reset_chunksize
+ @chunksize = (retries <= 1) ? 10 : 1
+ end
+
+ def diff
+ end_time - start_time - time_in_db - idle_secs
+ end
+
+ def time_in_db=(t)
+ @time_in_db = t
+ @time_in_db = @time_in_db.to_f rescue 0.0
+ end
+
+ def time_delta
+ t1 = Time.now
+ yield if block_given?
+ t2 = Time.now
+ t2 - t1
+ end
+
+ def calc_new_chunksize
+ new_chunksize = if retries > 0
+ chunksize
+ elsif diff > 3.0
+ (chunksize / 3).ceil
+ elsif diff > 1.1
+ chunksize - 100
+ elsif diff < 0.8
+ chunksize * 2
+ else
+ chunksize + 100
+ end
+ new_chunksize = 1 if new_chunksize < 1
+ new_chunksize
+ end
+end
View
2  lib/taps/data_stream.rb
@@ -166,7 +166,7 @@ def fetch_from_resource(resource, headers)
res = nil
log.debug "DataStream#fetch_from_resource state -> #{state.inspect}"
state[:chunksize] = Taps::Utils.calculate_chunksize(state[:chunksize]) do |c|
- state[:chunksize] = c
+ state[:chunksize] = c.to_i
res = resource.post({:state => self.to_json}, headers)
end
View
37 lib/taps/operation.rb
@@ -485,23 +485,32 @@ def push_data_from_table(stream, progress)
begin
chunksize = Taps::Utils.calculate_chunksize(chunksize) do |c|
- stream.state[:chunksize] = c
- encoded_data, row_size, elapsed_time = stream.fetch
+ stream.state[:chunksize] = c.to_i
+ encoded_data, row_size, elapsed_time = nil
+ d1 = c.time_delta do
+ encoded_data, row_size, elapsed_time = stream.fetch
+ end
break if stream.complete?
- data = {
- :state => stream.to_hash,
- :checksum => Taps::Utils.checksum(encoded_data).to_s
- }
+ data = nil
+ d2 = c.time_delta do
+ data = {
+ :state => stream.to_hash,
+ :checksum => Taps::Utils.checksum(encoded_data).to_s
+ }
+ end
begin
- content, content_type = Taps::Multipart.create do |r|
- r.attach :name => :encoded_data,
- :payload => encoded_data,
- :content_type => 'application/octet-stream'
- r.attach :name => :json,
- :payload => data.to_json,
- :content_type => 'application/json'
+ content, content_type = nil
+ d3 = c.time_delta do
+ content, content_type = Taps::Multipart.create do |r|
+ r.attach :name => :encoded_data,
+ :payload => encoded_data,
+ :content_type => 'application/octet-stream'
+ r.attach :name => :json,
+ :payload => data.to_json,
+ :content_type => 'application/json'
+ end
end
session_resource['push/table'].post(content, http_headers(:content_type => content_type))
self.stream_state = stream.to_hash
@@ -509,6 +518,8 @@ def push_data_from_table(stream, progress)
Taps::Utils.reraise_server_exception(e)
end
+ c.idle_secs = (d1 + d2 + d3)
+
elapsed_time
end
rescue Taps::CorruptedData => e
View
39 lib/taps/utils.rb
@@ -2,8 +2,10 @@
require 'stringio'
require 'time'
require 'tempfile'
+require 'rest_client'
require 'taps/errors'
+require 'taps/chunksize'
module Taps
module Utils
@@ -93,42 +95,23 @@ def blobs_to_string(row, columns)
end
def calculate_chunksize(old_chunksize)
- chunksize = old_chunksize
+ c = Taps::Chunksize.new(old_chunksize)
- retries = 0
- time_in_db = 0
begin
- t1 = Time.now
- time_in_db = yield chunksize
- time_in_db = time_in_db.to_f rescue 0
+ c.start_time = Time.now
+ c.time_in_db = yield c
rescue Errno::EPIPE, RestClient::RequestFailed, RestClient::RequestTimeout
- retries += 1
- raise if retries > 2
+ c.retries += 1
+ raise if c.retries > 2
# we got disconnected, the chunksize could be too large
- # on first retry change to 10, on successive retries go down to 1
- chunksize = (retries == 1) ? 10 : 1
-
+ # reset the chunksize based on the number of retries
+ c.reset_chunksize
retry
end
- t2 = Time.now
-
- diff = t2 - t1 - time_in_db
-
- new_chunksize = if retries > 0
- chunksize
- elsif diff > 3.0
- (chunksize / 3).ceil
- elsif diff > 1.1
- chunksize - 100
- elsif diff < 0.8
- chunksize * 2
- else
- chunksize + 100
- end
- new_chunksize = 1 if new_chunksize < 1
- new_chunksize
+ c.end_time = Time.now
+ c.calc_new_chunksize
end
def load_schema(database_url, schema_data)
View
41 spec/chunksize_spec.rb
@@ -0,0 +1,41 @@
+require File.dirname(__FILE__) + '/base'
+require 'taps/utils'
+
+describe Taps::Chunksize do
+ it "scales chunksize down slowly when the time delta of the block is just over a second" do
+ Time.stubs(:now).returns(10.0).returns(11.5)
+ Taps::Utils.calculate_chunksize(1000) { |c| }.should == 900
+ end
+
+ it "scales chunksize down fast when the time delta of the block is over 3 seconds" do
+ Time.stubs(:now).returns(10.0).returns(15.0)
+ Taps::Utils.calculate_chunksize(3000) { |c| }.should == 1000
+ end
+
+ it "scales up chunksize fast when the time delta of the block is under 0.8 seconds" do
+ Time.stubs(:now).returns(10.0).returns(10.7)
+ Taps::Utils.calculate_chunksize(1000) { |c| }.should == 2000
+ end
+
+ it "scales up chunksize slow when the time delta of the block is between 0.8 and 1.1 seconds" do
+ Time.stubs(:now).returns(10.0).returns(10.8)
+ Taps::Utils.calculate_chunksize(1000) { |c| }.should == 1100
+
+ Time.stubs(:now).returns(10.0).returns(11.1)
+ Taps::Utils.calculate_chunksize(1000) { |c| }.should == 1100
+ end
+
+ it "will reset the chunksize to a small value if we got a broken pipe exception" do
+ Taps::Utils.calculate_chunksize(1000) do |c|
+ raise Errno::EPIPE if c.chunksize == 1000
+ c.chunksize.should == 10
+ end.should == 10
+ end
+
+ it "will reset the chunksize to a small value if we got a broken pipe exception a second time" do
+ Taps::Utils.calculate_chunksize(1000) do |c|
+ raise Errno::EPIPE if c.chunksize == 1000 || c.chunksize == 10
+ c.chunksize.should == 1
+ end.should == 1
+ end
+end
View
31 spec/utils_spec.rb
@@ -18,37 +18,6 @@
lambda { Taps::Utils.format_data(data, :schema => schema) }.should.raise(Taps::InvalidData)
end
- it "scales chunksize down slowly when the time delta of the block is just over a second" do
- Time.stubs(:now).returns(10.0).returns(11.5)
- Taps::Utils.calculate_chunksize(1000) { }.should == 900
- end
-
- it "scales chunksize down fast when the time delta of the block is over 3 seconds" do
- Time.stubs(:now).returns(10.0).returns(15.0)
- Taps::Utils.calculate_chunksize(3000) { }.should == 1000
- end
-
- it "scales up chunksize fast when the time delta of the block is under 0.8 seconds" do
- Time.stubs(:now).returns(10.0).returns(10.7)
- Taps::Utils.calculate_chunksize(1000) { }.should == 2000
- end
-
- it "scales up chunksize slow when the time delta of the block is between 0.8 and 1.1 seconds" do
- Time.stubs(:now).returns(10.0).returns(10.8)
- Taps::Utils.calculate_chunksize(1000) { }.should == 1100
-
- Time.stubs(:now).returns(10.0).returns(11.1)
- Taps::Utils.calculate_chunksize(1000) { }.should == 1100
- end
-
- it "will reset the chunksize to a small value if we got a broken pipe exception" do
- Taps::Utils.calculate_chunksize(1000) { |c| raise Errno::EPIPE if c == 1000; c.should == 10 }.should == 10
- end
-
- it "will reset the chunksize to a small value if we got a broken pipe exception a second time" do
- Taps::Utils.calculate_chunksize(1000) { |c| raise Errno::EPIPE if c == 1000 || c == 10; c.should == 1 }.should == 1
- end
-
it "returns a list of columns that are text fields if the database is mysql" do
@db = mock("db", :url => "mysql://localhost/mydb")
@db.stubs(:schema).with(:mytable).returns([
Please sign in to comment.
Something went wrong with that request. Please try again.