Skip to content

Commit

Permalink
Use v08's strategy if resource limit is used
Browse files Browse the repository at this point in the history
If resource limit is used, such queue doesn't need performance so much.
Therefore use old strategy at this time.
  • Loading branch information
nurse committed Oct 5, 2016
1 parent bf37abc commit 3644ed2
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 74 deletions.
170 changes: 98 additions & 72 deletions lib/perfectqueue/backend/rdb_compat.rb
Expand Up @@ -78,55 +78,7 @@ def initialize(client, config)
# connection test
}

# MySQL's CONNECTION_ID() is a 64bit unsigned integer from the
# server's internal thread ID counter. It is unique while the MySQL
# server is running.
# https://bugs.mysql.com/bug.php?id=19806
#
# An acquired task is marked with next_timeout and CONNECTION_ID().
# Therefore while alive_time is not changed and we don't restart
# the server in 1 second, they won't conflict.
if config[:disable_resource_limit]
@update_sql = <<SQL
UPDATE `#{@table}`
JOIN (
SELECT id
FROM `#{@table}` FORCE INDEX (`index_#{@table}_on_timeout`)
WHERE #{EVENT_HORIZON} < timeout AND timeout <= :now
ORDER BY timeout ASC
LIMIT :max_acquire) AS t1 USING(id)
SET timeout=:next_timeout, owner=CONNECTION_ID()
SQL
@sql = <<SQL
SELECT id, timeout, data, created_at, resource
FROM `#{@table}`
WHERE timeout = ? AND owner = CONNECTION_ID()
SQL
else
@update_sql = <<SQL
UPDATE `#{@table}`
JOIN (
SELECT id, IFNULL(max_running, 1) / (IFNULL(running, 0) + 1) AS weight
FROM `#{@table}`
LEFT JOIN (
SELECT resource, COUNT(1) AS running
FROM `#{@table}` AS t1
WHERE timeout > :now AND resource IS NOT NULL
GROUP BY resource
) AS t2 USING(resource)
WHERE #{EVENT_HORIZON} < timeout AND timeout <= :now AND IFNULL(max_running - running, 1) > 0
ORDER BY weight DESC, timeout ASC
LIMIT :max_acquire
) AS t3 USING (id)
SET timeout = :next_timeout, owner = CONNECTION_ID()
SQL
@sql = <<SQL
SELECT id, timeout, data, created_at, resource, max_running
FROM `#{@table}`
WHERE timeout = ? AND owner = CONNECTION_ID()
SQL
end

@disable_resource_limit = config[:disable_resource_limit]
@cleanup_interval = config[:cleanup_interval] || DEFAULT_DELETE_INTERVAL
# If cleanup_interval > max_request_per_child / max_acquire,
# some processes won't run DELETE query.
Expand Down Expand Up @@ -240,7 +192,6 @@ def submit(key, type, data, options)
def acquire(alive_time, max_acquire, options)
now = (options[:now] || Time.now).to_i
next_timeout = now + alive_time
tasks = nil
t0 = nil

if @cleanup_interval_count <= 0
Expand All @@ -252,27 +203,11 @@ def acquire(alive_time, max_acquire, options)
}
end

connect_locked {
t0=Process.clock_gettime(Process::CLOCK_MONOTONIC)
n = @db[@update_sql, next_timeout: next_timeout, now: now, max_acquire: max_acquire].update
if n <= 0
return nil
end
@table_unlock.call

tasks = []
@db.fetch(@sql, next_timeout) {|row|
attributes = create_attributes(nil, row)
task_token = Token.new(row[:id])
task = AcquiredTask.new(@client, row[:id], attributes, task_token)
tasks.push task
}
@cleanup_interval_count -= 1

return tasks
}
ensure
STDERR.puts "PQ:acquire from #{@table}:%6f sec (%d tasks)" % [Process.clock_gettime(Process::CLOCK_MONOTONIC)-t0,tasks.size] if tasks
if @disable_resource_limit
return acquire_without_resource(next_timeout, now, max_acquire)
else
return acquire_with_resource(next_timeout, now, max_acquire)
end
end

def force_finish(key, retention_time, options)
Expand Down Expand Up @@ -454,7 +389,98 @@ def create_attributes(now, row)
}
end

