Skip to content

Commit

Permalink
refactor code back into Server#multi_response_{start,nonblock,abort}
Browse files Browse the repository at this point in the history
  • Loading branch information
tmm1 committed Nov 28, 2012
1 parent 540a92c commit 8f00c01
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 68 deletions.
97 changes: 29 additions & 68 deletions lib/dalli/client.rb
Expand Up @@ -73,86 +73,47 @@ def get_multi(*keys)
end
end

values = {}
return {} if servers_in_use.empty?

if servers_in_use.any?
servers_in_use.each do |server|
servers_in_use.each do |server|
begin
next unless server.alive?
begin
server.send(:write_noop)
rescue NetworkError => e
servers_in_use.delete(server)
end
server.multi_response_start
rescue NetworkError => e
servers_in_use.delete(server)
end
end

buffers = Hash.new{ |h,k| h[k] = '' }

while servers_in_use.any?
timeout = servers_in_use.first.options[:socket_timeout]
servers_in_use.delete_if{ |s| s.sock.nil? }
sockets = servers_in_use.map(&:sock)
readable, _ = IO.select(sockets, nil, nil, timeout)

if readable.nil?
# mark all pending servers as failed
servers_in_use.each do |server|
begin
server.send(:failure!)
rescue NetworkError => e
end
end

break
values = {}
while servers_in_use.any?
timeout = servers_in_use.first.options[:socket_timeout]
servers_in_use.delete_if{ |s| s.sock.nil? }
sockets = servers_in_use.map(&:sock)
readable, _ = IO.select(sockets, nil, nil, timeout)

if readable.nil?
# mark all pending servers as failed
servers_in_use.each do |server|
server.multi_response_abort
end
break

else
readable.each do |sock|
server = sock.server
else
readable.each do |sock|
server = sock.server

begin
buffers[server] << sock.read_available
rescue SystemCallError, Timeout::Error, EOFError
begin
server.send(:failure!)
rescue NetworkError => e
begin
if ret = server.multi_response_nonblock
ret.each do |key, value|
values[key_without_namespace(key)] = value
end
servers_in_use.delete(server)
buffers.delete(server)
next
end

buf = buffers[server]
while buf.bytesize >= 24
header = buf.slice(0, 24)
(key_length, _, body_length) = header.unpack(Dalli::Server::KV_HEADER)

if key_length == 0
# all done!
buffers.delete(server)
servers_in_use.delete(server)
break

elsif buf.bytesize >= (24 + body_length)
buf.slice!(0, 24)
flags = buf.slice!(0, 4).unpack('N')[0]
key = buf.slice!(0, key_length)
value = buf.slice!(0, body_length - key_length - 4) if body_length - key_length - 4 > 0

begin
values[key_without_namespace(key)] = server.send(:deserialize, value, flags)
rescue DalliError => e
Dalli.logger.debug { e.inspect }
Dalli.logger.debug { "results from this server will be missing" }
end

else
# not enough data yet, keep waiting
break
end
end
rescue NetworkError => e
servers_in_use.delete(server)
end
end
end

end

values
Expand Down
50 changes: 50 additions & 0 deletions lib/dalli/server.rb
Expand Up @@ -107,6 +107,56 @@ def compressor
@options[:compressor]
end

def multi_response_start
write_noop
@multi_buffer = ''
@multi_values = {}
@inprogress = true
end

def multi_response_nonblock
@multi_buffer << @sock.read_available
buf = @multi_buffer

while buf.bytesize >= 24
header = buf.slice(0, 24)
(key_length, _, body_length) = header.unpack(KV_HEADER)

if key_length == 0
# all done!
values = @multi_values
@multi_buffer = @multi_values = nil
@inprogress = false
return values

elsif buf.bytesize >= (24 + body_length)
buf.slice!(0, 24)
flags = buf.slice!(0, 4).unpack('N')[0]
key = buf.slice!(0, key_length)
value = buf.slice!(0, body_length - key_length - 4) if body_length - key_length - 4 > 0

begin
@multi_values[key] = deserialize(value, flags)
rescue DalliError => e
end

else
# not enough data yet, keep waiting
return nil
end
end
rescue SystemCallError, Timeout::Error, EOFError
failure!
end

def multi_response_abort
@multi_buffer = @multi_values = nil
@inprogress = false
failure!
rescue NetworkError
true
end

# NOTE: Additional public methods should be overridden in Dalli::Threadsafe

private
Expand Down

0 comments on commit 8f00c01

Please sign in to comment.