Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Re-implement Dalli::Client#get_multi outside of Dalli::Server#multi_r…

…esponse to read from servers in parallel
  • Loading branch information...
commit f84f71b9bcd2bb055e6cd0a46e8750cc2787e05f 1 parent 79d55c1
@tmm1 authored
Showing with 78 additions and 8 deletions.
  1. +78 −8 lib/dalli/client.rb
View
86 lib/dalli/client.rb
@@ -74,17 +74,87 @@ def get_multi(*keys)
end
values = {}
- servers_in_use.each do |server|
- next unless server.alive?
- begin
- server.request(:noop).each_pair do |key, value|
- values[key_without_namespace(key)] = value
+
+ if servers_in_use.any?
+ servers_in_use.each do |server|
+ next unless server.alive?
+ begin
+ server.send(:write_noop)
+ rescue NetworkError => e
+ servers_in_use.delete(server)
end
- rescue DalliError, NetworkError => e
- Dalli.logger.debug { e.inspect }
- Dalli.logger.debug { "results from this server will be missing" }
end
+
+ buffers = Hash.new{ |h,k| h[k] = '' }
+
+ while servers_in_use.any?
+ timeout = servers_in_use.first.options[:socket_timeout]
+ servers_in_use.delete_if{ |s| s.sock.nil? }
+ sockets = servers_in_use.map(&:sock)
+ readable, _ = IO.select(sockets, nil, nil, timeout)
+
+ if readable.nil?
+ # mark all pending servers as failed
+ servers_in_use.each do |server|
+ begin
+ server.send(:failure!)
+ rescue NetworkError => e
+ end
+ end
+
+ break
+
+ else
+ readable.each do |sock|
+ server = sock.server
+
+ begin
+ buffers[server] << sock.read_available
+ rescue SystemCallError, Timeout::Error, EOFError
+ begin
+ server.send(:failure!)
+ rescue NetworkError => e
+ end
+ servers_in_use.delete(server)
+ buffers.delete(server)
+ next
+ end
+
+ buf = buffers[server]
+ while buf.bytesize >= 24
+ header = buf.slice(0, 24)
+ (key_length, _, body_length) = header.unpack(Dalli::Server::KV_HEADER)
+
+ if key_length == 0
+ # all done!
+ buffers.delete(server)
+ servers_in_use.delete(server)
+ break
+
+ elsif buf.bytesize >= (24 + 4 + body_length)
+ buf.slice!(0, 24)
+ flags = buf.slice!(0, 4).unpack('N')[0]
+ key = buf.slice!(0, key_length)
+ value = buf.slice!(0, body_length - key_length - 4) if body_length - key_length - 4 > 0
+
+ begin
+ values[key_without_namespace(key)] = server.send(:deserialize, value, flags)
+ rescue DalliError => e
+ Dalli.logger.debug { e.inspect }
+ Dalli.logger.debug { "results from this server will be missing" }
+ end
+
+ else
+ # not enough data yet, keep waiting
+ break
+ end
+ end
+ end
+ end
+ end
+
end
+
values
end
ensure
Please sign in to comment.
Something went wrong with that request. Please try again.