Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Reset any configured timeout before issuing blocking commands (for no…

…w blpop and brpop). Original timeout is restored after command has finished.
  • Loading branch information...
commit 4483f5929524444206fb7f139156df1b1216e760 1 parent 2f243b7
@roidrage roidrage authored ezmobius committed
Showing with 79 additions and 11 deletions.
  1. +31 −11 lib/redis.rb
  2. +48 −0 spec/redis_spec.rb
View
42 lib/redis.rb
@@ -120,6 +120,11 @@ class Redis
"sync" => true
}
+ BLOCKING_COMMANDS = {
+ "blpop" => true,
+ "brpop" => true
+ }
+
def initialize(options = {})
@host = options[:host] || '127.0.0.1'
@port = (options[:port] || 6379).to_i
@@ -168,16 +173,7 @@ def connect_to(host, port, timeout=nil)
# to make sure a blocking read will return after the specified number
# of seconds. This hack is from memcached ruby client.
if timeout
- secs = Integer(timeout)
- usecs = Integer((timeout - secs) * 1_000_000)
- optval = [secs, usecs].pack("l_2")
- begin
- sock.setsockopt Socket::SOL_SOCKET, Socket::SO_RCVTIMEO, optval
- sock.setsockopt Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, optval
- rescue Exception => ex
- # Solaris, for one, does not like/support socket timeouts.
- @logger.info "Unable to use raw socket timeouts: #{ex.class.name}: #{ex.message}" if @logger
- end
+ set_socket_timeout(sock, timeout)
end
sock
end
@@ -246,11 +242,14 @@ def raw_call_command(argvp)
end
def process_command(command, argvv)
+ set_socket_timeout(@sock, 0) if requires_timeout_reset?(command)
@sock.write(command)
argvv.map do |argv|
processor = REPLY_PROCESSOR[argv[0]]
processor ? processor.call(read_reply) : read_reply
end
+ ensure
+ set_socket_timeout(@sock, @timeout) if requires_timeout_reset?(command)
end
def maybe_lock(&block)
@@ -367,7 +366,28 @@ def read_reply
end
private
+ def requires_timeout_reset?(command)
+ blocking?(command) && @timeout
+ end
+
+ def blocking?(command)
+ command.match(/^(#{BLOCKING_COMMANDS.keys.join("|")})/)
+ end
+
def get_size(string)
string.respond_to?(:bytesize) ? string.bytesize : string.size
end
-end
+
+ def set_socket_timeout(sock, timeout)
+ secs = Integer(timeout)
+ usecs = Integer((timeout - secs) * 1_000_000)
+ optval = [secs, usecs].pack("l_2")
+ begin
+ sock.setsockopt Socket::SOL_SOCKET, Socket::SO_RCVTIMEO, optval
+ sock.setsockopt Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, optval
+ rescue Exception => ex
+ # Solaris, for one, does not like/support socket timeouts.
+ @logger.info "Unable to use raw socket timeouts: #{ex.class.name}: #{ex.message}" if @logger
+ end
+ end
+end
View
48 spec/redis_spec.rb
@@ -665,4 +665,52 @@ def ==(other)
r.connect_to_server
end
+ it "should be able to block on the tail of a list (BLPOP)" do
+ @r.lpush('blocking_queue', 'message')
+ @r.lpush('blocking_queue', 'another_message')
+ pid = Kernel.fork do
+ r = Redis.new(:db => 15)
+ sleep 0.3
+ r.lpush('blocking_queue', 'waiting_message')
+ end
+ @r.blpop('blocking_queue', 0.1).should == ['blocking_queue', 'another_message']
+ @r.blpop('blocking_queue', 0.1).should == ['blocking_queue', 'message']
+ @r.blpop('blocking_queue', 0.4).should == ['blocking_queue', 'waiting_message']
+ Process.wait(pid)
+ end
+
+ it "should be able to block on the head of a list (BRPOP)" do
+ @r.rpush('blocking_queue', 'message')
+ @r.rpush('blocking_queue', 'another_message')
+ pid = Kernel.fork do
+ r = Redis.new(:db => 15)
+ sleep 0.3
+ r.rpush('blocking_queue', 'waiting_message')
+ end
+ @r.brpop('blocking_queue', 0.1).should == ['blocking_queue', 'another_message']
+ @r.brpop('blocking_queue', 0.1).should == ['blocking_queue', 'message']
+ @r.brpop('blocking_queue', 0.4).should == ['blocking_queue', 'waiting_message']
+ Process.wait(pid)
+ end
+
+ it "should unset a configured timeout when using a blocking command" do
+ @r = Redis.new(:timeout => 1)
+ lambda {
+ @r.brpop('blocking_key', 2)
+ }.should_not raise_error(Errno::EAGAIN)
+ end
+
+ it "should restore the timeout after the blocking command was run" do
+ @r.should_receive(:set_socket_timeout).with(instance_of(TCPSocket), 0)
+ @r.should_receive(:set_socket_timeout).with(instance_of(TCPSocket), 5)
+ @r.brpop('blocking_key', 1)
+ end
+
+ it "should restore the timeout even if the command failed" do
+ @r.should_receive(:set_socket_timeout).with(instance_of(TCPSocket), 0)
+ @r.should_receive(:set_socket_timeout).with(instance_of(TCPSocket), 5)
+ lambda {
+ @r.brpop('blocking_key', nil)
+ }.should raise_error
+ end
end
Please sign in to comment.
Something went wrong with that request. Please try again.