Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Drain mode #157

Merged
merged 3 commits into from
Aug 31, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 65 additions & 3 deletions lib/nats/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ module NATS
DEFAULT_PING_INTERVAL = 120
DEFAULT_PING_MAX = 2

# Drain mode support
DEFAULT_DRAIN_TIMEOUT = 30

# Protocol
# @private
MSG = /\AMSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\r\n/i #:nodoc:
Expand Down Expand Up @@ -161,6 +164,7 @@ def connect(uri=nil, opts={}, &blk)
opts[:reconnect_time_wait] = RECONNECT_TIME_WAIT if opts[:reconnect_time_wait].nil?
opts[:ping_interval] = DEFAULT_PING_INTERVAL if opts[:ping_interval].nil?
opts[:max_outstanding_pings] = DEFAULT_PING_MAX if opts[:max_outstanding_pings].nil?
opts[:drain_timeout] = DEFAULT_DRAIN_TIMEOUT if opts[:drain_timeout].nil?

# Override with ENV
opts[:uri] ||= ENV['NATS_URI'] || DEFAULT_URI
Expand All @@ -176,6 +180,7 @@ def connect(uri=nil, opts={}, &blk)
opts[:no_echo] ||= ENV['NATS_NO_ECHO'] || false
opts[:ping_interval] = ENV['NATS_PING_INTERVAL'].to_i unless ENV['NATS_PING_INTERVAL'].nil?
opts[:max_outstanding_pings] = ENV['NATS_MAX_OUTSTANDING_PINGS'].to_i unless ENV['NATS_MAX_OUTSTANDING_PINGS'].nil?
opts[:drain_timeout] ||= ENV['NATS_DRAIN_TIMEOUT'].to_i unless ENV['NATS_DRAIN_TIMEOUT'].nil?

uri = opts[:uris] || opts[:servers] || opts[:uri]

Expand Down Expand Up @@ -254,6 +259,16 @@ def stop(&blk)
@disconnect_cb = nil
end

# Drain gracefully disconnects from the server, letting
# subscribers process pending messages already sent by server and
# optionally calls the associated block.
# @param [Block] &blk called when drain is done and connection is closed.
def drain(&blk)
if (client and !client.draining? and (client.connected? || client.reconnecting?))
client.drain { blk.call if blk }
end
end

# @return [URI] Connected server
def connected_server
return nil unless client
Expand All @@ -272,6 +287,12 @@ def reconnecting?
client.reconnecting?
end

# @return [Boolean] Draining state
def draining?
return false unless client
client.draining?
end

# @return [Hash] Options
def options
return {} unless client
Expand Down Expand Up @@ -420,13 +441,14 @@ def process_uri(uris)
end

attr_reader :connected, :connect_cb, :err_cb, :err_cb_overridden, :pongs_received #:nodoc:
attr_reader :closing, :reconnecting, :server_pool, :options, :server_info #:nodoc
attr_reader :closing, :reconnecting, :draining, :server_pool, :options, :server_info #:nodoc
attr_reader :msgs_received, :msgs_sent, :bytes_received, :bytes_sent, :pings
attr_reader :disconnect_cb, :close_cb

alias :connected? :connected
alias :closing? :closing
alias :reconnecting? :reconnecting
alias :draining? :draining

def initialize(options)
@options = options
Expand Down Expand Up @@ -457,6 +479,8 @@ def initialize(options)
@resp_sub_prefix = nil
@nuid = NATS::NUID.new

# Drain mode
@draining = false
send_connect_command
end

Expand All @@ -466,7 +490,7 @@ def initialize(options)
# @param [String] opt_reply
# @param [Block] blk, closure called when publish has been processed by the server.
def publish(subject, msg=EMPTY_MSG, opt_reply=nil, &blk)
return unless subject
return unless subject or draining?
msg = msg.to_s

# Accounting
Expand Down Expand Up @@ -508,6 +532,44 @@ def unsubscribe(sid, opt_max=nil)
@subs.delete(sid) unless (sub[:max] && (sub[:received] < sub[:max]))
end

# Drain gracefully closes the connection.
# @param [Block] blk called when drain is done and connection is closed.
def drain(&blk)
return if draining? or closing?
@draining = true

# Remove interest in all subjects to stop receiving messages.
@subs.each do |sid, _|
send_command("UNSUB #{sid} #{CR_LF}")
end

# Roundtrip to ensure no more messages are received.
flush do
drain_timeout_timer, draining_timer = nil, nil
drain_timeout_timer = EM.add_timer(options[:drain_timeout]) do
EM.cancel_timer(draining_timer)

# Report the timeout via the error callback and just close
err_cb.call(NATS::ClientError.new("Drain Timeout"))
@draining = false
close unless closing?
blk.call if blk
end

# Periodically check for the pending data to be empty.
draining_timer = EM.add_periodic_timer(0.1) do
next unless closing? or @buf.nil? or @buf.empty?
EM.cancel_timer(drain_timeout_timer)
EM.cancel_timer(draining_timer)

# We're done draining and can close now.
@draining = false
close unless closing?
blk.call if blk
end
end
end

# Return the active subscription count.
# @return [Number]
def subscription_count
Expand Down Expand Up @@ -776,6 +838,7 @@ def flush_pending #:nodoc:

