Skip to content

Commit

Permalink
Add ability to start all jobs of a workflow on a specific connection
Browse files Browse the repository at this point in the history
  • Loading branch information
ksassnowski committed Jul 13, 2022
1 parent 27b3240 commit 804193e
Show file tree
Hide file tree
Showing 15 changed files with 347 additions and 73 deletions.
10 changes: 10 additions & 0 deletions phpstan-baseline.neon
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ parameters:
count: 3
path: src/VentureServiceProvider.php

-
message: "#^Access to an undefined property object\\:\\:\\$connection\\.$#"
count: 1
path: src/WorkflowStepAdapter.php

-
message: "#^Call to an undefined method object\\:\\:getDelay\\(\\)\\.$#"
count: 1
Expand Down Expand Up @@ -65,6 +70,11 @@ parameters:
count: 1
path: src/WorkflowStepAdapter.php

-
message: "#^Call to an undefined method object\\:\\:onConnection\\(\\)\\.$#"
count: 1
path: src/WorkflowStepAdapter.php

-
message: "#^Call to an undefined method object\\:\\:step\\(\\)\\.$#"
count: 1
Expand Down
16 changes: 14 additions & 2 deletions src/AbstractWorkflow.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ public static function start(): Workflow
return (new static(...\func_get_args()))->run();
}

public static function startOnConnection(string $connection, mixed ...$args): Workflow
{
/** @phpstan-ignore-next-line */
return (new static(...$args))->run($connection);
}

public static function startSync(mixed ...$args): Workflow
{
/** @phpstan-ignore-next-line */
return (new static(...$args))->run('sync');
}

public static function test(mixed ...$arguments): WorkflowTester
{
/** @phpstan-ignore-next-line */
Expand All @@ -50,11 +62,11 @@ protected function define(string $workflowName = ''): WorkflowDefinition
return new WorkflowDefinition($this, $workflowName);
}

private function run(): Workflow
private function run(?string $connection = null): Workflow
{
/** @var WorkflowManagerInterface $manager */
$manager = Container::getInstance()->make('venture.manager');

return $manager->startWorkflow($this);
return $manager->startWorkflow($this, $connection);
}
}
1 change: 1 addition & 0 deletions src/Facades/Workflow.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
/**
* @method static void assertNotStarted(string $workflowDefinition, ?callable $callback = null)
* @method static void assertStarted(string $workflowDefinition, ?callable $callback = null)
* @method static void assertStartedOnConnection(string $workflowDefinition, string $connection, ?callable $callback = null)
* @method static WorkflowDefinition define(string $workflowName)
* @method static bool hasStarted(string $workflowClass, ?callable $callback = null)
* @method static WorkflowDefinition startWorkflow(AbstractWorkflow $abstractWorkflow)
Expand Down
10 changes: 8 additions & 2 deletions src/Manager/WorkflowManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,16 @@ public function define(AbstractWorkflow $workflow, string $workflowName): Workfl
return new WorkflowDefinition($workflow, $workflowName);
}

public function startWorkflow(AbstractWorkflow $abstractWorkflow): Workflow
{
public function startWorkflow(
AbstractWorkflow $abstractWorkflow,
?string $connection = null,
): Workflow {
$definition = $abstractWorkflow->definition();

if (null !== $connection) {
$definition->allOnConnection($connection);
}

[$workflow, $initialJobs] = $definition->build(
Closure::fromCallable([$abstractWorkflow, 'beforeCreate']),
);
Expand Down
43 changes: 35 additions & 8 deletions src/Manager/WorkflowManagerFake.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
class WorkflowManagerFake implements WorkflowManagerInterface
{
/**
* @var array<class-string<AbstractWorkflow>, AbstractWorkflow>
* @var array<class-string<AbstractWorkflow>, array{workflow: AbstractWorkflow, connection: string|null}>
*/
private array $started = [];

Expand All @@ -38,21 +38,26 @@ public function define(AbstractWorkflow $workflow, string $workflowName): Workfl
return $this->manager->define($workflow, $workflowName);
}

public function startWorkflow(AbstractWorkflow $abstractWorkflow): Workflow
{
public function startWorkflow(
AbstractWorkflow $abstractWorkflow,
?string $connection = null,
): Workflow {
$pendingWorkflow = $abstractWorkflow->definition();

[$workflow, $initialBatch] = $pendingWorkflow->build(
Closure::fromCallable([$abstractWorkflow, 'beforeCreate']),
);

$this->started[\get_class($abstractWorkflow)] = $abstractWorkflow;
$this->started[\get_class($abstractWorkflow)] = [
'workflow' => $abstractWorkflow,
'connection' => $connection,
];

return $workflow;
}

/**
* @param null|callable(AbstractWorkflow): bool $callback
* @param null|callable(AbstractWorkflow, ?string): bool $callback
*/
public function hasStarted(string $workflowClass, ?callable $callback = null): bool
{
Expand All @@ -64,11 +69,14 @@ public function hasStarted(string $workflowClass, ?callable $callback = null): b
return true;
}

return $callback($this->started[$workflowClass]);
return $callback(
$this->started[$workflowClass]['workflow'],
$this->started[$workflowClass]['connection'],
);
}

