Skip to content

Commit

Permalink
Add better support for safe updates with multiple writers
Browse files Browse the repository at this point in the history
  • Loading branch information
philipmw committed Nov 8, 2013
1 parent 045f368 commit ef5b43b
Show file tree
Hide file tree
Showing 6 changed files with 321 additions and 110 deletions.
7 changes: 7 additions & 0 deletions History.md
Expand Up @@ -6,6 +6,13 @@ Unreleased

- Implement `version` for retrieving version of connected servers [dterei, #384]
- Implement `fetch_multi` for batched read/write [sorentwo, #380]
- Add more support for safe updates with multiple writers: [philipmw]
* Get value with CAS: [value, cas] = get_cas(key)
get_cas(key) {|value, cas| ...}
* Get multiple values with CAS: get_multi_full(k1, k2, ...) {|value, metadata| cas = metadata[:cas]}
* Set value with CAS: new_cas = set(key, value, ttl, options={cas: <curr_cas>})
* Replace value with CAS: replace(key, new_value, ttl, options={cas: <curr_cas>})
* Delete value with CAS: delete_cas(key, cas)

2.6.4
=======
Expand Down
182 changes: 117 additions & 65 deletions lib/dalli/client.rb
Expand Up @@ -58,11 +58,26 @@ def multi
Thread.current[:dalli_multi] = old
end

##
# Get the value associated with the key.
def get(key, options=nil)
resp = perform(:get, key)
resp.nil? || 'Not found' == resp ? nil : resp
end

##
# Get the value and CAS ID associated with the key. If a block is provided,
# value and CAS will be passed to the block.
def get_cas(key)
(value, cas) = perform(:cas, key)
value = (!value || value == 'Not found') ? nil : value
if block_given?
yield value, cas
else
[value, cas]
end
end

def groups_for_keys(*keys)
groups = mapped_keys(keys).flatten.group_by do |key|
begin
Expand Down Expand Up @@ -110,70 +125,30 @@ def perform_multi_response_start(servers)

##
# Fetch multiple keys efficiently.
# Returns a hash of { 'key' => 'value', 'key2' => 'value1' }
# If a block is given, yields key/value pairs one at a time.
# Otherwise returns a hash of { 'key' => 'value', 'key2' => 'value1' }
def get_multi(*keys)
perform do
return {} if keys.empty?
options = nil
options = keys.pop if keys.last.is_a?(Hash) || keys.last.nil?
ring.lock do
begin
groups = groups_for_keys(keys)
if unfound_keys = groups.delete(nil)
Dalli.logger.debug { "unable to get keys for #{unfound_keys.length} keys because no matching server was found" }
end
make_multi_get_requests(groups)

servers = groups.keys
values = {}
return values if servers.empty?
servers = perform_multi_response_start(servers)

start = Time.now
loop do
# remove any dead servers
servers.delete_if { |s| s.sock.nil? }
break if servers.empty?

# calculate remaining timeout
elapsed = Time.now - start
timeout = servers.first.options[:socket_timeout]
if elapsed > timeout
readable = nil
else
sockets = servers.map(&:sock)
readable, _ = IO.select(sockets, nil, nil, timeout - elapsed)
end

if readable.nil?
# no response within timeout; abort pending connections
servers.each do |server|
Dalli.logger.debug { "memcached at #{server.name} did not response within timeout" }
server.multi_response_abort
end
break

else
readable.each do |sock|
server = sock.server

begin
server.multi_response_nonblock.each do |key, value|
values[key_without_namespace(key)] = value
end

if server.multi_response_completed?
servers.delete(server)
end
rescue NetworkError
servers.delete(server)
end
end
end
end
if block_given?
get_multi_yielder(keys) {|k, meta| yield k, meta[:v]}
else
Hash.new.tap do |hash|
get_multi_yielder(keys) {|k, meta| hash[k] = meta[:v]}
end
end
end

values
end
##
# Fetch multiple keys efficiently, including available metadata such as CAS.
# If a block is given, yields key/metadata pairs one a time. Metadata is a hash:
# {:v => 'value', :cas => <cas-id>, ...}
# If no block is given, returns a hash of
# { 'key' => {:v => 'value', :cas => <cas-id>} }
def get_multi_full(*keys)
if block_given?
get_multi_yielder(keys) {|*args| yield(*args)}
else
Hash.new.tap do |hash|
get_multi_yielder(keys) {|k, meta| hash[k] = meta}
end
end
end
Expand Down Expand Up @@ -209,9 +184,14 @@ def cas(key, ttl=nil, options=nil, &block)
end
end

##
# Set the key-value pair. To ensure safe updates in scenarios with multiple
# writers, provide the current CAS value in options[:cas].
# Returns the resulting CAS value.
def set(key, value, ttl=nil, options=nil)
ttl ||= @options[:expires_in].to_i
perform(:set, key, value, ttl, 0, options)
cas = options ? options.delete(:cas){|k| 0} : 0
perform(:set, key, value, ttl, cas, options)
end

##
Expand All @@ -225,13 +205,20 @@ def add(key, value, ttl=nil, options=nil)
##
# Conditionally add a key/value pair, only if the key already exists
# on the server. Returns true if the operation succeeded.
# To ensure safe updates in scenarios with multiple writers, provide the
# current CAS value in options[:cas]
def replace(key, value, ttl=nil, options=nil)
ttl ||= @options[:expires_in].to_i
perform(:replace, key, value, ttl, options)
cas = options ? options.delete(:cas){|k| 0} : 0
perform(:replace, key, value, ttl, cas, options)
end

def delete(key)
perform(:delete, key)
perform(:delete, key, 0)
end

def delete_cas(key, cas)
perform(:delete, key, cas)
end

##
Expand Down Expand Up @@ -413,5 +400,70 @@ def normalize_options(opts)
end
opts
end

##
# Yields, one at a time, keys and their values+attributes.
def get_multi_yielder(keys)
perform do
return {} if keys.empty?
ring.lock do
begin
groups = groups_for_keys(keys)
if unfound_keys = groups.delete(nil)
Dalli.logger.debug { "unable to get keys for #{unfound_keys.length} keys because no matching server was found" }
end
make_multi_get_requests(groups)

servers = groups.keys
return if servers.empty?
servers = perform_multi_response_start(servers)

start = Time.now
loop do
# remove any dead servers
servers.delete_if { |s| s.sock.nil? }
break if servers.empty?

# calculate remaining timeout
elapsed = Time.now - start
timeout = servers.first.options[:socket_timeout]
if elapsed > timeout
readable = nil
else
sockets = servers.map(&:sock)
readable, _ = IO.select(sockets, nil, nil, timeout - elapsed)
end

if readable.nil?
# no response within timeout; abort pending connections
servers.each do |server|
Dalli.logger.debug { "memcached at #{server.name} did not response within timeout" }
server.multi_response_abort
end
break

else
readable.each do |sock|
server = sock.server

begin
server.multi_response_nonblock.each_pair do |key, value_hash|
yield key_without_namespace(key), value_hash
end

if server.multi_response_completed?
servers.delete(server)
end
rescue NetworkError
servers.delete(server)
end
end
end
end
end
end
end
end

end
end
44 changes: 30 additions & 14 deletions lib/dalli/server.rb
Expand Up @@ -146,7 +146,7 @@ def multi_response_nonblock

while buf.bytesize - pos >= 24
header = buf.slice(pos, 24)
(key_length, _, body_length) = header.unpack(KV_HEADER)
(key_length, _, body_length, cas) = header.unpack(KV_HEADER)

if key_length == 0
# all done!
Expand All @@ -163,7 +163,7 @@ def multi_response_nonblock
pos = pos + 24 + body_length

begin
values[key] = deserialize(value, flags)
values[key] = {:v => deserialize(value, flags), :cas => cas}
rescue DalliError
end

Expand Down Expand Up @@ -272,7 +272,7 @@ def set(key, value, ttl, cas, options)
guard_max_value(key, value) do
req = [REQUEST, OPCODES[multi? ? :setq : :set], key.bytesize, 8, 0, 0, value.bytesize + key.bytesize + 8, 0, cas, flags, ttl, key, value].pack(FORMAT[:set])
write(req)
generic_response unless multi?
cas_response unless multi?
end
end

Expand All @@ -282,22 +282,22 @@ def add(key, value, ttl, options)
guard_max_value(key, value) do
req = [REQUEST, OPCODES[multi? ? :addq : :add], key.bytesize, 8, 0, 0, value.bytesize + key.bytesize + 8, 0, 0, flags, ttl, key, value].pack(FORMAT[:add])
write(req)
generic_response unless multi?
cas_response unless multi?
end
end

def replace(key, value, ttl, options)
def replace(key, value, ttl, cas, options)
(value, flags) = serialize(key, value, options)

guard_max_value(key, value) do
req = [REQUEST, OPCODES[multi? ? :replaceq : :replace], key.bytesize, 8, 0, 0, value.bytesize + key.bytesize + 8, 0, 0, flags, ttl, key, value].pack(FORMAT[:replace])
req = [REQUEST, OPCODES[multi? ? :replaceq : :replace], key.bytesize, 8, 0, 0, value.bytesize + key.bytesize + 8, 0, cas, flags, ttl, key, value].pack(FORMAT[:replace])
write(req)
generic_response unless multi?
cas_response unless multi?
end
end

def delete(key)
req = [REQUEST, OPCODES[multi? ? :deleteq : :delete], key.bytesize, 0, 0, 0, key.bytesize, 0, 0, key].pack(FORMAT[:delete])
def delete(key, cas)
req = [REQUEST, OPCODES[multi? ? :deleteq : :delete], key.bytesize, 0, 0, 0, key.bytesize, 0, cas, key].pack(FORMAT[:delete])
write(req)
generic_response unless multi?
end
Expand Down Expand Up @@ -369,7 +369,7 @@ def reset_stats
def cas(key)
req = [REQUEST, OPCODES[:get], key.bytesize, 0, 0, 0, key.bytesize, 0, 0, key].pack(FORMAT[:get])
write(req)
cas_response
data_cas_response
end

def version
Expand Down Expand Up @@ -433,7 +433,7 @@ def deserialize(value, flags)
raise UnmarshalError, "Unable to uncompress value: #{$!.message}"
end

def cas_response
def data_cas_response
header = read(24)
raise Dalli::NetworkError, 'No response' if !header
(extras, _, status, count, _, cas) = header.unpack(CAS_HEADER)
Expand All @@ -452,7 +452,7 @@ def cas_response

CAS_HEADER = '@4CCnNNQ'
NORMAL_HEADER = '@4CCnN'
KV_HEADER = '@2n@6nN'
KV_HEADER = '@2n@6nN@16Q'

def guard_max_value(key, value)
if value.bytesize <= @options[:value_max_bytes]
Expand Down Expand Up @@ -483,12 +483,28 @@ def generic_response(unpack=false)
end
end

def cas_response
header = read(24)
raise Dalli::NetworkError, 'No response' if !header
(_, _, status, count, _, cas) = header.unpack(CAS_HEADER)
read(count) if count > 0 # this is potential data that we don't care about
if status == 1
nil
elsif status == 2 || status == 5
false # Not stored, normal status for add operation
elsif status != 0
raise Dalli::DalliError, "Response error #{status}: #{RESPONSE_CODES[status]}"
else
cas
end
end

def keyvalue_response
hash = {}
loop do
header = read(24)
raise Dalli::NetworkError, 'No response' if !header
(key_length, _, body_length) = header.unpack(KV_HEADER)
(key_length, _, body_length, _) = header.unpack(KV_HEADER)
return hash if key_length == 0
key = read(key_length)
value = read(body_length - key_length) if body_length - key_length > 0
Expand All @@ -501,7 +517,7 @@ def multi_response
loop do
header = read(24)
raise Dalli::NetworkError, 'No response' if !header
(key_length, _, body_length) = header.unpack(KV_HEADER)
(key_length, _, body_length, _) = header.unpack(KV_HEADER)
return hash if key_length == 0
flags = read(4).unpack('N')[0]
key = read(key_length)
Expand Down
13 changes: 13 additions & 0 deletions test/helper.rb
Expand Up @@ -26,6 +26,19 @@ def assert_error(error, regexp=nil, &block)
assert_match(regexp, ex.message, "#{ex.class.name}: #{ex.message}\n#{ex.backtrace.join("\n\t")}")
end

def op_cas_succeeds(rsp)
rsp.is_a?(Integer)
end

def op_replace_succeeds(rsp)
rsp.is_a?(Integer)
end

# add and set must have the same return value because of DalliStore#write_entry
def op_addset_succeeds(rsp)
rsp.is_a?(Integer)
end

def with_activesupport
require 'active_support/all'
require 'active_support/cache/dalli_store'
Expand Down

0 comments on commit ef5b43b

Please sign in to comment.