Skip to content

Commit

Permalink
Merge pull request #15 from treasure-data/use-CONNECTION_ID
Browse files Browse the repository at this point in the history
UPDATE first strategy for finer grained locking
  • Loading branch information
nurse committed Aug 24, 2016
2 parents 2b1f305 + 5e1c218 commit 914ad50
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 82 deletions.
7 changes: 7 additions & 0 deletions ChangeLog
@@ -1,3 +1,10 @@
== 2016-xx-xx version 0.9.0

* Use UPDATE first strategy (#15)
* owner column is added for the table: migration is required
* AcquiredTask#timeout is the current value rather than at acquiring
* prefetch_break_types is removed

== 2016-08-02 version 0.8.49

* Revert v0.8.44 migration path (#38)
Expand Down
94 changes: 51 additions & 43 deletions lib/perfectqueue/backend/rdb_compat.rb
Expand Up @@ -75,35 +75,57 @@ def initialize(client, config)
# connection test
}

# MySQL's CONNECTION_ID() is a 64bit unsigned integer from the
# server's internal thread ID counter. It is unique while the MySQL
# server is running.
# https://bugs.mysql.com/bug.php?id=19806
#
# An acquired task is marked with next_timeout and CONNECTION_ID().
# Therefore while alive_time is not changed and we don't restart
# the server in 1 second, they won't conflict.
if config[:disable_resource_limit]
@update_sql = <<SQL
UPDATE `#{@table}`
JOIN (
SELECT id
FROM `#{@table}` FORCE INDEX (`index_#{@table}_on_timeout`)
WHERE #{EVENT_HORIZON} < timeout AND timeout <= :now
ORDER BY timeout ASC
LIMIT :max_acquire FOR UPDATE) AS t1 USING(id)
SET timeout=:next_timeout, owner=CONNECTION_ID()
SQL
@sql = <<SQL
SELECT id, timeout, data, created_at, resource
FROM `#{@table}`
WHERE #{EVENT_HORIZON} < timeout AND timeout <= ? AND timeout <= ?
AND created_at IS NOT NULL
ORDER BY timeout ASC
LIMIT ?
FROM `#{@table}`
WHERE timeout = ? AND owner = CONNECTION_ID()
SQL
else
@update_sql = <<SQL
UPDATE `#{@table}`
JOIN (
SELECT id, IFNULL(max_running, 1) / (IFNULL(running, 0) + 1) AS weight
FROM `#{@table}`
LEFT JOIN (
SELECT resource, COUNT(1) AS running
FROM `#{@table}` AS t1
WHERE timeout > :now AND resource IS NOT NULL
GROUP BY resource
FOR UPDATE
) AS t2 USING(resource)
WHERE #{EVENT_HORIZON} < timeout AND timeout <= :now AND IFNULL(max_running - running, 1) > 0
ORDER BY weight DESC, timeout ASC
LIMIT :max_acquire
FOR UPDATE
) AS t3 USING (id)
SET timeout = :next_timeout, owner = CONNECTION_ID()
SQL
@sql = <<SQL
SELECT id, timeout, data, created_at, resource, max_running, IFNULL(max_running, 1) / (IFNULL(running, 0) + 1) AS weight
FROM `#{@table}`
LEFT JOIN (
SELECT resource AS res, COUNT(1) AS running
FROM `#{@table}` AS T
WHERE timeout > ? AND created_at IS NOT NULL AND resource IS NOT NULL
GROUP BY resource
) AS R ON resource = res
WHERE #{EVENT_HORIZON} < timeout AND timeout <= ?
AND created_at IS NOT NULL
AND (max_running-running IS NULL OR max_running-running > 0)
ORDER BY weight DESC, timeout ASC
LIMIT ?
SELECT id, timeout, data, created_at, resource, max_running
FROM `#{@table}`
WHERE timeout = ? AND owner = CONNECTION_ID()
SQL
end

@prefetch_break_types = config[:prefetch_break_types] || []

@cleanup_interval = config[:cleanup_interval] || DEFAULT_DELETE_INTERVAL
# If cleanup_interval > max_request_per_child / max_acquire,
# some processes won't run DELETE query.
Expand All @@ -129,6 +151,8 @@ def init_database(options)
created_at INT,
resource VARCHAR(255),
max_running INT,
/* CONNECTION_ID() can be 64bit: https://bugs.mysql.com/bug.php?id=19806 */
owner BIGINT(21) UNSIGNED NOT NULL DEFAULT 0,
PRIMARY KEY (id)
)
SQL
Expand Down Expand Up @@ -219,7 +243,7 @@ def acquire(alive_time, max_acquire, options)
t0 = nil

