Skip to content

Commit

Permalink
Merge pull request #10 from spatie/concurrent
Browse files Browse the repository at this point in the history
Add concurrent option
  • Loading branch information
brendt committed May 4, 2021
2 parents de752b1 + 82acf2b commit eec2c6b
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 45 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Expand Up @@ -2,6 +2,10 @@

All notable changes to `fork` will be documented in this file.

## 1.1.0 - 2021-05-04

- Add `Fork::concurrent(int $concurrent)`

## 1.0.1 - 2021-05-03

- Add check for pcntl support
Expand Down
16 changes: 16 additions & 0 deletions README.md
Expand Up @@ -177,6 +177,22 @@ $result = Fork::new()
);
```

### Configuring concurrency

By default, all callables will be run in parallel. You can however configure a maximum amount of concurrent processes:

```php
$results = Fork::new()
->concurrent(2)
->run(
fn () => 1,
fn () => 2,
fn () => 3,
);
```

In this case, the first two functions will be run immediately and as soon as one of them finishes, the last one will start as well.

## Testing

```bash
Expand Down
1 change: 1 addition & 0 deletions composer.json
Expand Up @@ -25,6 +25,7 @@
"ext-pcntl": "*"
},
"require-dev": {
"nesbot/carbon": "^2.47",
"phpunit/phpunit": "^9.5",
"spatie/ray": "^1.10"
},
Expand Down
135 changes: 101 additions & 34 deletions src/Fork.php
Expand Up @@ -8,11 +8,21 @@
class Fork
{
protected ?Closure $toExecuteBeforeInChildTask = null;

protected ?Closure $toExecuteBeforeInParentTask = null;

protected ?Closure $toExecuteAfterInChildTask = null;

protected ?Closure $toExecuteAfterInParentTask = null;

protected ?int $concurrent = null;

/** @var \Spatie\Fork\Task[] */
protected array $queue = [];

/** @var \Spatie\Fork\Task[] */
protected array $runningTasks = [];

public function __construct()
{
if (! function_exists('pcntl_fork')) {
Expand Down Expand Up @@ -41,21 +51,69 @@ public function after(callable $child = null, callable $parent = null): self
return $this;
}

public function concurrent(int $concurrent): self
{
$this->concurrent = $concurrent;

return $this;
}

public function run(callable ...$callables): array
{
$tasks = [];

foreach ($callables as $order => $callable) {
if ($this->toExecuteBeforeInParentTask) {
($this->toExecuteBeforeInParentTask)();
$tasks[] = Task::fromCallable($callable, $order);
}

return $this->waitFor(...$tasks);
}

protected function waitFor(Task ...$queue): array
{
$output = [];

$this->startRunning(...$queue);

while ($this->isRunning()) {
foreach ($this->runningTasks as $task) {
if (! $task->isFinished()) {
continue;
}

$output[$task->order()] = $this->finishTask($task);

$this->shiftTaskFromQueue();
}

if ($this->isRunning()) {
usleep(1_000);
}
}

$task = Task::fromCallable($callable, $order);
return $output;
}

$tasks[] = $this->forkForTask($task);
protected function runTask(Task $task): Task
{
if ($this->toExecuteBeforeInParentTask) {
($this->toExecuteBeforeInParentTask)();
}

return $this->waitFor(...$tasks);
return $this->forkForTask($task);
}

protected function finishTask(Task $task): mixed
{
$output = $task->output();

if ($this->toExecuteAfterInParentTask) {
($this->toExecuteAfterInParentTask)($output);
}

unset($this->runningTasks[$task->order()]);

return $output;
}

protected function forkForTask(Task $task): Task
Expand All @@ -80,35 +138,6 @@ protected function forkForTask(Task $task): Task
->setConnection($socketToChild);
}

protected function waitFor(Task ...$runningTasks): array
{
$output = [];

while (count($runningTasks)) {
foreach ($runningTasks as $key => $task) {
if ($task->isFinished()) {
$taskOutput = $task->output();

$output[$task->order()] = $taskOutput;

unset($runningTasks[$key]);

if ($this->toExecuteAfterInParentTask) {
($this->toExecuteAfterInParentTask)($taskOutput);
}
}
}

if (! count($runningTasks)) {
break;
}

usleep(1_000);
}

return $output;
}

protected function currentlyInChildTask(int $pid): bool
{
return $pid === 0;
Expand All @@ -132,4 +161,42 @@ protected function executeInChildTask(

$connectionToParent->close();
}

protected function shiftTaskFromQueue(): void
{
if (! count($this->queue)) {
return;
}

$firstTask = array_shift($this->queue);

$this->runningTasks[] = $this->runTask($firstTask);
}


protected function startRunning(
Task ...$queue
): void {
$this->queue = $queue;

foreach ($this->queue as $task) {
$this->runningTasks[$task->order()] = $this->runTask($task);

unset($this->queue[$task->order()]);

if ($this->concurrencyLimitReached()) {
break;
}
}
}

protected function isRunning(): bool
{
return count($this->runningTasks) > 0;
}

protected function concurrencyLimitReached(): bool
{
return $this->concurrent && count($this->runningTasks) >= $this->concurrent;
}
}
33 changes: 22 additions & 11 deletions tests/ForkTest.php
Expand Up @@ -2,6 +2,7 @@

namespace Spatie\Fork\Tests;

use Carbon\Carbon;
use DateTime;
use PHPUnit\Framework\TestCase;
use Spatie\Fork\Fork;
Expand Down Expand Up @@ -30,35 +31,45 @@ public function it_will_execute_the_given_closures()
}

/** @test */
public function it_can_execute_the_closures_concurrently()
public function it_will_execute_the_given_closures_with_concurrency_cap()
{
$results = Fork::new()
->concurrent(2)
->run(
function () {
sleep(1);

return 1 + 1;
},
function () {
sleep(2);

return 2 + 2;
return Carbon::now()->second;
},
function () {
sleep(1);

return 3 + 3;
return Carbon::now()->second;
},
function () {
sleep(1);

return 4 + 4;
return Carbon::now()->second;
},
);

$this->assertEquals([2, 4, 6, 8], $results);
$this->assertEquals($results[0], $results[1]);
$this->assertNotEquals($results[1], $results[2]);
}

/** @test */
public function it_can_execute_the_closures_concurrently()
{
Fork::new()
->run(
...array_fill(
start_index: 0,
count: 20,
value: fn () => usleep(100_000),
) // 1/10th of a second each
);

$this->assertTookLessThanSeconds(3);
$this->assertTookLessThanSeconds(1);
}

/** @test */
Expand Down

0 comments on commit eec2c6b

Please sign in to comment.