1818
1919class PipelineProxy implements PipelineRuntimeInterface
2020{
21- private $ factory ;
21+ /** @var list<callable> $callbacks */
22+ private $ callback ;
23+
24+ private array $ callbacks = [];
25+ private PipelineRuntimeInterface $ decorated ;
2226
2327 public function __construct (
24- callable $ factory ,
25- private readonly ConsoleOutput $ output ,
26- private readonly PipelineInterface &WalkableInterface $ pipeline ,
27- private readonly State \StateOutput \Workflow $ state ,
28- private readonly string $ filename ,
29- ) {
30- $ this ->factory = $ factory ;
28+ callable $ callback ,
29+ ConsoleOutput $ output ,
30+ PipelineInterface &WalkableInterface $ pipeline ,
31+ State \StateOutput \Workflow $ state ,
32+ string $ filename ,
33+ )
34+ {
35+ $ this ->decorated = new PipelineConsoleRuntime ($ output , $ pipeline , $ state ->withPipeline ($ filename ));
36+ $ this ->callback = $ callback ;
3137 }
3238
3339 public function extract (
3440 ExtractorInterface $ extractor ,
3541 RejectionInterface $ rejection ,
3642 StateInterface $ state ,
37- ): self {
38- $ this ->pipeline ->extract ($ extractor , $ rejection , $ state );
43+ ): self
44+ {
45+ $ this ->callbacks [] = function () use ($ extractor , $ rejection , $ state ): void {
46+ $ this ->decorated ->extract ($ extractor , $ rejection , $ state );
47+ };
3948
4049 return $ this ;
4150 }
@@ -44,8 +53,11 @@ public function transform(
4453 TransformerInterface $ transformer ,
4554 RejectionInterface $ rejection ,
4655 StateInterface $ state ,
47- ): self {
48- $ this ->pipeline ->transform ($ transformer , $ rejection , $ state );
56+ ): self
57+ {
58+ $ this ->callbacks [] = function () use ($ transformer , $ rejection , $ state ): void {
59+ $ this ->decorated ->transform ($ transformer , $ rejection , $ state );
60+ };
4961
5062 return $ this ;
5163 }
@@ -54,18 +66,23 @@ public function load(
5466 LoaderInterface $ loader ,
5567 RejectionInterface $ rejection ,
5668 StateInterface $ state ,
57- ): self {
58- $ this ->pipeline ->load ($ loader , $ rejection , $ state );
69+ ): self
70+ {
71+ $ this ->callbacks [] = function () use ($ loader , $ rejection , $ state ): void {
72+ $ this ->decorated ->load ($ loader , $ rejection , $ state );
73+ };
5974
6075 return $ this ;
6176 }
6277
6378 public function run (int $ interval = 1000 ): int
6479 {
65- $ runtime = new PipelineConsoleRuntime ($ this ->output , $ this ->pipeline , $ this ->state ->withPipeline ($ this ->filename ));
80+ foreach ($ this ->callbacks as $ callback ) {
81+ $ callback ($ this ->callback );
82+ }
6683
67- $ pipeline = ( $ this ->factory )( $ runtime ) ;
84+ $ this ->callbacks = [] ;
6885
69- return $ pipeline ->run ($ interval );
86+ return $ this -> decorated ->run ($ interval );
7087 }
7188}
0 commit comments