/**
* @param null|callable(AbstractWorkflow): bool $callback
* @param null|callable(AbstractWorkflow, ?string): bool $callback
*/
public function assertStarted(string $workflowDefinition, ?callable $callback = null): void
{
Expand All @@ -79,7 +87,7 @@ public function assertStarted(string $workflowDefinition, ?callable $callback =
}

/**
* @param null|callable(AbstractWorkflow): bool $callback
* @param null|callable(AbstractWorkflow, ?string): bool $callback
*/
public function assertNotStarted(string $workflowDefinition, ?callable $callback = null): void
{
Expand All @@ -88,4 +96,23 @@ public function assertNotStarted(string $workflowDefinition, ?callable $callback
"The unexpected [{$workflowDefinition}] workflow was started.",
);
}

/**
* @param null|callable(AbstractWorkflow, ?string): bool $callback
*/
public function assertStartedOnConnection(
string $workflowDefinition,
string $connection,
?callable $callback = null,
): void {
$this->assertStarted($workflowDefinition, $callback);

$actualConnection = $this->started[$workflowDefinition]['connection'];

PHPUnit::assertSame(
$connection,
$actualConnection,
"The workflow [{$workflowDefinition}] was started, but on unexpected connection [{$actualConnection}]",
);
}
}
2 changes: 1 addition & 1 deletion src/Manager/WorkflowManagerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ interface WorkflowManagerInterface
{
public function define(AbstractWorkflow $workflow, string $workflowName): WorkflowDefinition;

public function startWorkflow(AbstractWorkflow $abstractWorkflow): Workflow;
public function startWorkflow(AbstractWorkflow $abstractWorkflow, ?string $connection): Workflow;
}
22 changes: 22 additions & 0 deletions src/WorkflowDefinition.php
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,22 @@ class WorkflowDefinition
*/
protected array $nestedWorkflows = [];

protected ?string $connection = null;

public function __construct(
protected AbstractWorkflow $workflow,
protected string $workflowName = '',
) {
$this->graph = new DependencyGraph();
}

public function allOnConnection(string $connection): self
{
$this->connection = $connection;

return $this;
}

/**
* @param array<int, string> $dependencies
* @param Delay $delay
Expand Down Expand Up @@ -151,6 +160,8 @@ public function catch(callable $callback): self
*/
public function build(?Closure $beforeCreate = null): array
{
$this->setConnectionOnJobs();

$workflow = $this->makeWorkflow([
'name' => $this->workflowName,
'job_count' => \count($this->jobs),
Expand Down Expand Up @@ -315,4 +326,15 @@ private function pushJob(WorkflowStepInterface $job): void
{
$this->jobs[$job->getJobId()] = $job;
}

private function setConnectionOnJobs(): void
{
if (null === $this->connection) {
return;
}

foreach ($this->jobs as $job) {
$job->withConnection($this->connection);
}
}
}
10 changes: 10 additions & 0 deletions src/WorkflowStep.php
Original file line number Diff line number Diff line change
Expand Up @@ -157,4 +157,14 @@ public function getDelay(): mixed
{
return $this->delay;
}

public function withConnection(?string $connection): self
{
return $this->onConnection($connection);
}

public function getConnection(): ?string
{
return $this->connection;
}
}
12 changes: 12 additions & 0 deletions src/WorkflowStepAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,16 @@ public function getDelay(): mixed
{
return $this->job->getDelay();
}

public function withConnection(?string $connection): WorkflowStepInterface
{
$this->job->onConnection($connection);

return $this;
}

public function getConnection(): ?string
{
return $this->job->connection;
}
}
4 changes: 4 additions & 0 deletions src/WorkflowStepInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,8 @@ public function withDelay(mixed $delay): self;
* @return Delay
*/
public function getDelay(): mixed;

public function withConnection(?string $connection): self;

public function getConnection(): ?string;
}
24 changes: 24 additions & 0 deletions tests/AbstractWorkflowTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,27 @@
it('can create a WorkflowTester for the workflow class', function (): void {
expect(TestWorkflow::test())->toBeInstanceOf(WorkflowTester::class);
});

it('can start a workflow on a specific connection', function (): void {
Workflow::fake();

WorkflowWithParameter::startOnConnection('::connection::', '::param::');

Workflow::assertStartedOnConnection(
WorkflowWithParameter::class,
'::connection::',
fn (WorkflowWithParameter $workflow) => '::param::' === $workflow->something,
);
});

it('can start a workflow synchronously', function (): void {
Workflow::fake();

WorkflowWithParameter::startSync('::param::');

Workflow::assertStartedOnConnection(
WorkflowWithParameter::class,
'sync',
fn (WorkflowWithParameter $workflow) => '::param::' === $workflow->something,
);
});
31 changes: 31 additions & 0 deletions tests/WorkflowDefinitionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,37 @@ public function definition(): WorkflowDefinition
],
]);

it('can configure the connection on all jobs', function (): void {
$definition = createDefinition()
->allOnConnection('::connection::')
->addJob((new TestJob1())->withConnection('::job-1-connection::'))
->addJob((new TestJob2())->withConnection('::job-2-connection::'))
->addJob(new TestJob3(), [TestJob1::class]);

$definition->build();

foreach ($definition->jobs() as $job) {
expect($job->getConnection())->toBe('::connection::');
}
});

it('does not override job connections if no explicit connection was provided', function (): void {
$definition = createDefinition()
->addJob((new TestJob1())->withConnection('::job-1-connection::'))
->addJob((new TestJob2())->withConnection('::job-2-connection::'))
->addJob(new TestJob3(), [TestJob1::class]);

$definition->build();

foreach ($definition->jobs() as $job) {
expect($job->getConnection())->match($job::class, [
TestJob1::class => '::job-1-connection::',
TestJob2::class => '::job-2-connection::',
TestJob3::class => null,
]);
}
});

it('fires an event after a job was added', function (): void {
Event::fake([JobAdded::class]);
$job = new TestJob1();
Expand Down

0 comments on commit 804193e

Please sign in to comment.