Skip to content

Commit

Permalink
Inactivity timeout support
Browse files Browse the repository at this point in the history
  • Loading branch information
mdpye committed Jul 15, 2014
1 parent 6ffdcc6 commit eda484f
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 1 deletion.
42 changes: 42 additions & 0 deletions lib/em-hiredis/base_client.rb
Expand Up @@ -25,6 +25,8 @@ def initialize(host = 'localhost', port = 6379, password = nil, db = nil)
@reconnect_timer = nil
@failed = false

@inactive_seconds = 0

self.on(:failed) {
@failed = true
@command_queue.each do |df, _, _|
Expand Down Expand Up @@ -72,6 +74,7 @@ def connect
@connection = EM.connect(@host, @port, Connection, @host, @port)

@connection.on(:closed) do
cancel_inactivity_checks
if @connected
@defs.each { |d| d.fail(Error.new("Redis disconnected")) }
@defs = []
Expand Down Expand Up @@ -115,6 +118,8 @@ def connect
end
@command_queue = []

schedule_inactivity_checks

emit(:connected)
EM::Hiredis.logger.info("#{@connection} Connected")
succeed
Expand All @@ -133,6 +138,7 @@ def connect
error.redis_error = reply
deferred.fail(error) if deferred
else
@inactive_seconds = 0
handle_reply(reply)
end
end
Expand Down Expand Up @@ -181,6 +187,21 @@ def reconnect_connection
reconnect
end

# Starts an inactivity checker which will ping redis if nothing has been
# heard on the connection for `trigger_secs` seconds and forces a reconnect
# after a further `response_timeout` seconds if we still don't hear anything.
def configure_inactivity_check(trigger_secs, response_timeout)
raise ArgumentError('trigger_secs must be > 0') unless trigger_secs.to_i > 0
raise ArgumentError('response_timeout must be > 0') unless response_timeout.to_i > 0

@inactivity_trigger_secs = trigger_secs.to_i
@inactivity_response_timeout = response_timeout.to_i

# Start the inactivity check now only if we're already conected, otherwise
# the connected event will schedule it.
schedule_inactivity_checks if @connected
end

private

def method_missing(sym, *args)
Expand All @@ -206,6 +227,27 @@ def reconnect
EM::Hiredis.logger.info("#{@connection} Reconnecting")
end

def cancel_inactivity_checks
EM.cancel_timer(@inactivity_timer) if @inactivity_timer
@inactivity_timer = nil
end

def schedule_inactivity_checks
if @inactivity_trigger_secs
@inactive_seconds = 0
@inactivity_timer = EM.add_periodic_timer(1) {
@inactive_seconds += 1
if @inactive_seconds > @inactivity_trigger_secs + @inactivity_response_timeout
EM::Hiredis.logger.error "#{@connection} No response to ping, triggering reconnect"
reconnect!
elsif @inactive_seconds > @inactivity_trigger_secs
EM::Hiredis.logger.debug "#{@connection} Connection inactive, triggering ping"
ping
end
}
end
end

def handle_reply(reply)
if @defs.empty?
if @monitoring
Expand Down
17 changes: 16 additions & 1 deletion lib/em-hiredis/pubsub_client.rb
Expand Up @@ -2,6 +2,8 @@ module EventMachine::Hiredis
class PubsubClient < BaseClient
PUBSUB_MESSAGES = %w{message pmessage subscribe unsubscribe psubscribe punsubscribe}.freeze

PING_CHANNEL = '__em-hiredis-ping'

def initialize(host='localhost', port='6379', password=nil, db=nil)
@subs, @psubs = [], []
@pubsub_defs = Hash.new { |h,k| h[k] = [] }
Expand Down Expand Up @@ -125,7 +127,20 @@ def punsubscribe_proc(pattern, proc)
end
return df
end


# Pubsub connections to not support even the PING command, but it is useful,
# especially with read-only connections like pubsub, to be able to check that
# the TCP connection is still usefully alive.
#
# This is not particularly elegant, but it's probably the best we can do
# for now. Ping support for pubsub connections is being considerred:
# https://github.com/antirez/redis/issues/420
def ping
subscribe(PING_CHANNEL).callback {
unsubscribe(PING_CHANNEL)
}
end

private

# Send a command to redis without adding a deferrable for it. This is
Expand Down

0 comments on commit eda484f

Please sign in to comment.