Skip to content

Commit

Permalink
Merge d80cb46 into 57835ee
Browse files Browse the repository at this point in the history
  • Loading branch information
juzerali committed Jan 9, 2018
2 parents 57835ee + d80cb46 commit 6287fa7
Show file tree
Hide file tree
Showing 10 changed files with 324 additions and 1 deletion.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -2,3 +2,4 @@
/composer.lock
/coverage/
/coverage.xml
.idea
47 changes: 47 additions & 0 deletions README.md
Expand Up @@ -115,6 +115,53 @@ $pipeline = (new Pipeline)
$pipeline->process(new DeleteBlogPost($postId));
```

## Fork-Join

Sometimes pipeline needs to fork into one of the disparate paths which aren't related
to each other.

For example, for payload `x` take separate paths depending upon whether `2*x+1` is 0,
positive, or negative and divide final result by 5.

```php
$pipeline = (new Pipeline)
->pipe(new TimeTwoStage)
->pipe(new AddOneStage)
->fork(function($payload) {
if($payload == 0) return "zero";
if($payload < 0) return "-";
if($payload > 0) return "+";
return false; // for short-circuit
})
->disjoin("zero")
->pipe(new ReturnZeroStage)
->disjoin("-")
->pipe(new ExponentOfTwoStage)
->pipe(new AddOneStage)
->disjoin("+")
->pipe(new SquareRootStage)
->pipe(new TimeThirteenStage)
->join()
->pipe(new TimesEightStage);
```

In order to fork, you need to call `fork` on a pipeline and provide `resolver` of type
`callable` as argument. The `resolver` should return a tag to identify which disjoint
path to take in the fork. After that, you will need to create disjoins. Each disjoin is
a disparate path which can be followed in the fork. Each disjoin needs to be named with
a tag. Finally calling `join` on a fork joins it and the parent pipeline processing can
resume.

You can optionally pass a stage or a pipeline to `disjoin()`. Like so:

```php
->disjoin("zero", $zeroProcessingPipeline)
->disjoin("-", $negativeNumberProcessingPipeline)
->disjoin("+", $positiveNumberProcessingPipeline)
->join()
```


## Pipeline Builders

Because pipelines themselves are immutable, pipeline builders are introduced to
Expand Down
47 changes: 47 additions & 0 deletions spec/ForkJoinSpec.php
@@ -0,0 +1,47 @@
<?php

namespace spec\League\Pipeline;

use League\Pipeline\Pipeline;
use PhpSpec\Exception\Exception;
use PhpSpec\ObjectBehavior;

class ForkJoinSpec extends ObjectBehavior
{

public function it_should_pass()
{
$pipeline = (new Pipeline())
->pipe(function($payload) {return $payload * 2;})
->fork(function($payload) {
if($payload == 0) return '0';
if($payload > 0) return '+';
if($payload < 0) return '-';
return false;
})
->disjoin('0')
->pipe(function() {return INF;})
->disjoin('+')
->pipe(function($payload) {return sqrt($payload);})
->pipe(function($payload) {return $payload / 2;})
->disjoin('-', function() {return NAN;})
->join()
->pipe(function ($payload) {return "&" . $payload;});

if(($result = $pipeline->process(0)) != '&INF')
{
throw new Exception('Should be INF but was ' . $result);
}

if(($result = $pipeline->process(32)) != '&4')
{
throw new Exception('Should be 4 but was ' . $result);
}

if($pipeline->process(-1) != '&NAN')
{
throw new Exception('Should be NAN');
}
}

}
59 changes: 59 additions & 0 deletions src/DisjointAwarePipeline.php
@@ -0,0 +1,59 @@
<?php
/**
* Created by PhpStorm.
* User: juzerali
* Date: 09/01/18
* Time: 1:58 PM
*/

namespace League\Pipeline;

/**
* Internal class used as a wrapper around pipelines within a fork.
* There should be no need to use this class directly.
*
* Class DisjointAwarePipeline
* @package League\Pipeline
*/
class DisjointAwarePipeline extends Pipeline implements DisjointAwarePipelineInterface
{
/**
* @var Fork
*/
public $fork;

/**
* DisjointAwarePipeline constructor.
* @param Fork $fork
*/
public function __construct(Fork $fork)
{
parent::__construct();
$this->fork = $fork;
}

/**
* @inheritdoc
*/
public function disjoin(string $tag, callable $stage = null)
{
return $this->fork->disjoin($tag, $stage);
}

/**
* @inheritdoc
*/
public function join(callable $resolver = null)
{
return $this->fork->join($resolver);
}

/**
* @inheritdoc
*/
public function pipe(callable $stage)
{
$this->stages[] = $stage;
return $this;
}
}
15 changes: 15 additions & 0 deletions src/DisjointAwarePipelineInterface.php
@@ -0,0 +1,15 @@
<?php
/**
* Created by PhpStorm.
* User: juzerali
* Date: 09/01/18
* Time: 4:04 PM
*/

namespace League\Pipeline;


interface DisjointAwarePipelineInterface extends PipelineInterface, ForkInterface
{

}
81 changes: 81 additions & 0 deletions src/Fork.php
@@ -0,0 +1,81 @@
<?php

namespace League\Pipeline;


class Fork implements ForkInterface
{
/**
* @var Pipeline
*/
private $parent;

/**
* @var array callable
*/
protected $forks = [];

/**
* @var callable
*/
protected $resolver;

/**
* Fork constructor.
*
* @param callable|null $resolver
*/
public function __construct(callable $resolver = null)
{
$this->resolver = $resolver;
}

public function pipeline(Pipeline $pipeline)
{
$this->parent = $pipeline;
}

/**
* @inheritdoc
*/
public function join(callable $resolver = null)
{
if($resolver != null)
{
$this->resolver = $resolver;
}

return $this->parent;
}

/**
* @inheritdoc
*/
public function disjoin(string $tag, callable $stage = null)
{
$pipeline = new DisjointAwarePipeline($this);

if($stage != null)
{
$pipeline = $pipeline->pipe($stage);
}

$this->forks[$tag] = $pipeline;
return $pipeline;
}

/**
* Chooses a fork or short-circuits based on $resolver
*
* @param mixed $payload
* @return mixed
*/
public function __invoke($payload)
{
$flowTo = call_user_func($this->resolver, $payload);
if($flowTo === false) return $payload;
$result = $this->forks[$flowTo]->process($payload);

return $result;
}
}
26 changes: 26 additions & 0 deletions src/ForkInterface.php
@@ -0,0 +1,26 @@
<?php

namespace League\Pipeline;


interface ForkInterface extends StageInterface
{
/**
* Builder method that joins all the branches in a fork
*
* @param callable|null $resolver
*
* @return PipelineInterface
*/
public function join(callable $resolver = null);

/**
* Adds a branch to the fork.
*
* @param string $tag
* @param callable|null $stage
*
* @return DisjointAwarePipelineInterface
*/
public function disjoin(string $tag, callable $stage = null);
}
23 changes: 23 additions & 0 deletions src/ForkResolver.php
@@ -0,0 +1,23 @@
<?php
/**
* Created by PhpStorm.
* User: juzerali
* Date: 09/01/18
* Time: 2:35 PM
*/

namespace League\Pipeline;


interface ForkResolver
{
/**
* Resolve the direction in the fork. Should return the tag of the disjoin.
* Return false to short-circuit the fork and directly jump to join.
*
* @param mixed $payload
*
* @return string|integer|boolean
*/
public function __invoke($payload);
}
17 changes: 16 additions & 1 deletion src/Pipeline.php
Expand Up @@ -9,7 +9,7 @@ class Pipeline implements PipelineInterface
/**
* @var callable[]
*/
private $stages = [];
protected $stages = [];

/**
* @var ProcessorInterface
Expand Down Expand Up @@ -59,6 +59,21 @@ public function process($payload)
return $this->processor->process($this->stages, $payload);
}

/**
* Fork the pipeline
*
* @param callable|null $resolver
*
* @return Fork
*/
public function fork(callable $resolver = null)
{
$fork = new Fork($resolver);
$pipeline = $this->pipe($fork);
$fork->pipeline($pipeline);
return $fork;
}

/**
* @inheritdoc
*/
Expand Down
9 changes: 9 additions & 0 deletions src/PipelineInterface.php
Expand Up @@ -12,4 +12,13 @@ interface PipelineInterface extends StageInterface
* @return static
*/
public function pipe(callable $operation);

/**
* Forks the pipeline with the given fork
*
* @param callable $resolver
*
* @return ForkInterface
*/
public function fork(callable $resolver);
}

0 comments on commit 6287fa7

Please sign in to comment.