def acquire_without_resource(next_timeout, now, max_acquire)
# MySQL's CONNECTION_ID() is a 64bit unsigned integer from the
# server's internal thread ID counter. It is unique while the MySQL
# server is running.
# https://bugs.mysql.com/bug.php?id=19806
#
# An acquired task is marked with next_timeout and CONNECTION_ID().
# Therefore while alive_time is not changed and we don't restart
# the server in 1 second, they won't conflict.
update_sql = <<SQL
UPDATE `#{@table}`
JOIN (
SELECT id
FROM `#{@table}` FORCE INDEX (`index_#{@table}_on_timeout`)
WHERE #{EVENT_HORIZON} < timeout AND timeout <= :now
ORDER BY timeout ASC
LIMIT :max_acquire) AS t1 USING(id)
SET timeout=:next_timeout, owner=CONNECTION_ID()
SQL
select_sql = <<SQL
SELECT id, timeout, data, created_at, resource
FROM `#{@table}`
WHERE timeout = ? AND owner = CONNECTION_ID()
SQL
t0 = 0
connect_locked do
t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
n = @db[update_sql, next_timeout: next_timeout, now: now, max_acquire: max_acquire].update
@table_unlock.call
STDERR.puts "PQ:acquire from #{@table}:%6f sec (%d tasks)" % [Process.clock_gettime(Process::CLOCK_MONOTONIC)-t0,n]
return nil if n <= 0

tasks = []
@db.fetch(select_sql, next_timeout) do |row|
attributes = create_attributes(nil, row)
task_token = Token.new(row[:id])
task = AcquiredTask.new(@client, row[:id], attributes, task_token)
tasks.push task
end
@cleanup_interval_count -= 1
return tasks
end
end

def acquire_with_resource(next_timeout, now, max_acquire)
t0 = nil
tasks = nil
sql = <<SQL
SELECT id, timeout, data, created_at, resource, max_running, IFNULL(max_running, 1) / (IFNULL(running, 0) + 1) AS weight
FROM `#{@table}`
LEFT JOIN (
SELECT resource AS res, COUNT(1) AS running
FROM `#{@table}` AS T
WHERE timeout > ? AND created_at IS NOT NULL AND resource IS NOT NULL
GROUP BY resource
) AS R ON resource = res
WHERE #{EVENT_HORIZON} < timeout AND timeout <= ?
AND created_at IS NOT NULL
AND (max_running-running IS NULL OR max_running-running > 0)
ORDER BY weight DESC, timeout ASC
LIMIT ?
SQL
connect_locked do
t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
tasks = []
@db.fetch(sql, now, now, max_acquire) do |row|
attributes = create_attributes(nil, row)
task_token = Token.new(row[:id])
task = AcquiredTask.new(@client, row[:id], attributes, task_token)
tasks.push task
end
return nil if tasks.empty?

sql = "UPDATE `#{@table}` FORCE INDEX (PRIMARY) SET timeout=? WHERE timeout <= ? AND id IN ("
params = [sql, next_timeout, now]
params.concat tasks.map(&:key)
sql << '?,' * tasks.size
sql.chop!
sql << ") AND created_at IS NOT NULL"

n = @db[*params].update
if n != tasks.size
# preempted
return nil
end
end
@cleanup_interval_count -= 1
return tasks
ensure
STDERR.puts "PQ:acquire from #{@table}:%6f sec (%d tasks)" % \
[Process.clock_gettime(Process::CLOCK_MONOTONIC)-t0, tasks.size] if tasks
end
end
end
end

4 changes: 2 additions & 2 deletions spec/rdb_compat_backend_spec.rb
Expand Up @@ -96,7 +96,7 @@
end
it 'supports mysql' do
expect(Backend::RDBCompatBackend.new(client, config)).to be_an_instance_of(Backend::RDBCompatBackend)
expect(db.instance_variable_get(:@sql)).to include('max_running')
expect(db.instance_variable_get(:@disable_resource_limit)).to be_falsey
end
it 'doesn\'t support postgres' do
config = {url: 'postgres://localhost', table: table}
Expand All @@ -110,7 +110,7 @@
it 'disable_resource_limit' do
config = {url: 'mysql2://root:@localhost/perfectqueue_test', table: table, disable_resource_limit: true}
db = Backend::RDBCompatBackend.new(client, config)
expect(db.instance_variable_get(:@sql)).not_to include('max_running')
expect(db.instance_variable_get(:@disable_resource_limit)).to be_truthy
end
end

Expand Down

0 comments on commit 3644ed2

Please sign in to comment.