From 7c2e879fd61ae7eb7bd3851a8af3046b00abaa73 Mon Sep 17 00:00:00 2001 From: Brent Roose Date: Tue, 4 May 2021 06:18:53 +0200 Subject: [PATCH 1/6] Add concurrent option --- CHANGELOG.md | 4 +++ README.md | 16 +++++++++ composer.json | 1 + src/Fork.php | 81 +++++++++++++++++++++++++++++++++++----------- tests/ForkTest.php | 25 ++++++++++++++ 5 files changed, 109 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 90e93dc..ad430c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ All notable changes to `fork` will be documented in this file. +## 1.1.0 - 2021-05-04 + +- Add `Fork::concurrent(int $concurrent)` + ## 1.0.1 - 2021-05-03 - Add check for pcntl support diff --git a/README.md b/README.md index fa8eb97..06e74fc 100644 --- a/README.md +++ b/README.md @@ -177,6 +177,22 @@ $result = Fork::new() ); ``` +### Configuring concurrency + +By default, all callables will be run in parallel. You can however configure a maximum amount of concurrent processes: + +```php +$results = Fork::new() + ->concurrent(2) + ->run( + fn () => 1, + fn () => 2, + fn () => 3, + ); +``` + +In this case, the first two functions will be run immediately and as soon as one of them finishes, the last one will start as well. + ## Testing ```bash diff --git a/composer.json b/composer.json index 00ac55f..a60da3f 100644 --- a/composer.json +++ b/composer.json @@ -25,6 +25,7 @@ "ext-pcntl": "*" }, "require-dev": { + "nesbot/carbon": "^2.47", "phpunit/phpunit": "^9.5", "spatie/ray": "^1.10" }, diff --git a/src/Fork.php b/src/Fork.php index c567a0d..1c9477b 100644 --- a/src/Fork.php +++ b/src/Fork.php @@ -8,11 +8,15 @@ class Fork { protected ?Closure $toExecuteBeforeInChildTask = null; + protected ?Closure $toExecuteBeforeInParentTask = null; protected ?Closure $toExecuteAfterInChildTask = null; + protected ?Closure $toExecuteAfterInParentTask = null; + protected ?int $concurrent = null; + public function __construct() { if (! function_exists('pcntl_fork')) { @@ -41,18 +45,19 @@ public function after(callable $child = null, callable $parent = null): self return $this; } + public function concurrent(int $concurrent): self + { + $this->concurrent = $concurrent; + + return $this; + } + public function run(callable ...$callables): array { $tasks = []; foreach ($callables as $order => $callable) { - if ($this->toExecuteBeforeInParentTask) { - ($this->toExecuteBeforeInParentTask)(); - } - - $task = Task::fromCallable($callable, $order); - - $tasks[] = $this->forkForTask($task); + $tasks[] = Task::fromCallable($callable, $order); } return $this->waitFor(...$tasks); @@ -80,26 +85,66 @@ protected function forkForTask(Task $task): Task ->setConnection($socketToChild); } - protected function waitFor(Task ...$runningTasks): array + protected function runTask(Task $task): Task + { + if ($this->toExecuteBeforeInParentTask) { + ($this->toExecuteBeforeInParentTask)(); + } + + return $this->forkForTask($task); + } + + protected function finishTask(Task $task): mixed + { + $output = $task->output(); + + if ($this->toExecuteAfterInParentTask) { + ($this->toExecuteAfterInParentTask)($output); + } + + return $output; + } + + protected function waitFor(Task ...$queue): array { $output = []; - while (count($runningTasks)) { - foreach ($runningTasks as $key => $task) { - if ($task->isFinished()) { - $taskOutput = $task->output(); + $running = []; + + $amountRunning = 0; + + foreach ($queue as $i => $task) { + $running[] = $this->runTask($task); + + unset($queue[$i]); + + $amountRunning += 1; + + if ($this->concurrent && $amountRunning >= $this->concurrent) { + break; + } + } + + while (count($running)) { + foreach ($running as $key => $task) { + if (! $task->isFinished()) { + continue; + } + + $output[$task->order()] = $this->finishTask($task); + + unset($running[$key]); - $output[$task->order()] = $taskOutput; + if (count($queue)) { + $i = array_key_first($queue); - unset($runningTasks[$key]); + $running[] = $this->runTask($queue[$i]); - if ($this->toExecuteAfterInParentTask) { - ($this->toExecuteAfterInParentTask)($taskOutput); - } + unset($queue[$i]); } } - if (! count($runningTasks)) { + if (! count($running)) { break; } diff --git a/tests/ForkTest.php b/tests/ForkTest.php index 39a6fcc..73fdde7 100644 --- a/tests/ForkTest.php +++ b/tests/ForkTest.php @@ -2,6 +2,7 @@ namespace Spatie\Fork\Tests; +use Carbon\Carbon; use DateTime; use PHPUnit\Framework\TestCase; use Spatie\Fork\Fork; @@ -29,6 +30,30 @@ public function it_will_execute_the_given_closures() $this->assertEquals([2, 4], $results); } + /** @test */ + public function it_will_execute_the_given_closures_with_concurrency_cap() + { + $results = Fork::new() + ->concurrent(2) + ->run( + function () { + sleep(1); + return Carbon::now()->second; + }, + function () { + sleep(1); + return Carbon::now()->second; + }, + function () { + sleep(1); + return Carbon::now()->second; + }, + ); + + $this->assertEquals($results[0], $results[1]); + $this->assertNotEquals($results[1], $results[2]); + } + /** @test */ public function it_can_execute_the_closures_concurrently() { From df46b67e06337a4cc7e11dd843cd35ba2a8b1d70 Mon Sep 17 00:00:00 2001 From: brendt Date: Tue, 4 May 2021 04:19:39 +0000 Subject: [PATCH 2/6] Fix styling --- tests/ForkTest.php | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/ForkTest.php b/tests/ForkTest.php index 73fdde7..9c8e44f 100644 --- a/tests/ForkTest.php +++ b/tests/ForkTest.php @@ -38,14 +38,17 @@ public function it_will_execute_the_given_closures_with_concurrency_cap() ->run( function () { sleep(1); + return Carbon::now()->second; }, function () { sleep(1); + return Carbon::now()->second; }, function () { sleep(1); + return Carbon::now()->second; }, ); From 91f4ceb8fa8baa33de582ff2cffddd73e6cd4322 Mon Sep 17 00:00:00 2001 From: Brent Roose Date: Tue, 4 May 2021 09:30:52 +0200 Subject: [PATCH 3/6] Code cleanup --- src/Fork.php | 126 ++++++++++++++++++++++++++------------------- tests/ForkTest.php | 43 +++++----------- 2 files changed, 87 insertions(+), 82 deletions(-) diff --git a/src/Fork.php b/src/Fork.php index 1c9477b..8e793b0 100644 --- a/src/Fork.php +++ b/src/Fork.php @@ -17,6 +17,12 @@ class Fork protected ?int $concurrent = null; + /** @var \Spatie\Fork\Task[] */ + protected array $queue = []; + + /** @var \Spatie\Fork\Task[] */ + protected array $runningTasks = []; + public function __construct() { if (! function_exists('pcntl_fork')) { @@ -63,26 +69,29 @@ public function run(callable ...$callables): array return $this->waitFor(...$tasks); } - protected function forkForTask(Task $task): Task + protected function waitFor(Task ...$queue): array { - [$socketToParent, $socketToChild] = Connection::createPair(); + $output = []; - $processId = pcntl_fork(); + $this->startRunning(...$queue); - if ($this->currentlyInChildTask($processId)) { - $socketToChild->close(); + while ($this->isRunning()) { + foreach ($this->runningTasks as $task) { + if (! $task->isFinished()) { + continue; + } - $this->executeInChildTask($task, $socketToParent); + $output[$task->order()] = $this->finishTask($task); - exit; - } + $this->shiftTaskFromQueue(); + } - $socketToParent->close(); + if ($this->isRunning()) { + usleep(1_000); + } + } - return $task - ->setStartTime(time()) - ->setPid($processId) - ->setConnection($socketToChild); + return $output; } protected function runTask(Task $task): Task @@ -102,56 +111,31 @@ protected function finishTask(Task $task): mixed ($this->toExecuteAfterInParentTask)($output); } + unset($this->runningTasks[$task->order()]); + return $output; } - protected function waitFor(Task ...$queue): array + protected function forkForTask(Task $task): Task { - $output = []; - - $running = []; - - $amountRunning = 0; + [$socketToParent, $socketToChild] = Connection::createPair(); - foreach ($queue as $i => $task) { - $running[] = $this->runTask($task); + $processId = pcntl_fork(); - unset($queue[$i]); + if ($this->currentlyInChildTask($processId)) { + $socketToChild->close(); - $amountRunning += 1; + $this->executeInChildTask($task, $socketToParent); - if ($this->concurrent && $amountRunning >= $this->concurrent) { - break; - } + exit; } - while (count($running)) { - foreach ($running as $key => $task) { - if (! $task->isFinished()) { - continue; - } - - $output[$task->order()] = $this->finishTask($task); - - unset($running[$key]); - - if (count($queue)) { - $i = array_key_first($queue); - - $running[] = $this->runTask($queue[$i]); - - unset($queue[$i]); - } - } - - if (! count($running)) { - break; - } - - usleep(1_000); - } + $socketToParent->close(); - return $output; + return $task + ->setStartTime(time()) + ->setPid($processId) + ->setConnection($socketToChild); } protected function currentlyInChildTask(int $pid): bool @@ -177,4 +161,42 @@ protected function executeInChildTask( $connectionToParent->close(); } + + protected function shiftTaskFromQueue(): void + { + if (! count($this->queue)) { + return; + } + + $firstTask = array_shift($this->queue); + + $this->runningTasks[] = $this->runTask($firstTask); + } + + + protected function startRunning( + Task ...$queue + ): void { + $this->queue = $queue; + + foreach ($this->queue as $task) { + $this->runningTasks[$task->order()] = $this->runTask($task); + + unset($this->queue[$task->order()]); + + if ($this->concurrencyLimitReached()) { + break; + } + } + } + + protected function isRunning(): bool + { + return count($this->runningTasks) > 0; + } + + protected function concurrencyLimitReached(): bool + { + return $this->concurrent && count($this->runningTasks) >= $this->concurrent; + } } diff --git a/tests/ForkTest.php b/tests/ForkTest.php index 9c8e44f..05530a7 100644 --- a/tests/ForkTest.php +++ b/tests/ForkTest.php @@ -37,19 +37,19 @@ public function it_will_execute_the_given_closures_with_concurrency_cap() ->concurrent(2) ->run( function () { - sleep(1); + usleep(100); - return Carbon::now()->second; + return Carbon::now()->millisecond; }, function () { - sleep(1); + usleep(100); - return Carbon::now()->second; + return Carbon::now()->millisecond; }, function () { - sleep(1); + usleep(100); - return Carbon::now()->second; + return Carbon::now()->millisecond; }, ); @@ -60,33 +60,16 @@ function () { /** @test */ public function it_can_execute_the_closures_concurrently() { - $results = Fork::new() + Fork::new() ->run( - function () { - sleep(1); - - return 1 + 1; - }, - function () { - sleep(2); - - return 2 + 2; - }, - function () { - sleep(1); - - return 3 + 3; - }, - function () { - sleep(1); - - return 4 + 4; - }, + ...array_fill( + start_index: 0, + count: 20, + value: fn () => usleep(100_000), + ) // 1/10th of a second each ); - $this->assertEquals([2, 4, 6, 8], $results); - - $this->assertTookLessThanSeconds(3); + $this->assertTookLessThanSeconds(1); } /** @test */ From 0f7e3e88d44d23344de30831a1860d032a5cfb03 Mon Sep 17 00:00:00 2001 From: Brent Roose Date: Tue, 4 May 2021 09:32:03 +0200 Subject: [PATCH 4/6] Fix cs fixer --- .php_cs.dist => .php_cs.dist.php | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename .php_cs.dist => .php_cs.dist.php (100%) diff --git a/.php_cs.dist b/.php_cs.dist.php similarity index 100% rename from .php_cs.dist rename to .php_cs.dist.php From 675f5ff7a084750d3f4422b0fc504bd3e41849cc Mon Sep 17 00:00:00 2001 From: Brent Roose Date: Tue, 4 May 2021 09:33:05 +0200 Subject: [PATCH 5/6] Revert "Fix cs fixer" This reverts commit 0f7e3e88d44d23344de30831a1860d032a5cfb03. --- .php_cs.dist.php => .php_cs.dist | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename .php_cs.dist.php => .php_cs.dist (100%) diff --git a/.php_cs.dist.php b/.php_cs.dist similarity index 100% rename from .php_cs.dist.php rename to .php_cs.dist From 82acf2b516f389fc48afdaa2c786c72f0a873150 Mon Sep 17 00:00:00 2001 From: Brent Roose Date: Tue, 4 May 2021 09:43:09 +0200 Subject: [PATCH 6/6] Fix test --- tests/ForkTest.php | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/ForkTest.php b/tests/ForkTest.php index 05530a7..33d268e 100644 --- a/tests/ForkTest.php +++ b/tests/ForkTest.php @@ -37,19 +37,19 @@ public function it_will_execute_the_given_closures_with_concurrency_cap() ->concurrent(2) ->run( function () { - usleep(100); + sleep(1); - return Carbon::now()->millisecond; + return Carbon::now()->second; }, function () { - usleep(100); + sleep(1); - return Carbon::now()->millisecond; + return Carbon::now()->second; }, function () { - usleep(100); + sleep(1); - return Carbon::now()->millisecond; + return Carbon::now()->second; }, );