diff --git a/README.md b/README.md index d88f133..c490bdd 100644 --- a/README.md +++ b/README.md @@ -115,6 +115,53 @@ $pipeline = (new Pipeline) $pipeline->process(new DeleteBlogPost($postId)); ``` +## Fork-Join + +Sometimes pipeline needs to fork into one of the disparate paths which aren't related +to each other. + +For example, for payload `x` take separate paths depending upon whether `2*x+1` is 0, +positive, or negative and divide final result by 5. + +```php +$pipeline = (new Pipeline) + ->pipe(new TimeTwoStage) + ->pipe(new AddOneStage) + ->fork(function($payload) { + if($payload == 0) return "zero"; + if($payload < 0) return "-"; + if($payload > 0) return "+"; + return false; // for short-circuit + }) + ->disjoin("zero") + ->pipe(new ReturnZeroStage) + ->disjoin("-") + ->pipe(new ExponentOfTwoStage) + ->pipe(new AddOneStage) + ->disjoin("+") + ->pipe(new SquareRootStage) + ->pipe(new TimeThirteenStage) + ->join() + ->pipe(new TimesEightStage); +``` + +In order to fork, you need to call `fork` on a pipeline and provide `resolver` of type +`callable` as argument. The `resolver` should return a tag to identify which disjoint +path to take in the fork. After that, you will need to create disjoins. Each disjoin is +a disparate path which can be followed in the fork. Each disjoin needs to be named with +a tag. Finally calling `join` on a fork joins it and the parent pipeline processing can +resume. + +You can optionally pass a stage or a pipeline to `disjoin()`. Like so: + +```php +->disjoin("zero", $zeroProcessingPipeline) + ->disjoin("-", $negativeNumberProcessingPipeline) + ->disjoin("+", $positiveNumberProcessingPipeline) + ->join() +``` + + ## Pipeline Builders Because pipelines themselves are immutable, pipeline builders are introduced to diff --git a/spec/ForkJoinSpec.php b/spec/ForkJoinSpec.php new file mode 100644 index 0000000..35f8d0e --- /dev/null +++ b/spec/ForkJoinSpec.php @@ -0,0 +1,47 @@ +pipe(function($payload) {return $payload * 2;}) + ->fork(function($payload) { + if($payload == 0) return '0'; + if($payload > 0) return '+'; + if($payload < 0) return '-'; + return false; + }) + ->disjoin('0') + ->pipe(function() {return INF;}) + ->disjoin('+') + ->pipe(function($payload) {return sqrt($payload);}) + ->pipe(function($payload) {return $payload / 2;}) + ->disjoin('-', function() {return NAN;}) + ->join() + ->pipe(function ($payload) {return "&" . $payload;}); + + if(($result = $pipeline->process(0)) != '&INF') + { + throw new Exception('Should be INF but was ' . $result); + } + + if(($result = $pipeline->process(32)) != '&4') + { + throw new Exception('Should be 4 but was ' . $result); + } + + if($pipeline->process(-1) != '&NAN') + { + throw new Exception('Should be NAN'); + } + } + +} diff --git a/src/DisjointAwarePipeline.php b/src/DisjointAwarePipeline.php new file mode 100644 index 0000000..043e05b --- /dev/null +++ b/src/DisjointAwarePipeline.php @@ -0,0 +1,59 @@ +fork = $fork; + } + + /** + * @inheritdoc + */ + public function disjoin(string $tag, callable $stage = null) + { + return $this->fork->disjoin($tag, $stage); + } + + /** + * @inheritdoc + */ + public function join(callable $resolver = null) + { + return $this->fork->join($resolver); + } + + /** + * @inheritdoc + */ + public function pipe(callable $stage) + { + $this->stages[] = $stage; + return $this; + } +} diff --git a/src/DisjointAwarePipelineInterface.php b/src/DisjointAwarePipelineInterface.php new file mode 100644 index 0000000..8721d83 --- /dev/null +++ b/src/DisjointAwarePipelineInterface.php @@ -0,0 +1,15 @@ +resolver = $resolver; + } + + public function pipeline(Pipeline $pipeline) + { + $this->parent = $pipeline; + } + + /** + * @inheritdoc + */ + public function join(callable $resolver = null) + { + if($resolver != null) + { + $this->resolver = $resolver; + } + + return $this->parent; + } + + /** + * @inheritdoc + */ + public function disjoin(string $tag, callable $stage = null) + { + $pipeline = new DisjointAwarePipeline($this); + + if($stage != null) + { + $pipeline = $pipeline->pipe($stage); + } + + $this->forks[$tag] = $pipeline; + return $pipeline; + } + + /** + * Chooses a fork or short-circuits based on $resolver + * + * @param mixed $payload + * @return mixed + */ + public function __invoke($payload) + { + $flowTo = call_user_func($this->resolver, $payload); + if($flowTo === false) return $payload; + $result = $this->forks[$flowTo]->process($payload); + + return $result; + } +} diff --git a/src/ForkInterface.php b/src/ForkInterface.php new file mode 100644 index 0000000..14b4a26 --- /dev/null +++ b/src/ForkInterface.php @@ -0,0 +1,26 @@ +processor->process($this->stages, $payload); } + /** + * Fork the pipeline + * + * @param callable|null $resolver + * + * @return Fork + */ + public function fork(callable $resolver = null) + { + $fork = new Fork($resolver); + $pipeline = $this->pipe($fork); + $fork->pipeline($pipeline); + return $fork; + } + /** * @inheritdoc */ diff --git a/src/PipelineInterface.php b/src/PipelineInterface.php index 50c25b6..2081831 100644 --- a/src/PipelineInterface.php +++ b/src/PipelineInterface.php @@ -12,4 +12,13 @@ interface PipelineInterface extends StageInterface * @return static */ public function pipe(callable $operation); + + /** + * Forks the pipeline with the given fork + * + * @param callable $resolver + * + * @return ForkInterface + */ + public function fork(callable $resolver); }