Skip to content

Commit f2844e4

Browse files
committed
Created a Proxy to fix pipeline executions in a workflow : pipelines must be launched one after the other
1 parent d065493 commit f2844e4

File tree

2 files changed

+74
-3
lines changed

2 files changed

+74
-3
lines changed

src/Console.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
use Kiboko\Component\Action\Action;
88
use Kiboko\Component\Pipeline\Pipeline;
99
use Kiboko\Component\Runtime\Action\Console as ActionConsoleRuntime;
10-
use Kiboko\Component\Runtime\Pipeline\Console as PipelineConsoleRuntime;
10+
use Kiboko\Component\Runtime\Pipeline\PipelineRuntimeInterface;
1111
use Kiboko\Component\State;
1212
use Kiboko\Contract\Pipeline\PipelineRunnerInterface;
1313
use Kiboko\Contract\Satellite\RunnableInterface as JobRunnableInterface;
@@ -27,13 +27,13 @@ public function __construct(
2727
$this->state = new State\StateOutput\Workflow($output);
2828
}
2929

30-
public function loadPipeline(string $filename): PipelineConsoleRuntime
30+
public function loadPipeline(string $filename): PipelineRuntimeInterface
3131
{
3232
$factory = require $filename;
3333

3434
$pipeline = new Pipeline($this->pipelineRunner);
3535

36-
return $factory(new PipelineConsoleRuntime($this->output, $pipeline, $this->state->withPipeline(basename($filename))));
36+
return new PipelineProxy($factory, $this->output, $pipeline, $this->state, basename($filename));
3737
}
3838

3939
public function loadAction(string $filename): ActionConsoleRuntime

src/PipelineProxy.php

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Kiboko\Component\Runtime\Workflow;
6+
7+
use Kiboko\Component\Runtime\Pipeline\Console as PipelineConsoleRuntime;
8+
use Kiboko\Component\Runtime\Pipeline\PipelineRuntimeInterface;
9+
use Kiboko\Component\State;
10+
use Kiboko\Contract\Pipeline\ExtractorInterface;
11+
use Kiboko\Contract\Pipeline\LoaderInterface;
12+
use Kiboko\Contract\Pipeline\PipelineInterface;
13+
use Kiboko\Contract\Pipeline\RejectionInterface;
14+
use Kiboko\Contract\Pipeline\StateInterface;
15+
use Kiboko\Contract\Pipeline\TransformerInterface;
16+
use Kiboko\Contract\Pipeline\WalkableInterface;
17+
use Symfony\Component\Console\Output\ConsoleOutput;
18+
19+
class PipelineProxy implements PipelineRuntimeInterface
20+
{
21+
private $factory;
22+
23+
public function __construct(
24+
callable $factory,
25+
private readonly ConsoleOutput $output,
26+
private readonly PipelineInterface&WalkableInterface $pipeline,
27+
private readonly State\StateOutput\Workflow $state,
28+
private readonly string $filename,
29+
) {
30+
$this->factory = $factory;
31+
}
32+
33+
public function extract(
34+
ExtractorInterface $extractor,
35+
RejectionInterface $rejection,
36+
StateInterface $state,
37+
): self {
38+
$this->pipeline->extract($extractor, $rejection, $state);
39+
40+
return $this;
41+
}
42+
43+
public function transform(
44+
TransformerInterface $transformer,
45+
RejectionInterface $rejection,
46+
StateInterface $state,
47+
): self {
48+
$this->pipeline->transform($transformer, $rejection, $state);
49+
50+
return $this;
51+
}
52+
53+
public function load(
54+
LoaderInterface $loader,
55+
RejectionInterface $rejection,
56+
StateInterface $state,
57+
): self {
58+
$this->pipeline->load($loader, $rejection, $state);
59+
60+
return $this;
61+
}
62+
63+
public function run(int $interval = 1000): int
64+
{
65+
$runtime = new PipelineConsoleRuntime($this->output, $this->pipeline, $this->state->withPipeline($this->filename));
66+
67+
$pipeline = ($this->factory)($runtime);
68+
69+
return $pipeline->run($interval);
70+
}
71+
}

0 commit comments

Comments
 (0)