From 8154aa2e039c9fb58664bd35b670caffc58421b8 Mon Sep 17 00:00:00 2001 From: Cameron Ball Date: Mon, 13 Dec 2021 15:52:09 +0800 Subject: [PATCH] MDL-67648 tasks: Fair-share scheduling with resource management This patch changes the way adhoc tasks are chosen to run. It now calculates how many runners each type of adhoc task should be allowed to use. In the case that not all the runners are utilised, it attempts to infer which tasks do not take a long time to run, and gives those to the vacant runners. Thanks to Brendan Heywood for guidance and SQL help. --- lib/classes/task/manager.php | 204 +++++++++++++++++++++++++++++++- lib/db/install.xml | 3 +- lib/db/upgrade.php | 15 +++ lib/tests/task_manager_test.php | 197 ------------------------------ lib/upgrade.txt | 5 + version.php | 2 +- 6 files changed, 222 insertions(+), 204 deletions(-) delete mode 100644 lib/tests/task_manager_test.php diff --git a/lib/classes/task/manager.php b/lib/classes/task/manager.php index 765fba2f726fd..889307002ca08 100644 --- a/lib/classes/task/manager.php +++ b/lib/classes/task/manager.php @@ -36,6 +36,31 @@ */ class manager { + /** + * @var int Used to tell the adhoc task queue to fairly distribute tasks. + */ + const ADHOC_TASK_QUEUE_MODE_DISTRIBUTING = 0; + + /** + * @var int Used to tell the adhoc task queue to try and fill unused capacity. + */ + const ADHOC_TASK_QUEUE_MODE_FILLING = 1; + + /** + * @var array A cached queue of adhoc tasks + */ + public static $miniqueue; + + /** + * @var int The last recorded number of unique adhoc tasks. + */ + public static $numtasks; + + /** + * @var string Used to determine if the adhoc task queue is distributing or filling capacity. + */ + public static $mode; + /** * Given a component name, will load the list of tasks in the db/tasks.php file for that component. * @@ -542,8 +567,13 @@ public static function get_failed_adhoc_tasks(int $delay = 0): array { * * @param array $records array of task records * @param array $records array of same task records shuffled + * @deprecated since Moodle 4.1 MDL-67648 - please do not use this method anymore. + * @todo MDL-74843 This method will be deleted in Moodle 4.5 + * @see \core\task\manager::get_next_adhoc_task */ public static function ensure_adhoc_task_qos(array $records): array { + debugging('The method \core\task\manager::ensure_adhoc_task_qos is deprecated. + Please use \core\task\manager::get_next_adhoc_task instead.', DEBUG_DEVELOPER); $count = count($records); if ($count == 0) { @@ -621,16 +651,114 @@ public static function ensure_adhoc_task_qos(array $records): array { public static function get_next_adhoc_task($timestart, $checklimits = true) { global $DB; - $where = '(nextruntime IS NULL OR nextruntime < :timestart1)'; - $params = array('timestart1' => $timestart); - $records = $DB->get_records_select('task_adhoc', $where, $params, 'nextruntime ASC, id ASC', '*', 0, 2000); - $records = self::ensure_adhoc_task_qos($records); + $concurrencylimit = get_config('core', 'task_adhoc_concurrency_limit'); + $cachedqueuesize = 1200; + + $uniquetasksinqueue = array_map( + ['\core\task\manager', 'adhoc_task_from_record'], + $DB->get_records_sql( + 'SELECT classname FROM {task_adhoc} WHERE nextruntime < :timestart GROUP BY classname', + ['timestart' => $timestart] + ) + ); + + if (!isset(self::$numtasks) || self::$numtasks !== count($uniquetasksinqueue)) { + self::$numtasks = count($uniquetasksinqueue); + self::$miniqueue = []; + } + + $concurrencylimits = []; + if ($checklimits) { + $concurrencylimits = array_map( + function ($task) { + return $task->get_concurrency_limit(); + }, + $uniquetasksinqueue + ); + } + + /* + * The maximum number of cron runners that an individual task is allowed to use. + * For example if the concurrency limit is 20 and there are 5 unique types of tasks + * in the queue, each task should not be allowed to consume more than 3 (i.e., ⌊20/6⌋). + * The + 1 is needed to prevent the queue from becoming full of only one type of class. + * i.e., if it wasn't there and there were 20 tasks of the same type in the queue, every + * runner would become consumed with the same (potentially long-running task) and no more + * tasks can run. This way, some resources are always available if some new types + * of tasks enter the queue. + * + * We use the short-ternary to force the value to 1 in the case when the number of tasks + * exceeds the runners (e.g., there are 8 tasks and 4 runners, ⌊4/(8+1)⌋ = 0). + */ + $slots = floor($concurrencylimit / (count($uniquetasksinqueue) + 1)) ?: 1; + if (empty(self::$miniqueue)) { + self::$mode = self::ADHOC_TASK_QUEUE_MODE_DISTRIBUTING; + self::$miniqueue = self::get_candidate_adhoc_tasks( + $timestart, + $cachedqueuesize, + $slots, + $concurrencylimits + ); + } + + // The query to cache tasks is expensive on big data sets, so we use this cheap + // query to get the ordering (which is the interesting part about the main query) + // We can use this information to filter the cache and also order it. + $runningtasks = $DB->get_records_sql( + 'SELECT classname, COALESCE(COUNT(*), 0) running, MIN(timestarted) earliest + FROM {task_adhoc} + WHERE timestarted IS NOT NULL + AND nextruntime < :timestart + GROUP BY classname + ORDER BY running ASC, earliest DESC', + ['timestart' => $timestart] + ); + + /* + * Each runner has a cache, so the same task can be in multiple runners' caches. + * We need to check that each task we have cached hasn't gone over its fair number + * of slots. This filtering is only applied during distributing mode as when we are + * filling capacity we intend for fast tasks to go over their slot limit. + */ + if (self::$mode === self::ADHOC_TASK_QUEUE_MODE_DISTRIBUTING) { + self::$miniqueue = array_filter( + self::$miniqueue, + function (\stdClass $task) use ($runningtasks, $slots) { + return !array_key_exists($task->classname, $runningtasks) || $runningtasks[$task->classname]->running < $slots; + } + ); + } + + /* + * If this happens that means each task has consumed its fair share of capacity, but there's still + * runners left over (and we are one of them). Fetch tasks without checking slot limits. + */ + if (empty(self::$miniqueue) && array_sum(array_column($runningtasks, 'running')) < $concurrencylimit) { + self::$mode = self::ADHOC_TASK_QUEUE_MODE_FILLING; + self::$miniqueue = self::get_candidate_adhoc_tasks( + $timestart, + $cachedqueuesize, + false, + $concurrencylimits + ); + } + + // Used below to order the cache. + $ordering = array_flip(array_keys($runningtasks)); + + // Order the queue so it's consistent with the ordering from the DB. + usort( + self::$miniqueue, + function ($a, $b) use ($ordering) { + return ($ordering[$a->classname] ?? -1) - ($ordering[$b->classname] ?? -1); + } + ); $cronlockfactory = \core\lock\lock_config::get_lock_factory('cron'); $skipclasses = array(); - foreach ($records as $record) { + foreach (self::$miniqueue as $taskid => $record) { if (in_array($record->classname, $skipclasses)) { // Skip the task if it can't be started due to per-task concurrency limit. @@ -643,6 +771,7 @@ public static function get_next_adhoc_task($timestart, $checklimits = true) { $record = $DB->get_record('task_adhoc', array('id' => $record->id)); if (!$record) { $lock->release(); + unset(self::$miniqueue[$taskid]); continue; } @@ -650,6 +779,7 @@ public static function get_next_adhoc_task($timestart, $checklimits = true) { // Safety check in case the task in the DB does not match a real class (maybe something was uninstalled). if (!$task) { $lock->release(); + unset(self::$miniqueue[$taskid]); continue; } @@ -661,6 +791,7 @@ public static function get_next_adhoc_task($timestart, $checklimits = true) { // Unable to obtain a concurrency lock. mtrace("Skipping $record->classname adhoc task class as the per-task limit of $tasklimit is reached."); $skipclasses[] = $record->classname; + unset(self::$miniqueue[$taskid]); $lock->release(); continue; } @@ -679,13 +810,76 @@ public static function get_next_adhoc_task($timestart, $checklimits = true) { } else { $task->set_cron_lock($cronlock); } + + unset(self::$miniqueue[$taskid]); return $task; + } else { + unset(self::$miniqueue[$taskid]); } } return null; } + /** + * Return a list of candidate adhoc tasks to run. + * + * @param int $timestart Only return tasks where nextruntime is less than this value + * @param int $limit Limit the list to this many results + * @param int|null $runmax Only return tasks that have less than this value currently running + * @param array $pertasklimits An array of classname => limit specifying how many instance of a task may be returned + * @return array Array of candidate tasks + */ + public static function get_candidate_adhoc_tasks( + int $timestart, + int $limit, + ?int $runmax, + array $pertasklimits = [] + ): array { + global $DB; + + $pertaskclauses = array_map( + function (string $class, int $limit, int $index): array { + $limitcheck = $limit > 0 ? " AND COALESCE(run.running, 0) < :running_$index" : ""; + $limitparam = $limit > 0 ? ["running_$index" => $limit] : []; + + return [ + "sql" => "(q.classname = :classname_$index" . $limitcheck . ")", + "params" => ["classname_$index" => $class] + $limitparam + ]; + }, + array_keys($pertasklimits), + $pertasklimits, + $pertasklimits ? range(1, count($pertasklimits)) : [] + ); + + $pertasksql = implode(" OR ", array_column($pertaskclauses, 'sql')); + $pertaskparams = $pertaskclauses ? array_merge(...array_column($pertaskclauses, 'params')) : []; + + $params = ['timestart' => $timestart] + + ($runmax ? ['runmax' => $runmax] : []) + + $pertaskparams; + + return $DB->get_records_sql( + "SELECT q.id, q.classname, q.timestarted, COALESCE(run.running, 0) running, run.earliest + FROM {task_adhoc} q + LEFT JOIN ( + SELECT classname, COUNT(*) running, MIN(timestarted) earliest + FROM {task_adhoc} run + WHERE timestarted IS NOT NULL + GROUP BY classname + ) run ON run.classname = q.classname + WHERE nextruntime < :timestart + AND q.timestarted IS NULL " . + (!empty($pertasksql) ? "AND (" . $pertasksql . ") " : "") . + ($runmax ? "AND (COALESCE(run.running, 0)) < :runmax " : "") . + "ORDER BY COALESCE(run.running, 0) ASC, run.earliest DESC, q.nextruntime ASC, q.id ASC", + $params, + 0, + $limit + ); + } + /** * This function will dispatch the next scheduled task in the queue. The task will be handed out * with an open lock - possibly on the entire cron process. Make sure you call either diff --git a/lib/db/install.xml b/lib/db/install.xml index 81d3caf5e3421..be6dd424047dc 100644 --- a/lib/db/install.xml +++ b/lib/db/install.xml @@ -1,5 +1,5 @@ - @@ -3448,6 +3448,7 @@ + diff --git a/lib/db/upgrade.php b/lib/db/upgrade.php index b3844f7c69e0a..af98fc0b5c46d 100644 --- a/lib/db/upgrade.php +++ b/lib/db/upgrade.php @@ -4506,5 +4506,20 @@ function xmldb_main_upgrade($oldversion) { upgrade_main_savepoint(true, 2022051000.00); } + if ($oldversion < 2022051900.01) { + + // Define index timestarted_idx (not unique) to be added to task_adhoc. + $table = new xmldb_table('task_adhoc'); + $index = new xmldb_index('timestarted_idx', XMLDB_INDEX_NOTUNIQUE, ['timestarted']); + + // Conditionally launch add index timestarted_idx. + if (!$dbman->index_exists($table, $index)) { + $dbman->add_index($table, $index); + } + + // Main savepoint reached. + upgrade_main_savepoint(true, 2022051900.01); + } + return true; } diff --git a/lib/tests/task_manager_test.php b/lib/tests/task_manager_test.php deleted file mode 100644 index b3007e9a2651e..0000000000000 --- a/lib/tests/task_manager_test.php +++ /dev/null @@ -1,197 +0,0 @@ -. - -/** - * This file contains the unit tests for the task manager. - * - * @package core - * @copyright 2019 Brendan Heywood - * @license http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later - */ - -defined('MOODLE_INTERNAL') || die(); - -/** - * This file contains the unit tests for the task manager. - * - * @copyright 2019 Brendan Heywood - * @license http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later - */ -class core_task_manager_testcase extends advanced_testcase { - - public function test_ensure_adhoc_task_qos_provider() { - return [ - [ - [], - [], - ], - // A queue with a lopside initial load that needs to be staggered. - [ - [ - (object)['id' => 1, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 2, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 3, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 4, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 5, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 6, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 7, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 8, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 9, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 10, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 11, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 12, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 13, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 14, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 15, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - ], - [ - (object)['id' => 1, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 7, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 2, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 8, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 3, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 9, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 4, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 10, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 5, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 6, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 11, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 12, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 13, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 14, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 15, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - ], - ], - // The same lopsided queue but now the first item is gone. - [ - [ - (object)['id' => 2, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 3, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 4, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 5, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 6, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 7, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 8, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 9, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - ], - [ - (object)['id' => 7, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 2, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 8, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 3, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 9, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 4, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 5, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 6, 'classname' => '\core\task\asynchronous_backup_task'], - ], - ], - // The same lopsided queue but now the first two items is gone. - [ - [ - (object)['id' => 3, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 4, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 5, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 6, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 7, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 8, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 9, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - ], - [ - (object)['id' => 3, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 7, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 4, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 8, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 5, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 9, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 6, 'classname' => '\core\task\asynchronous_backup_task'], - ], - ], - // The same lopsided queue but now the first three items are gone. - [ - [ - (object)['id' => 4, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 5, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 6, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 7, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 8, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 9, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - ], - [ - (object)['id' => 7, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 4, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 8, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 5, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 9, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 6, 'classname' => '\core\task\asynchronous_backup_task'], - ], - ], - [ - [ - (object)['id' => 5, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 6, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 7, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 8, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 9, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - ], - [ - (object)['id' => 5, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 7, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - - (object)['id' => 6, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 8, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - - (object)['id' => 9, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - ], - ], - ]; - } - - /** - * Reduces a list of tasks into a simpler string - * - * @param array $input array of tasks - * @return string list of task ids - */ - function flatten($tasks) { - $list = ''; - foreach ($tasks as $id => $task) { - $list .= ' ' . $task->id; - } - return $list; - } - - /** - * Test that the Quality of Service reordering works. - * - * @dataProvider test_ensure_adhoc_task_qos_provider - * - * @param array $input array of tasks - * @param array $expected array of reordered tasks - * @return void - */ - public function test_ensure_adhoc_task_qos(array $input, array $expected) { - $this->resetAfterTest(); - $result = \core\task\manager::ensure_adhoc_task_qos($input); - - - $result = $this->flatten($result); - $expected = $this->flatten($expected); - - $this->assertEquals($expected, $result); - } - -} - diff --git a/lib/upgrade.txt b/lib/upgrade.txt index ffaaf8cc7b21d..da6c3db06e4f8 100644 --- a/lib/upgrade.txt +++ b/lib/upgrade.txt @@ -1,6 +1,11 @@ This files describes API changes in core libraries and APIs, information provided here is intended especially for developers. +=== 4.1 === + +* The method ensure_adhoc_task_qos() in lib/classes/task/manager.php has been deprecated, please use get_next_adhoc_task() + instead. + === 4.0 === * To better detect wrong floats (like, for example, unformatted, using local-dependent separators ones) a number of diff --git a/version.php b/version.php index 49a0dbd7b8999..53671c204f1a2 100644 --- a/version.php +++ b/version.php @@ -29,7 +29,7 @@ defined('MOODLE_INTERNAL') || die(); -$version = 2022051900.00; // YYYYMMDD = weekly release date of this DEV branch. +$version = 2022051900.01; // YYYYMMDD = weekly release date of this DEV branch. // RR = release increments - 00 in DEV branches. // .XX = incremental changes. $release = '4.1dev (Build: 20220519)'; // Human-friendly version name