Skip to content

Commit

Permalink
Add dispatching slow consumer async errors
Browse files Browse the repository at this point in the history
  • Loading branch information
wallyqs committed Mar 15, 2018
1 parent 9bafe48 commit 28151d4
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 4 deletions.
20 changes: 18 additions & 2 deletions lib/nats/io/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ class SocketTimeoutError < Error; end
# When we use an invalid subject.
class BadSubject < Error; end

# When a subscription hits the pending messages limit.
class SlowConsumer < Error; end

class Client
include MonitorMixin

Expand Down Expand Up @@ -492,6 +495,7 @@ def process_msg(subject, sid, reply, data)
synchronize { sub = @subs[sid] }
return unless sub

sc = nil
sub.synchronize do
sub.received += 1

Expand Down Expand Up @@ -519,10 +523,22 @@ def process_msg(subject, sid, reply, data)
elsif sub.callback
# Async subscribers use a sized queue for processing
# and should be able to consume messages in parallel.
sub.pending_queue << Msg.new(subject, reply, data)
sub.pending_size += data.size
if sub.pending_queue.size == sub.pending_msgs_limit \
or sub.pending_size >= sub.pending_bytes_limit then
sc = SlowConsumer.new("nats: slow consumer, messages dropped")
else
# Only dispatch message when sure that it would not block
# the main read loop from the parser.
sub.pending_queue << Msg.new(subject, reply, data)
sub.pending_size += data.size
end
end
end

synchronize do
@last_err = sc
@err_cb.call(sc) if @err_cb
end if sc
end

def process_info(line)
Expand Down
114 changes: 112 additions & 2 deletions spec/client_errors_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class CustomError < StandardError; end
raise CustomError.new("NG!")
end

msgs << payload
msgs << payload
end

5.times do
Expand All @@ -146,7 +146,117 @@ class CustomError < StandardError; end
expect(disconnects.first).to be_nil
expect(closes).to eql(1)
expect(nats.closed?).to eql(true)
end
end

it 'should handle subscriptions with slow consumers as async errors when over pending msgs limit' do
nats = NATS::IO::Client.new
nats.connect(reconnect: false)

mon = Monitor.new
done = mon.new_cond

errors = []
nats.on_error do |e|
errors << e
end

disconnects = []
nats.on_disconnect do |e|
disconnects << e
end

closes = 0
nats.on_close do
closes += 1
mon.synchronize { done.signal }
end

msgs = []
nats.subscribe("hello", pending_msgs_limit: 5) do |payload|
msgs << payload
sleep 1 if msgs.count == 5
end

10.times do
nats.publish("hello")
end
nats.flush(1) rescue nil

# Wait a bit for subscriber to recover
sleep 2
3.times do |n|
nats.publish("hello", "ok-#{n}")
end
nats.flush(1) rescue nil

# Wait a bit to receive final messages
sleep 0.5

nats.close
mon.synchronize { done.wait(3) }
expect(msgs.count).to eql(8)
expect(errors.count).to eql(5)
expect(errors.first).to be_a(NATS::IO::SlowConsumer)
expect(disconnects.count).to eql(1)
expect(disconnects.first).to be_a(NATS::IO::SlowConsumer)
expect(closes).to eql(1)
expect(nats.closed?).to eql(true)
end

it 'should handle subscriptions with slow consumers as async errors when over pending size limit' do
nats = NATS::IO::Client.new
nats.connect(reconnect: false)

mon = Monitor.new
done = mon.new_cond

errors = []
nats.on_error do |e|
errors << e
end

disconnects = []
nats.on_disconnect do |e|
disconnects << e
end

closes = 0
nats.on_close do
closes += 1
mon.synchronize { done.signal }
end

data = ''
nats.subscribe("hello", pending_bytes_limit: 10) do |payload|
data += payload

sleep 1 if data.size == 10
end

20.times do
nats.publish("hello", 'A')
end
nats.flush(1) rescue nil

sleep 2
3.times do |n|
nats.publish("hello", 'B')
end
nats.flush(1) rescue nil

# Wait a bit to receive final messages
sleep 0.5

nats.close
mon.synchronize { done.wait(3) }
expect(data.size).to eql(13)
expect(errors.count).to eql(10)
expect(errors.first).to be_a(NATS::IO::SlowConsumer)
expect(disconnects.count).to eql(1)
expect(disconnects.first).to be_a(NATS::IO::SlowConsumer)
expect(closes).to eql(1)
expect(nats.closed?).to eql(true)
end

context 'against a server which is idle' do
before(:all) do
Expand Down

0 comments on commit 28151d4

Please sign in to comment.