From b7d02037127607cbcbae3e785901e8e813275baa Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 22 Sep 2011 21:21:34 -0500 Subject: [PATCH] Track more connection metrics, connz sortable by metrics, break out connections --- lib/nats/server.rb | 1 + lib/nats/server/connection.rb | 208 ++++++++++++++++++++++++++++++++++ lib/nats/server/connz.rb | 14 ++- lib/nats/server/server.rb | 207 +-------------------------------- spec/monitor_spec.rb | 106 +++++++++++++++++ 5 files changed, 334 insertions(+), 202 deletions(-) create mode 100644 lib/nats/server/connection.rb diff --git a/lib/nats/server.rb b/lib/nats/server.rb index 1503c3eb..9b658575 100644 --- a/lib/nats/server.rb +++ b/lib/nats/server.rb @@ -10,6 +10,7 @@ require "#{ep}/ext/json" require "#{ep}/server/server" require "#{ep}/server/sublist" +require "#{ep}/server/connection" require "#{ep}/server/options" require "#{ep}/server/const" require "#{ep}/server/util" diff --git a/lib/nats/server/connection.rb b/lib/nats/server/connection.rb new file mode 100644 index 00000000..19ad035d --- /dev/null +++ b/lib/nats/server/connection.rb @@ -0,0 +1,208 @@ +module NATSD #:nodoc: all + + module Connection #:nodoc: all + + attr_accessor :in_msgs, :out_msgs, :in_bytes, :out_bytes + attr_reader :cid, :closing, :last_activity + alias :closing? :closing + + def client_info + @client_info ||= Socket.unpack_sockaddr_in(get_peername) + end + + def info + { + :cid => cid, + :ip => client_info[1], + :port => client_info[0], + :subscriptions => @subscriptions.size, + :pending_size => get_outbound_data_size, + :in_msgs => in_msgs, + :out_msgs => out_msgs, + :in_bytes => in_bytes, + :out_bytes => out_bytes + } + end + + def post_init + @cid = Server.cid + @subscriptions = {} + @verbose = @pedantic = true # suppressed by most clients, but allows friendly telnet + @in_msgs = @out_msgs = @in_bytes = @out_bytes = 0 + @parse_state = AWAITING_CONTROL_LINE + send_info + @auth_pending = EM.add_timer(NATSD::Server.auth_timeout) { connect_auth_timeout } if Server.auth_required? + @ping_timer = EM.add_periodic_timer(NATSD::Server.ping_interval) { send_ping } + @pings_outstanding = 0 + Server.num_connections += 1 + debug "Client connection created", client_info, cid + end + + def send_ping + return if @closing + if @pings_outstanding > NATSD::Server.ping_max + error_close UNRESPONSIVE + return + end + send_data(PING_RESPONSE) + @pings_outstanding += 1 + end + + def connect_auth_timeout + error_close AUTH_REQUIRED + debug "Connection timeout due to lack of auth credentials", cid + end + + def receive_data(data) + @buf = @buf ? @buf << data : data + return close_connection if @buf =~ /(\006|\004)/ # ctrl+c or ctrl+d for telnet friendly + + # while (@buf && !@buf.empty? && !@closing) + while (@buf && !@closing) + case @parse_state + when AWAITING_CONTROL_LINE + case @buf + when PUB_OP + ctrace('PUB OP', strip_op($&)) if NATSD::Server.trace_flag? + return connect_auth_timeout if @auth_pending + @buf = $' + @parse_state = AWAITING_MSG_PAYLOAD + @msg_sub, @msg_reply, @msg_size = $1, $3, $4.to_i + if (@msg_size > NATSD::Server.max_payload) + debug "Message payload size exceeded (#{@msg_size}/#{NATSD::Server.max_payload}), closing connection" + error_close PAYLOAD_TOO_BIG + end + send_data(INVALID_SUBJECT) if (@pedantic && !(@msg_sub =~ SUB_NO_WC)) + when SUB_OP + ctrace('SUB OP', strip_op($&)) if NATSD::Server.trace_flag? + return connect_auth_timeout if @auth_pending + @buf = $' + sub, qgroup, sid = $1, $3, $4 + return send_data(INVALID_SUBJECT) if !($1 =~ SUB) + return send_data(INVALID_SID_TAKEN) if @subscriptions[sid] + sub = Subscriber.new(self, sub, sid, qgroup, 0) + @subscriptions[sid] = sub + Server.subscribe(sub) + send_data(OK) if @verbose + when UNSUB_OP + ctrace('UNSUB OP', strip_op($&)) if NATSD::Server.trace_flag? + return connect_auth_timeout if @auth_pending + @buf = $' + sid, sub = $1, @subscriptions[$1] + if sub + # If we have set max_responses, we will unsubscribe once we have received + # the appropriate amount of responses. + sub.max_responses = ($2 && $3) ? $3.to_i : nil + delete_subscriber(sub) unless (sub.max_responses && (sub.num_responses < sub.max_responses)) + send_data(OK) if @verbose + else + send_data(INVALID_SID_NOEXIST) if @pedantic + end + when PING + ctrace('PING OP', strip_op($&)) if NATSD::Server.trace_flag? + @buf = $' + send_data(PONG_RESPONSE) + when PONG + ctrace('PONG OP', strip_op($&)) if NATSD::Server.trace_flag? + @buf = $' + @pings_outstanding -= 1 + when CONNECT + ctrace('CONNECT OP', strip_op($&)) if NATSD::Server.trace_flag? + @buf = $' + begin + config = JSON.parse($1) + process_connect_config(config) + rescue => e + send_data(INVALID_CONFIG) + log_error + end + when INFO + ctrace('INFO OP', strip_op($&)) if NATSD::Server.trace_flag? + return connect_auth_timeout if @auth_pending + @buf = $' + send_info + when UNKNOWN + ctrace('Unknown Op', strip_op($&)) if NATSD::Server.trace_flag? + return connect_auth_timeout if @auth_pending + @buf = $' + send_data(UNKNOWN_OP) + else + # If we are here we do not have a complete line yet that we understand. + # If too big, cut the connection off. + if @buf.bytesize > NATSD::Server.max_control_line + debug "Control line size exceeded (#{@buf.bytesize}/#{NATSD::Server.max_control_line}), closing connection.." + error_close PROTOCOL_OP_TOO_BIG + end + return + end + @buf = nil if (@buf && @buf.empty?) + + when AWAITING_MSG_PAYLOAD + return unless (@buf.bytesize >= (@msg_size + CR_LF_SIZE)) + msg = @buf.slice(0, @msg_size) + ctrace('Processing msg', @msg_sub, @msg_reply, msg) if NATSD::Server.trace_flag? + send_data(OK) if @verbose + Server.route_to_subscribers(@msg_sub, @msg_reply, msg) + @in_msgs += 1 + @in_bytes += @msg_size + @buf = @buf.slice((@msg_size + CR_LF_SIZE), @buf.bytesize) + @msg_sub = @msg_size = @reply = nil + @parse_state = AWAITING_CONTROL_LINE + @buf = nil if (@buf && @buf.empty?) + end + end + end + + def send_info + send_data("INFO #{Server.info_string}#{CR_LF}") + end + + def process_connect_config(config) + @verbose = config['verbose'] unless config['verbose'].nil? + @pedantic = config['pedantic'] unless config['pedantic'].nil? + return send_data(OK) unless Server.auth_required? + + EM.cancel_timer(@auth_pending) + if Server.auth_ok?(config['user'], config['pass']) + send_data(OK) if @verbose + @auth_pending = nil + else + error_close AUTH_FAILED + debug "Authorization failed for connection", cid + end + end + + def delete_subscriber(sub) + ctrace('DELSUB OP', sub.subject, sub.qgroup, sub.sid) if NATSD::Server.trace_flag? + Server.unsubscribe(sub) + @subscriptions.delete(sub.sid) + end + + def error_close(msg) + send_data(msg) + close_connection_after_writing + @closing = true + end + + def unbind + debug "Client connection closed", client_info, cid + Server.num_connections -= 1 + @subscriptions.each_value { |sub| Server.unsubscribe(sub) } + EM.cancel_timer(@auth_pending) if @auth_pending + @auth_pending = nil + EM.cancel_timer(@ping_timer) if @ping_timer + @ping_timer = nil + + @closing = true + end + + def ctrace(*args) + trace(args, "c: #{cid}") + end + + def strip_op(op='') + op.dup.sub(CR_LF, EMPTY) + end + end + +end diff --git a/lib/nats/server/connz.rb b/lib/nats/server/connz.rb index 1716b498..50668c75 100644 --- a/lib/nats/server/connz.rb +++ b/lib/nats/server/connz.rb @@ -5,8 +5,20 @@ def call(env) c_info = Server.dump_connections qs = env['QUERY_STRING'] if (qs =~ /n=(\d)/) + sort_key = :pending_size + n = $1.to_i + if (qs =~ /s=(\S+)/) + case $1 + when 'in_msgs'; sort_key = :in_msgs + when 'out_msgs'; sort_key = :out_msgs + when 'in_bytes'; sort_key = :in_bytes + when 'out_bytes'; sort_key = :out_bytes + when 'subs'; sort_key = :subscriptions + when 'subscriptions'; sort_key = :subscriptions + end + end conns = c_info[:connections] - c_info[:connections] = conns.sort { |a,b| b[:pending_size] <=> a[:pending_size] } [0, $1.to_i] + c_info[:connections] = conns.sort { |a,b| b[sort_key] <=> a[sort_key] } [0, n] end connz_json = JSON.pretty_generate(c_info) + "\n" hdrs = RACK_JSON_HDR.dup diff --git a/lib/nats/server/server.rb b/lib/nats/server/server.rb index c7553075..34a8e960 100644 --- a/lib/nats/server/server.rb +++ b/lib/nats/server/server.rb @@ -93,7 +93,12 @@ def deliver_to_subscriber(sub, subject, reply, msg) # Accounting @out_msgs += 1 - @out_bytes += msg.bytesize unless msg.nil? + conn.out_msgs += 1 + unless msg.nil? + mbs = msg.bytesize + @out_bytes += mbs + conn.out_bytes += mbs + end conn.send_data("MSG #{subject} #{sub.sid} #{reply}#{msg.bytesize}#{CR_LF}#{msg}#{CR_LF}") @@ -200,204 +205,4 @@ def start_http_server end end - module Connection #:nodoc: all - - attr_reader :cid, :closing - alias :closing? :closing - - def client_info - @client_info ||= Socket.unpack_sockaddr_in(get_peername) - end - - def info - { - :cid => cid, - :ip => client_info[1], - :port => client_info[0], - :subscriptions => @subscriptions.size, - :pending_size => get_outbound_data_size - } - end - - def post_init - @cid = Server.cid - @subscriptions = {} - @verbose = @pedantic = true # suppressed by most clients, but allows friendly telnet - # @receive_data_calls = 0 - @parse_state = AWAITING_CONTROL_LINE - send_info - @auth_pending = EM.add_timer(NATSD::Server.auth_timeout) { connect_auth_timeout } if Server.auth_required? - @ping_timer = EM.add_periodic_timer(NATSD::Server.ping_interval) { send_ping } - @pings_outstanding = 0 - Server.num_connections += 1 - debug "Client connection created", client_info, cid - end - - def send_ping - return if @closing - if @pings_outstanding > NATSD::Server.ping_max - error_close UNRESPONSIVE - return - end - send_data(PING_RESPONSE) - @pings_outstanding += 1 - end - - def connect_auth_timeout - error_close AUTH_REQUIRED - debug "Connection timeout due to lack of auth credentials", cid - end - - def receive_data(data) - # @receive_data_calls += 1 - @buf = @buf ? @buf << data : data - return close_connection if @buf =~ /(\006|\004)/ # ctrl+c or ctrl+d for telnet friendly - - # while (@buf && !@buf.empty? && !@closing) - while (@buf && !@closing) - case @parse_state - when AWAITING_CONTROL_LINE - case @buf - when PUB_OP - ctrace('PUB OP', strip_op($&)) if NATSD::Server.trace_flag? - return connect_auth_timeout if @auth_pending - @buf = $' - @parse_state = AWAITING_MSG_PAYLOAD - @msg_sub, @msg_reply, @msg_size = $1, $3, $4.to_i - if (@msg_size > NATSD::Server.max_payload) - debug "Message payload size exceeded (#{@msg_size}/#{NATSD::Server.max_payload}), closing connection" - error_close PAYLOAD_TOO_BIG - end - send_data(INVALID_SUBJECT) if (@pedantic && !(@msg_sub =~ SUB_NO_WC)) - when SUB_OP - ctrace('SUB OP', strip_op($&)) if NATSD::Server.trace_flag? - return connect_auth_timeout if @auth_pending - @buf = $' - sub, qgroup, sid = $1, $3, $4 - return send_data(INVALID_SUBJECT) if !($1 =~ SUB) - return send_data(INVALID_SID_TAKEN) if @subscriptions[sid] - sub = Subscriber.new(self, sub, sid, qgroup, 0) - @subscriptions[sid] = sub - Server.subscribe(sub) - send_data(OK) if @verbose - when UNSUB_OP - ctrace('UNSUB OP', strip_op($&)) if NATSD::Server.trace_flag? - return connect_auth_timeout if @auth_pending - @buf = $' - sid, sub = $1, @subscriptions[$1] - if sub - # If we have set max_responses, we will unsubscribe once we have received - # the appropriate amount of responses. - sub.max_responses = ($2 && $3) ? $3.to_i : nil - delete_subscriber(sub) unless (sub.max_responses && (sub.num_responses < sub.max_responses)) - send_data(OK) if @verbose - else - send_data(INVALID_SID_NOEXIST) if @pedantic - end - when PING - ctrace('PING OP', strip_op($&)) if NATSD::Server.trace_flag? - @buf = $' - send_data(PONG_RESPONSE) - when PONG - ctrace('PONG OP', strip_op($&)) if NATSD::Server.trace_flag? - @buf = $' - @pings_outstanding -= 1 - when CONNECT - ctrace('CONNECT OP', strip_op($&)) if NATSD::Server.trace_flag? - @buf = $' - begin - config = JSON.parse($1) - process_connect_config(config) - rescue => e - send_data(INVALID_CONFIG) - log_error - end - when INFO - ctrace('INFO OP', strip_op($&)) if NATSD::Server.trace_flag? - return connect_auth_timeout if @auth_pending - @buf = $' - send_info - when UNKNOWN - ctrace('Unknown Op', strip_op($&)) if NATSD::Server.trace_flag? - return connect_auth_timeout if @auth_pending - @buf = $' - send_data(UNKNOWN_OP) - else - # If we are here we do not have a complete line yet that we understand. - # If too big, cut the connection off. - if @buf.bytesize > NATSD::Server.max_control_line - debug "Control line size exceeded (#{@buf.bytesize}/#{NATSD::Server.max_control_line}), closing connection.." - error_close PROTOCOL_OP_TOO_BIG - end - return - end - @buf = nil if (@buf && @buf.empty?) - - when AWAITING_MSG_PAYLOAD - return unless (@buf.bytesize >= (@msg_size + CR_LF_SIZE)) - msg = @buf.slice(0, @msg_size) - ctrace('Processing msg', @msg_sub, @msg_reply, msg) if NATSD::Server.trace_flag? - send_data(OK) if @verbose - Server.route_to_subscribers(@msg_sub, @msg_reply, msg) - @buf = @buf.slice((@msg_size + CR_LF_SIZE), @buf.bytesize) - @msg_sub = @msg_size = @reply = nil - @parse_state = AWAITING_CONTROL_LINE - @buf = nil if (@buf && @buf.empty?) - end - end - end - - def send_info - send_data("INFO #{Server.info_string}#{CR_LF}") - end - - def process_connect_config(config) - @verbose = config['verbose'] unless config['verbose'].nil? - @pedantic = config['pedantic'] unless config['pedantic'].nil? - return send_data(OK) unless Server.auth_required? - - EM.cancel_timer(@auth_pending) - if Server.auth_ok?(config['user'], config['pass']) - send_data(OK) if @verbose - @auth_pending = nil - else - error_close AUTH_FAILED - debug "Authorization failed for connection", cid - end - end - - def delete_subscriber(sub) - ctrace('DELSUB OP', sub.subject, sub.qgroup, sub.sid) if NATSD::Server.trace_flag? - Server.unsubscribe(sub) - @subscriptions.delete(sub.sid) - end - - def error_close(msg) - send_data(msg) - close_connection_after_writing - @closing = true - end - - def unbind - debug "Client connection closed", client_info, cid - Server.num_connections -= 1 - # ctrace "Receive_Data called #{@receive_data_calls} times." if @receive_data_calls > 0 - @subscriptions.each_value { |sub| Server.unsubscribe(sub) } - EM.cancel_timer(@auth_pending) if @auth_pending - @auth_pending = nil - EM.cancel_timer(@ping_timer) if @ping_timer - @ping_timer = nil - - @closing = true - end - - def ctrace(*args) - trace(args, "c: #{cid}") - end - - def strip_op(op='') - op.dup.sub(CR_LF, EMPTY) - end - end - end diff --git a/spec/monitor_spec.rb b/spec/monitor_spec.rb index 7ec45402..9748daf4 100644 --- a/spec/monitor_spec.rb +++ b/spec/monitor_spec.rb @@ -141,6 +141,10 @@ c_info.should have_key :port c_info.should have_key :subscriptions c_info.should have_key :pending_size + c_info.should have_key :in_msgs + c_info.should have_key :out_msgs + c_info.should have_key :in_bytes + c_info.should have_key :out_bytes EM.stop end end @@ -165,10 +169,112 @@ c_info.should have_key :port c_info.should have_key :subscriptions c_info.should have_key :pending_size + c_info.should have_key :in_msgs + c_info.should have_key :out_msgs + c_info.should have_key :in_bytes + c_info.should have_key :out_bytes EM.stop end end + it 'should return connz with subset of connections sorted correctly if requested' do + EM.run do + (1..10).each { NATS.connect(:uri => HTTP_SERVER) } + (1..4).each do + NATS.connect(:uri => HTTP_SERVER) do |c| + c.subscribe('foo') + c.subscribe('foo') + end + end + # Wait for them to register and connz to allow updates + sleep(0.5) + NATS.connect(:uri => HTTP_SERVER) do |c| + (1..10).each { c.publish('foo', "hello world") } + c.flush do + host, port = NATSD::Server.host, HTTP_PORT + + # Test different sorts + + # out_msgs + connz_req = Net::HTTP::Get.new("/connz?n=4&s=out_msgs") + connz_resp = Net::HTTP.new(host, port).start { |http| http.request(connz_req) } + connz_resp.body.should_not be_nil + connz = JSON.parse(connz_resp.body, :symbolize_keys => true, :symbolize_names => true) + connz.should have_key :pending_size + connz.should have_key :num_connections + connz[:num_connections].should == 15 + connz[:connections].size.should == 4 + connz[:connections].each do |c_info| + c_info[:out_msgs].should == 20 + end + + # out_bytes + connz_req = Net::HTTP::Get.new("/connz?n=2&s=out_bytes") + connz_resp = Net::HTTP.new(host, port).start { |http| http.request(connz_req) } + connz_resp.body.should_not be_nil + connz = JSON.parse(connz_resp.body, :symbolize_keys => true, :symbolize_names => true) + connz.should have_key :pending_size + connz.should have_key :num_connections + connz[:num_connections].should == 15 + connz[:connections].size.should == 2 + connz[:connections].each do |c_info| + c_info[:out_bytes].should == 220 + end + + # in_msgs + connz_req = Net::HTTP::Get.new("/connz?n=1&s=in_msgs") + connz_resp = Net::HTTP.new(host, port).start { |http| http.request(connz_req) } + connz_resp.body.should_not be_nil + connz = JSON.parse(connz_resp.body, :symbolize_keys => true, :symbolize_names => true) + connz.should have_key :pending_size + connz.should have_key :num_connections + connz[:num_connections].should == 15 + connz[:connections].size.should == 1 + c_info = connz[:connections].first + c_info[:in_msgs].should == 10 + + # in_bytes + connz_req = Net::HTTP::Get.new("/connz?n=1&s=in_bytes") + connz_resp = Net::HTTP.new(host, port).start { |http| http.request(connz_req) } + connz_resp.body.should_not be_nil + connz = JSON.parse(connz_resp.body, :symbolize_keys => true, :symbolize_names => true) + connz.should have_key :pending_size + connz.should have_key :num_connections + connz[:num_connections].should == 15 + connz[:connections].size.should == 1 + c_info = connz[:connections].first + c_info[:in_bytes].should == 110 + + # subscriptions (short form) + connz_req = Net::HTTP::Get.new("/connz?n=1&s=subs") + connz_resp = Net::HTTP.new(host, port).start { |http| http.request(connz_req) } + connz_resp.body.should_not be_nil + connz = JSON.parse(connz_resp.body, :symbolize_keys => true, :symbolize_names => true) + connz.should have_key :pending_size + connz.should have_key :num_connections + connz[:num_connections].should == 15 + connz[:connections].size.should == 1 + c_info = connz[:connections].first + c_info[:subscriptions].should == 2 + + # subscriptions (long form) + connz_req = Net::HTTP::Get.new("/connz?n=1&s=subscriptions") + connz_resp = Net::HTTP.new(host, port).start { |http| http.request(connz_req) } + connz_resp.body.should_not be_nil + connz = JSON.parse(connz_resp.body, :symbolize_keys => true, :symbolize_names => true) + connz.should have_key :pending_size + connz.should have_key :num_connections + connz[:num_connections].should == 15 + connz[:connections].size.should == 1 + c_info = connz[:connections].first + c_info[:subscriptions].should == 2 + + EM.stop + end + end + end + end + it 'should require auth if configured to do so' do config_file = File.dirname(__FILE__) + '/resources/monitor.yml' config = File.open(config_file) { |f| YAML.load(f) }