Skip to content

Commit

Permalink
Merge pull request #77 from spiral/feature/interceptors
Browse files Browse the repository at this point in the history
Adds interceptors support
  • Loading branch information
butschster committed Dec 25, 2023
2 parents 66f8754 + 2266613 commit 4407049
Show file tree
Hide file tree
Showing 14 changed files with 451 additions and 76 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/psalm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ on:
name: static analysis

jobs:
psalm:
uses: spiral/gh-actions/.github/workflows/psalm.yml@master
with:
os: >-
['ubuntu-latest']
psalm:
uses: spiral/gh-actions/.github/workflows/psalm.yml@master
with:
os: >-
['ubuntu-latest']
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ docs
vendor
node_modules
.php-cs-fixer.cache
tests/runtime
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
"spiral/tokenizer": "^3.0",
"spiral/scaffolder": "^3.0",
"spiral/roadrunner-bridge": "^2.0 || ^3.0",
"temporal/sdk": "^1.3 || ^2.0"
"temporal/sdk": "^2.7"
},
"require-dev": {
"spiral/framework": "^3.0",
Expand Down
2 changes: 1 addition & 1 deletion src/Bootloader/PrototypeBootloader.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use Spiral\Prototype\Bootloader\PrototypeBootloader as BasePrototypeBootloader;
use Temporal\Client\WorkflowClientInterface;

class PrototypeBootloader extends Bootloader
final class PrototypeBootloader extends Bootloader
{
public function defineDependencies(): array
{
Expand Down
54 changes: 34 additions & 20 deletions src/Bootloader/TemporalBridgeBootloader.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,19 @@
use Spiral\Boot\AbstractKernel;
use Spiral\Boot\Bootloader\Bootloader;
use Spiral\Boot\EnvironmentInterface;
use Spiral\Boot\FinalizerInterface;
use Spiral\Config\ConfiguratorInterface;
use Spiral\Config\Patch\Append;
use Spiral\Console\Bootloader\ConsoleBootloader;
use Spiral\Core\Container\Autowire;
use Spiral\Core\FactoryInterface;
use Spiral\RoadRunnerBridge\Bootloader\RoadRunnerBootloader;
use Spiral\TemporalBridge\Commands;
use Spiral\TemporalBridge\Config\TemporalConfig;
use Spiral\TemporalBridge\DeclarationLocator;
use Spiral\TemporalBridge\DeclarationLocatorInterface;
use Spiral\TemporalBridge\Dispatcher;
use Spiral\TemporalBridge\WorkerFactory;
use Spiral\TemporalBridge\WorkerFactoryInterface;
use Spiral\TemporalBridge\WorkersRegistry;
use Spiral\TemporalBridge\WorkersRegistryInterface;
use Spiral\Tokenizer\ClassesInterface;
Expand All @@ -28,10 +30,13 @@
use Temporal\Client\WorkflowClientInterface;
use Temporal\DataConverter\DataConverter;
use Temporal\DataConverter\DataConverterInterface;
use Temporal\Interceptor\PipelineProvider;
use Temporal\Interceptor\SimplePipelineProvider;
use Temporal\Internal\Interceptor\Interceptor;
use Temporal\Worker\Transport\Goridge;
use Temporal\Worker\WorkerFactoryInterface;
use Temporal\Worker\WorkerFactoryInterface as TemporalWorkerFactoryInterface;
use Temporal\Worker\WorkerOptions;
use Temporal\WorkerFactory;
use Temporal\WorkerFactory as TemporalWorkerFactory;

class TemporalBridgeBootloader extends Bootloader
{
Expand All @@ -47,11 +52,13 @@ public function defineDependencies(): array
public function defineSingletons(): array
{
return [
WorkerFactoryInterface::class => [self::class, 'initWorkerFactory'],
TemporalWorkerFactoryInterface::class => [self::class, 'initWorkerFactory'],
WorkerFactoryInterface::class => WorkerFactory::class,
DeclarationLocatorInterface::class => [self::class, 'initDeclarationLocator'],
WorkflowClientInterface::class => [self::class, 'initWorkflowClient'],
WorkersRegistryInterface::class => [self::class, 'initWorkersRegistry'],
WorkersRegistryInterface::class => WorkersRegistry::class,
DataConverterInterface::class => [self::class, 'initDataConverter'],
PipelineProvider::class => [self::class, 'initPipelineProvider'],
];
}

Expand Down Expand Up @@ -82,7 +89,7 @@ protected function initConfig(EnvironmentInterface $env): void
[
'address' => $env->get('TEMPORAL_ADDRESS', '127.0.0.1:7233'),
'namespace' => 'App\\Endpoint\\Temporal\\Workflow',
'defaultWorker' => (string)$env->get('TEMPORAL_TASK_QUEUE', WorkerFactoryInterface::DEFAULT_TASK_QUEUE),
'defaultWorker' => (string)$env->get('TEMPORAL_TASK_QUEUE', TemporalWorkerFactoryInterface::DEFAULT_TASK_QUEUE),
'workers' => [],
],
);
Expand All @@ -91,11 +98,13 @@ protected function initConfig(EnvironmentInterface $env): void
protected function initWorkflowClient(
TemporalConfig $config,
DataConverterInterface $dataConverter,
PipelineProvider $pipelineProvider,
): WorkflowClientInterface {
return WorkflowClient::create(
return new WorkflowClient(
serviceClient: ServiceClient::create($config->getAddress()),
options: (new ClientOptions())->withNamespace($config->getTemporalNamespace()),
converter: $dataConverter,
interceptorProvider: $pipelineProvider,
);
}

Expand All @@ -104,29 +113,34 @@ protected function initDataConverter(): DataConverterInterface
return DataConverter::createDefault();
}

protected function initWorkerFactory(
DataConverterInterface $dataConverter,
): WorkerFactoryInterface {
return new WorkerFactory(
protected function initWorkerFactory(DataConverterInterface $dataConverter,): TemporalWorkerFactoryInterface
{
return new TemporalWorkerFactory(
dataConverter: $dataConverter,
rpc: Goridge::create(),
);
}

protected function initDeclarationLocator(
ClassesInterface $classes,
): DeclarationLocatorInterface {
protected function initDeclarationLocator(ClassesInterface $classes,): DeclarationLocatorInterface
{
return new DeclarationLocator(
classes: $classes,
reader: new AttributeReader(),
);
}

protected function initWorkersRegistry(
WorkerFactoryInterface $workerFactory,
FinalizerInterface $finalizer,
TemporalConfig $config,
): WorkersRegistryInterface {
return new WorkersRegistry($workerFactory, $finalizer, $config);
protected function initPipelineProvider(TemporalConfig $config, FactoryInterface $factory): PipelineProvider
{
/** @var Interceptor[] $interceptors */
$interceptors = \array_map(
static fn(mixed $interceptor) => match (true) {
\is_string($interceptor) => $factory->make($interceptor),
$interceptor instanceof Autowire => $interceptor->resolve($factory),
default => $interceptor
},
$config->getInterceptors(),
);

return new SimplePipelineProvider($interceptors);
}
}
47 changes: 45 additions & 2 deletions src/Config/TemporalConfig.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,30 @@

namespace Spiral\TemporalBridge\Config;

use Spiral\Core\Container\Autowire;
use Spiral\Core\InjectableConfig;
use Temporal\Exception\ExceptionInterceptorInterface;
use Temporal\Internal\Interceptor\Interceptor;
use Temporal\Worker\WorkerFactoryInterface;
use Temporal\Worker\WorkerOptions;

/**
* @psalm-type TInterceptor = Interceptor|class-string<Interceptor>|Autowire<Interceptor>
* @psalm-type TExceptionInterceptor = ExceptionInterceptorInterface|class-string<ExceptionInterceptorInterface>|Autowire<ExceptionInterceptorInterface>
* @psalm-type TWorker = array{
* options?: WorkerOptions,
* exception_interceptor?: TExceptionInterceptor
* }
*
* @property array{
* address: non-empty-string,
* namespace: non-empty-string,
* temporalNamespace: non-empty-string,
* defaultWorker: non-empty-string,
* workers: array<non-empty-string, WorkerOptions|TWorker>,
* interceptors?: TInterceptor[]
* } $config
*/
final class TemporalConfig extends InjectableConfig
{
public const CONFIG = 'temporal';
Expand All @@ -18,31 +38,54 @@ final class TemporalConfig extends InjectableConfig
'temporalNamespace' => 'default',
'defaultWorker' => WorkerFactoryInterface::DEFAULT_TASK_QUEUE,
'workers' => [],
'interceptors' => [],
];

/**
* @return non-empty-string
*/
public function getDefaultNamespace(): string
{
return $this->config['namespace'];
}

/**
* @return non-empty-string
*/
public function getTemporalNamespace(): string
{
return $this->config['temporalNamespace'];
}

/**
* @return non-empty-string
*/
public function getAddress(): string
{
return $this->config['address'];
}

/**
* @return non-empty-string
*/
public function getDefaultWorker(): string
{
return $this->config['defaultWorker'];
}

/** @psalm-return array<non-empty-string, WorkerOptions> */
/**
* @return array<non-empty-string, WorkerOptions|TWorker>
*/
public function getWorkers(): array
{
return (array) $this->config['workers'];
return $this->config['workers'] ?? [];
}

/**
* @return TInterceptor[]
*/
public function getInterceptors(): array
{
return $this->config['interceptors'] ?? [];
}
}
90 changes: 90 additions & 0 deletions src/WorkerFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
<?php

declare(strict_types=1);

namespace Spiral\TemporalBridge;

use Spiral\Boot\FinalizerInterface;
use Spiral\Core\Container\Autowire;
use Spiral\Core\FactoryInterface;
use Spiral\TemporalBridge\Config\TemporalConfig;
use Temporal\Exception\ExceptionInterceptorInterface;
use Temporal\Interceptor\PipelineProvider;
use Temporal\Worker\WorkerFactoryInterface as TemporalWorkerFactory;
use Temporal\Worker\WorkerInterface;
use Temporal\Worker\WorkerOptions;

/**
* @psalm-import-type TWorker from TemporalConfig
*/
final class WorkerFactory implements WorkerFactoryInterface
{
/** @var array<non-empty-string, WorkerOptions|TWorker> */
private array $workers = [];

public function __construct(
private readonly TemporalWorkerFactory $workerFactory,
private readonly FinalizerInterface $finalizer,
private readonly FactoryInterface $factory,
private readonly PipelineProvider $pipelineProvider,
private readonly TemporalConfig $config,
) {
$this->workers = $this->config->getWorkers();
}

/**
* @param non-empty-string $name
*/
public function create(string $name): WorkerInterface
{
/** @psalm-suppress TooManyArguments */
$worker = $this->workerFactory->newWorker(
$name,
$this->getWorkerOptions($name),
$this->getExceptionInterceptor($name),
$this->pipelineProvider,
);
$worker->registerActivityFinalizer(fn () => $this->finalizer->finalize());

return $worker;
}

/**
* @param non-empty-string $name
*/
private function getWorkerOptions(string $name): ?WorkerOptions
{
$worker = $this->workers[$name] ?? null;

return match (true) {
$worker instanceof WorkerOptions => $worker,
isset($worker['options']) && $worker['options'] instanceof WorkerOptions => $worker['options'],
default => null
};
}

/**
* @param non-empty-string $name
*/
private function getExceptionInterceptor(string $name): ?ExceptionInterceptorInterface
{
$worker = $this->workers[$name] ?? null;
if (!\is_array($worker) || !isset($worker['exception_interceptor'])) {
return null;
}

$exceptionInterceptor = $this->wire($worker['exception_interceptor']);
\assert($exceptionInterceptor instanceof ExceptionInterceptorInterface);

return $exceptionInterceptor;
}

private function wire(mixed $alias): object
{
return match (true) {
\is_string($alias) => $this->factory->make($alias),
$alias instanceof Autowire => $alias->resolve($this->factory),
default => $alias
};
}
}
17 changes: 17 additions & 0 deletions src/WorkerFactoryInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

declare(strict_types=1);

namespace Spiral\TemporalBridge;

use Temporal\Worker\WorkerInterface;

interface WorkerFactoryInterface
{
/**
* Creates a new Temporal worker.
*
* @param non-empty-string $name
*/
public function create(string $name): WorkerInterface;
}
Loading

0 comments on commit 4407049

Please sign in to comment.