diff --git a/lib/perfectqueue/backend/rdb_compat.rb b/lib/perfectqueue/backend/rdb_compat.rb index aace128..feee196 100644 --- a/lib/perfectqueue/backend/rdb_compat.rb +++ b/lib/perfectqueue/backend/rdb_compat.rb @@ -259,15 +259,16 @@ def acquire(alive_time, max_acquire, options) return nil end - sql = "UPDATE `#{@table}` SET timeout=? WHERE id IN (" - params = [sql, next_timeout] + sql = "UPDATE `#{@table}` SET timeout=? WHERE timeout <= ? AND id IN (" + params = [sql, next_timeout, now] tasks.each {|t| params << t.key } sql << (1..tasks.size).map { '?' }.join(',') sql << ") AND created_at IS NOT NULL" n = @db[*params].update if n != tasks.size - # TODO table lock doesn't work. error? + # NOTE table lock doesn't work. error! + return nil end @cleanup_interval_count -= 1 diff --git a/spec/rdb_compat_backend_spec.rb b/spec/rdb_compat_backend_spec.rb index 0dfd93a..3bfb8cd 100644 --- a/spec/rdb_compat_backend_spec.rb +++ b/spec/rdb_compat_backend_spec.rb @@ -228,6 +228,26 @@ expect(ary[1]).to be_an_instance_of(AcquiredTask) end end + context 'stole a task' do + let :t0 do now - 100 end + before do + db.submit('key1', 'test1', nil, {now: t0}) + db.submit('key2', 'test2', nil, {now: t0}) + db.submit('key3', 'test3', nil, {now: t0}) + end + it 'returns nil' do + # hook and steal a task + mock = double('prefetch_break_types') + db.instance_variable_set(:@prefetch_break_types, mock) + allow(mock).to receive(:include?) do + db.db['UPDATE `test_queues` SET timeout=? WHERE id=?', now+300, 'key2'].update + false + end + + ary = db.acquire(alive_time, max_acquire, {}) + expect(ary).to be_nil + end + end end context '#cancel_request' do