Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(core): Introduce concurrency control for main mode #9453

Merged
merged 71 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
532ed07
perf(core): Introduce concurrency control for main mode
ivov May 17, 2024
485112c
Fix test
ivov May 17, 2024
d1f89b4
Fix Public API test
ivov May 17, 2024
0a1e01e
Fix another test
ivov May 20, 2024
58b2711
FE adjustments
ivov May 23, 2024
37e28f1
Merge branch 'master' into introduce-concurrency-control-for-main-mode
ivov May 23, 2024
a126d03
Fix `useExecutionHelpers` test
ivov May 23, 2024
c7fd840
Account for disabled queue in `removeMany`
ivov May 23, 2024
4296249
Reduce indirection
ivov May 23, 2024
9d7b5a7
Improve copy
ivov May 23, 2024
44a9f03
Account for stopping cancelled enqueued executions
ivov May 23, 2024
a84b96f
Clean up queue constructor
ivov May 23, 2024
f225bcb
Simplify with event emitter
ivov May 23, 2024
ae60662
Move listeners to `ActiveExecutions`
ivov May 23, 2024
327570c
Ensure toasts only show for relevant workflow
ivov May 23, 2024
2f508f4
Set `stoppedAt` for cancelled queued execution
ivov May 23, 2024
3e81c7a
Remove negative check
ivov May 23, 2024
5b7488c
Apply initial feedback
ivov May 28, 2024
694c4bb
Merge master, fix conflicts
ivov May 31, 2024
fa8b4c7
Remove manual executions queue from BE
ivov May 31, 2024
d109700
Remove manual executions queue from FE
ivov May 31, 2024
f1d6430
Exclude `new` executions from crash recovery
ivov May 31, 2024
7df488e
Serve production cap
ivov May 31, 2024
8bf0d5b
Account for enqueued executions post instance crash
ivov May 31, 2024
e41861d
Disable concurrency control on queue mode
ivov May 31, 2024
16a85d4
Fix bug in emitted event arg
ivov May 31, 2024
208ef18
Switch `Pending` with `Enqueued` in FE
ivov May 31, 2024
fa049a7
Replace spinner with circled pause icon
ivov May 31, 2024
787d1a4
Better copy for status tooltip
ivov May 31, 2024
9b39154
Implement `<ConcurrentExecutionsHeader />`
ivov May 31, 2024
832279d
Fix test
ivov May 31, 2024
40fee69
Apply more feedback
ivov Jun 3, 2024
c483d4d
Merge branch 'master' into introduce-concurrency-control-for-main-mode
ivov Jun 3, 2024
15946b9
Fix FE tests
ivov Jun 3, 2024
07ec336
Fix FE tests
ivov Jun 3, 2024
6238c16
Fix per suite
ivov Jun 3, 2024
1380cf8
Adjust mocks
ivov Jun 3, 2024
ed0c0b2
Use initial state in `WorkflowExecutionsPreview`
ivov Jun 3, 2024
ce937c9
Merge branch 'master' into introduce-concurrency-control-for-main-mode
ivov Jun 3, 2024
7c75aa8
Cleanup
ivov Jun 4, 2024
ebdfec9
Clearer comment
ivov Jun 4, 2024
a43ec7b
Merge branch 'master' into introduce-concurrency-control-for-main-mode
ivov Jun 4, 2024
3626069
Remove FE test mocking
ivov Jun 4, 2024
1bfb450
Update lockfile
ivov Jun 4, 2024
7452e42
Merge branch 'master' into introduce-concurrency-control-for-main-mode
ivov Jun 4, 2024
661f9de
Merge branch 'master' into introduce-concurrency-control-for-main-mode
ivov Jun 6, 2024
909048e
Merge branch 'master' into introduce-concurrency-control-for-main-mode
ivov Jun 6, 2024
b68a1ef
refactor(editor): Apply latest UX changes to concurrency control (#9648)
ivov Jun 6, 2024
64e5f30
Remove FE changes
ivov Jun 6, 2024
ede527e
Rename cap to limit
ivov Jun 6, 2024
bd2fb5b
Missing spot to rename
ivov Jun 6, 2024
0ec9cee
Cancel enqueued executions withn response promises
ivov Jun 6, 2024
e4884dd
refactor(core): Run enqueued executions on startup (#9658)
ivov Jun 7, 2024
df3c69c
Rename `ConcurrencyControl.check` to `throttle`
ivov Jun 7, 2024
622eb47
Move `cancelEnqueuedExecutionsWithResponsePromises` to `ConcurrencySe…
ivov Jun 7, 2024
d4efe95
Remove `ConcurrencyService.removeMany`
ivov Jun 7, 2024
d22bb04
Prevent deleting a `running` execution in Public API
ivov Jun 7, 2024
66548c7
Make dequeueing conditional on execution status in Public API
ivov Jun 7, 2024
7a537b8
Remove unneeded optional chaining
ivov Jun 7, 2024
8ff18cf
Merge branch 'master' into introduce-concurrency-control-for-main-mode
ivov Jun 10, 2024
2351580
Remove setting limit via license
ivov Jun 10, 2024
ef85fbf
Merge branch 'master' into introduce-concurrency-control-for-main-mode
ivov Jun 10, 2024
ed2fae0
Use env var also for queue mode worker
ivov Jun 10, 2024
dd062c9
Merge branch 'master' into introduce-concurrency-control-for-main-mode
ivov Jun 11, 2024
58e8956
Add telemetry
ivov Jun 11, 2024
100758e
Merge master, fix conflicts
ivov Jun 11, 2024
3ced9d7
Finish telemetry
ivov Jun 11, 2024
ba0d261
Fix tests
ivov Jun 11, 2024
09e14b3
Merge branch 'master' into introduce-concurrency-control-for-main-mode
ivov Jun 11, 2024
c0ff93f
Fix dependency cycle
ivov Jun 12, 2024
fcda46e
Fix tests
ivov Jun 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion packages/cli/src/ActiveExecutions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import type {
import { isWorkflowIdValid } from '@/utils';
import { ExecutionRepository } from '@db/repositories/execution.repository';
import { Logger } from '@/Logger';
import { ConcurrencyControlService } from './concurrency/concurrency-control.service';
import config from './config';

@Service()
export class ActiveExecutions {
Expand All @@ -31,19 +33,21 @@ export class ActiveExecutions {
constructor(
private readonly logger: Logger,
private readonly executionRepository: ExecutionRepository,
private readonly concurrencyControl: ConcurrencyControlService,
) {}

/**
* Add a new active execution
*/
async add(executionData: IWorkflowExecutionDataProcess, executionId?: string): Promise<string> {
let executionStatus: ExecutionStatus = executionId ? 'running' : 'new';
const mode = executionData.executionMode;
if (executionId === undefined) {
// Is a new execution so save in DB

const fullExecutionData: ExecutionPayload = {
data: executionData.executionData!,
mode: executionData.executionMode,
mode,
finished: false,
startedAt: new Date(),
workflowData: executionData.workflowData,
Expand All @@ -64,10 +68,14 @@ export class ActiveExecutions {
if (executionId === undefined) {
throw new ApplicationError('There was an issue assigning an execution id to the execution');
}

await this.concurrencyControl.throttle({ mode, executionId });
executionStatus = 'running';
} else {
// Is an existing execution we want to finish so update in DB

await this.concurrencyControl.throttle({ mode, executionId });

const execution: Pick<IExecutionDb, 'id' | 'data' | 'waitTill' | 'status'> = {
id: executionId,
data: executionData.executionData!,
Expand Down Expand Up @@ -128,6 +136,8 @@ export class ActiveExecutions {

// Remove from the list of active executions
delete this.activeExecutions[executionId];

this.concurrencyControl.release({ mode: execution.executionData.executionMode });
}

/**
Expand Down Expand Up @@ -191,6 +201,10 @@ export class ActiveExecutions {
let executionIds = Object.keys(this.activeExecutions);

if (cancelAll) {
if (config.getEnv('executions.mode') === 'regular') {
await this.concurrencyControl.removeAll(this.activeExecutions);
}

const stopPromises = executionIds.map(
async (executionId) => await this.stopExecution(executionId),
);
Expand Down
4 changes: 4 additions & 0 deletions packages/cli/src/InternalHooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1261,4 +1261,8 @@ export class InternalHooks {
}) {
return await this.telemetry.track('Project settings updated', data);
}

async onConcurrencyLimitHit({ threshold }: { threshold: number }) {
await this.telemetry.track('User hit concurrency limit', { threshold });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { getSharedWorkflowIds } from '../workflows/workflows.service';
import { encodeNextCursor } from '../../shared/services/pagination.service';
import { InternalHooks } from '@/InternalHooks';
import { ExecutionRepository } from '@db/repositories/execution.repository';
import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service';

export = {
deleteExecution: [
Expand All @@ -32,6 +33,19 @@ export = {
return res.status(404).json({ message: 'Not Found' });
}

if (execution.status === 'running') {
return res.status(400).json({
message: 'Cannot delete a running execution',
});
}

if (execution.status === 'new') {
Container.get(ConcurrencyControlService).remove({
executionId: execution.id,
mode: execution.mode,
});
}

await Container.get(ExecutionRepository).hardDelete({
workflowId: execution.workflowId,
executionId: execution.id,
Expand Down
42 changes: 42 additions & 0 deletions packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import { ExecutionRepository } from '@db/repositories/execution.repository';
import { FeatureNotLicensedError } from '@/errors/feature-not-licensed.error';
import { WaitTracker } from '@/WaitTracker';
import { BaseCommand } from './BaseCommand';
import type { IWorkflowExecutionDataProcess } from '@/Interfaces';
import { ExecutionService } from '@/executions/execution.service';
import { OwnershipService } from '@/services/ownership.service';
import { WorkflowRunner } from '@/WorkflowRunner';

// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires
const open = require('open');
Expand Down Expand Up @@ -288,6 +292,10 @@ export class Start extends BaseCommand {

await this.initPruning();

if (config.getEnv('executions.mode') === 'regular') {
await this.runEnqueuedExecutions();
}

// Start to get active workflows and run their triggers
await this.activeWorkflowManager.init();

Expand Down Expand Up @@ -347,4 +355,38 @@ export class Start extends BaseCommand {
if (error.stack) this.logger.error(error.stack);
await this.exitWithCrash('Exiting due to an error.', error);
}

/**
* During startup, we may find executions that had been enqueued at the time of shutdown.
*
* If so, start running any such executions concurrently up to the concurrency limit, and
* enqueue any remaining ones until we have spare concurrency capacity again.
*/
private async runEnqueuedExecutions() {
const executions = await Container.get(ExecutionService).findAllEnqueuedExecutions();

if (executions.length === 0) return;

this.logger.debug(
'[Startup] Found enqueued executions to run',
executions.map((e) => e.id),
);

const ownershipService = Container.get(OwnershipService);
const workflowRunner = Container.get(WorkflowRunner);

for (const execution of executions) {
const project = await ownershipService.getWorkflowProjectCached(execution.workflowId);

const data: IWorkflowExecutionDataProcess = {
executionMode: execution.mode,
executionData: execution.data,
workflowData: execution.workflowData,
projectId: project.id,
};

// do not block - each execution either runs concurrently or is queued
void workflowRunner.run(data, undefined, false, execution.id);
}
}
}
6 changes: 5 additions & 1 deletion packages/cli/src/commands/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,12 @@ export class Worker extends BaseCommand {
Worker.jobQueue = Container.get(Queue);
await Worker.jobQueue.init();
this.logger.debug('Queue singleton ready');

const envConcurrency = config.getEnv('executions.concurrency.productionLimit');
const concurrency = envConcurrency !== -1 ? envConcurrency : flags.concurrency;

void Worker.jobQueue.process(
flags.concurrency,
concurrency,
async (job) => await this.runJob(job, this.nodeTypes),
);

Expand Down
Loading
Loading