Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): Detect DB connection aquisition deadlocks (no-changelog) #9485

Merged
merged 10 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci-postgres-mysql.yml
netroy marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ jobs:
timeout-minutes: 20
env:
DB_POSTGRESDB_PASSWORD: password
DB_POSTGRESDB_POOL_SIZE: 1 # Detect connection pooling deadlocks
steps:
- uses: actions/checkout@v4.1.1
- run: corepack enable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ export async function saveCredential(

const personalProject = await Container.get(ProjectRepository).getPersonalProjectForUserOrFail(
user.id,
transactionManager,
);

Object.assign(newSharedCredential, {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { SharedWorkflow, type WorkflowSharingRole } from '@db/entities/SharedWor
import { WorkflowRepository } from '@db/repositories/workflow.repository';
import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository';
import type { Project } from '@/databases/entities/Project';
import { WorkflowTagMappingRepository } from '@db/repositories/workflowTagMapping.repository';
import { TagRepository } from '@db/repositories/tag.repository';
import { License } from '@/License';
import { WorkflowSharingService } from '@/workflows/workflowSharing.service';
Expand Down Expand Up @@ -113,9 +112,7 @@ export async function getWorkflowTags(workflowId: string) {

export async function updateTags(workflowId: string, newTags: string[]): Promise<any> {
await Db.transaction(async (transactionManager) => {
const oldTags = await Container.get(WorkflowTagMappingRepository).findBy({
workflowId,
});
const oldTags = await transactionManager.findBy(WorkflowTagMapping, { workflowId });
if (oldTags.length > 0) {
await transactionManager.delete(WorkflowTagMapping, oldTags);
}
Expand Down
7 changes: 2 additions & 5 deletions packages/cli/src/WaitTracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {
ErrorReporterProxy as ErrorReporter,
WorkflowOperationError,
} from 'n8n-workflow';
import { Container, Service } from 'typedi';
import { Service } from 'typedi';
import type { ExecutionStopResult, IWorkflowExecutionDataProcess } from '@/Interfaces';
import { WorkflowRunner } from '@/WorkflowRunner';
import { ExecutionRepository } from '@db/repositories/execution.repository';
Expand Down Expand Up @@ -137,10 +137,7 @@ export class WaitTracker {
fullExecutionData.waitTill = null;
fullExecutionData.status = 'canceled';

await Container.get(ExecutionRepository).updateExistingExecution(
executionId,
fullExecutionData,
);
await this.executionRepository.updateExistingExecution(executionId, fullExecutionData);

return {
mode: fullExecutionData.mode,
Expand Down
38 changes: 16 additions & 22 deletions packages/cli/src/commands/import/credentials.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import { BaseCommand } from '../BaseCommand';
import type { ICredentialsEncrypted } from 'n8n-workflow';
import { ApplicationError, jsonParse } from 'n8n-workflow';
import { UM_FIX_INSTRUCTION } from '@/constants';
import { UserRepository } from '@db/repositories/user.repository';
import { ProjectRepository } from '@/databases/repositories/project.repository';
import type { Project } from '@/databases/entities/Project';
import { Project } from '@/databases/entities/Project';
import { User } from '@/databases/entities/User';

export class ImportCredentialsCommand extends BaseCommand {
static description = 'Import credentials';
Expand Down Expand Up @@ -75,13 +75,13 @@ export class ImportCredentialsCommand extends BaseCommand {
);
}

const project = await this.getProject(flags.userId, flags.projectId);

const credentials = await this.readCredentials(flags.input, flags.separate);

await Db.getConnection().transaction(async (transactionManager) => {
this.transactionManager = transactionManager;

const project = await this.getProject(flags.userId, flags.projectId);

const result = await this.checkRelations(credentials, flags.projectId, flags.userId);

if (!result.success) {
Expand Down Expand Up @@ -130,19 +130,6 @@ export class ImportCredentialsCommand extends BaseCommand {
}
}

private async getOwnerProject() {
const owner = await Container.get(UserRepository).findOneBy({ role: 'global:owner' });
if (!owner) {
throw new ApplicationError(`Failed to find owner. ${UM_FIX_INSTRUCTION}`);
}

const project = await Container.get(ProjectRepository).getPersonalProjectForUserOrFail(
owner.id,
);

return project;
}

private async checkRelations(
credentials: ICredentialsEncrypted[],
projectId?: string,
Expand Down Expand Up @@ -244,7 +231,7 @@ export class ImportCredentialsCommand extends BaseCommand {
});

if (sharedCredential && sharedCredential.project.type === 'personal') {
const user = await Container.get(UserRepository).findOneByOrFail({
const user = await this.transactionManager.findOneByOrFail(User, {
projectRelations: {
role: 'project:personalOwner',
projectId: sharedCredential.projectId,
Expand All @@ -263,13 +250,20 @@ export class ImportCredentialsCommand extends BaseCommand {

private async getProject(userId?: string, projectId?: string) {
if (projectId) {
return await Container.get(ProjectRepository).findOneByOrFail({ id: projectId });
return await this.transactionManager.findOneByOrFail(Project, { id: projectId });
}

if (userId) {
return await Container.get(ProjectRepository).getPersonalProjectForUserOrFail(userId);
if (!userId) {
const owner = await this.transactionManager.findOneBy(User, { role: 'global:owner' });
if (!owner) {
throw new ApplicationError(`Failed to find owner. ${UM_FIX_INSTRUCTION}`);
}
userId = owner.id;
}

return await this.getOwnerProject();
return await Container.get(ProjectRepository).getPersonalProjectForUserOrFail(
userId,
this.transactionManager,
);
}
}
23 changes: 7 additions & 16 deletions packages/cli/src/commands/import/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,19 +160,6 @@ export class ImportWorkflowsCommand extends BaseCommand {
this.logger.info(`Successfully imported ${total} ${total === 1 ? 'workflow.' : 'workflows.'}`);
}

private async getOwnerProject() {
const owner = await Container.get(UserRepository).findOneBy({ role: 'global:owner' });
if (!owner) {
throw new ApplicationError(`Failed to find owner. ${UM_FIX_INSTRUCTION}`);
}

const project = await Container.get(ProjectRepository).getPersonalProjectForUserOrFail(
owner.id,
);

return project;
}

private async getWorkflowOwner(workflow: WorkflowEntity) {
const sharing = await Container.get(SharedWorkflowRepository).findOne({
where: { workflowId: workflow.id, role: 'workflow:owner' },
Expand Down Expand Up @@ -234,10 +221,14 @@ export class ImportWorkflowsCommand extends BaseCommand {
return await Container.get(ProjectRepository).findOneByOrFail({ id: projectId });
}

if (userId) {
return await Container.get(ProjectRepository).getPersonalProjectForUserOrFail(userId);
if (!userId) {
const owner = await Container.get(UserRepository).findOneBy({ role: 'global:owner' });
if (!owner) {
throw new ApplicationError(`Failed to find owner. ${UM_FIX_INSTRUCTION}`);
}
userId = owner.id;
}

return await this.getOwnerProject();
return await Container.get(ProjectRepository).getPersonalProjectForUserOrFail(userId);
}
}
5 changes: 4 additions & 1 deletion packages/cli/src/credentials/credentials.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,10 @@ export class CredentialsService {

const project =
projectId === undefined
? await this.projectRepository.getPersonalProjectForUserOrFail(user.id)
? await this.projectRepository.getPersonalProjectForUserOrFail(
user.id,
transactionManager,
)
: await this.projectService.getProjectWithScope(
user,
projectId,
Expand Down
6 changes: 4 additions & 2 deletions packages/cli/src/databases/repositories/project.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ export class ProjectRepository extends Repository<Project> {
});
}

async getPersonalProjectForUserOrFail(userId: string) {
return await this.findOneOrFail({
async getPersonalProjectForUserOrFail(userId: string, entityManager?: EntityManager) {
const em = entityManager ?? this.manager;

return await em.findOneOrFail(Project, {
where: { type: 'personal', projectRelations: { userId, role: 'project:personalOwner' } },
});
}
Expand Down
25 changes: 14 additions & 11 deletions packages/cli/src/databases/subscribers/UserSubscriber.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { Container } from 'typedi';
import type { EntitySubscriberInterface, UpdateEvent } from '@n8n/typeorm';
import { EventSubscriber } from '@n8n/typeorm';
import { User } from '../entities/User';
import Container from 'typedi';
import { ProjectRepository } from '../repositories/project.repository';
import { ApplicationError, ErrorReporterProxy } from 'n8n-workflow';
import { Logger } from '@/Logger';
import { UserRepository } from '../repositories/user.repository';

import { Project } from '../entities/Project';
import { User } from '../entities/User';
import { UserRepository } from '../repositories/user.repository';

@EventSubscriber()
export class UserSubscriber implements EntitySubscriberInterface<User> {
Expand All @@ -27,14 +27,17 @@ export class UserSubscriber implements EntitySubscriberInterface<User> {
fields.includes('email')
) {
const oldUser = event.databaseEntity;
const name =
const userEntity =
newUserData instanceof User
? newUserData.createPersonalProjectName()
: Container.get(UserRepository).create(newUserData).createPersonalProjectName();
? newUserData
: Container.get(UserRepository).create(newUserData);

const projectName = userEntity.createPersonalProjectName();

const project = await Container.get(ProjectRepository).getPersonalProjectForUser(
oldUser.id,
);
const project = await event.manager.findOneBy(Project, {
type: 'personal',
projectRelations: { userId: oldUser.id },
});

if (!project) {
// Since this is benign we're not throwing the exception. We don't
Expand All @@ -47,7 +50,7 @@ export class UserSubscriber implements EntitySubscriberInterface<User> {
return;
}

project.name = name;
project.name = projectName;

await event.manager.save(Project, project);
}
Expand Down
8 changes: 3 additions & 5 deletions packages/cli/src/services/import.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import Container, { Service } from 'typedi';
import { Service } from 'typedi';
import { v4 as uuid } from 'uuid';
import { type INode, type INodeCredentialsDetails } from 'n8n-workflow';

Expand All @@ -8,11 +8,11 @@ import { CredentialsRepository } from '@db/repositories/credentials.repository';
import { TagRepository } from '@db/repositories/tag.repository';
import { SharedWorkflow } from '@db/entities/SharedWorkflow';
import { replaceInvalidCredentials } from '@/WorkflowHelpers';
import { Project } from '@db/entities/Project';
import { WorkflowEntity } from '@db/entities/WorkflowEntity';
import { WorkflowTagMapping } from '@db/entities/WorkflowTagMapping';
import type { TagEntity } from '@db/entities/TagEntity';
import type { ICredentialsDb } from '@/Interfaces';
import { ProjectRepository } from '@/databases/repositories/project.repository';

@Service()
export class ImportService {
Expand Down Expand Up @@ -59,9 +59,7 @@ export class ImportService {
const upsertResult = await tx.upsert(WorkflowEntity, workflow, ['id']);
const workflowId = upsertResult.identifiers.at(0)?.id as string;

const personalProject = await Container.get(ProjectRepository).findOneByOrFail({
id: projectId,
});
const personalProject = await tx.findOneByOrFail(Project, { id: projectId });

// Create relationship if the workflow was inserted instead of updated.
if (!exists) {
Expand Down
Loading