Skip to content

Commit

Permalink
Check timeout on UPDATE in acquire
Browse files Browse the repository at this point in the history
On migrating, workers may be stolen some of tasks by another workers.
To ease such migrating, detect the situation and release those tasks.
Tasks other than already acquired ones will be acquired after tiemout.
  • Loading branch information
nurse committed Mar 4, 2016
1 parent 6c8cf5d commit a7dfe18
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 3 deletions.
7 changes: 4 additions & 3 deletions lib/perfectqueue/backend/rdb_compat.rb
Expand Up @@ -259,15 +259,16 @@ def acquire(alive_time, max_acquire, options)
return nil
end

sql = "UPDATE `#{@table}` SET timeout=? WHERE id IN ("
params = [sql, next_timeout]
sql = "UPDATE `#{@table}` SET timeout=? WHERE timeout <= ? AND id IN ("
params = [sql, next_timeout, now]
tasks.each {|t| params << t.key }
sql << (1..tasks.size).map { '?' }.join(',')
sql << ") AND created_at IS NOT NULL"

n = @db[*params].update
if n != tasks.size
# TODO table lock doesn't work. error?
# NOTE table lock doesn't work. error!
return nil
end

@cleanup_interval_count -= 1
Expand Down
20 changes: 20 additions & 0 deletions spec/rdb_compat_backend_spec.rb
Expand Up @@ -228,6 +228,26 @@
expect(ary[1]).to be_an_instance_of(AcquiredTask)
end
end
context 'stole a task' do
let :t0 do now - 100 end
before do
db.submit('key1', 'test1', nil, {now: t0})
db.submit('key2', 'test2', nil, {now: t0})
db.submit('key3', 'test3', nil, {now: t0})
end
it 'returns nil' do
# hook and steal a task
mock = double('prefetch_break_types')
db.instance_variable_set(:@prefetch_break_types, mock)
allow(mock).to receive(:include?) do
db.db['UPDATE `test_queues` SET timeout=? WHERE id=?', now+300, 'key2'].update
false
end

ary = db.acquire(alive_time, max_acquire, {})
expect(ary).to be_nil
end
end
end

context '#cancel_request' do
Expand Down

0 comments on commit a7dfe18

Please sign in to comment.