From a7dfe18146263ecd01541970041732a09fad39ed Mon Sep 17 00:00:00 2001 From: "NARUSE, Yui" Date: Fri, 4 Mar 2016 19:20:33 +0900 Subject: [PATCH] Check timeout on UPDATE in acquire On migrating, workers may be stolen some of tasks by another workers. To ease such migrating, detect the situation and release those tasks. Tasks other than already acquired ones will be acquired after tiemout. --- lib/perfectqueue/backend/rdb_compat.rb | 7 ++++--- spec/rdb_compat_backend_spec.rb | 20 ++++++++++++++++++++ 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/lib/perfectqueue/backend/rdb_compat.rb b/lib/perfectqueue/backend/rdb_compat.rb index aace128..feee196 100644 --- a/lib/perfectqueue/backend/rdb_compat.rb +++ b/lib/perfectqueue/backend/rdb_compat.rb @@ -259,15 +259,16 @@ def acquire(alive_time, max_acquire, options) return nil end - sql = "UPDATE `#{@table}` SET timeout=? WHERE id IN (" - params = [sql, next_timeout] + sql = "UPDATE `#{@table}` SET timeout=? WHERE timeout <= ? AND id IN (" + params = [sql, next_timeout, now] tasks.each {|t| params << t.key } sql << (1..tasks.size).map { '?' }.join(',') sql << ") AND created_at IS NOT NULL" n = @db[*params].update if n != tasks.size - # TODO table lock doesn't work. error? + # NOTE table lock doesn't work. error! + return nil end @cleanup_interval_count -= 1 diff --git a/spec/rdb_compat_backend_spec.rb b/spec/rdb_compat_backend_spec.rb index 0dfd93a..3bfb8cd 100644 --- a/spec/rdb_compat_backend_spec.rb +++ b/spec/rdb_compat_backend_spec.rb @@ -228,6 +228,26 @@ expect(ary[1]).to be_an_instance_of(AcquiredTask) end end + context 'stole a task' do + let :t0 do now - 100 end + before do + db.submit('key1', 'test1', nil, {now: t0}) + db.submit('key2', 'test2', nil, {now: t0}) + db.submit('key3', 'test3', nil, {now: t0}) + end + it 'returns nil' do + # hook and steal a task + mock = double('prefetch_break_types') + db.instance_variable_set(:@prefetch_break_types, mock) + allow(mock).to receive(:include?) do + db.db['UPDATE `test_queues` SET timeout=? WHERE id=?', now+300, 'key2'].update + false + end + + ary = db.acquire(alive_time, max_acquire, {}) + expect(ary).to be_nil + end + end end context '#cancel_request' do