Skip to content

Commit

Permalink
Merge pull request #295 from tmm1/concurrent-multi-get
Browse files Browse the repository at this point in the history
Concurrent get_multi
  • Loading branch information
mperham committed Dec 5, 2012
2 parents 3fe871e + 93f9dee commit 0f3ad9e
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 13 deletions.
55 changes: 49 additions & 6 deletions lib/dalli/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ def initialize(servers=nil, options={})
@servers = servers || env_servers || '127.0.0.1:11211'
@options = normalize_options(options)
@ring = nil
@servers_in_use = nil
end

#
Expand Down Expand Up @@ -62,7 +61,7 @@ def get_multi(*keys)
options = nil
options = keys.pop if keys.last.is_a?(Hash) || keys.last.nil?
ring.lock do
self.servers_in_use = Set.new
servers = self.servers_in_use = Set.new

keys.flatten.each do |key|
begin
Expand All @@ -74,17 +73,61 @@ def get_multi(*keys)
end

values = {}
servers_in_use.each do |server|
return values if servers.empty?

servers.each do |server|
next unless server.alive?
begin
server.request(:noop).each_pair do |key, value|
values[key_without_namespace(key)] = value
end
server.multi_response_start
rescue DalliError, NetworkError => e
Dalli.logger.debug { e.inspect }
Dalli.logger.debug { "results from this server will be missing" }
servers.delete(server)
end
end

start = Time.now
loop do
# remove any dead servers
servers.delete_if{ |s| s.sock.nil? }
break if servers.empty?

# calculate remaining timeout
elapsed = Time.now - start
timeout = servers.first.options[:socket_timeout]
if elapsed > timeout
readable = nil
else
sockets = servers.map(&:sock)
readable, _ = IO.select(sockets, nil, nil, timeout - elapsed)
end

if readable.nil?
# no response within timeout; abort pending connections
servers.each do |server|
server.multi_response_abort
end
break

else
readable.each do |sock|
server = sock.server

begin
server.multi_response_nonblock.each do |key, value|
values[key_without_namespace(key)] = value
end

if server.multi_response_completed?
servers.delete(server)
end
rescue NetworkError => e
servers.delete(server)
end
end
end
end

values
end
ensure
Expand Down
18 changes: 18 additions & 0 deletions lib/dalli/options.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,24 @@ def close
end
end

def multi_response_start
@lock.synchronize do
super
end
end

def multi_response_nonblock
@lock.synchronize do
super
end
end

def multi_response_abort
@lock.synchronize do
super
end
end

def lock!
@lock.mon_enter
end
Expand Down
86 changes: 83 additions & 3 deletions lib/dalli/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ class Server
attr_accessor :port
attr_accessor :weight
attr_accessor :options
attr_reader :sock

