Permalink
Browse files

death to RetryingThriftClient

  • Loading branch information...
ryanking committed Sep 13, 2010
1 parent d3a5548 commit c5e1d1921416ef07747a52598f8dcc91e3f021b2
Showing with 118 additions and 157 deletions.
  1. +0 −3 lib/thrift_client.rb
  2. +118 −152 lib/thrift_client/abstract_thrift_client.rb
  3. +0 −2 lib/thrift_client/event_machine.rb
View
@@ -13,10 +13,7 @@
require 'thrift_client/abstract_thrift_client'
class ThriftClient < AbstractThriftClient
- # This error is for backwards compatibility only. If defined in
- # RetryingThriftClient instead, causes the test suite will break.
class NoServersAvailable < StandardError; end
- include RetryingThriftClient
=begin rdoc
Create a new ThriftClient instance. Accepts an internal Thrift client class (such as CassandraRb::Client), a list of servers with ports, and optional parameters.
@@ -1,13 +1,40 @@
class AbstractThriftClient
-
DEFAULTS = {
:protocol => Thrift::BinaryProtocol,
:protocol_extra_params => [],
:transport => Thrift::Socket,
:transport_wrapper => Thrift::FramedTransport,
:raise => true,
:defaults => {}
- }.freeze
+ }
+
+ DISCONNECT_ERRORS = [
+ IOError,
+ Thrift::Exception,
+ Thrift::ProtocolException,
+ Thrift::ApplicationException,
+ Thrift::TransportException
+ ]
+
+ DEFAULT_WRAPPED_ERRORS = [
+ Thrift::ApplicationException,
+ Thrift::TransportException,
+ ]
+
+ RETRYING_DEFAULTS = {
+ :exception_classes => DISCONNECT_ERRORS,
+ :randomize_server_list => true,
+ :retries => 0,
+ :server_retry_period => 1,
+ :server_max_requests => nil,
+ :retry_overrides => {},
+ :wrapped_exception_classes => DEFAULT_WRAPPED_ERRORS
+ }
+
+ TIMINGOUT_DEFAULTS = {
+ :timeout => 1,
+ :timeout_overrides => {}
+ }
attr_reader :client, :client_class, :current_server, :server_list, :options, :client_methods
@@ -24,6 +51,21 @@ def initialize(client_class, servers, options = {})
@client_methods << $1
end
end
+ @options = RETRYING_DEFAULTS.merge(@options) # @options is set by super
+ @retries = @options[:retries]
+ @request_count = 0
+ @max_requests = @options[:server_max_requests]
+ @retry_period = @options[:server_retry_period]
+ @options[:wrapped_exception_classes].each do |exception_klass|
+ name = exception_klass.to_s.split('::').last
+ klass = begin
+ @client_class.const_get(name)
+ rescue NameError
+ @client_class.const_set(name, Class.new(exception_klass))
+ end
+ end
+ rebuild_live_server_list!
+ @options = TIMINGOUT_DEFAULTS.merge(@options)
end
def inspect
@@ -34,198 +76,122 @@ def inspect
# called as the connection will be made on the first RPC method
# call.
def connect!
+ @current_server = next_server
@connection = Connection::Factory.create(@options[:transport], @options[:transport_wrapper], @current_server, @options[:timeout])
@connection.connect!
@client = @client_class.new(@options[:protocol].new(@connection.transport, *@options[:protocol_extra_params]))
- rescue Thrift::TransportException, Errno::ECONNREFUSED => e
+ rescue Thrift::TransportException, Errno::ECONNREFUSED
@transport.close rescue nil
- raise e
+ retry
end
def disconnect!
+ # Keep live servers in the list if we have a retry period. Otherwise,
+ # always eject, because we will always re-add them.
+ if @retry_period && @current_server
+ @live_server_list.unshift(@current_server)
+ end
+
@connection.close rescue nil
@client = nil
@current_server = nil
+ @request_count = 0
end
private
+
+ def next_server
+ if @retry_period
+ rebuild_live_server_list! if Time.now > @last_rebuild + @retry_period
+ raise ThriftClient::NoServersAvailable, "No live servers in #{@server_list.inspect} since #{@last_rebuild.inspect}." if @live_server_list.empty?
+ elsif @live_server_list.empty?
+ rebuild_live_server_list!
+ end
+ @live_server_list.pop
+ end
+
+ def rebuild_live_server_list!
+ @last_rebuild = Time.now
+ if @options[:randomize_server_list]
+ @live_server_list = @server_list.sort_by { rand }
+ else
+ @live_server_list = @server_list.dup
+ end
+ end
+
def handled_proxy(method_name, *args)
- proxy(method_name, *args)
- rescue Exception => e
- handle_exception(e, method_name, args)
+ disconnect_on_max! if @max_requests and @request_count >= @max_requests
+ begin
+ proxy(method_name, *args)
+ rescue Exception => e
+ handle_exception(e, method_name, args)
+ end
end
- def post_connect(method_name); end
+ def handle_exception(e, method_name, args=nil)
+ raise e if @options[:raise]
+ @options[:defaults][method_name.to_sym]
+ end
def proxy(method_name, *args)
connect! unless @client
post_connect(method_name)
send_rpc(method_name, *args)
+ rescue *@options[:exception_classes] => e
+ disconnect_on_error!
+ tries ||= (@options[:retry_overrides][method_name.to_sym] || @retries) + 1
+ tries -= 1
+ if tries > 0
+ retry
+ else
+ raise_wrapped_error(e)
+ end
+ end
+
+ def raise_wrapped_error(e)
+ if @options[:wrapped_exception_classes].include?(e.class)
+ raise @client_class.const_get(e.class.to_s.split('::').last), e.message, e.backtrace
+ else
+ raise e
+ end
end
def send_rpc(method_name, *args)
+ @request_count += 1
@client.send(method_name, *args)
end
+ def disconnect_on_max!
+ @live_server_list.push(@current_server)
+ disconnect_on_error!
+ end
+
def disconnect_on_error!
@connection.close rescue nil
@client = nil
@current_server = nil
+ @request_count = 0
end
- def handle_exception(e, method_name, args=nil)
- raise e if @options[:raise]
- @options[:defaults][method_name.to_sym]
+ def post_connect(method_name)
+ return unless has_timeouts?
+ @client.timeout = @options[:timeout_overrides][method_name.to_sym] || @options[:timeout]
end
- module RetryingThriftClient
- DISCONNECT_ERRORS = [
- IOError,
- Thrift::Exception,
- Thrift::ProtocolException,
- Thrift::ApplicationException,
- Thrift::TransportException
- ]
-
- DEFAULT_WRAPPED_ERRORS = [
- Thrift::ApplicationException,
- Thrift::TransportException,
- ]
-
- RETRYING_DEFAULTS = {
- :exception_classes => DISCONNECT_ERRORS,
- :randomize_server_list => true,
- :retries => 0,
- :server_retry_period => 1,
- :server_max_requests => nil,
- :retry_overrides => {},
- :wrapped_exception_classes => DEFAULT_WRAPPED_ERRORS
- }
-
- TIMINGOUT_DEFAULTS = {
- :timeout => 1,
- :timeout_overrides => {}
- }
-
- def initialize(client_class, servers, options = {})
- super
- @options = RETRYING_DEFAULTS.merge(@options) # @options is set by super
- @retries = @options[:retries]
- @request_count = 0
- @max_requests = @options[:server_max_requests]
- @retry_period = @options[:server_retry_period]
- @options[:wrapped_exception_classes].each do |exception_klass|
- name = exception_klass.to_s.split('::').last
- klass = begin
- @client_class.const_get(name)
- rescue NameError
- @client_class.const_set(name, Class.new(exception_klass))
- end
- end
- rebuild_live_server_list!
- @options = TIMINGOUT_DEFAULTS.merge(@options)
- end
-
- def connect!
- @current_server = next_server
- super
- rescue Thrift::TransportException, Errno::ECONNREFUSED
- retry
- end
-
- def disconnect!
- # Keep live servers in the list if we have a retry period. Otherwise,
- # always eject, because we will always re-add them.
- if @retry_period && @current_server
- @live_server_list.unshift(@current_server)
- end
-
- super()
- @request_count = 0
- end
-
- private
-
- def next_server
- if @retry_period
- rebuild_live_server_list! if Time.now > @last_rebuild + @retry_period
- raise ThriftClient::NoServersAvailable, "No live servers in #{@server_list.inspect} since #{@last_rebuild.inspect}." if @live_server_list.empty?
- elsif @live_server_list.empty?
- rebuild_live_server_list!
- end
- @live_server_list.pop
- end
-
- def rebuild_live_server_list!
- @last_rebuild = Time.now
- if @options[:randomize_server_list]
- @live_server_list = @server_list.sort_by { rand }
- else
- @live_server_list = @server_list.dup
- end
- end
-
- def handled_proxy(method_name, *args)
- disconnect_on_max! if @max_requests and @request_count >= @max_requests
- super
- end
-
- def proxy(method_name, *args)
- super
- rescue *@options[:exception_classes] => e
- disconnect_on_error!
- tries ||= (@options[:retry_overrides][method_name.to_sym] || @retries) + 1
- tries -= 1
- if tries > 0
- retry
- else
- raise_wrapped_error(e)
- end
- end
-
- def raise_wrapped_error(e)
- if @options[:wrapped_exception_classes].include?(e.class)
- raise @client_class.const_get(e.class.to_s.split('::').last), e.message, e.backtrace
- else
- raise e
- end
- end
-
- def send_rpc(method_name, *args)
- @request_count += 1
- super
- end
-
- def disconnect_on_max!
- @live_server_list.push(@current_server)
- disconnect_on_error!
- end
-
- def disconnect_on_error!
- super
- @request_count = 0
- end
-
- def post_connect(method_name)
- return unless has_timeouts?
- @client.timeout = @options[:timeout_overrides][method_name.to_sym] || @options[:timeout]
- end
-
- def has_timeouts?
- @has_timeouts ||= has_timeouts!
- end
+ def has_timeouts?
+ @has_timeouts ||= has_timeouts!
+ end
- def has_timeouts!
- @options[:timeout_overrides].any? && transport_can_timeout?
- end
+ def has_timeouts!
+ @options[:timeout_overrides].any? && transport_can_timeout?
+ end
- def transport_can_timeout?
+ def transport_can_timeout?
if (@options[:transport_wrapper] || @options[:transport]).method_defined?(:timeout=)
true
else
warn "ThriftClient: Timeout overrides have no effect with with transport type #{(@options[:transport_wrapper] || @options[:transport])}"
false
end
end
- end
end
@@ -27,8 +27,6 @@ def open
end
Fiber.yield
- # Use Thrift::TransportException so the RetryingThriftClient knows to try the next
- # server instead of raising the error.
raise Thrift::TransportException, "Unable to connect to #{@host}:#{@port}" unless @connection.connected?
@connection
end

0 comments on commit c5e1d19

Please sign in to comment.