From a692eba6bb9ef9cc2a4af74699bfc3a2f93f8edd Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Tue, 28 Aug 2018 18:37:36 -0700 Subject: [PATCH 1/3] Add support for drain mode Signed-off-by: Waldemar Quevedo --- lib/nats/client.rb | 70 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 67 insertions(+), 3 deletions(-) diff --git a/lib/nats/client.rb b/lib/nats/client.rb index fbb6e403..10a12acd 100644 --- a/lib/nats/client.rb +++ b/lib/nats/client.rb @@ -42,6 +42,9 @@ module NATS DEFAULT_PING_INTERVAL = 120 DEFAULT_PING_MAX = 2 + # Drain mode support + DEFAULT_DRAIN_TIMEOUT = 30 + # Protocol # @private MSG = /\AMSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\r\n/i #:nodoc: @@ -161,6 +164,7 @@ def connect(uri=nil, opts={}, &blk) opts[:reconnect_time_wait] = RECONNECT_TIME_WAIT if opts[:reconnect_time_wait].nil? opts[:ping_interval] = DEFAULT_PING_INTERVAL if opts[:ping_interval].nil? opts[:max_outstanding_pings] = DEFAULT_PING_MAX if opts[:max_outstanding_pings].nil? + opts[:drain_timeout] = DEFAULT_DRAIN_TIMEOUT if opts[:drain_timeout].nil? # Override with ENV opts[:uri] ||= ENV['NATS_URI'] || DEFAULT_URI @@ -176,6 +180,7 @@ def connect(uri=nil, opts={}, &blk) opts[:no_echo] ||= ENV['NATS_NO_ECHO'] || false opts[:ping_interval] = ENV['NATS_PING_INTERVAL'].to_i unless ENV['NATS_PING_INTERVAL'].nil? opts[:max_outstanding_pings] = ENV['NATS_MAX_OUTSTANDING_PINGS'].to_i unless ENV['NATS_MAX_OUTSTANDING_PINGS'].nil? + opts[:drain_timeout] ||= ENV['NATS_DRAIN_TIMEOUT'].to_i unless ENV['NATS_DRAIN_TIMEOUT'].nil? uri = opts[:uris] || opts[:servers] || opts[:uri] @@ -254,6 +259,16 @@ def stop(&blk) @disconnect_cb = nil end + # Drain gracefully disconnects from the server, letting + # subscribers process pending messages already sent by server and + # optionally calls the associated block. + # @param [Block] &blk called when drain is done and connection is closed. + def drain(&blk) + if (client and !client.draining? and (client.connected? || client.reconnecting?)) + client.drain { blk.call if blk } + end + end + # @return [URI] Connected server def connected_server return nil unless client @@ -272,6 +287,12 @@ def reconnecting? client.reconnecting? end + # @return [Boolean] Draining state + def draining? + return false unless client + client.draining? + end + # @return [Hash] Options def options return {} unless client @@ -420,13 +441,14 @@ def process_uri(uris) end attr_reader :connected, :connect_cb, :err_cb, :err_cb_overridden, :pongs_received #:nodoc: - attr_reader :closing, :reconnecting, :server_pool, :options, :server_info #:nodoc + attr_reader :closing, :reconnecting, :draining, :server_pool, :options, :server_info #:nodoc attr_reader :msgs_received, :msgs_sent, :bytes_received, :bytes_sent, :pings attr_reader :disconnect_cb, :close_cb alias :connected? :connected alias :closing? :closing alias :reconnecting? :reconnecting + alias :draining? :draining def initialize(options) @options = options @@ -457,6 +479,8 @@ def initialize(options) @resp_sub_prefix = nil @nuid = NATS::NUID.new + # Drain mode + @draining = false send_connect_command end @@ -466,7 +490,7 @@ def initialize(options) # @param [String] opt_reply # @param [Block] blk, closure called when publish has been processed by the server. def publish(subject, msg=EMPTY_MSG, opt_reply=nil, &blk) - return unless subject + return unless subject or draining? msg = msg.to_s # Accounting @@ -508,6 +532,42 @@ def unsubscribe(sid, opt_max=nil) @subs.delete(sid) unless (sub[:max] && (sub[:received] < sub[:max])) end + # Drain gracefully closes the connection. + # @param [Block] + def drain(&blk) + return if draining? or closing? + @draining = true + + # Remove interest in all subjects to stop receiving messages. + @subs.each do |sid, _| + send_command("UNSUB #{sid} #{CR_LF}") + end + + # Roundtrip to ensure no more messages are received. + flush do + drain_timeout_timer, draining_timer = nil, nil + drain_timeout_timer = EM.add_timer(options[:drain_timeout]) do + EM.cancel_timer(draining_timer) + + # Report the timeout via the error callback and just close + err_cb.call(NATS::ClientError.new("Drain Timeout")) + close unless closing? + blk.call if blk + end + + # Periodically check for the pending data to be empty. + draining_timer = EM.add_periodic_timer(0.1) do + next unless closing? or @buf.nil? or @buf.empty? + EM.cancel_timer(drain_timeout_timer) + EM.cancel_timer(draining_timer) + + # We're done draining and can close now. + close unless closing? + blk.call if blk + end + end + end + # Return the active subscription count. # @return [Number] def subscription_count @@ -776,6 +836,8 @@ def flush_pending #:nodoc: def receive_data(data) #:nodoc: @buf = @buf ? @buf << data : data + + # puts "#{@buf.bytesize} || #{@buf[0..20]}" if draining? while (@buf) case @parse_state when AWAITING_CONTROL_LINE @@ -823,7 +885,9 @@ def receive_data(data) #:nodoc: @parse_state = AWAITING_CONTROL_LINE @buf = nil if (@buf && @buf.empty?) end - + # if draining? and @buf + # puts "#{@buf.bytesize} |> #{@buf[0..20]}" + # end end end From a9ab211854ba7b87795da46f34b96afc20e82270 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Wed, 29 Aug 2018 16:21:31 -0700 Subject: [PATCH 2/3] Add spec for drain mode Signed-off-by: Waldemar Quevedo --- lib/nats/client.rb | 10 +- spec/client/client_connect_spec.rb | 1 - spec/client/client_drain_spec.rb | 183 +++++++++++++++++++++++++++++ 3 files changed, 187 insertions(+), 7 deletions(-) create mode 100644 spec/client/client_drain_spec.rb diff --git a/lib/nats/client.rb b/lib/nats/client.rb index 10a12acd..3b365c4e 100644 --- a/lib/nats/client.rb +++ b/lib/nats/client.rb @@ -533,7 +533,7 @@ def unsubscribe(sid, opt_max=nil) end # Drain gracefully closes the connection. - # @param [Block] + # @param [Block] blk called when drain is done and connection is closed. def drain(&blk) return if draining? or closing? @draining = true @@ -551,6 +551,7 @@ def drain(&blk) # Report the timeout via the error callback and just close err_cb.call(NATS::ClientError.new("Drain Timeout")) + @draining = false close unless closing? blk.call if blk end @@ -562,12 +563,13 @@ def drain(&blk) EM.cancel_timer(draining_timer) # We're done draining and can close now. + @draining = false close unless closing? blk.call if blk end end end - + # Return the active subscription count. # @return [Number] def subscription_count @@ -837,7 +839,6 @@ def flush_pending #:nodoc: def receive_data(data) #:nodoc: @buf = @buf ? @buf << data : data - # puts "#{@buf.bytesize} || #{@buf[0..20]}" if draining? while (@buf) case @parse_state when AWAITING_CONTROL_LINE @@ -885,9 +886,6 @@ def receive_data(data) #:nodoc: @parse_state = AWAITING_CONTROL_LINE @buf = nil if (@buf && @buf.empty?) end - # if draining? and @buf - # puts "#{@buf.bytesize} |> #{@buf[0..20]}" - # end end end diff --git a/spec/client/client_connect_spec.rb b/spec/client/client_connect_spec.rb index 56196a93..1bd2de8b 100644 --- a/spec/client/client_connect_spec.rb +++ b/spec/client/client_connect_spec.rb @@ -18,7 +18,6 @@ msgs_b = [] with_em_timeout do NATS.on_error do |e| - p e errors << e end diff --git a/spec/client/client_drain_spec.rb b/spec/client/client_drain_spec.rb new file mode 100644 index 00000000..4fa4c3f3 --- /dev/null +++ b/spec/client/client_drain_spec.rb @@ -0,0 +1,183 @@ +require 'spec_helper' + +describe 'Client - Drain' do + + before(:each) do + @s = NatsServerControl.new + @s.start_server(true) + end + + after(:each) do + @s.kill_server + end + + it "should support draining a connection" do + msgs = [] + errors = [] + closed = false + drained = false + after_drain = nil + total_msgs_before_drain = nil + total_msgs_after_drain = nil + pending_data_before_draining = nil + pending_data_after_draining = nil + + with_em_timeout(10) do |future| + nc1 = NATS.connect(uri: @s.uri) do |nc| + expect(nc.options[:drain_timeout]).to eql(30) + nc.on_error do |err| + errors << err + end + nc.on_close do |err| + closed = true + future.resume + end + + nc.subscribe("foo", queue: "worker") do |msg, reply| + nc.publish(reply, "ACK:foo") + end + + nc.subscribe("bar") do |msg, reply| + nc.publish(reply, "ACK:bar") + end + + nc.subscribe("quux") do |msg, reply| + nc.publish(reply, "ACK:quux") + end + + EM.add_timer(1) do + # Before draining + subs = nc.instance_variable_get('@subs') + pending_data_before_draining = nc.instance_variable_get('@buf') + total_msgs_before_drain = subs.reduce(0) do |total, pair| + sid, sub = pair + total += sub[:received] + end + + nc.drain do + after_drain = nc.draining? + drained = true + + pending_data_after_draining = nc.instance_variable_get('@buf') + subs = nc.instance_variable_get('@subs') + total_msgs_after_drain = subs.reduce(0) do |total, pair| + sid, sub = pair + total += sub[:received] + end + end + end + end + + # Fast publisher + nc2 = NATS.connect(uri: @s.uri) do |nc| + inbox = NATS.create_inbox + nc.subscribe(inbox) do |msg| + msgs << msg + end + + timer = EM.add_periodic_timer(0.1) do + 10000.times do + nc.publish("foo", "hi", inbox) + nc.publish("bar", "hi", inbox) + nc.publish("quux", "hi", inbox) + end + end + EM.add_timer(1) do + EM.cancel_timer(timer) + end + end + end + expect(errors.count).to eql(0) + expect(closed).to eql(true) + expect(drained).to eql(true) + expect(after_drain).to eql(false) + expect(total_msgs_before_drain < total_msgs_after_drain).to eql(true) + expect(pending_data_after_draining).to eql(nil) + end + + it "should timeout draining if takes too long" do + msgs = [] + errors = [] + closed = false + drained = false + after_drain = nil + total_msgs_before_drain = nil + total_msgs_after_drain = nil + pending_data_before_draining = nil + pending_data_after_draining = nil + + with_em_timeout(10) do |future| + # Use a very short timeout for to timeout. + nc1 = NATS.connect(uri: @s.uri, drain_timeout: 0.01) do |nc| + nc.on_error do |err| + errors << err + end + nc.on_close do |err| + closed = true + future.resume + end + + nc.subscribe("foo", queue: "worker") do |msg, reply| + nc.publish(reply, "ACK:foo") + end + + nc.subscribe("bar") do |msg, reply| + nc.publish(reply, "ACK:bar") + end + + nc.subscribe("quux") do |msg, reply| + nc.publish(reply, "ACK:quux") + end + + EM.add_timer(1) do + # Before draining + subs = nc.instance_variable_get('@subs') + pending_data_before_draining = nc.instance_variable_get('@buf') + total_msgs_before_drain = subs.reduce(0) do |total, pair| + sid, sub = pair + total += sub[:received] + end + + nc.drain do + after_drain = nc.draining? + drained = true + + pending_data_after_draining = nc.instance_variable_get('@buf') + subs = nc.instance_variable_get('@subs') + total_msgs_after_drain = subs.reduce(0) do |total, pair| + sid, sub = pair + total += sub[:received] + end + end + end + end + + # Fast publisher + nc2 = NATS.connect(uri: @s.uri) do |nc| + inbox = NATS.create_inbox + nc.subscribe(inbox) do |msg| + msgs << msg + end + + timer = EM.add_periodic_timer(0.1) do + 10000.times do + nc.publish("foo", "hi", inbox) + nc.publish("bar", "hi", inbox) + nc.publish("quux", "hi", inbox) + end + end + EM.add_timer(1) do + EM.cancel_timer(timer) + end + end + end + expect(errors.count).to eql(1) + expect(errors.first).to be_a(NATS::ClientError) + expect(errors.first.to_s).to eql("Drain Timeout") + expect(closed).to eql(true) + expect(drained).to eql(true) + expect(after_drain).to eql(false) + expect(total_msgs_before_drain < total_msgs_after_drain).to eql(true) + expect(pending_data_after_draining).to eql(nil) + end +end From 5e111553ee43321bbf413b8fc5629983f3a2abc2 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Thu, 30 Aug 2018 16:26:43 -0700 Subject: [PATCH 3/3] Wait for pending outbound data to be flushed on drain Also disallow publishing once subs have been drained Signed-off-by: Waldemar Quevedo --- lib/nats/client.rb | 12 +- spec/client/client_drain_spec.rb | 335 ++++++++++++++++++++++++++-- spec/client/client_requests_spec.rb | 2 +- 3 files changed, 325 insertions(+), 24 deletions(-) diff --git a/lib/nats/client.rb b/lib/nats/client.rb index 3b365c4e..19df3a24 100644 --- a/lib/nats/client.rb +++ b/lib/nats/client.rb @@ -481,6 +481,7 @@ def initialize(options) # Drain mode @draining = false + @drained_subs = false send_connect_command end @@ -490,7 +491,7 @@ def initialize(options) # @param [String] opt_reply # @param [Block] blk, closure called when publish has been processed by the server. def publish(subject, msg=EMPTY_MSG, opt_reply=nil, &blk) - return unless subject or draining? + return unless subject and not @drained_subs msg = msg.to_s # Accounting @@ -510,7 +511,7 @@ def publish(subject, msg=EMPTY_MSG, opt_reply=nil, &blk) # @param [Block] callback, called when a message is delivered. # @return [Object] sid, Subject Identifier def subscribe(subject, opts={}, &callback) - return unless subject + return unless subject and not draining? sid = (@ssid += 1) sub = @subs[sid] = { :subject => subject, :callback => callback, :received => 0 } sub[:queue] = opts[:queue] if opts[:queue] @@ -525,6 +526,7 @@ def subscribe(subject, opts={}, &callback) # @param [Object] sid # @param [Number] opt_max, optional number of responses to receive before auto-unsubscribing def unsubscribe(sid, opt_max=nil) + return if draining? opt_max_str = " #{opt_max}" unless opt_max.nil? send_command("UNSUB #{sid}#{opt_max_str}#{CR_LF}") return unless sub = @subs[sid] @@ -559,8 +561,12 @@ def drain(&blk) # Periodically check for the pending data to be empty. draining_timer = EM.add_periodic_timer(0.1) do next unless closing? or @buf.nil? or @buf.empty? - EM.cancel_timer(drain_timeout_timer) + + # Subscriptions have been drained already so disallow publishing. + @drained_subs = true + next unless pending_data_size == 0 EM.cancel_timer(draining_timer) + EM.cancel_timer(drain_timeout_timer) # We're done draining and can close now. @draining = false diff --git a/spec/client/client_drain_spec.rb b/spec/client/client_drain_spec.rb index 4fa4c3f3..17b91d69 100644 --- a/spec/client/client_drain_spec.rb +++ b/spec/client/client_drain_spec.rb @@ -19,8 +19,11 @@ after_drain = nil total_msgs_before_drain = nil total_msgs_after_drain = nil + total_msgs_sent = nil pending_data_before_draining = nil pending_data_after_draining = nil + pending_outbound_data_before_draining = nil + pending_outbound_data_after_draining = nil with_em_timeout(10) do |future| nc1 = NATS.connect(uri: @s.uri) do |nc| @@ -47,23 +50,18 @@ EM.add_timer(1) do # Before draining - subs = nc.instance_variable_get('@subs') pending_data_before_draining = nc.instance_variable_get('@buf') - total_msgs_before_drain = subs.reduce(0) do |total, pair| - sid, sub = pair - total += sub[:received] - end + pending_outbound_data_before_draining = nc.pending_data_size + total_msgs_before_drain = nc.msgs_received nc.drain do after_drain = nc.draining? drained = true + total_msgs_sent = nc.msgs_sent + total_msgs_after_drain = nc.msgs_received pending_data_after_draining = nc.instance_variable_get('@buf') - subs = nc.instance_variable_get('@subs') - total_msgs_after_drain = subs.reduce(0) do |total, pair| - sid, sub = pair - total += sub[:received] - end + pending_outbound_data_after_draining = nc.pending_data_size end end end @@ -87,6 +85,12 @@ end end end + + # Should be the same as the messages received by the first client. + expect(msgs.count).to eql(total_msgs_sent) + expect(msgs.count).to eql(total_msgs_after_drain) + expect(pending_outbound_data_after_draining).to eql(0) + expect(pending_data_after_draining).to eql(nil) expect(errors.count).to eql(0) expect(closed).to eql(true) expect(drained).to eql(true) @@ -103,8 +107,11 @@ after_drain = nil total_msgs_before_drain = nil total_msgs_after_drain = nil + total_msgs_sent = nil pending_data_before_draining = nil pending_data_after_draining = nil + pending_outbound_data_before_draining = nil + pending_outbound_data_after_draining = nil with_em_timeout(10) do |future| # Use a very short timeout for to timeout. @@ -130,24 +137,17 @@ end EM.add_timer(1) do - # Before draining subs = nc.instance_variable_get('@subs') pending_data_before_draining = nc.instance_variable_get('@buf') - total_msgs_before_drain = subs.reduce(0) do |total, pair| - sid, sub = pair - total += sub[:received] - end + total_msgs_before_drain = nc.msgs_received nc.drain do after_drain = nc.draining? drained = true pending_data_after_draining = nc.instance_variable_get('@buf') - subs = nc.instance_variable_get('@subs') - total_msgs_after_drain = subs.reduce(0) do |total, pair| - sid, sub = pair - total += sub[:received] - end + total_msgs_after_drain = nc.msgs_received + total_msgs_sent = nc.msgs_sent end end end @@ -180,4 +180,299 @@ expect(total_msgs_before_drain < total_msgs_after_drain).to eql(true) expect(pending_data_after_draining).to eql(nil) end + + it "should disallow subscribe and unsubscribe while draining" do + msgs = [] + errors = [] + closed = false + drained = false + after_drain = nil + total_msgs_before_drain = nil + total_msgs_after_drain = nil + total_msgs_sent = nil + pending_data_before_draining = nil + pending_data_after_draining = nil + pending_outbound_data_before_draining = nil + pending_outbound_data_after_draining = nil + unsub_result = true + + no_more_subs = nil + with_em_timeout(10) do |future| + nc1 = NATS.connect(uri: @s.uri) do |nc| + expect(nc.options[:drain_timeout]).to eql(30) + nc.on_error do |err| + errors << err + end + nc.on_close do |err| + closed = true + future.resume + end + + nc.subscribe("foo", queue: "worker") do |msg, reply| + nc.publish(reply, "ACK:foo") + end + + nc.subscribe("bar") do |msg, reply| + nc.publish(reply, "ACK:bar") + end + + nc.subscribe("quux") do |msg, reply| + nc.publish(reply, "ACK:quux") + end + + sub_timer = EM.add_periodic_timer(0.1) do + sid = nc.subscribe("hello.#{rand(1_000_000)}") { } + + if sid.nil? + no_more_subs = true + EM.cancel_timer(sub_timer) + + # Any sid even if invalid should return right away + unsub_result = nc.unsubscribe(1) + end + end + + EM.add_timer(1) do + pending_data_before_draining = nc.instance_variable_get('@buf') + pending_outbound_data_before_draining = nc.pending_data_size + + total_msgs_before_drain = nc.msgs_received + nc.drain do + after_drain = nc.draining? + drained = true + + total_msgs_sent = nc.msgs_sent + total_msgs_after_drain = nc.msgs_received + pending_data_after_draining = nc.instance_variable_get('@buf') + pending_outbound_data_after_draining = nc.pending_data_size + end + end + end + + # Fast publisher + nc2 = NATS.connect(uri: @s.uri) do |nc| + inbox = NATS.create_inbox + nc.subscribe(inbox) do |msg| + msgs << msg + end + + timer = EM.add_periodic_timer(0.1) do + 10000.times do + nc.publish("foo", "hi", inbox) + nc.publish("bar", "hi", inbox) + nc.publish("quux", "hi", inbox) + end + end + EM.add_timer(1) do + EM.cancel_timer(timer) + end + end + end + + # Subscribe should have eventually failed + expect(no_more_subs).to eql(true) + expect(unsub_result).to eql(nil) + + # Should be the same as the messages received by the first client. + expect(msgs.count).to eql(total_msgs_sent) + expect(msgs.count).to eql(total_msgs_after_drain) + expect(pending_outbound_data_after_draining).to eql(0) + expect(pending_data_after_draining).to eql(nil) + expect(errors.count).to eql(0) + expect(closed).to eql(true) + expect(drained).to eql(true) + expect(after_drain).to eql(false) + expect(total_msgs_before_drain < total_msgs_after_drain).to eql(true) + expect(pending_data_after_draining).to eql(nil) + end + + it "should disallow publish and flush outbound pending data once subscriptions have been drained" do + msgs = [] + errors = [] + closed = false + drained = false + after_drain = nil + + total_msgs_received_before_drain = nil + total_msgs_received_after_drain = nil + total_msgs_sent = nil + + pending_data_before_draining = nil + pending_data_after_draining = nil + pending_outbound_data_before_draining = nil + pending_outbound_data_after_draining = nil + + before_publish = nil + after_publish = nil + extra_pubs = 0 + with_em_timeout(30) do |future| + nc1 = NATS.connect(uri: @s.uri) do |nc| + expect(nc.options[:drain_timeout]).to eql(30) + nc.on_error do |err| + errors << err + end + nc.on_close do |err| + closed = true + + # Give sometime to the other client to receive + # all the messages that were published by client + # that started to drain. + EM.add_timer(5) do + future.resume + end + end + + nc.subscribe("foo", queue: "worker") do |msg, reply| + 10.times { nc.publish(reply, "ACK:foo") } + end + + nc.subscribe("bar") do |msg, reply| + 10.times { nc.publish(reply, "ACK:bar") } + end + + nc.subscribe("quux") do |msg, reply| + 10.times { nc.publish(reply, "ACK:quux") } + end + + EM.add_timer(0.5) do + pub_timer = EM.add_periodic_timer(0.1) do + before_publish = nc.msgs_sent + nc.publish("hello", "world") + after_publish = nc.msgs_sent + if before_publish == after_publish + EM.cancel_timer(pub_timer) + else + extra_pubs += 1 + end + end + end + + EM.add_timer(1.5) do + pending_data_before_draining = nc.instance_variable_get('@buf') + pending_outbound_data_before_draining = nc.pending_data_size + + total_msgs_received_before_drain = nc.msgs_received + nc.drain do + after_drain = nc.draining? + drained = true + total_msgs_sent = nc.msgs_sent + total_msgs_received_after_drain = nc.msgs_received + pending_data_after_draining = nc.instance_variable_get('@buf') + pending_outbound_data_after_draining = nc.pending_data_size + end + end + end + + # Fast publisher + nc2 = NATS.connect(uri: @s.uri) do |nc| + inbox = NATS.create_inbox + nc.flush do + nc.subscribe(inbox) do |msg| + msgs << msg + end + end + + timer = EM.add_periodic_timer(0.2) do + 10000.times do + nc.publish("foo", "hi", inbox) + nc.publish("bar", "hi", inbox) + nc.publish("quux", "hi", inbox) + end + end + EM.add_timer(1) do + EM.cancel_timer(timer) + end + end + end + + # Should be the same as the messages received by the first client. + expect(msgs.count).to eql(total_msgs_sent-extra_pubs) + expect(msgs.count).to eql(total_msgs_received_after_drain*10) + expect(before_publish).to eql(after_publish) + + expect(pending_outbound_data_after_draining).to eql(0) if not pending_outbound_data_after_draining.nil? + expect(pending_data_after_draining).to eql(nil) + expect(errors.count).to eql(0) + expect(closed).to eql(true) + expect(drained).to eql(true) + expect(after_drain).to eql(false) + expect(pending_data_after_draining).to eql(nil) + end + + it "should support draining a connection with NATS.drain" do + msgs = [] + drained = false + after_drain = nil + total_msgs_before_drain = nil + total_msgs_after_drain = nil + total_msgs_sent = nil + pending_data_before_draining = nil + pending_data_after_draining = nil + pending_outbound_data_before_draining = nil + pending_outbound_data_after_draining = nil + + with_em_timeout(10) do + NATS.start(uri: @s.uri) do |nc| + expect(nc.options[:drain_timeout]).to eql(30) + + NATS.subscribe("foo", queue: "worker") do |msg, reply| + NATS.publish(reply, "ACK:foo") + end + + NATS.subscribe("bar") do |msg, reply| + NATS.publish(reply, "ACK:bar") + end + + NATS.subscribe("quux") do |msg, reply| + NATS.publish(reply, "ACK:quux") + end + + EM.add_timer(1) do + # Before draining + pending_data_before_draining = nc.instance_variable_get('@buf') + pending_outbound_data_before_draining = nc.pending_data_size + + total_msgs_before_drain = nc.msgs_received + NATS.drain do + after_drain = nc.draining? + drained = true + + total_msgs_sent = nc.msgs_sent + total_msgs_after_drain = nc.msgs_received + pending_data_after_draining = nc.instance_variable_get('@buf') + pending_outbound_data_after_draining = nc.pending_data_size + end + end + end + + # Fast publisher + NATS.connect(uri: @s.uri) do |nc| + inbox = NATS.create_inbox + nc.subscribe(inbox) do |msg| + msgs << msg + end + + timer = EM.add_periodic_timer(0.1) do + 10000.times do + nc.publish("foo", "hi", inbox) + nc.publish("bar", "hi", inbox) + nc.publish("quux", "hi", inbox) + end + end + EM.add_timer(1) do + EM.cancel_timer(timer) + end + end + end + + # Should be the same as the messages received by the first client. + expect(msgs.count).to eql(total_msgs_sent) + expect(msgs.count).to eql(total_msgs_after_drain) + expect(pending_outbound_data_after_draining).to eql(0) + expect(pending_data_after_draining).to eql(nil) + expect(drained).to eql(true) + expect(after_drain).to eql(false) + expect(total_msgs_before_drain < total_msgs_after_drain).to eql(true) + expect(pending_data_after_draining).to eql(nil) + end end diff --git a/spec/client/client_requests_spec.rb b/spec/client/client_requests_spec.rb index fd28aef0..1f4c08e5 100644 --- a/spec/client/client_requests_spec.rb +++ b/spec/client/client_requests_spec.rb @@ -11,7 +11,7 @@ @s.kill_server end - it 'should receive a responses using single subscription for requests' do + it 'should receive responses using single subscription for requests' do msgs = [] received = false nats = nil