diff --git a/lib/classes/task/manager.php b/lib/classes/task/manager.php index 765fba2f726fd..889307002ca08 100644 --- a/lib/classes/task/manager.php +++ b/lib/classes/task/manager.php @@ -36,6 +36,31 @@ */ class manager { + /** + * @var int Used to tell the adhoc task queue to fairly distribute tasks. + */ + const ADHOC_TASK_QUEUE_MODE_DISTRIBUTING = 0; + + /** + * @var int Used to tell the adhoc task queue to try and fill unused capacity. + */ + const ADHOC_TASK_QUEUE_MODE_FILLING = 1; + + /** + * @var array A cached queue of adhoc tasks + */ + public static $miniqueue; + + /** + * @var int The last recorded number of unique adhoc tasks. + */ + public static $numtasks; + + /** + * @var string Used to determine if the adhoc task queue is distributing or filling capacity. + */ + public static $mode; + /** * Given a component name, will load the list of tasks in the db/tasks.php file for that component. * @@ -542,8 +567,13 @@ public static function get_failed_adhoc_tasks(int $delay = 0): array { * * @param array $records array of task records * @param array $records array of same task records shuffled + * @deprecated since Moodle 4.1 MDL-67648 - please do not use this method anymore. + * @todo MDL-74843 This method will be deleted in Moodle 4.5 + * @see \core\task\manager::get_next_adhoc_task */ public static function ensure_adhoc_task_qos(array $records): array { + debugging('The method \core\task\manager::ensure_adhoc_task_qos is deprecated. + Please use \core\task\manager::get_next_adhoc_task instead.', DEBUG_DEVELOPER); $count = count($records); if ($count == 0) { @@ -621,16 +651,114 @@ public static function ensure_adhoc_task_qos(array $records): array { public static function get_next_adhoc_task($timestart, $checklimits = true) { global $DB; - $where = '(nextruntime IS NULL OR nextruntime < :timestart1)'; - $params = array('timestart1' => $timestart); - $records = $DB->get_records_select('task_adhoc', $where, $params, 'nextruntime ASC, id ASC', '*', 0, 2000); - $records = self::ensure_adhoc_task_qos($records); + $concurrencylimit = get_config('core', 'task_adhoc_concurrency_limit'); + $cachedqueuesize = 1200; + + $uniquetasksinqueue = array_map( + ['\core\task\manager', 'adhoc_task_from_record'], + $DB->get_records_sql( + 'SELECT classname FROM {task_adhoc} WHERE nextruntime < :timestart GROUP BY classname', + ['timestart' => $timestart] + ) + ); + + if (!isset(self::$numtasks) || self::$numtasks !== count($uniquetasksinqueue)) { + self::$numtasks = count($uniquetasksinqueue); + self::$miniqueue = []; + } + + $concurrencylimits = []; + if ($checklimits) { + $concurrencylimits = array_map( + function ($task) { + return $task->get_concurrency_limit(); + }, + $uniquetasksinqueue + ); + } + + /* + * The maximum number of cron runners that an individual task is allowed to use. + * For example if the concurrency limit is 20 and there are 5 unique types of tasks + * in the queue, each task should not be allowed to consume more than 3 (i.e., ⌊20/6⌋). + * The + 1 is needed to prevent the queue from becoming full of only one type of class. + * i.e., if it wasn't there and there were 20 tasks of the same type in the queue, every + * runner would become consumed with the same (potentially long-running task) and no more + * tasks can run. This way, some resources are always available if some new types + * of tasks enter the queue. + * + * We use the short-ternary to force the value to 1 in the case when the number of tasks + * exceeds the runners (e.g., there are 8 tasks and 4 runners, ⌊4/(8+1)⌋ = 0). + */ + $slots = floor($concurrencylimit / (count($uniquetasksinqueue) + 1)) ?: 1; + if (empty(self::$miniqueue)) { + self::$mode = self::ADHOC_TASK_QUEUE_MODE_DISTRIBUTING; + self::$miniqueue = self::get_candidate_adhoc_tasks( + $timestart, + $cachedqueuesize, + $slots, + $concurrencylimits + ); + } + + // The query to cache tasks is expensive on big data sets, so we use this cheap + // query to get the ordering (which is the interesting part about the main query) + // We can use this information to filter the cache and also order it. + $runningtasks = $DB->get_records_sql( + 'SELECT classname, COALESCE(COUNT(*), 0) running, MIN(timestarted) earliest + FROM {task_adhoc} + WHERE timestarted IS NOT NULL + AND nextruntime < :timestart + GROUP BY classname + ORDER BY running ASC, earliest DESC', + ['timestart' => $timestart] + ); + + /* + * Each runner has a cache, so the same task can be in multiple runners' caches. + * We need to check that each task we have cached hasn't gone over its fair number + * of slots. This filtering is only applied during distributing mode as when we are + * filling capacity we intend for fast tasks to go over their slot limit. + */ + if (self::$mode === self::ADHOC_TASK_QUEUE_MODE_DISTRIBUTING) { + self::$miniqueue = array_filter( + self::$miniqueue, + function (\stdClass $task) use ($runningtasks, $slots) { + return !array_key_exists($task->classname, $runningtasks) || $runningtasks[$task->classname]->running < $slots; + } + ); + } + + /* + * If this happens that means each task has consumed its fair share of capacity, but there's still + * runners left over (and we are one of them). Fetch tasks without checking slot limits. + */ + if (empty(self::$miniqueue) && array_sum(array_column($runningtasks, 'running')) < $concurrencylimit) { + self::$mode = self::ADHOC_TASK_QUEUE_MODE_FILLING; + self::$miniqueue = self::get_candidate_adhoc_tasks( + $timestart, + $cachedqueuesize, + false, + $concurrencylimits + ); + } + + // Used below to order the cache. + $ordering = array_flip(array_keys($runningtasks)); + + // Order the queue so it's consistent with the ordering from the DB. + usort( + self::$miniqueue, + function ($a, $b) use ($ordering) { + return ($ordering[$a->classname] ?? -1) - ($ordering[$b->classname] ?? -1); + } + ); $cronlockfactory = \core\lock\lock_config::get_lock_factory('cron'); $skipclasses = array(); - foreach ($records as $record) { + foreach (self::$miniqueue as $taskid => $record) { if (in_array($record->classname, $skipclasses)) { // Skip the task if it can't be started due to per-task concurrency limit. @@ -643,6 +771,7 @@ public static function get_next_adhoc_task($timestart, $checklimits = true) { $record = $DB->get_record('task_adhoc', array('id' => $record->id)); if (!$record) { $lock->release(); + unset(self::$miniqueue[$taskid]); continue; } @@ -650,6 +779,7 @@ public static function get_next_adhoc_task($timestart, $checklimits = true) { // Safety check in case the task in the DB does not match a real class (maybe something was uninstalled). if (!$task) { $lock->release(); + unset(self::$miniqueue[$taskid]); continue; } @@ -661,6 +791,7 @@ public static function get_next_adhoc_task($timestart, $checklimits = true) { // Unable to obtain a concurrency lock. mtrace("Skipping $record->classname adhoc task class as the per-task limit of $tasklimit is reached."); $skipclasses[] = $record->classname; + unset(self::$miniqueue[$taskid]); $lock->release(); continue; } @@ -679,13 +810,76 @@ public static function get_next_adhoc_task($timestart, $checklimits = true) { } else { $task->set_cron_lock($cronlock); } + + unset(self::$miniqueue[$taskid]); return $task; + } else { + unset(self::$miniqueue[$taskid]); } } return null; } + /** + * Return a list of candidate adhoc tasks to run. + * + * @param int $timestart Only return tasks where nextruntime is less than this value + * @param int $limit Limit the list to this many results + * @param int|null $runmax Only return tasks that have less than this value currently running + * @param array $pertasklimits An array of classname => limit specifying how many instance of a task may be returned + * @return array Array of candidate tasks + */ + public static function get_candidate_adhoc_tasks( + int $timestart, + int $limit, + ?int $runmax, + array $pertasklimits = [] + ): array { + global $DB; + + $pertaskclauses = array_map( + function (string $class, int $limit, int $index): array { + $limitcheck = $limit > 0 ? " AND COALESCE(run.running, 0) < :running_$index" : ""; + $limitparam = $limit > 0 ? ["running_$index" => $limit] : []; + + return [ + "sql" => "(q.classname = :classname_$index" . $limitcheck . ")", + "params" => ["classname_$index" => $class] + $limitparam + ]; + }, + array_keys($pertasklimits), + $pertasklimits, + $pertasklimits ? range(1, count($pertasklimits)) : [] + ); + + $pertasksql = implode(" OR ", array_column($pertaskclauses, 'sql')); + $pertaskparams = $pertaskclauses ? array_merge(...array_column($pertaskclauses, 'params')) : []; + + $params = ['timestart' => $timestart] + + ($runmax ? ['runmax' => $runmax] : []) + + $pertaskparams; + + return $DB->get_records_sql( + "SELECT q.id, q.classname, q.timestarted, COALESCE(run.running, 0) running, run.earliest + FROM {task_adhoc} q + LEFT JOIN ( + SELECT classname, COUNT(*) running, MIN(timestarted) earliest + FROM {task_adhoc} run + WHERE timestarted IS NOT NULL + GROUP BY classname + ) run ON run.classname = q.classname + WHERE nextruntime < :timestart + AND q.timestarted IS NULL " . + (!empty($pertasksql) ? "AND (" . $pertasksql . ") " : "") . + ($runmax ? "AND (COALESCE(run.running, 0)) < :runmax " : "") . + "ORDER BY COALESCE(run.running, 0) ASC, run.earliest DESC, q.nextruntime ASC, q.id ASC", + $params, + 0, + $limit + ); + } + /** * This function will dispatch the next scheduled task in the queue. The task will be handed out * with an open lock - possibly on the entire cron process. Make sure you call either diff --git a/lib/db/install.xml b/lib/db/install.xml index 81d3caf5e3421..be6dd424047dc 100644 --- a/lib/db/install.xml +++ b/lib/db/install.xml @@ -1,5 +1,5 @@ - @@ -3448,6 +3448,7 @@ + diff --git a/lib/db/upgrade.php b/lib/db/upgrade.php index b3844f7c69e0a..af98fc0b5c46d 100644 --- a/lib/db/upgrade.php +++ b/lib/db/upgrade.php @@ -4506,5 +4506,20 @@ function xmldb_main_upgrade($oldversion) { upgrade_main_savepoint(true, 2022051000.00); } + if ($oldversion < 2022051900.01) { + + // Define index timestarted_idx (not unique) to be added to task_adhoc. + $table = new xmldb_table('task_adhoc'); + $index = new xmldb_index('timestarted_idx', XMLDB_INDEX_NOTUNIQUE, ['timestarted']); + + // Conditionally launch add index timestarted_idx. + if (!$dbman->index_exists($table, $index)) { + $dbman->add_index($table, $index); + } + + // Main savepoint reached. + upgrade_main_savepoint(true, 2022051900.01); + } + return true; } diff --git a/lib/tests/task_manager_test.php b/lib/tests/task_manager_test.php deleted file mode 100644 index b3007e9a2651e..0000000000000 --- a/lib/tests/task_manager_test.php +++ /dev/null @@ -1,197 +0,0 @@ -. - -/** - * This file contains the unit tests for the task manager. - * - * @package core - * @copyright 2019 Brendan Heywood - * @license http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later - */ - -defined('MOODLE_INTERNAL') || die(); - -/** - * This file contains the unit tests for the task manager. - * - * @copyright 2019 Brendan Heywood - * @license http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later - */ -class core_task_manager_testcase extends advanced_testcase { - - public function test_ensure_adhoc_task_qos_provider() { - return [ - [ - [], - [], - ], - // A queue with a lopside initial load that needs to be staggered. - [ - [ - (object)['id' => 1, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 2, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 3, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 4, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 5, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 6, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 7, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 8, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 9, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 10, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 11, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 12, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 13, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 14, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 15, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - ], - [ - (object)['id' => 1, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 7, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 2, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 8, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 3, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 9, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 4, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 10, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 5, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 6, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 11, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 12, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 13, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 14, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 15, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - ], - ], - // The same lopsided queue but now the first item is gone. - [ - [ - (object)['id' => 2, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 3, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 4, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 5, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 6, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 7, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 8, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 9, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - ], - [ - (object)['id' => 7, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 2, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 8, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 3, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 9, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 4, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 5, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 6, 'classname' => '\core\task\asynchronous_backup_task'], - ], - ], - // The same lopsided queue but now the first two items is gone. - [ - [ - (object)['id' => 3, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 4, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 5, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 6, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 7, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 8, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 9, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - ], - [ - (object)['id' => 3, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 7, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 4, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 8, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 5, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 9, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 6, 'classname' => '\core\task\asynchronous_backup_task'], - ], - ], - // The same lopsided queue but now the first three items are gone. - [ - [ - (object)['id' => 4, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 5, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 6, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 7, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 8, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 9, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - ], - [ - (object)['id' => 7, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 4, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 8, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 5, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 9, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 6, 'classname' => '\core\task\asynchronous_backup_task'], - ], - ], - [ - [ - (object)['id' => 5, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 6, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 7, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 8, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - (object)['id' => 9, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - ], - [ - (object)['id' => 5, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 7, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - - (object)['id' => 6, 'classname' => '\core\task\asynchronous_backup_task'], - (object)['id' => 8, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - - (object)['id' => 9, 'classname' => '\tool_dataprivacy\task\process_data_request_task'], - ], - ], - ]; - } - - /** - * Reduces a list of tasks into a simpler string - * - * @param array $input array of tasks - * @return string list of task ids - */ - function flatten($tasks) { - $list = ''; - foreach ($tasks as $id => $task) { - $list .= ' ' . $task->id; - } - return $list; - } - - /** - * Test that the Quality of Service reordering works. - * - * @dataProvider test_ensure_adhoc_task_qos_provider - * - * @param array $input array of tasks - * @param array $expected array of reordered tasks - * @return void - */ - public function test_ensure_adhoc_task_qos(array $input, array $expected) { - $this->resetAfterTest(); - $result = \core\task\manager::ensure_adhoc_task_qos($input); - - - $result = $this->flatten($result); - $expected = $this->flatten($expected); - - $this->assertEquals($expected, $result); - } - -} - diff --git a/lib/upgrade.txt b/lib/upgrade.txt index ffaaf8cc7b21d..da6c3db06e4f8 100644 --- a/lib/upgrade.txt +++ b/lib/upgrade.txt @@ -1,6 +1,11 @@ This files describes API changes in core libraries and APIs, information provided here is intended especially for developers. +=== 4.1 === + +* The method ensure_adhoc_task_qos() in lib/classes/task/manager.php has been deprecated, please use get_next_adhoc_task() + instead. + === 4.0 === * To better detect wrong floats (like, for example, unformatted, using local-dependent separators ones) a number of diff --git a/version.php b/version.php index 49a0dbd7b8999..53671c204f1a2 100644 --- a/version.php +++ b/version.php @@ -29,7 +29,7 @@ defined('MOODLE_INTERNAL') || die(); -$version = 2022051900.00; // YYYYMMDD = weekly release date of this DEV branch. +$version = 2022051900.01; // YYYYMMDD = weekly release date of this DEV branch. // RR = release increments - 00 in DEV branches. // .XX = incremental changes. $release = '4.1dev (Build: 20220519)'; // Human-friendly version name