Permalink
Browse files

Extracted processor, created and interruptible processor.

  • Loading branch information...
frankdejonge committed Jun 3, 2016
1 parent ef6d420 commit 141e8dde6faaa95e86ad0dc74bbbbd72040ec3fe
@@ -0,0 +1,22 @@
<?php
namespace spec\League\Pipeline;
use League\Pipeline\FingersCrossedProcessor;
use PhpSpec\ObjectBehavior;
use Prophecy\Argument;
class FingersCrossedProcessorSpec extends ObjectBehavior
{
function it_is_initializable()
{
$this->shouldHaveType(FingersCrossedProcessor::class);
}
function it_should_process_stages()
{
$this->process([
function ($payload) { return $payload * 2; }
], 2)->shouldBe(4);
}
}
@@ -0,0 +1,35 @@
<?php
namespace spec\League\Pipeline;
use League\Pipeline\InterruptibleProcessor;
use PhpSpec\ObjectBehavior;
use Prophecy\Argument;
class InterruptibleProcessorSpec extends ObjectBehavior
{
function it_is_initializable()
{
$this->shouldHaveType(InterruptibleProcessor::class);
}
function let()
{
$this->beConstructedWith(function ($payload) {
return $payload < 10;
});
}
function it_should_interrupt()
{
$this->process([
function ($payload) { return $payload + 2; },
function ($payload) { return $payload * 10; },
function ($payload) { return $payload * 10; },
], 5)->shouldBe(70);
$this->process([
function ($payload) { return $payload + 2; },
], 5)->shouldBe(7);
}
}
@@ -0,0 +1,21 @@
<?php
namespace League\Pipeline;
class FingersCrossedProcessor implements ProcessorInterface
{
/**
* @param array $stages
* @param mixed $payload
*
* @return mixed
*/
public function process(array $stages, $payload)
{
foreach ($stages as $stage) {
$payload = call_user_func($stage, $payload);
}
return $payload;
}
}
@@ -0,0 +1,42 @@
<?php
namespace League\Pipeline;
class InterruptibleProcessor implements ProcessorInterface
{
/**
* @var callable
*/
private $check;
/**
* InterruptibleProcessor constructor.
*
* @param callable $check
*/
public function __construct(callable $check)
{
$this->check = $check;
}
/**
* @param array $stages
* @param mixed $payload
*
* @return mixed
*/
public function process(array $stages, $payload)
{
$check = $this->check;
foreach ($stages as $stage) {
$payload = $stage($payload);
if ($check($payload) !== true) {
return $payload;
}
}
return $payload;
}
}
View
@@ -11,14 +11,20 @@ class Pipeline implements PipelineInterface
*/
private $stages = [];
/**
* @var ProcessorInterface
*/
private $processor;
/**
* Constructor.
*
* @param callable[] $stages
* @param callable[] $stages
* @param ProcessorInterface $processor
*
* @throws InvalidArgumentException
*/
public function __construct(array $stages = [])
public function __construct(array $stages = [], ProcessorInterface $processor = null)
{
foreach ($stages as $stage) {
if (false === is_callable($stage)) {
@@ -27,6 +33,7 @@ public function __construct(array $stages = [])
}
$this->stages = $stages;
$this->processor = $processor ?: new FingersCrossedProcessor;
}
/**
@@ -49,11 +56,7 @@ public function pipe(callable $stage)
*/
public function process($payload)
{
foreach ($this->stages as $stage) {
$payload = call_user_func($stage, $payload);
}
return $payload;
return $this->processor->process($this->stages, $payload);
}
/**
View
@@ -0,0 +1,14 @@
<?php
namespace League\Pipeline;
interface ProcessorInterface
{
/**
* @param array $stages
* @param mixed $payload
*
* @return mixed
*/
public function process(array $stages, $payload);
}

0 comments on commit 141e8dd

Please sign in to comment.