def receive_data(data) #:nodoc:
@buf = @buf ? @buf << data : data

while (@buf)
case @parse_state
when AWAITING_CONTROL_LINE
Expand Down Expand Up @@ -823,7 +886,6 @@ def receive_data(data) #:nodoc:
@parse_state = AWAITING_CONTROL_LINE
@buf = nil if (@buf && @buf.empty?)
end

end
end

Expand Down
1 change: 0 additions & 1 deletion spec/client/client_connect_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
msgs_b = []
with_em_timeout do
NATS.on_error do |e|
p e
errors << e
end

Expand Down
183 changes: 183 additions & 0 deletions spec/client/client_drain_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
require 'spec_helper'

describe 'Client - Drain' do

before(:each) do
@s = NatsServerControl.new
@s.start_server(true)
end

after(:each) do
@s.kill_server
end

it "should support draining a connection" do
msgs = []
errors = []
closed = false
drained = false
after_drain = nil
total_msgs_before_drain = nil
total_msgs_after_drain = nil
pending_data_before_draining = nil
pending_data_after_draining = nil

with_em_timeout(10) do |future|
nc1 = NATS.connect(uri: @s.uri) do |nc|
expect(nc.options[:drain_timeout]).to eql(30)
nc.on_error do |err|
errors << err
end
nc.on_close do |err|
closed = true
future.resume
end

nc.subscribe("foo", queue: "worker") do |msg, reply|
nc.publish(reply, "ACK:foo")
end

nc.subscribe("bar") do |msg, reply|
nc.publish(reply, "ACK:bar")
end

nc.subscribe("quux") do |msg, reply|
nc.publish(reply, "ACK:quux")
end

EM.add_timer(1) do
# Before draining
subs = nc.instance_variable_get('@subs')
pending_data_before_draining = nc.instance_variable_get('@buf')
total_msgs_before_drain = subs.reduce(0) do |total, pair|
sid, sub = pair
total += sub[:received]
end

nc.drain do
after_drain = nc.draining?
drained = true

pending_data_after_draining = nc.instance_variable_get('@buf')
subs = nc.instance_variable_get('@subs')
total_msgs_after_drain = subs.reduce(0) do |total, pair|
sid, sub = pair
total += sub[:received]
end
end
end
end

# Fast publisher
nc2 = NATS.connect(uri: @s.uri) do |nc|
inbox = NATS.create_inbox
nc.subscribe(inbox) do |msg|
msgs << msg
end

timer = EM.add_periodic_timer(0.1) do
10000.times do
nc.publish("foo", "hi", inbox)
nc.publish("bar", "hi", inbox)
nc.publish("quux", "hi", inbox)
end
end
EM.add_timer(1) do
EM.cancel_timer(timer)
end
end
end
expect(errors.count).to eql(0)
expect(closed).to eql(true)
expect(drained).to eql(true)
expect(after_drain).to eql(false)
expect(total_msgs_before_drain < total_msgs_after_drain).to eql(true)
expect(pending_data_after_draining).to eql(nil)
end

it "should timeout draining if takes too long" do
msgs = []
errors = []
closed = false
drained = false
after_drain = nil
total_msgs_before_drain = nil
total_msgs_after_drain = nil
pending_data_before_draining = nil
pending_data_after_draining = nil

with_em_timeout(10) do |future|
# Use a very short timeout for to timeout.
nc1 = NATS.connect(uri: @s.uri, drain_timeout: 0.01) do |nc|
nc.on_error do |err|
errors << err
end
nc.on_close do |err|
closed = true
future.resume
end

nc.subscribe("foo", queue: "worker") do |msg, reply|
nc.publish(reply, "ACK:foo")
end

nc.subscribe("bar") do |msg, reply|
nc.publish(reply, "ACK:bar")
end

nc.subscribe("quux") do |msg, reply|
nc.publish(reply, "ACK:quux")
end

EM.add_timer(1) do
# Before draining
subs = nc.instance_variable_get('@subs')
pending_data_before_draining = nc.instance_variable_get('@buf')
total_msgs_before_drain = subs.reduce(0) do |total, pair|
sid, sub = pair
total += sub[:received]
end

nc.drain do
after_drain = nc.draining?
drained = true

pending_data_after_draining = nc.instance_variable_get('@buf')
subs = nc.instance_variable_get('@subs')
total_msgs_after_drain = subs.reduce(0) do |total, pair|
sid, sub = pair
total += sub[:received]
end
end
end
end

# Fast publisher
nc2 = NATS.connect(uri: @s.uri) do |nc|
inbox = NATS.create_inbox
nc.subscribe(inbox) do |msg|
msgs << msg
end

timer = EM.add_periodic_timer(0.1) do
10000.times do
nc.publish("foo", "hi", inbox)
nc.publish("bar", "hi", inbox)
nc.publish("quux", "hi", inbox)
end
end
EM.add_timer(1) do
EM.cancel_timer(timer)
end
end
end
expect(errors.count).to eql(1)
expect(errors.first).to be_a(NATS::ClientError)
expect(errors.first.to_s).to eql("Drain Timeout")
expect(closed).to eql(true)
expect(drained).to eql(true)
expect(after_drain).to eql(false)
expect(total_msgs_before_drain < total_msgs_after_drain).to eql(true)
expect(pending_data_after_draining).to eql(nil)
end
end