Skip to content
This repository has been archived by the owner on Jun 17, 2021. It is now read-only.

Commit

Permalink
using a less convoluted locking mechanism
Browse files Browse the repository at this point in the history
next step is to implement lock expiration
(also a potential issue with ct_2 but that could be because of residual data)
  • Loading branch information
jmettraux committed Aug 25, 2010
1 parent 8387f3e commit 468a3a8
Showing 1 changed file with 108 additions and 75 deletions.
183 changes: 108 additions & 75 deletions lib/ruote/redis/storage.rb
Expand Up @@ -128,85 +128,108 @@ def put(doc, opts={})

# regular put

rev = doc['_rev'].to_i
key = key_for(doc)

current_rev = @redis.get(key).to_i

return true if current_rev == 0 && rev > 0
return do_get(doc, current_rev) if rev != current_rev

nrev = @redis.incr(key).to_i

# the setnx here is crucial in multiple workers env...

r = @redis.setnx(
key_rev_for(doc, nrev),
to_json(doc.merge('_rev' => nrev), opts))

return get(doc['type'], doc['_id']) if r == false

@redis.set(key, nrev)
@redis.del(key_rev_for(doc, rev)) if rev > 0

doc['_rev'] = nrev if opts[:update_rev]

nil
rev = doc['_rev']

lock(key) do

current_doc = do_get(key)
current_rev = current_doc ? current_doc['_rev'] : nil

if current_rev && rev != current_rev
#
# version in storage is newer than version being put,
# (eturn version in storage)
#
current_doc

elsif rev && current_rev.nil?
#
# document deleted, put fails (return true)
#
true

else
#
# put is successful (return nil)
#
nrev = (rev.to_i + 1).to_s
@redis.set(key, to_json(doc.merge('_rev' => nrev)))
doc['_rev'] = nrev if opts[:update_rev]

nil
end
end
end

def get(type, key)

return from_json(@redis.get(key_for(type, key))) if MAS.include?(type)
# 'copy' case

do_get(type, key, @redis.get(key_for(type, key)))
do_get(key_for(type, key))
end

def delete(doc)

raise ArgumentError.new(
"can't delete doc without _rev") unless doc['_rev']
rev = doc['_rev']

r = put(doc, :delete => true)
raise ArgumentError.new("can't delete doc without _rev") unless rev

return r if r != nil
key = key_for(doc)

@redis.keys_to_a("#{key_for(doc)}*").sort.each { |k|
Thread.pass # lingering a bit...
@redis.del(k)
}
# deleting the key_rev last and making 1 'keys' call preliminarily
lock(key) do

nil
current_doc = do_get(key)

if current_doc.nil?
#
# document is [already] gone, delete fails (return true)
#
true

elsif current_doc['_rev'] != rev
#
# version in storage doesn't match version to delete
# (return version in storage)
#
current_doc

else
#
# delete is successful (return nil)
#
@redis.del(key)

nil
end
end
end

def get_many(type, key=nil, opts={})

keys = key ? Array(key) : nil

ids = if type == 'msgs' || type == 'schedules'

@redis.keys_to_a("#{type}/*")
#ids = if type == 'msgs' || type == 'schedules'
# @redis.keys_to_a("#{type}/*")

elsif keys == nil
ids = if keys == nil

@redis.keys_to_a("#{type}/*/*")
@redis.keys_to_a("#{type}/*")

elsif keys.first.is_a?(String)

keys.collect { |k| @redis.keys_to_a("#{type}/*!#{k}/*") }.flatten
keys.collect { |k| @redis.keys_to_a("#{type}/*!#{k}") }.flatten

else #if keys.first.is_a?(Regexp)

@redis.keys_to_a("#{type}/*/*").select { |i|
@redis.keys_to_a("#{type}/*").select { |i|

i = i[type.length + 1..i.rindex('/') - 1]
# removing "^type/" and "/\d+$"
i = i[type.length + 1..-1]
# removing "^type/"

keys.find { |k| k.match(i) }
}
end

ids = ids.reject { |i| i.match(LOCK_KEY) }
ids = ids.sort
ids = ids.reverse if opts[:descending]

Expand All @@ -228,13 +251,10 @@ def get_many(type, key=nil, opts={})

def ids(type)

@redis.keys_to_a("#{type}/*").inject([]) { |a, k|

if m = k.match(/^[^\/]+\/([^\/]+)$/)
a << m[1]
end

a
@redis.keys_to_a("#{type}/*").reject { |i|
i.match(LOCK_KEY)
}.collect { |i|
i.split('/').last
}.sort
end

Expand Down Expand Up @@ -271,38 +291,51 @@ def purge_type!(type)

protected

# key_for(doc)
# key_for(type, key)
#
def key_for(*args)
LOCK_KEY = /-lock$/

a = args.first
def lock(key, &block)

(a.is_a?(Hash) ? [ a['type'], a['_id'] ] : args[0, 2]).join('/')
kl = "#{key}-lock"

#p [ kl, :locking, Thread.current.object_id, Time.now.to_f ]

#while @redis.setnx(kl, 'null') == false; sleep(0.007); end
loop do
r = @redis.setnx(kl, Time.now.to_f.to_s)
#p [ :setnx, r ]
if r == false
sleep 0.007
else
break
end
end

#p [ kl, :locked, Thread.current.object_id, Time.now.to_f ]

#@redis.expire(kl, 2)

result = block.call

@redis.del(kl)

#p [ kl, :unlocked, Thread.current.object_id, Time.now.to_f ]

result
end

# key_rev_for(doc)
# key_rev_for(doc, rev)
# key_rev_for(type, key, rev)
# key_for(doc)
# key_for(type, key)
#
def key_rev_for(*args)
def key_for(*args)

as = nil
a = args.first

if a.is_a?(Hash)
as = [ a['type'], a['_id'], a['_rev'] ] if a.is_a?(Hash)
as[2] = args[1] if args[1]
else
as = args[0, 3]
end

as.join('/')
(a.is_a?(Hash) ? [ a['type'], a['_id'] ] : args[0, 2]).join('/')
end

def do_get(*args)
def do_get(key)

from_json(@redis.get(key_rev_for(*args)))
from_json(@redis.get(key))
end

def from_json(s)
Expand All @@ -318,7 +351,7 @@ def to_json(doc, opts={})

# Don't put configuration if it's already in
#
#(avoid storages from trashing configuration...)
# (prevent storages from trashing configuration...)
#
def put_configuration

Expand Down

0 comments on commit 468a3a8

Please sign in to comment.