From 28151d46d98a19aa416e0c650fa8ab37fd4ba795 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Wed, 14 Mar 2018 18:20:48 -0700 Subject: [PATCH] Add dispatching slow consumer async errors --- lib/nats/io/client.rb | 20 ++++++- spec/client_errors_spec.rb | 114 ++++++++++++++++++++++++++++++++++++- 2 files changed, 130 insertions(+), 4 deletions(-) diff --git a/lib/nats/io/client.rb b/lib/nats/io/client.rb index f36adc4..d248e21 100644 --- a/lib/nats/io/client.rb +++ b/lib/nats/io/client.rb @@ -88,6 +88,9 @@ class SocketTimeoutError < Error; end # When we use an invalid subject. class BadSubject < Error; end + # When a subscription hits the pending messages limit. + class SlowConsumer < Error; end + class Client include MonitorMixin @@ -492,6 +495,7 @@ def process_msg(subject, sid, reply, data) synchronize { sub = @subs[sid] } return unless sub + sc = nil sub.synchronize do sub.received += 1 @@ -519,10 +523,22 @@ def process_msg(subject, sid, reply, data) elsif sub.callback # Async subscribers use a sized queue for processing # and should be able to consume messages in parallel. - sub.pending_queue << Msg.new(subject, reply, data) - sub.pending_size += data.size + if sub.pending_queue.size == sub.pending_msgs_limit \ + or sub.pending_size >= sub.pending_bytes_limit then + sc = SlowConsumer.new("nats: slow consumer, messages dropped") + else + # Only dispatch message when sure that it would not block + # the main read loop from the parser. + sub.pending_queue << Msg.new(subject, reply, data) + sub.pending_size += data.size + end end end + + synchronize do + @last_err = sc + @err_cb.call(sc) if @err_cb + end if sc end def process_info(line) diff --git a/spec/client_errors_spec.rb b/spec/client_errors_spec.rb index 18a3f21..50ba489 100644 --- a/spec/client_errors_spec.rb +++ b/spec/client_errors_spec.rb @@ -128,7 +128,7 @@ class CustomError < StandardError; end raise CustomError.new("NG!") end - msgs << payload + msgs << payload end 5.times do @@ -146,7 +146,117 @@ class CustomError < StandardError; end expect(disconnects.first).to be_nil expect(closes).to eql(1) expect(nats.closed?).to eql(true) - end + end + + it 'should handle subscriptions with slow consumers as async errors when over pending msgs limit' do + nats = NATS::IO::Client.new + nats.connect(reconnect: false) + + mon = Monitor.new + done = mon.new_cond + + errors = [] + nats.on_error do |e| + errors << e + end + + disconnects = [] + nats.on_disconnect do |e| + disconnects << e + end + + closes = 0 + nats.on_close do + closes += 1 + mon.synchronize { done.signal } + end + + msgs = [] + nats.subscribe("hello", pending_msgs_limit: 5) do |payload| + msgs << payload + sleep 1 if msgs.count == 5 + end + + 10.times do + nats.publish("hello") + end + nats.flush(1) rescue nil + + # Wait a bit for subscriber to recover + sleep 2 + 3.times do |n| + nats.publish("hello", "ok-#{n}") + end + nats.flush(1) rescue nil + + # Wait a bit to receive final messages + sleep 0.5 + + nats.close + mon.synchronize { done.wait(3) } + expect(msgs.count).to eql(8) + expect(errors.count).to eql(5) + expect(errors.first).to be_a(NATS::IO::SlowConsumer) + expect(disconnects.count).to eql(1) + expect(disconnects.first).to be_a(NATS::IO::SlowConsumer) + expect(closes).to eql(1) + expect(nats.closed?).to eql(true) + end + + it 'should handle subscriptions with slow consumers as async errors when over pending size limit' do + nats = NATS::IO::Client.new + nats.connect(reconnect: false) + + mon = Monitor.new + done = mon.new_cond + + errors = [] + nats.on_error do |e| + errors << e + end + + disconnects = [] + nats.on_disconnect do |e| + disconnects << e + end + + closes = 0 + nats.on_close do + closes += 1 + mon.synchronize { done.signal } + end + + data = '' + nats.subscribe("hello", pending_bytes_limit: 10) do |payload| + data += payload + + sleep 1 if data.size == 10 + end + + 20.times do + nats.publish("hello", 'A') + end + nats.flush(1) rescue nil + + sleep 2 + 3.times do |n| + nats.publish("hello", 'B') + end + nats.flush(1) rescue nil + + # Wait a bit to receive final messages + sleep 0.5 + + nats.close + mon.synchronize { done.wait(3) } + expect(data.size).to eql(13) + expect(errors.count).to eql(10) + expect(errors.first).to be_a(NATS::IO::SlowConsumer) + expect(disconnects.count).to eql(1) + expect(disconnects.first).to be_a(NATS::IO::SlowConsumer) + expect(closes).to eql(1) + expect(nats.closed?).to eql(true) + end context 'against a server which is idle' do before(:all) do