Skip to content

Commit

Permalink
Revert "Reset any configured timeout before issuing blocking commands…
Browse files Browse the repository at this point in the history
… (for now blpop and brpop). Original timeout is restored after command has finished."

This reverts commit 4483f59.
  • Loading branch information
Ezra Zygmuntowicz committed Feb 13, 2010
1 parent 4483f59 commit 7dade41
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 79 deletions.
42 changes: 11 additions & 31 deletions lib/redis.rb
Expand Up @@ -120,11 +120,6 @@ class Redis
"sync" => true
}

BLOCKING_COMMANDS = {
"blpop" => true,
"brpop" => true
}

def initialize(options = {})
@host = options[:host] || '127.0.0.1'
@port = (options[:port] || 6379).to_i
Expand Down Expand Up @@ -173,7 +168,16 @@ def connect_to(host, port, timeout=nil)
# to make sure a blocking read will return after the specified number
# of seconds. This hack is from memcached ruby client.
if timeout
set_socket_timeout(sock, timeout)
secs = Integer(timeout)
usecs = Integer((timeout - secs) * 1_000_000)
optval = [secs, usecs].pack("l_2")
begin
sock.setsockopt Socket::SOL_SOCKET, Socket::SO_RCVTIMEO, optval
sock.setsockopt Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, optval
rescue Exception => ex
# Solaris, for one, does not like/support socket timeouts.
@logger.info "Unable to use raw socket timeouts: #{ex.class.name}: #{ex.message}" if @logger
end
end
sock
end
Expand Down Expand Up @@ -242,14 +246,11 @@ def raw_call_command(argvp)
end

def process_command(command, argvv)
set_socket_timeout(@sock, 0) if requires_timeout_reset?(command)
@sock.write(command)
argvv.map do |argv|
processor = REPLY_PROCESSOR[argv[0]]
processor ? processor.call(read_reply) : read_reply
end
ensure
set_socket_timeout(@sock, @timeout) if requires_timeout_reset?(command)
end

def maybe_lock(&block)
Expand Down Expand Up @@ -366,28 +367,7 @@ def read_reply
end

private
def requires_timeout_reset?(command)
blocking?(command) && @timeout
end

def blocking?(command)
command.match(/^(#{BLOCKING_COMMANDS.keys.join("|")})/)
end

def get_size(string)
string.respond_to?(:bytesize) ? string.bytesize : string.size
end

def set_socket_timeout(sock, timeout)
secs = Integer(timeout)
usecs = Integer((timeout - secs) * 1_000_000)
optval = [secs, usecs].pack("l_2")
begin
sock.setsockopt Socket::SOL_SOCKET, Socket::SO_RCVTIMEO, optval
sock.setsockopt Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, optval
rescue Exception => ex
# Solaris, for one, does not like/support socket timeouts.
@logger.info "Unable to use raw socket timeouts: #{ex.class.name}: #{ex.message}" if @logger
end
end
end
end
48 changes: 0 additions & 48 deletions spec/redis_spec.rb
Expand Up @@ -665,52 +665,4 @@ def ==(other)
r.connect_to_server
end

it "should be able to block on the tail of a list (BLPOP)" do
@r.lpush('blocking_queue', 'message')
@r.lpush('blocking_queue', 'another_message')
pid = Kernel.fork do
r = Redis.new(:db => 15)
sleep 0.3
r.lpush('blocking_queue', 'waiting_message')
end
@r.blpop('blocking_queue', 0.1).should == ['blocking_queue', 'another_message']
@r.blpop('blocking_queue', 0.1).should == ['blocking_queue', 'message']
@r.blpop('blocking_queue', 0.4).should == ['blocking_queue', 'waiting_message']
Process.wait(pid)
end

it "should be able to block on the head of a list (BRPOP)" do
@r.rpush('blocking_queue', 'message')
@r.rpush('blocking_queue', 'another_message')
pid = Kernel.fork do
r = Redis.new(:db => 15)
sleep 0.3
r.rpush('blocking_queue', 'waiting_message')
end
@r.brpop('blocking_queue', 0.1).should == ['blocking_queue', 'another_message']
@r.brpop('blocking_queue', 0.1).should == ['blocking_queue', 'message']
@r.brpop('blocking_queue', 0.4).should == ['blocking_queue', 'waiting_message']
Process.wait(pid)
end

it "should unset a configured timeout when using a blocking command" do
@r = Redis.new(:timeout => 1)
lambda {
@r.brpop('blocking_key', 2)
}.should_not raise_error(Errno::EAGAIN)
end

it "should restore the timeout after the blocking command was run" do
@r.should_receive(:set_socket_timeout).with(instance_of(TCPSocket), 0)
@r.should_receive(:set_socket_timeout).with(instance_of(TCPSocket), 5)
@r.brpop('blocking_key', 1)
end

it "should restore the timeout even if the command failed" do
@r.should_receive(:set_socket_timeout).with(instance_of(TCPSocket), 0)
@r.should_receive(:set_socket_timeout).with(instance_of(TCPSocket), 5)
lambda {
@r.brpop('blocking_key', nil)
}.should raise_error
end
end

0 comments on commit 7dade41

Please sign in to comment.