diff --git a/lib/dalli/client.rb b/lib/dalli/client.rb index 136ae808..4ae83441 100644 --- a/lib/dalli/client.rb +++ b/lib/dalli/client.rb @@ -73,86 +73,47 @@ def get_multi(*keys) end end - values = {} + return {} if servers_in_use.empty? - if servers_in_use.any? - servers_in_use.each do |server| + servers_in_use.each do |server| + begin next unless server.alive? - begin - server.send(:write_noop) - rescue NetworkError => e - servers_in_use.delete(server) - end + server.multi_response_start + rescue NetworkError => e + servers_in_use.delete(server) end + end - buffers = Hash.new{ |h,k| h[k] = '' } - - while servers_in_use.any? - timeout = servers_in_use.first.options[:socket_timeout] - servers_in_use.delete_if{ |s| s.sock.nil? } - sockets = servers_in_use.map(&:sock) - readable, _ = IO.select(sockets, nil, nil, timeout) - - if readable.nil? - # mark all pending servers as failed - servers_in_use.each do |server| - begin - server.send(:failure!) - rescue NetworkError => e - end - end - - break + values = {} + while servers_in_use.any? + timeout = servers_in_use.first.options[:socket_timeout] + servers_in_use.delete_if{ |s| s.sock.nil? } + sockets = servers_in_use.map(&:sock) + readable, _ = IO.select(sockets, nil, nil, timeout) + + if readable.nil? + # mark all pending servers as failed + servers_in_use.each do |server| + server.multi_response_abort + end + break - else - readable.each do |sock| - server = sock.server + else + readable.each do |sock| + server = sock.server - begin - buffers[server] << sock.read_available - rescue SystemCallError, Timeout::Error, EOFError - begin - server.send(:failure!) - rescue NetworkError => e + begin + if ret = server.multi_response_nonblock + ret.each do |key, value| + values[key_without_namespace(key)] = value end servers_in_use.delete(server) - buffers.delete(server) - next - end - - buf = buffers[server] - while buf.bytesize >= 24 - header = buf.slice(0, 24) - (key_length, _, body_length) = header.unpack(Dalli::Server::KV_HEADER) - - if key_length == 0 - # all done! - buffers.delete(server) - servers_in_use.delete(server) - break - - elsif buf.bytesize >= (24 + body_length) - buf.slice!(0, 24) - flags = buf.slice!(0, 4).unpack('N')[0] - key = buf.slice!(0, key_length) - value = buf.slice!(0, body_length - key_length - 4) if body_length - key_length - 4 > 0 - - begin - values[key_without_namespace(key)] = server.send(:deserialize, value, flags) - rescue DalliError => e - Dalli.logger.debug { e.inspect } - Dalli.logger.debug { "results from this server will be missing" } - end - - else - # not enough data yet, keep waiting - break - end end + rescue NetworkError => e + servers_in_use.delete(server) end end end - end values diff --git a/lib/dalli/server.rb b/lib/dalli/server.rb index 944d2d72..6bab749e 100644 --- a/lib/dalli/server.rb +++ b/lib/dalli/server.rb @@ -107,6 +107,56 @@ def compressor @options[:compressor] end + def multi_response_start + write_noop + @multi_buffer = '' + @multi_values = {} + @inprogress = true + end + + def multi_response_nonblock + @multi_buffer << @sock.read_available + buf = @multi_buffer + + while buf.bytesize >= 24 + header = buf.slice(0, 24) + (key_length, _, body_length) = header.unpack(KV_HEADER) + + if key_length == 0 + # all done! + values = @multi_values + @multi_buffer = @multi_values = nil + @inprogress = false + return values + + elsif buf.bytesize >= (24 + body_length) + buf.slice!(0, 24) + flags = buf.slice!(0, 4).unpack('N')[0] + key = buf.slice!(0, key_length) + value = buf.slice!(0, body_length - key_length - 4) if body_length - key_length - 4 > 0 + + begin + @multi_values[key] = deserialize(value, flags) + rescue DalliError => e + end + + else + # not enough data yet, keep waiting + return nil + end + end + rescue SystemCallError, Timeout::Error, EOFError + failure! + end + + def multi_response_abort + @multi_buffer = @multi_values = nil + @inprogress = false + failure! + rescue NetworkError + true + end + # NOTE: Additional public methods should be overridden in Dalli::Threadsafe private