Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Make the Ruby AnalyticsLogger class more fault tolerant

  • Loading branch information...
commit c38707ef6d5e16422d8b6dafac1ba9043732c2e8 1 parent 777c29f
@FooBarWidget FooBarWidget authored
View
291 lib/phusion_passenger/analytics_logger.rb
@@ -24,7 +24,7 @@
require 'thread'
require 'phusion_passenger/utils'
require 'phusion_passenger/debug_logging'
-require 'phusion_passenger/message_client'
+require 'phusion_passenger/message_channel'
module PhusionPassenger
@@ -38,24 +38,33 @@ class AnalyticsLogger
class Log
attr_reader :txn_id
- def initialize(shared_data = nil, txn_id = nil)
- if shared_data
- @shared_data = shared_data
+ def initialize(connection = nil, txn_id = nil)
+ if connection
+ @connection = connection
@txn_id = txn_id
- shared_data.ref
+ connection.ref
end
end
def null?
- return !@shared_data
+ return !@connection
end
def message(text)
- @shared_data.synchronize do
- @shared_data.client.write("log", @txn_id,
- AnalyticsLogger.timestamp_string)
- @shared_data.client.write_scalar(text)
- end if @shared_data
+ @connection.synchronize do
+ return if !@connection.connected?
+ begin
+ @connection.channel.write("log", @txn_id,
+ AnalyticsLogger.timestamp_string)
+ @connection.channel.write_scalar(text)
+ rescue SystemCallError, IOError => e
+ @connection.disconnect
+ DebugLogging.warn("Error communicating with the logging agent: #{e.message}")
+ rescue Exception => e
+ @connection.disconnect
+ raise e
+ end
+ end if @connection
end
def begin_measure(name, extra_info = nil)
@@ -107,30 +116,39 @@ def measured_time_points(name, begin_time, end_time, extra_info = nil)
end
def close(flush_to_disk = false)
- @shared_data.synchronize do
- # We need an ACK here. See abstract_request_handler.rb finalize_request.
- @shared_data.client.write("closeTransaction", @txn_id,
- AnalyticsLogger.timestamp_string, true)
- result = @shared_data.client.read
- if result != ["ok"]
- raise "Expected logging server to respond with 'ok', but got #{result.inspect} instead"
- end
- if flush_to_disk
- @shared_data.client.write("flush")
- result = @shared_data.client.read
+ @connection.synchronize do
+ begin
+ # We need an ACK here. See abstract_request_handler.rb finalize_request.
+ @connection.channel.write("closeTransaction", @txn_id,
+ AnalyticsLogger.timestamp_string, true)
+ result = @connection.channel.read
if result != ["ok"]
- raise "Invalid logging server response #{result.inspect} to the 'flush' command"
+ raise "Expected logging agent to respond with 'ok', but got #{result.inspect} instead"
+ end
+ if flush_to_disk
+ @connection.channel.write("flush")
+ result = @connection.channel.read
+ if result != ["ok"]
+ raise "Invalid logging agent response #{result.inspect} to the 'flush' command"
+ end
end
+ rescue SystemCallError, IOError => e
+ @connection.disconnect
+ DebugLogging.warn("Error communicating with the logging agent: #{e.message}")
+ rescue Exception => e
+ @connection.disconnect
+ raise e
+ ensure
+ @connection.unref
+ @connection = nil
end
- @shared_data.unref
- @shared_data = nil
- end if @shared_data
+ end if @connection
end
def closed?
- if @shared_data
- @shared_data.synchronize do
- return !@shared_data.client.connected?
+ if @connection
+ @connection.synchronize do
+ return !@connection.connected?
end
else
return nil
@@ -170,10 +188,10 @@ def initialize(logging_agent_address, username, password, node_name)
@random_dev = File.open("/dev/urandom")
# This mutex protects the following instance variables, but
- # not the contents of @shared_data.
+ # not the contents of @connection.
@mutex = Mutex.new
- @shared_data = SharedData.new
+ @connection = Connection.new(nil)
if @server_address && local_socket_address?(@server_address)
@max_connect_tries = 10
else
@@ -185,20 +203,20 @@ def initialize(logging_agent_address, username, password, node_name)
def clear_connection
@mutex.synchronize do
- @shared_data.synchronize do
+ @connection.synchronize do
@random_dev = File.open("/dev/urandom") if @random_dev.closed?
- @shared_data.unref
- @shared_data = SharedData.new
+ @connection.unref
+ @connection = Connection.new(nil)
end
end
end
def close
@mutex.synchronize do
- @shared_data.synchronize do
+ @connection.synchronize do
@random_dev.close
- @shared_data.unref
- @shared_data = nil
+ @connection.unref
+ @connection = nil
end
end
end
@@ -212,44 +230,53 @@ def new_transaction(group_name, category = :requests, union_station_key = nil)
txn_id = (AnalyticsLogger.current_time.to_i / 60).to_s(36)
txn_id << "-#{random_token(11)}"
+
Lock.new(@mutex).synchronize do |lock|
- Lock.new(@shared_data.mutex).synchronize do |shared_data_lock|
- try_count = 0
- if current_time >= @next_reconnect_time
- while try_count < @max_connect_tries
+ if current_time < @next_reconnect_time
+ return Log.new
+ end
+
+ Lock.new(@connection.mutex).synchronize do |connection_lock|
+ if !@connection.connected?
begin
- connect if !connected?
- @shared_data.client.write("openTransaction",
- txn_id, group_name, "", category,
- AnalyticsLogger.timestamp_string,
- union_station_key,
- true,
- true)
- result = @shared_data.client.read
- if result != ["ok"]
- raise "Expected logging server to respond with 'ok', but got #{result.inspect} instead"
- end
- return Log.new(@shared_data, txn_id)
- rescue Errno::ENOENT, *NETWORK_ERRORS
- try_count += 1
- disconnect(true)
- shared_data_lock.reset(@shared_data.mutex, false)
- lock.unlock
- sleep RETRY_SLEEP if try_count < @max_connect_tries
- lock.lock
- shared_data_lock.lock
+ connect
+ connection_lock.reset(@connection.mutex)
+ rescue SystemCallError, IOError
+ @connection.disconnect
+ DebugLogging.warn("Cannot connect to the logging agent at #{@server_address}; " +
+ "retrying in #{@reconnect_timeout} second(s).")
+ @next_reconnect_time = current_time + @reconnect_timeout
+ return Log.new
rescue Exception => e
- disconnect
+ @connection.disconnect
raise e
end
end
- # Failed to connect.
- DebugLogging.warn("Cannot connect to the logging agent (#{@server_address}); " +
- "retrying in #{@reconnect_timeout} second(s).")
- @next_reconnect_time = current_time + @reconnect_timeout
+
+ begin
+ @connection.channel.write("openTransaction",
+ txn_id, group_name, "", category,
+ AnalyticsLogger.timestamp_string,
+ union_station_key,
+ true,
+ true)
+ result = @connection.channel.read
+ if result != ["ok"]
+ raise "Expected logging server to respond with 'ok', but got #{result.inspect} instead"
+ end
+ return Log.new(@connection, txn_id)
+ rescue SystemCallError, IOError
+ @connection.disconnect
+ DebugLogging.warn("The logging agent at #{@server_address}" <<
+ " closed the connection; will reconnect in " <<
+ "#{@reconnect_timeout} second(s).")
+ @next_reconnect_time = current_time + @reconnect_timeout
+ return Log.new
+ rescue Exception => e
+ @connection.disconnect
+ raise e
+ end
end
- return Log.new
- end
end
end
@@ -261,38 +288,46 @@ def continue_transaction(txn_id, group_name, category = :requests, union_station
end
Lock.new(@mutex).synchronize do |lock|
- Lock.new(@shared_data.mutex).synchronize do |shared_data_lock|
- try_count = 0
- if current_time >= @next_reconnect_time
- while try_count < @max_connect_tries
+ if current_time < @next_reconnect_time
+ return Log.new
+ end
+
+ Lock.new(@connection.mutex).synchronize do |connection_lock|
+ if !@connection.connected?
begin
- connect if !connected?
- @shared_data.client.write("openTransaction",
- txn_id, group_name, "", category,
- AnalyticsLogger.timestamp_string,
- union_station_key,
- true)
- return Log.new(@shared_data, txn_id)
- rescue Errno::ENOENT, *NETWORK_ERRORS
- try_count += 1
- disconnect(true)
- shared_data_lock.reset(@shared_data.mutex, false)
- lock.unlock
- sleep RETRY_SLEEP if try_count < @max_connect_tries
- lock.lock
- shared_data_lock.lock
+ connect
+ connection_lock.reset(@connection.mutex)
+ rescue SystemCallError, IOError
+ @connection.disconnect
+ DebugLogging.warn("Cannot connect to the logging agent at #{@server_address}; " +
+ "retrying in #{@reconnect_timeout} second(s).")
+ @next_reconnect_time = current_time + @reconnect_timeout
+ return Log.new
rescue Exception => e
- disconnect
+ @connection.disconnect
raise e
end
end
- # Failed to connect.
- DebugLogging.warn("Cannot connect to the logging agent (#{@server_address}); " +
- "retrying in #{@reconnect_timeout} second(s).")
- @next_reconnect_time = current_time + @reconnect_timeout
+
+ begin
+ @connection.channel.write("openTransaction",
+ txn_id, group_name, "", category,
+ AnalyticsLogger.timestamp_string,
+ union_station_key,
+ true)
+ return Log.new(@connection, txn_id)
+ rescue SystemCallError, IOError
+ @connection.disconnect
+ DebugLogging.warn("The logging agent at #{@server_address}" <<
+ " closed the connection; will reconnect in " <<
+ "#{@reconnect_timeout} second(s).")
+ @next_reconnect_time = current_time + @reconnect_timeout
+ return Log.new
+ rescue Exception => e
+ @connection.disconnect
+ raise e
+ end
end
- return Log.new
- end
end
end
@@ -337,18 +372,23 @@ def unlock
end
end
- class SharedData
+ class Connection
attr_reader :mutex
- attr_accessor :client
+ attr_accessor :channel
- def initialize
+ def initialize(io)
@mutex = Mutex.new
@refcount = 1
+ @channel = MessageChannel.new(io) if io
+ end
+
+ def connected?
+ return !!@channel
end
- def disconnect(check_error_response = false)
- # TODO: implement check_error_response support
- @client.close if @client
+ def disconnect
+ @channel.close if @channel
+ @channel = nil
end
def ref
@@ -369,29 +409,46 @@ def synchronize
end
end
- def connected?
- return @shared_data.client && @shared_data.client.connected?
- end
-
def connect
- @shared_data.client = MessageClient.new(@username, @password, @server_address)
- @shared_data.client.write("init", @node_name)
- args = @shared_data.client.read
+ socket = connect_to_server(@server_address)
+ channel = MessageChannel.new(socket)
+
+ result = channel.read
+ if result.nil?
+ raise EOFError
+ elsif result.size != 2 || result[0] != "version"
+ raise IOError, "The logging agent didn't sent a valid version identifier"
+ elsif result[1] != "1"
+ raise IOError, "Unsupported logging agent protocol version #{result[1]}"
+ end
+
+ channel.write_scalar(@username)
+ channel.write_scalar(@password)
+
+ result = channel.read
+ if result.nil?
+ raise EOFError
+ elsif result[0] != "ok"
+ raise SecurityError, result[0]
+ end
+
+ channel.write("init", @node_name)
+ args = channel.read
if !args
- raise Errno::ECONNREFUSED, "Cannot connect to logging server"
+ raise Errno::ECONNREFUSED, "Cannot connect to logging agent"
elsif args.size != 1
- raise IOError, "Logging server returned an invalid reply for the 'init' command"
+ raise IOError, "Logging agent returned an invalid reply for the 'init' command"
elsif args[0] == "server shutting down"
- raise Errno::ECONNREFUSED, "Cannot connect to logging server"
+ raise Errno::ECONNREFUSED, "Cannot connect to logging agent"
elsif args[0] != "ok"
- raise IOError, "Logging server returned an invalid reply for the 'init' command"
+ raise IOError, "Logging agent returned an invalid reply for the 'init' command"
end
- end
-
- def disconnect(check_error_response = false)
- @shared_data.disconnect(check_error_response)
- @shared_data.unref
- @shared_data = SharedData.new
+
+ @connection.unref
+ @connection = Connection.new(socket)
+ rescue Exception => e
+ socket.close if socket && !socket.closed?
+ raise e
end
def random_token(length)
View
48 test/ruby/analytics_logger_spec.rb
@@ -76,9 +76,10 @@ def kill_agent
mock_time(TODAY)
@logger.new_transaction("foobar").close(true)
- shared_data = @logger.instance_variable_get(:"@shared_data")
- shared_data.synchronize do
- shared_data.client.close
+ connection = @logger.instance_variable_get(:"@connection")
+ connection.synchronize do
+ connection.channel.close
+ connection.channel = nil
end
log = @logger.new_transaction("foobar")
@@ -92,24 +93,6 @@ def kill_agent
File.read(log_file).should =~ /hello/
end
- specify "#new_transaction reestablishes the connection to the logging server if the logging server crashed and was restarted" do
- mock_time(TODAY)
-
- @logger.new_transaction("foobar").close
- kill_agent
- start_agent
-
- log = @logger.new_transaction("foobar")
- begin
- log.message("hello")
- ensure
- log.close(true)
- end
-
- log_file = "#{@log_dir}/1/#{FOOBAR_MD5}/#{LOCALHOST_MD5}/requests/2010/04/11/12/log.txt"
- File.read(log_file).should =~ /hello/
- end
-
specify "#new_transaction does not reconnect to the server for a short period of time if connecting failed" do
@logger.reconnect_timeout = 60
@logger.max_connect_tries = 1
@@ -157,9 +140,10 @@ def kill_agent
log2 = @logger2.continue_transaction(log.txn_id, "foobar")
log2.close(true)
- shared_data = @logger2.instance_variable_get(:"@shared_data")
- shared_data.synchronize do
- shared_data.client.close
+ connection = @logger2.instance_variable_get(:"@connection")
+ connection.synchronize do
+ connection.channel.close
+ connection.channel = nil
end
log2 = @logger2.continue_transaction(log.txn_id, "foobar")
@@ -173,7 +157,7 @@ def kill_agent
File.read(log_file).should =~ /hello/
end
- specify "#continue_transaction reestablishes the connection to the logging server if the logging server crashed and was restarted" do
+ specify "#new_transaction and #continue_transaction eventually reestablish the connection to the logging server if the logging server crashed and was restarted" do
mock_time(TODAY)
log = @logger.new_transaction("foobar")
@@ -181,12 +165,20 @@ def kill_agent
kill_agent
start_agent
+ log = @logger.new_transaction("foobar")
+ log.should be_null
+ log2 = @logger2.continue_transaction("1234-abcd", "foobar")
+ log2.should be_null
+
+ mock_time(TODAY + 60)
+ log = @logger.new_transaction("foobar")
log2 = @logger2.continue_transaction(log.txn_id, "foobar")
begin
log2.message("hello")
ensure
log2.close(true)
end
+ log.close(true)
log_file = "#{@log_dir}/1/#{FOOBAR_MD5}/#{LOCALHOST_MD5}/requests/2010/04/11/12/log.txt"
File.read(log_file).should =~ /hello/
@@ -243,9 +235,9 @@ def kill_agent
specify "#clear_connection closes the connection" do
@logger.new_transaction("foobar").close
@logger.clear_connection
- shared_data = @logger.instance_variable_get(:"@shared_data")
- shared_data.synchronize do
- shared_data.client.should be_nil
+ connection = @logger.instance_variable_get(:"@connection")
+ connection.synchronize do
+ connection.channel.should be_nil
end
end
end
Please sign in to comment.
Something went wrong with that request. Please try again.