if @cleanup_interval_count <= 0
connect { # TODO: HERE should be still connect_locked ?
connect {
t0=Process.clock_gettime(Process::CLOCK_MONOTONIC)
@db["DELETE FROM `#{@table}` WHERE timeout <= ?", now-DELETE_OFFSET].delete
@cleanup_interval_count = @cleanup_interval
Expand All @@ -229,34 +253,18 @@ def acquire(alive_time, max_acquire, options)

connect_locked {
t0=Process.clock_gettime(Process::CLOCK_MONOTONIC)
n = @db[@update_sql, next_timeout: next_timeout, now: now, max_acquire: max_acquire].update
if n <= 0
return nil
end

tasks = []
@db.fetch(@sql, now, now, max_acquire) {|row|
@db.fetch(@sql, next_timeout) {|row|
attributes = create_attributes(nil, row)
task_token = Token.new(row[:id])
task = AcquiredTask.new(@client, row[:id], attributes, task_token)
tasks.push task

if @prefetch_break_types.include?(attributes[:type])
break
end
}

if tasks.empty?
return nil
end

sql = "UPDATE `#{@table}` FORCE INDEX (PRIMARY) SET timeout=? WHERE timeout <= ? AND id IN ("
params = [sql, next_timeout, now]
tasks.each {|t| params << t.key }
sql << (1..tasks.size).map { '?' }.join(',')
sql << ") AND created_at IS NOT NULL"

n = @db[*params].update
if n != tasks.size
# NOTE table lock doesn't work. error!
return nil
end

@cleanup_interval_count -= 1

return tasks
Expand Down
73 changes: 34 additions & 39 deletions spec/rdb_compat_backend_spec.rb
Expand Up @@ -199,58 +199,53 @@
expect(ary[0]).to be_an_instance_of(AcquiredTask)
end
end
context 'disable_resource_limit' do
let (:config) do
{url: 'mysql2://root:@localhost/perfectqueue_test', table: table, disable_resource_limit: true}
end
before do
db.submit(key, 'test', nil, {})
end
it 'returns a task' do
ary = db.acquire(alive_time, max_acquire, {})
expect(ary).to be_an_instance_of(Array)
expect(ary.size).to eq(1)
expect(ary[0]).to be_an_instance_of(AcquiredTask)
end
end
context 'some tasks' do
let :t0 do now - 100 end
let :t0 do now - 300 end
let :t1 do now - 200 end
let :t2 do now - 100 end
before do
db.submit('key1', 'test1', nil, {now: t0})
db.submit('key2', 'test2', nil, {now: t0})
db.submit('key3', 'test3', nil, {now: t0})
end
it 'returns 3 tasks' do
now0 = Time.at(t0)
expect(now0).to receive(:to_time).exactly(3).times.and_call_original
db.list({}) do |task|
expect(task.timeout).to eq now0.to_time
end
db.submit('key3', 'test3', nil, {now: t1})
db.submit('key4', 'test4', nil, {now: t2})
db.submit('key5', 'test5', nil, {now: t2})
end
it 'returns 5 tasks' do
ary = []
db.list({}){|task| ary << task }
expect(ary[0].timeout.to_i).to eq t0
expect(ary[1].timeout.to_i).to eq t0
expect(ary[2].timeout.to_i).to eq t1
expect(ary[3].timeout.to_i).to eq t2
expect(ary[4].timeout.to_i).to eq t2

ary = db.acquire(alive_time, max_acquire, {now: now})
expect(ary).to be_an_instance_of(Array)
expect(ary.size).to eq(3)
expect(ary.size).to eq(5)
expect(ary[0]).to be_an_instance_of(AcquiredTask)
expect(ary[1]).to be_an_instance_of(AcquiredTask)
expect(ary[2]).to be_an_instance_of(AcquiredTask)
expect(ary[3]).to be_an_instance_of(AcquiredTask)
expect(ary[4]).to be_an_instance_of(AcquiredTask)

now1 = Time.at(now + alive_time)
expect(now1).to receive(:to_time).exactly(3).times.and_call_original
expect(now1).to receive(:to_time).exactly(5).times.and_call_original
db.list({}){|task| expect(task.timeout).to eq now1.to_time }
end
it 'returns 2 tasks' do
db.instance_variable_set(:@prefetch_break_types, 'test2')
ary = db.acquire(alive_time, max_acquire, {})
expect(ary).to be_an_instance_of(Array)
expect(ary.size).to eq(2)
expect(ary[0]).to be_an_instance_of(AcquiredTask)
expect(ary[1]).to be_an_instance_of(AcquiredTask)
end
end
context 'stole a task' do
let :t0 do now - 100 end
before do
db.submit('key1', 'test1', nil, {now: t0})
db.submit('key2', 'test2', nil, {now: t0})
db.submit('key3', 'test3', nil, {now: t0})
end
it 'returns nil' do
# hook and steal a task
mock = double('prefetch_break_types')
db.instance_variable_set(:@prefetch_break_types, mock)
allow(mock).to receive(:include?) do
db.db['UPDATE `test_queues` SET timeout=? WHERE id=?', now+300, 'key2'].update
false
end

ary = db.acquire(alive_time, max_acquire, {})
expect(ary).to be_nil
end
end
end

Expand Down

0 comments on commit 914ad50

Please sign in to comment.