Skip to content

Commit

Permalink
Merge pull request #27 from nats-io/thread-per-async-sub
Browse files Browse the repository at this point in the history
Add delivery thread per async subscriber
  • Loading branch information
wallyqs committed Mar 15, 2018
2 parents d066651 + fb48c24 commit 580ea5a
Show file tree
Hide file tree
Showing 7 changed files with 342 additions and 23 deletions.
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
nats-pure (0.3.0)
nats-pure (0.4.0)

GEM
remote: https://rubygems.org/
Expand Down
103 changes: 89 additions & 14 deletions lib/nats/io/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ module IO
DEFAULT_CONNECT_TIMEOUT = 2
DEFAULT_READ_WRITE_TIMEOUT = 2

# Default Pending Limits
DEFAULT_SUB_PENDING_MSGS_LIMIT = 65536
DEFAULT_SUB_PENDING_BYTES_LIMIT = 65536 * 1024

CR_LF = ("\r\n".freeze)
CR_LF_SIZE = (CR_LF.bytesize)

Expand Down Expand Up @@ -84,6 +88,9 @@ class SocketTimeoutError < Error; end
# When we use an invalid subject.
class BadSubject < Error; end

# When a subscription hits the pending messages limit.
class SlowConsumer < Error; end

class Client
include MonitorMixin

Expand Down Expand Up @@ -286,18 +293,54 @@ def subscribe(subject, opts={}, &callback)
sid = (@ssid += 1)
sub = @subs[sid] = Subscription.new
end
opts[:pending_msgs_limit] ||= DEFAULT_SUB_PENDING_MSGS_LIMIT
opts[:pending_bytes_limit] ||= DEFAULT_SUB_PENDING_BYTES_LIMIT

sub.subject = subject
sub.callback = callback
sub.received = 0
sub.queue = opts[:queue] if opts[:queue]
sub.max = opts[:max] if opts[:max]
sub.pending_msgs_limit = opts[:pending_msgs_limit]
sub.pending_bytes_limit = opts[:pending_bytes_limit]
sub.pending_queue = SizedQueue.new(sub.pending_msgs_limit)

send_command("SUB #{subject} #{opts[:queue]} #{sid}#{CR_LF}")
@flush_queue << :sub

# Setup server support for auto-unsubscribe when receiving enough messages
unsubscribe(sid, opts[:max]) if opts[:max]

# Async subscriptions each own a single thread for the
# delivery of messages.
# FIXME: Support shared thread pool with configurable limits
# to better support case of having a lot of subscriptions.
sub.wait_for_msgs_t = Thread.new do
loop do
msg = sub.pending_queue.pop

cb = nil
sub.synchronize do
# Decrease pending size since consumed already
sub.pending_size -= msg.data.size
cb = sub.callback
end

begin
case cb.arity
when 0 then cb.call
when 1 then cb.call(msg.data)
when 2 then cb.call(msg.data, msg.reply)
else cb.call(msg.data, msg.reply, msg.subject)
end
rescue => e
synchronize do
@err_cb.call(e) if @err_cb
end
end
end
end

sid
end

Expand Down Expand Up @@ -369,6 +412,12 @@ def unsubscribe(sid, opt_max=nil)
synchronize do
sub.max = opt_max
@subs.delete(sid) unless (sub.max && (sub.received < sub.max))

# Stop messages delivery thread for async subscribers
if sub.wait_for_msgs_t && sub.wait_for_msgs_t.alive?
sub.wait_for_msgs_t.exit
sub.pending_queue.clear
end
end
end

Expand Down Expand Up @@ -446,9 +495,11 @@ def process_msg(subject, sid, reply, data)
synchronize { sub = @subs[sid] }
return unless sub

# Check for auto_unsubscribe
sc = nil
sub.synchronize do
sub.received += 1

# Check for auto_unsubscribe
if sub.max
case
when sub.received > sub.max
Expand All @@ -469,20 +520,25 @@ def process_msg(subject, sid, reply, data)
future.signal

return
elsif sub.callback
# Async subscribers use a sized queue for processing
# and should be able to consume messages in parallel.
if sub.pending_queue.size >= sub.pending_msgs_limit \
or sub.pending_size >= sub.pending_bytes_limit then
sc = SlowConsumer.new("nats: slow consumer, messages dropped")
else
# Only dispatch message when sure that it would not block
# the main read loop from the parser.
sub.pending_queue << Msg.new(subject, reply, data)
sub.pending_size += data.size
end
end
end

# Distinguish between async subscriptions with callbacks
# and request subscriptions which expect a single response.
if sub.callback
cb = sub.callback
case cb.arity
when 0 then cb.call
when 1 then cb.call(data)
when 2 then cb.call(data, reply)
else cb.call(data, reply, subject)
end
end
synchronize do
@last_err = sc
@err_cb.call(sc) if @err_cb
end if sc
end

def process_info(line)
Expand Down Expand Up @@ -998,7 +1054,15 @@ def close_connection(conn_status, do_cbs=true)
@err_cb.call(e) if @err_cb
end if should_flush

# TODO: Destroy any remaining subscriptions
# Destroy any remaining subscriptions.
@subs.each do |_, sub|
if sub.wait_for_msgs_t && sub.wait_for_msgs_t.alive?
sub.wait_for_msgs_t.exit
sub.pending_queue.clear
end
end
@subs.clear

if do_cbs
@disconnect_cb.call(@last_err) if @disconnect_cb
@close_cb.call if @close_cb
Expand Down Expand Up @@ -1195,7 +1259,9 @@ def connect_addrinfo(ai, port, timeout)
class Subscription
include MonitorMixin

attr_accessor :subject, :queue, :future, :callback, :response, :received, :max
attr_accessor :subject, :queue, :future, :callback, :response, :received, :max, :pending
attr_accessor :pending_queue, :pending_size, :wait_for_msgs_t, :is_slow_consumer
attr_accessor :pending_msgs_limit, :pending_bytes_limit

def initialize
super # required to initialize monitor
Expand All @@ -1206,6 +1272,15 @@ def initialize
@response = nil
@received = 0
@max = nil
@pending = nil

# State from async subscriber messages delivery
@pending_queue = nil
@pending_size = 0
@pending_msgs_limit = nil
@pending_bytes_limit = nil
@wait_for_msgs_t = nil
@is_slow_consumer = false
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/nats/io/version.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module NATS
module IO
# NOTE: These are all announced to the server on CONNECT
VERSION = "0.3.0"
VERSION = "0.4.0"
LANG = "#{RUBY_ENGINE}2".freeze
PROTOCOL = 1
end
Expand Down
9 changes: 6 additions & 3 deletions spec/client_cluster_reconnect_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@

nats = NATS::IO::Client.new
nats.connect({
servers: [@s1.uri, @s2.uri],
dont_randomize_servers: true
})
servers: [@s1.uri, @s2.uri],
dont_randomize_servers: true
})

disconnects = 0
nats.on_disconnect do |e|
Expand Down Expand Up @@ -179,6 +179,9 @@
case
when n == 100
nats.flush

# Wait a bit for all messages
sleep 0.5
expect(msgs.count).to eql(100)
@s1.kill_server
when (n % 100 == 0)
Expand Down
Loading

0 comments on commit 580ea5a

Please sign in to comment.