Skip to content

Commit

Permalink
MDL-67648 tasks: Fair-share scheduling with resource management
Browse files Browse the repository at this point in the history
This patch changes the way adhoc tasks are chosen to run. It now calculates
how many runners each type of adhoc task should be allowed to use. In the
case that not all the runners are utilised, it attempts to infer which
tasks do not take a long time to run, and gives those to the vacant runners.

Thanks to Brendan Heywood for guidance and SQL help.
  • Loading branch information
cameron1729 committed May 25, 2022
1 parent 6c114e2 commit 8154aa2
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 204 deletions.
204 changes: 199 additions & 5 deletions lib/classes/task/manager.php
Expand Up @@ -36,6 +36,31 @@
*/
class manager {

/**
* @var int Used to tell the adhoc task queue to fairly distribute tasks.
*/
const ADHOC_TASK_QUEUE_MODE_DISTRIBUTING = 0;

/**
* @var int Used to tell the adhoc task queue to try and fill unused capacity.
*/
const ADHOC_TASK_QUEUE_MODE_FILLING = 1;

/**
* @var array A cached queue of adhoc tasks
*/
public static $miniqueue;

/**
* @var int The last recorded number of unique adhoc tasks.
*/
public static $numtasks;

/**
* @var string Used to determine if the adhoc task queue is distributing or filling capacity.
*/
public static $mode;

/**
* Given a component name, will load the list of tasks in the db/tasks.php file for that component.
*
Expand Down Expand Up @@ -542,8 +567,13 @@ public static function get_failed_adhoc_tasks(int $delay = 0): array {
*
* @param array $records array of task records
* @param array $records array of same task records shuffled
* @deprecated since Moodle 4.1 MDL-67648 - please do not use this method anymore.
* @todo MDL-74843 This method will be deleted in Moodle 4.5
* @see \core\task\manager::get_next_adhoc_task
*/
public static function ensure_adhoc_task_qos(array $records): array {
debugging('The method \core\task\manager::ensure_adhoc_task_qos is deprecated.
Please use \core\task\manager::get_next_adhoc_task instead.', DEBUG_DEVELOPER);

$count = count($records);
if ($count == 0) {
Expand Down Expand Up @@ -621,16 +651,114 @@ public static function ensure_adhoc_task_qos(array $records): array {
public static function get_next_adhoc_task($timestart, $checklimits = true) {
global $DB;

$where = '(nextruntime IS NULL OR nextruntime < :timestart1)';
$params = array('timestart1' => $timestart);
$records = $DB->get_records_select('task_adhoc', $where, $params, 'nextruntime ASC, id ASC', '*', 0, 2000);
$records = self::ensure_adhoc_task_qos($records);
$concurrencylimit = get_config('core', 'task_adhoc_concurrency_limit');
$cachedqueuesize = 1200;

$uniquetasksinqueue = array_map(
['\core\task\manager', 'adhoc_task_from_record'],
$DB->get_records_sql(
'SELECT classname FROM {task_adhoc} WHERE nextruntime < :timestart GROUP BY classname',
['timestart' => $timestart]
)
);

if (!isset(self::$numtasks) || self::$numtasks !== count($uniquetasksinqueue)) {
self::$numtasks = count($uniquetasksinqueue);
self::$miniqueue = [];
}

$concurrencylimits = [];
if ($checklimits) {
$concurrencylimits = array_map(
function ($task) {
return $task->get_concurrency_limit();
},
$uniquetasksinqueue
);
}

/*
* The maximum number of cron runners that an individual task is allowed to use.
* For example if the concurrency limit is 20 and there are 5 unique types of tasks
* in the queue, each task should not be allowed to consume more than 3 (i.e., ⌊20/6⌋).
* The + 1 is needed to prevent the queue from becoming full of only one type of class.
* i.e., if it wasn't there and there were 20 tasks of the same type in the queue, every
* runner would become consumed with the same (potentially long-running task) and no more
* tasks can run. This way, some resources are always available if some new types
* of tasks enter the queue.
*
* We use the short-ternary to force the value to 1 in the case when the number of tasks
* exceeds the runners (e.g., there are 8 tasks and 4 runners, ⌊4/(8+1)⌋ = 0).
*/
$slots = floor($concurrencylimit / (count($uniquetasksinqueue) + 1)) ?: 1;
if (empty(self::$miniqueue)) {
self::$mode = self::ADHOC_TASK_QUEUE_MODE_DISTRIBUTING;
self::$miniqueue = self::get_candidate_adhoc_tasks(
$timestart,
$cachedqueuesize,
$slots,
$concurrencylimits
);
}

// The query to cache tasks is expensive on big data sets, so we use this cheap
// query to get the ordering (which is the interesting part about the main query)
// We can use this information to filter the cache and also order it.
$runningtasks = $DB->get_records_sql(
'SELECT classname, COALESCE(COUNT(*), 0) running, MIN(timestarted) earliest
FROM {task_adhoc}
WHERE timestarted IS NOT NULL
AND nextruntime < :timestart
GROUP BY classname
ORDER BY running ASC, earliest DESC',
['timestart' => $timestart]
);

/*
* Each runner has a cache, so the same task can be in multiple runners' caches.
* We need to check that each task we have cached hasn't gone over its fair number
* of slots. This filtering is only applied during distributing mode as when we are
* filling capacity we intend for fast tasks to go over their slot limit.
*/
if (self::$mode === self::ADHOC_TASK_QUEUE_MODE_DISTRIBUTING) {
self::$miniqueue = array_filter(
self::$miniqueue,
function (\stdClass $task) use ($runningtasks, $slots) {
return !array_key_exists($task->classname, $runningtasks) || $runningtasks[$task->classname]->running < $slots;
}
);
}

/*
* If this happens that means each task has consumed its fair share of capacity, but there's still
* runners left over (and we are one of them). Fetch tasks without checking slot limits.
*/
if (empty(self::$miniqueue) && array_sum(array_column($runningtasks, 'running')) < $concurrencylimit) {
self::$mode = self::ADHOC_TASK_QUEUE_MODE_FILLING;
self::$miniqueue = self::get_candidate_adhoc_tasks(
$timestart,
$cachedqueuesize,
false,
$concurrencylimits
);
}

// Used below to order the cache.
$ordering = array_flip(array_keys($runningtasks));

// Order the queue so it's consistent with the ordering from the DB.
usort(
self::$miniqueue,
function ($a, $b) use ($ordering) {
return ($ordering[$a->classname] ?? -1) - ($ordering[$b->classname] ?? -1);
}
);

$cronlockfactory = \core\lock\lock_config::get_lock_factory('cron');

$skipclasses = array();

foreach ($records as $record) {
foreach (self::$miniqueue as $taskid => $record) {

if (in_array($record->classname, $skipclasses)) {
// Skip the task if it can't be started due to per-task concurrency limit.
Expand All @@ -643,13 +771,15 @@ public static function get_next_adhoc_task($timestart, $checklimits = true) {
$record = $DB->get_record('task_adhoc', array('id' => $record->id));
if (!$record) {
$lock->release();
unset(self::$miniqueue[$taskid]);
continue;
}

$task = self::adhoc_task_from_record($record);
// Safety check in case the task in the DB does not match a real class (maybe something was uninstalled).
if (!$task) {
$lock->release();
unset(self::$miniqueue[$taskid]);
continue;
}

Expand All @@ -661,6 +791,7 @@ public static function get_next_adhoc_task($timestart, $checklimits = true) {
// Unable to obtain a concurrency lock.
mtrace("Skipping $record->classname adhoc task class as the per-task limit of $tasklimit is reached.");
$skipclasses[] = $record->classname;
unset(self::$miniqueue[$taskid]);
$lock->release();
continue;
}
Expand All @@ -679,13 +810,76 @@ public static function get_next_adhoc_task($timestart, $checklimits = true) {
} else {
$task->set_cron_lock($cronlock);
}

unset(self::$miniqueue[$taskid]);
return $task;
} else {
unset(self::$miniqueue[$taskid]);
}
}

return null;
}

/**
* Return a list of candidate adhoc tasks to run.
*
* @param int $timestart Only return tasks where nextruntime is less than this value
* @param int $limit Limit the list to this many results
* @param int|null $runmax Only return tasks that have less than this value currently running
* @param array $pertasklimits An array of classname => limit specifying how many instance of a task may be returned
* @return array Array of candidate tasks
*/
public static function get_candidate_adhoc_tasks(
int $timestart,
int $limit,
?int $runmax,
array $pertasklimits = []
): array {
global $DB;

$pertaskclauses = array_map(
function (string $class, int $limit, int $index): array {
$limitcheck = $limit > 0 ? " AND COALESCE(run.running, 0) < :running_$index" : "";
$limitparam = $limit > 0 ? ["running_$index" => $limit] : [];

return [
"sql" => "(q.classname = :classname_$index" . $limitcheck . ")",
"params" => ["classname_$index" => $class] + $limitparam
];
},
array_keys($pertasklimits),
$pertasklimits,
$pertasklimits ? range(1, count($pertasklimits)) : []
);

$pertasksql = implode(" OR ", array_column($pertaskclauses, 'sql'));
$pertaskparams = $pertaskclauses ? array_merge(...array_column($pertaskclauses, 'params')) : [];

$params = ['timestart' => $timestart] +
($runmax ? ['runmax' => $runmax] : []) +
$pertaskparams;

return $DB->get_records_sql(
"SELECT q.id, q.classname, q.timestarted, COALESCE(run.running, 0) running, run.earliest
FROM {task_adhoc} q
LEFT JOIN (
SELECT classname, COUNT(*) running, MIN(timestarted) earliest
FROM {task_adhoc} run
WHERE timestarted IS NOT NULL
GROUP BY classname
) run ON run.classname = q.classname
WHERE nextruntime < :timestart
AND q.timestarted IS NULL " .
(!empty($pertasksql) ? "AND (" . $pertasksql . ") " : "") .
($runmax ? "AND (COALESCE(run.running, 0)) < :runmax " : "") .
"ORDER BY COALESCE(run.running, 0) ASC, run.earliest DESC, q.nextruntime ASC, q.id ASC",
$params,
0,
$limit
);
}

/**
* This function will dispatch the next scheduled task in the queue. The task will be handed out
* with an open lock - possibly on the entire cron process. Make sure you call either
Expand Down
3 changes: 2 additions & 1 deletion lib/db/install.xml
@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8" ?>
<XMLDB PATH="lib/db" VERSION="20220510" COMMENT="XMLDB file for core Moodle tables"
<XMLDB PATH="lib/db" VERSION="20220524" COMMENT="XMLDB file for core Moodle tables"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="../../lib/xmldb/xmldb.xsd"
>
Expand Down Expand Up @@ -3448,6 +3448,7 @@
</KEYS>
<INDEXES>
<INDEX NAME="nextruntime_idx" UNIQUE="false" FIELDS="nextruntime"/>
<INDEX NAME="timestarted_idx" UNIQUE="false" FIELDS="timestarted"/>
</INDEXES>
</TABLE>
<TABLE NAME="task_log" COMMENT="The log table for all tasks">
Expand Down
15 changes: 15 additions & 0 deletions lib/db/upgrade.php
Expand Up @@ -4506,5 +4506,20 @@ function xmldb_main_upgrade($oldversion) {
upgrade_main_savepoint(true, 2022051000.00);
}

if ($oldversion < 2022051900.01) {

// Define index timestarted_idx (not unique) to be added to task_adhoc.
$table = new xmldb_table('task_adhoc');
$index = new xmldb_index('timestarted_idx', XMLDB_INDEX_NOTUNIQUE, ['timestarted']);

// Conditionally launch add index timestarted_idx.
if (!$dbman->index_exists($table, $index)) {
$dbman->add_index($table, $index);
}

// Main savepoint reached.
upgrade_main_savepoint(true, 2022051900.01);
}

return true;
}

0 comments on commit 8154aa2

Please sign in to comment.