Skip to content

Commit

Permalink
Implement incremental timeouts in sharding queue processing logic
Browse files Browse the repository at this point in the history
  • Loading branch information
donhardman committed May 10, 2024
1 parent eacc332 commit 3513bfb
Showing 1 changed file with 12 additions and 4 deletions.
16 changes: 12 additions & 4 deletions src/Plugin/Sharding/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ public function process(Node $node): void {
}

/**
* Helper to check if we should skipp query in processing the queue
* @param array{id:int,query:string,wait_for_id:int,tries:int,status:string} $query
* Helper to check if we should skip query in processing the queue
* @param array{id:int,query:string,wait_for_id:int,tries:int,status:string,updated_at:int} $query
* @return bool
*/
protected function shouldSkipQuery(array $query): bool {
Expand All @@ -110,6 +110,14 @@ protected function shouldSkipQuery(array $query): bool {
return true;
}
}

// We try to keep it inside 30 second frame
$timeSinceLastAttempt = time() - $query['updated_at'];
$maxAttemptTime = ceil(pow(1.4, $query['tries']));
if ($timeSinceLastAttempt >= $maxAttemptTime) {
return true;
}

return !$this->attemptToUpdateStatus($query, 'processing', 0);
}

Expand Down Expand Up @@ -171,13 +179,13 @@ protected function attemptToUpdateStatus(array $query, string $status, int $dura
* We use this method for internal use only
* and automatic handle returns of failed queries
* @param Node $node
* @return Vector<array{id:int,query:string,wait_for_id:int,tries:int,status:string}>
* @return Vector<array{id:int,query:string,wait_for_id:int,tries:int,status:string,updated_at:int}>
* list of queries for request node
*/
protected function dequeue(Node $node): Vector {
$maxTries = static::MAX_TRIES;
$query = "
SELECT `id`, `query`, `wait_for_id`, `tries`
SELECT `id`, `query`, `wait_for_id`, `tries`, `updated_at`
FROM {$this->table}
WHERE
`node` = '{$node->id}'
Expand Down

0 comments on commit 3513bfb

Please sign in to comment.