DEFAULTS = {
# seconds between trying to contact a remote server
Expand Down Expand Up @@ -106,6 +107,81 @@ def compressor
@options[:compressor]
end

# Start reading key/value pairs from this connection. This is usually called
# after a series of GETKQ commands. A NOOP is sent, and the server begins
# flushing responses for kv pairs that were found.
#
# Returns nothing.
def multi_response_start
verify_state
write_noop
@multi_buffer = ''
@inprogress = true
end

# Did the last call to #multi_response_start complete successfully?
def multi_response_completed?
@multi_buffer.nil?
end

# Attempt to receive and parse as many key/value pairs as possible
# from this server. After #multi_response_start, this should be invoked
# repeatedly whenever this server's socket is readable until
# #multi_response_completed?.
#
# Returns a Hash of kv pairs received.
def multi_response_nonblock
raise 'multi_response has completed' if @multi_buffer.nil?

@multi_buffer << @sock.read_available
buf = @multi_buffer
values = {}

while buf.bytesize >= 24
header = buf.slice(0, 24)
(key_length, _, body_length) = header.unpack(KV_HEADER)

if key_length == 0
# all done!
@multi_buffer = nil
@inprogress = false
break

elsif buf.bytesize >= (24 + body_length)
buf.slice!(0, 24)
flags = buf.slice!(0, 4).unpack('N')[0]
key = buf.slice!(0, key_length)
value = buf.slice!(0, body_length - key_length - 4) if body_length - key_length - 4 > 0

begin
values[key] = deserialize(value, flags)
rescue DalliError => e
end

else
# not enough data yet, wait for more
break
end
end

values
rescue SystemCallError, Timeout::Error, EOFError
failure!
end

# Abort an earlier #multi_response_start. Used to signal an external
# timeout. The underlying socket is disconnected, and the exception is
# swallowed.
#
# Returns nothing.
def multi_response_abort
@multi_buffer = nil
@inprogress = false
failure!
rescue NetworkError
true
end

# NOTE: Additional public methods should be overridden in Dalli::Threadsafe

private
Expand Down Expand Up @@ -244,11 +320,15 @@ def incr(key, count, ttl, default)
body ? longlong(*body.unpack('NN')) : body
end

def write_noop
req = [REQUEST, OPCODES[:noop], 0, 0, 0, 0, 0, 0, 0].pack(FORMAT[:noop])
write(req)
end

# Noop is a keepalive operation but also used to demarcate the end of a set of pipelined commands.
# We need to read all the responses at once.
def noop
req = [REQUEST, OPCODES[:noop], 0, 0, 0, 0, 0, 0, 0].pack(FORMAT[:noop])
write(req)
write_noop
multi_response
end

Expand Down Expand Up @@ -442,7 +522,7 @@ def connect

begin
@pid = Process.pid
@sock = KSocket.open(hostname, port, options)
@sock = KSocket.open(hostname, port, self, options)
@version = version # trigger actual connect
sasl_authentication if need_auth?
up!
Expand Down
38 changes: 34 additions & 4 deletions lib/dalli/socket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
puts "Using kgio socket IO" if defined?($TESTING) && $TESTING

class Dalli::Server::KSocket < Kgio::Socket
attr_accessor :options
attr_accessor :options, :server

def kgio_wait_readable
IO.select([self], nil, nil, options[:socket_timeout]) || raise(Timeout::Error, "IO timeout")
Expand All @@ -13,12 +13,13 @@ def kgio_wait_writable
IO.select(nil, [self], nil, options[:socket_timeout]) || raise(Timeout::Error, "IO timeout")
end

def self.open(host, port, options = {})
def self.open(host, port, server, options = {})
addr = Socket.pack_sockaddr_in(port, host)
sock = start(addr)
sock.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, true)
sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true) if options[:keepalive]
sock.options = options
sock.server = server
sock.kgio_wait_writable
sock
end
Expand All @@ -34,6 +35,22 @@ def readfull(count)
value
end

def read_available
value = ''
loop do
ret = kgio_tryread(8196)
case ret
when nil
raise EOFError, 'end of stream'
when :wait_readable
break
else
value << ret
end
end
value
end

end

if ::Kgio.respond_to?(:wait_readable=)
Expand All @@ -45,14 +62,15 @@ def readfull(count)

puts "Using standard socket IO (#{RUBY_DESCRIPTION})" if defined?($TESTING) && $TESTING
class Dalli::Server::KSocket < TCPSocket
attr_accessor :options
attr_accessor :options, :server

def self.open(host, port, options = {})
def self.open(host, port, server, options = {})
Timeout.timeout(options[:socket_timeout]) do
sock = new(host, port)
sock.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, true)
sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true) if options[:keepalive]
sock.options = { :host => host, :port => port }.merge(options)
sock.server = server
sock
end
end
Expand All @@ -74,5 +92,17 @@ def readfull(count)
value
end

def read_available
value = ''
loop do
begin
value << read_nonblock(8196)
rescue Errno::EAGAIN, Errno::EWOULDBLOCK
break
end
end
value
end

end
end

0 comments on commit 0f3ad9e

Please sign in to comment.