diff --git a/.github/workflows/psalm.yml b/.github/workflows/psalm.yml index e6940e4..4c1f9dd 100644 --- a/.github/workflows/psalm.yml +++ b/.github/workflows/psalm.yml @@ -8,8 +8,8 @@ on: name: static analysis jobs: - psalm: - uses: spiral/gh-actions/.github/workflows/psalm.yml@master - with: - os: >- - ['ubuntu-latest'] + psalm: + uses: spiral/gh-actions/.github/workflows/psalm.yml@master + with: + os: >- + ['ubuntu-latest'] diff --git a/.gitignore b/.gitignore index 1a6ed30..29387cb 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ docs vendor node_modules .php-cs-fixer.cache +tests/runtime diff --git a/composer.json b/composer.json index 1f49803..1104a89 100644 --- a/composer.json +++ b/composer.json @@ -42,7 +42,7 @@ "spiral/tokenizer": "^3.0", "spiral/scaffolder": "^3.0", "spiral/roadrunner-bridge": "^2.0 || ^3.0", - "temporal/sdk": "^1.3 || ^2.0" + "temporal/sdk": "^2.7" }, "require-dev": { "spiral/framework": "^3.0", diff --git a/src/Bootloader/PrototypeBootloader.php b/src/Bootloader/PrototypeBootloader.php index 29bbb0e..3e65690 100644 --- a/src/Bootloader/PrototypeBootloader.php +++ b/src/Bootloader/PrototypeBootloader.php @@ -8,7 +8,7 @@ use Spiral\Prototype\Bootloader\PrototypeBootloader as BasePrototypeBootloader; use Temporal\Client\WorkflowClientInterface; -class PrototypeBootloader extends Bootloader +final class PrototypeBootloader extends Bootloader { public function defineDependencies(): array { diff --git a/src/Bootloader/TemporalBridgeBootloader.php b/src/Bootloader/TemporalBridgeBootloader.php index 1555eab..126ed46 100644 --- a/src/Bootloader/TemporalBridgeBootloader.php +++ b/src/Bootloader/TemporalBridgeBootloader.php @@ -8,10 +8,10 @@ use Spiral\Boot\AbstractKernel; use Spiral\Boot\Bootloader\Bootloader; use Spiral\Boot\EnvironmentInterface; -use Spiral\Boot\FinalizerInterface; use Spiral\Config\ConfiguratorInterface; use Spiral\Config\Patch\Append; use Spiral\Console\Bootloader\ConsoleBootloader; +use Spiral\Core\Container\Autowire; use Spiral\Core\FactoryInterface; use Spiral\RoadRunnerBridge\Bootloader\RoadRunnerBootloader; use Spiral\TemporalBridge\Commands; @@ -19,6 +19,8 @@ use Spiral\TemporalBridge\DeclarationLocator; use Spiral\TemporalBridge\DeclarationLocatorInterface; use Spiral\TemporalBridge\Dispatcher; +use Spiral\TemporalBridge\WorkerFactory; +use Spiral\TemporalBridge\WorkerFactoryInterface; use Spiral\TemporalBridge\WorkersRegistry; use Spiral\TemporalBridge\WorkersRegistryInterface; use Spiral\Tokenizer\ClassesInterface; @@ -28,10 +30,13 @@ use Temporal\Client\WorkflowClientInterface; use Temporal\DataConverter\DataConverter; use Temporal\DataConverter\DataConverterInterface; +use Temporal\Interceptor\PipelineProvider; +use Temporal\Interceptor\SimplePipelineProvider; +use Temporal\Internal\Interceptor\Interceptor; use Temporal\Worker\Transport\Goridge; -use Temporal\Worker\WorkerFactoryInterface; +use Temporal\Worker\WorkerFactoryInterface as TemporalWorkerFactoryInterface; use Temporal\Worker\WorkerOptions; -use Temporal\WorkerFactory; +use Temporal\WorkerFactory as TemporalWorkerFactory; class TemporalBridgeBootloader extends Bootloader { @@ -47,11 +52,13 @@ public function defineDependencies(): array public function defineSingletons(): array { return [ - WorkerFactoryInterface::class => [self::class, 'initWorkerFactory'], + TemporalWorkerFactoryInterface::class => [self::class, 'initWorkerFactory'], + WorkerFactoryInterface::class => WorkerFactory::class, DeclarationLocatorInterface::class => [self::class, 'initDeclarationLocator'], WorkflowClientInterface::class => [self::class, 'initWorkflowClient'], - WorkersRegistryInterface::class => [self::class, 'initWorkersRegistry'], + WorkersRegistryInterface::class => WorkersRegistry::class, DataConverterInterface::class => [self::class, 'initDataConverter'], + PipelineProvider::class => [self::class, 'initPipelineProvider'], ]; } @@ -82,7 +89,7 @@ protected function initConfig(EnvironmentInterface $env): void [ 'address' => $env->get('TEMPORAL_ADDRESS', '127.0.0.1:7233'), 'namespace' => 'App\\Endpoint\\Temporal\\Workflow', - 'defaultWorker' => (string)$env->get('TEMPORAL_TASK_QUEUE', WorkerFactoryInterface::DEFAULT_TASK_QUEUE), + 'defaultWorker' => (string)$env->get('TEMPORAL_TASK_QUEUE', TemporalWorkerFactoryInterface::DEFAULT_TASK_QUEUE), 'workers' => [], ], ); @@ -91,11 +98,13 @@ protected function initConfig(EnvironmentInterface $env): void protected function initWorkflowClient( TemporalConfig $config, DataConverterInterface $dataConverter, + PipelineProvider $pipelineProvider, ): WorkflowClientInterface { - return WorkflowClient::create( + return new WorkflowClient( serviceClient: ServiceClient::create($config->getAddress()), options: (new ClientOptions())->withNamespace($config->getTemporalNamespace()), converter: $dataConverter, + interceptorProvider: $pipelineProvider, ); } @@ -104,29 +113,34 @@ protected function initDataConverter(): DataConverterInterface return DataConverter::createDefault(); } - protected function initWorkerFactory( - DataConverterInterface $dataConverter, - ): WorkerFactoryInterface { - return new WorkerFactory( + protected function initWorkerFactory(DataConverterInterface $dataConverter,): TemporalWorkerFactoryInterface + { + return new TemporalWorkerFactory( dataConverter: $dataConverter, rpc: Goridge::create(), ); } - protected function initDeclarationLocator( - ClassesInterface $classes, - ): DeclarationLocatorInterface { + protected function initDeclarationLocator(ClassesInterface $classes,): DeclarationLocatorInterface + { return new DeclarationLocator( classes: $classes, reader: new AttributeReader(), ); } - protected function initWorkersRegistry( - WorkerFactoryInterface $workerFactory, - FinalizerInterface $finalizer, - TemporalConfig $config, - ): WorkersRegistryInterface { - return new WorkersRegistry($workerFactory, $finalizer, $config); + protected function initPipelineProvider(TemporalConfig $config, FactoryInterface $factory): PipelineProvider + { + /** @var Interceptor[] $interceptors */ + $interceptors = \array_map( + static fn(mixed $interceptor) => match (true) { + \is_string($interceptor) => $factory->make($interceptor), + $interceptor instanceof Autowire => $interceptor->resolve($factory), + default => $interceptor + }, + $config->getInterceptors(), + ); + + return new SimplePipelineProvider($interceptors); } } diff --git a/src/Config/TemporalConfig.php b/src/Config/TemporalConfig.php index 78a28b4..4c68faa 100644 --- a/src/Config/TemporalConfig.php +++ b/src/Config/TemporalConfig.php @@ -4,10 +4,30 @@ namespace Spiral\TemporalBridge\Config; +use Spiral\Core\Container\Autowire; use Spiral\Core\InjectableConfig; +use Temporal\Exception\ExceptionInterceptorInterface; +use Temporal\Internal\Interceptor\Interceptor; use Temporal\Worker\WorkerFactoryInterface; use Temporal\Worker\WorkerOptions; +/** + * @psalm-type TInterceptor = Interceptor|class-string|Autowire + * @psalm-type TExceptionInterceptor = ExceptionInterceptorInterface|class-string|Autowire + * @psalm-type TWorker = array{ + * options?: WorkerOptions, + * exception_interceptor?: TExceptionInterceptor + * } + * + * @property array{ + * address: non-empty-string, + * namespace: non-empty-string, + * temporalNamespace: non-empty-string, + * defaultWorker: non-empty-string, + * workers: array, + * interceptors?: TInterceptor[] + * } $config + */ final class TemporalConfig extends InjectableConfig { public const CONFIG = 'temporal'; @@ -18,31 +38,54 @@ final class TemporalConfig extends InjectableConfig 'temporalNamespace' => 'default', 'defaultWorker' => WorkerFactoryInterface::DEFAULT_TASK_QUEUE, 'workers' => [], + 'interceptors' => [], ]; + /** + * @return non-empty-string + */ public function getDefaultNamespace(): string { return $this->config['namespace']; } + /** + * @return non-empty-string + */ public function getTemporalNamespace(): string { return $this->config['temporalNamespace']; } + /** + * @return non-empty-string + */ public function getAddress(): string { return $this->config['address']; } + /** + * @return non-empty-string + */ public function getDefaultWorker(): string { return $this->config['defaultWorker']; } - /** @psalm-return array */ + /** + * @return array + */ public function getWorkers(): array { - return (array) $this->config['workers']; + return $this->config['workers'] ?? []; + } + + /** + * @return TInterceptor[] + */ + public function getInterceptors(): array + { + return $this->config['interceptors'] ?? []; } } diff --git a/src/WorkerFactory.php b/src/WorkerFactory.php new file mode 100644 index 0000000..f9ec100 --- /dev/null +++ b/src/WorkerFactory.php @@ -0,0 +1,90 @@ + */ + private array $workers = []; + + public function __construct( + private readonly TemporalWorkerFactory $workerFactory, + private readonly FinalizerInterface $finalizer, + private readonly FactoryInterface $factory, + private readonly PipelineProvider $pipelineProvider, + private readonly TemporalConfig $config, + ) { + $this->workers = $this->config->getWorkers(); + } + + /** + * @param non-empty-string $name + */ + public function create(string $name): WorkerInterface + { + /** @psalm-suppress TooManyArguments */ + $worker = $this->workerFactory->newWorker( + $name, + $this->getWorkerOptions($name), + $this->getExceptionInterceptor($name), + $this->pipelineProvider, + ); + $worker->registerActivityFinalizer(fn () => $this->finalizer->finalize()); + + return $worker; + } + + /** + * @param non-empty-string $name + */ + private function getWorkerOptions(string $name): ?WorkerOptions + { + $worker = $this->workers[$name] ?? null; + + return match (true) { + $worker instanceof WorkerOptions => $worker, + isset($worker['options']) && $worker['options'] instanceof WorkerOptions => $worker['options'], + default => null + }; + } + + /** + * @param non-empty-string $name + */ + private function getExceptionInterceptor(string $name): ?ExceptionInterceptorInterface + { + $worker = $this->workers[$name] ?? null; + if (!\is_array($worker) || !isset($worker['exception_interceptor'])) { + return null; + } + + $exceptionInterceptor = $this->wire($worker['exception_interceptor']); + \assert($exceptionInterceptor instanceof ExceptionInterceptorInterface); + + return $exceptionInterceptor; + } + + private function wire(mixed $alias): object + { + return match (true) { + \is_string($alias) => $this->factory->make($alias), + $alias instanceof Autowire => $alias->resolve($this->factory), + default => $alias + }; + } +} diff --git a/src/WorkerFactoryInterface.php b/src/WorkerFactoryInterface.php new file mode 100644 index 0000000..34aa9c8 --- /dev/null +++ b/src/WorkerFactoryInterface.php @@ -0,0 +1,17 @@ + $options */ public function __construct( - private readonly WorkerFactoryInterface $workerFactory, + private readonly WorkerFactoryInterface|TemporalWorkerFactory $workerFactory, private readonly FinalizerInterface $finalizer, private readonly TemporalConfig $config ) { @@ -34,18 +34,22 @@ public function register(string $name, ?WorkerOptions $options): void ); } - $this->workers[$name] = $this->workerFactory->newWorker($name, $options); - $this->workers[$name]->registerActivityFinalizer(fn() => $this->finalizer->finalize()); + if ($this->workerFactory instanceof WorkerFactoryInterface) { + $this->workers[$name] = $this->workerFactory->create($name); + } else { + $this->workers[$name] = $this->workerFactory->newWorker($name, $options); + $this->workers[$name]->registerActivityFinalizer(fn() => $this->finalizer->finalize()); + } } public function get(string $name): WorkerInterface { \assert($name !== ''); - $options = $this->config->getWorkers(); + $options = $this->config->getWorkers()[$name] ?? null; if (! $this->has($name)) { - $this->register($name, $options[$name] ?? null); + $this->register($name, $options instanceof WorkerOptions ? $options : null); } return $this->workers[$name]; diff --git a/tests/app/src/SomeInterceptor.php b/tests/app/src/SomeInterceptor.php new file mode 100644 index 0000000..ba447ca --- /dev/null +++ b/tests/app/src/SomeInterceptor.php @@ -0,0 +1,15 @@ +getContainer()->get(PrototypeRegistry::class); - - $this->assertInstanceOf( - $expected, - $this->getContainer()->get($registry->resolveProperty($property)->type->name()) - ); - } - - public function propertiesDataProvider(): \Traversable - { - yield [WorkflowClientInterface::class, 'workflow']; - } -} diff --git a/tests/src/Bootloader/TemporalBridgeBootloaderTest.php b/tests/src/Bootloader/TemporalBridgeBootloaderTest.php index f3e5604..7c87d1d 100644 --- a/tests/src/Bootloader/TemporalBridgeBootloaderTest.php +++ b/tests/src/Bootloader/TemporalBridgeBootloaderTest.php @@ -11,19 +11,31 @@ use Spiral\TemporalBridge\DeclarationLocator; use Spiral\TemporalBridge\DeclarationLocatorInterface; use Spiral\TemporalBridge\Tests\TestCase; +use Spiral\TemporalBridge\WorkerFactory; +use Spiral\TemporalBridge\WorkerFactoryInterface; use Spiral\TemporalBridge\WorkersRegistry; use Spiral\TemporalBridge\WorkersRegistryInterface; use Temporal\Client\WorkflowClient; use Temporal\Client\WorkflowClientInterface; use Temporal\DataConverter\DataConverter; use Temporal\DataConverter\DataConverterInterface; -use Temporal\Worker\WorkerFactoryInterface; +use Temporal\Interceptor\SimplePipelineProvider; +use Temporal\Interceptor\PipelineProvider; +use Temporal\Worker\WorkerFactoryInterface as TemporalWorkerFactoryInterface; use Temporal\Worker\WorkerOptions; -use Temporal\WorkerFactory; +use Temporal\WorkerFactory as TemporalWorkerFactory; class TemporalBridgeBootloaderTest extends TestCase { - public function testWorkerFactory() + public function testTemporalWorkerFactory(): void + { + $this->assertContainerBoundAsSingleton( + TemporalWorkerFactoryInterface::class, + TemporalWorkerFactory::class, + ); + } + + public function testWorkerFactory(): void { $this->assertContainerBoundAsSingleton( WorkerFactoryInterface::class, @@ -31,27 +43,27 @@ public function testWorkerFactory() ); } - public function testDataConverter() + public function testDataConverter(): void { $this->assertContainerBoundAsSingleton( DataConverterInterface::class, - DataConverter::class + DataConverter::class, ); } - public function testDeclarationLocator() + public function testDeclarationLocator(): void { $this->assertContainerBoundAsSingleton( DeclarationLocatorInterface::class, - DeclarationLocator::class + DeclarationLocator::class, ); } - public function testWorkflowClient() + public function testWorkflowClient(): void { $this->assertContainerBoundAsSingleton( WorkflowClientInterface::class, - WorkflowClient::class + WorkflowClient::class, ); } @@ -59,7 +71,15 @@ public function testWorkersRegistry(): void { $this->assertContainerBoundAsSingleton( WorkersRegistryInterface::class, - WorkersRegistry::class + WorkersRegistry::class, + ); + } + + public function testPipelineProvider(): void + { + $this->assertContainerBound( + PipelineProvider::class, + SimplePipelineProvider::class, ); } @@ -74,7 +94,7 @@ public function testAddWorkerOptions(): void $this->assertSame( ['first' => $first, 'second' => $second], - $configs->getConfig(TemporalConfig::CONFIG)['workers'] + $configs->getConfig(TemporalConfig::CONFIG)['workers'], ); } } diff --git a/tests/src/Config/TemporalConfigTest.php b/tests/src/Config/TemporalConfigTest.php index 60068e2..f7bb05f 100644 --- a/tests/src/Config/TemporalConfigTest.php +++ b/tests/src/Config/TemporalConfigTest.php @@ -79,7 +79,25 @@ public function testGetsWorkers(): void { $workers = [ 'first' => WorkerOptions::new(), - 'second' => WorkerOptions::new() + 'second' => WorkerOptions::new(), + 'withOptions' => [ + 'options' => WorkerOptions::new(), + ], + 'withInterceptors' => [ + 'interceptors' => [ + 'foo' + ], + ], + 'withExceptionInterceptor' => [ + 'exception_interceptor' => 'bar' + ], + 'all' => [ + 'options' => WorkerOptions::new(), + 'interceptors' => [ + 'foo' + ], + 'exception_interceptor' => 'bar' + ], ]; $config = new TemporalConfig([ diff --git a/tests/src/WorkerFactoryTest.php b/tests/src/WorkerFactoryTest.php new file mode 100644 index 0000000..813a6e8 --- /dev/null +++ b/tests/src/WorkerFactoryTest.php @@ -0,0 +1,182 @@ +temporalWorkerFactory = $this->createMock(TemporalWorkerFactory::class); + } + + public function testCreateWithoutAnyOptions(): void + { + $this->temporalWorkerFactory + ->expects($this->once()) + ->method('newWorker') + ->with('without-any-options') + ->willReturn($worker = $this->createMock(WorkerInterface::class)); + + $factory = $this->createWorkerFactory($this->temporalWorkerFactory); + + $this->assertSame($worker, $factory->create('without-any-options')); + } + + public function testCreateWithOptionsAsValue(): void + { + $this->temporalWorkerFactory + ->expects($this->once()) + ->method('newWorker') + ->with('with-options-as-value', $this->equalTo(WorkerOptions::new()->withEnableSessionWorker())) + ->willReturn($worker = $this->createMock(WorkerInterface::class)); + + $factory = $this->createWorkerFactory($this->temporalWorkerFactory); + + $this->assertSame($worker, $factory->create('with-options-as-value')); + } + + public function testCreateWithOptionsInArray(): void + { + $this->temporalWorkerFactory + ->expects($this->once()) + ->method('newWorker') + ->with('with-options-in-array', $this->equalTo(WorkerOptions::new()->withEnableSessionWorker())) + ->willReturn($worker = $this->createMock(WorkerInterface::class)); + + $factory = $this->createWorkerFactory($this->temporalWorkerFactory); + + $this->assertSame($worker, $factory->create('with-options-in-array')); + } + + /** + * @dataProvider exceptionInterceptorsDataProvider + */ + public function testCreateWithExceptionInterceptor(string $name): void + { + $this->temporalWorkerFactory + ->expects($this->once()) + ->method('newWorker') + ->with($name, null, $this->equalTo(new ExceptionInterceptor([]))) + ->willReturn($worker = $this->createMock(WorkerInterface::class)); + + $factory = $this->createWorkerFactory($this->temporalWorkerFactory); + + $this->assertSame($worker, $factory->create($name)); + } + + public function testCreateWithInterceptors(): void + { + $expectedInterceptors = new SimplePipelineProvider([ + new SomeInterceptor(), + new SomeInterceptor(), + new SomeInterceptor(), + ]); + + $this->temporalWorkerFactory + ->expects($this->once()) + ->method('newWorker') + ->with('with-interceptors', null, null, $this->equalTo($expectedInterceptors)) + ->willReturn($worker = $this->createMock(WorkerInterface::class)); + + $factory = $this->createWorkerFactory($this->temporalWorkerFactory, $expectedInterceptors); + + $this->assertSame($worker, $factory->create('with-interceptors')); + } + + public function testCreateWithAllOptions(): void + { + $expectedInterceptors = new SimplePipelineProvider([ + new SomeInterceptor(), + new SomeInterceptor(), + new SomeInterceptor(), + ]); + + $this->temporalWorkerFactory + ->expects($this->once()) + ->method('newWorker') + ->with( + 'all', + $this->equalTo(WorkerOptions::new()->withEnableSessionWorker()), + $this->equalTo(new ExceptionInterceptor([])), + $this->equalTo($expectedInterceptors), + ) + ->willReturn($worker = $this->createMock(WorkerInterface::class)); + + $factory = $this->createWorkerFactory($this->temporalWorkerFactory, $expectedInterceptors); + + $this->assertSame($worker, $factory->create('all')); + } + + public function exceptionInterceptorsDataProvider(): \Traversable + { + yield ['with-exception-interceptor-as-string']; + yield ['with-exception-interceptor-as-autowire']; + yield ['with-exception-interceptor-as-instance']; + } + + private function createWorkerFactory( + TemporalWorkerFactory $workerFactory, + PipelineProvider $pipelineProvider = new SimplePipelineProvider(), + ): + WorkerFactory { + $container = new Container(); + $container->bind(PipelineProvider::class, SimplePipelineProvider::class); + $container->bind(ExceptionInterceptor::class, new ExceptionInterceptor([])); + + $interceptors = [ + SomeInterceptor::class, + new SomeInterceptor(), + new Autowire(SomeInterceptor::class), + ]; + + return new WorkerFactory( + $workerFactory, + $this->createMock(FinalizerInterface::class), + $container, + $pipelineProvider, + new TemporalConfig([ + 'workers' => [ + 'with-options-as-value' => WorkerOptions::new()->withEnableSessionWorker(), + 'with-options-in-array' => [ + 'options' => WorkerOptions::new()->withEnableSessionWorker(), + ], + 'with-interceptors' => [ + 'interceptors' => $interceptors, + ], + 'with-exception-interceptor-as-string' => [ + 'exception_interceptor' => ExceptionInterceptor::class, + ], + 'with-exception-interceptor-as-autowire' => [ + 'exception_interceptor' => new Autowire(ExceptionInterceptor::class, []), + ], + 'with-exception-interceptor-as-instance' => [ + 'exception_interceptor' => new ExceptionInterceptor([]), + ], + 'all' => [ + 'options' => WorkerOptions::new()->withEnableSessionWorker(), + 'interceptors' => $interceptors, + 'exception_interceptor' => ExceptionInterceptor::class, + ], + ], + ]) + ); + } +}