Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(core): Cache workflow ownership #6738

Merged
merged 13 commits into from
Jul 31, 2023
6 changes: 4 additions & 2 deletions packages/cli/src/UserManagement/PermissionChecker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import { In } from 'typeorm';
import * as Db from '@/Db';
import config from '@/config';
import type { SharedCredentials } from '@db/entities/SharedCredentials';
import { getRoleId, getWorkflowOwner, isSharingEnabled } from './UserManagementHelper';
import { getRoleId, isSharingEnabled } from './UserManagementHelper';
import { WorkflowsService } from '@/workflows/workflows.services';
import { UserService } from '@/user/user.service';
import { OwnershipService } from '@/services/ownership.service';
import Container from 'typedi';

export class PermissionChecker {
/**
Expand Down Expand Up @@ -101,7 +103,7 @@ export class PermissionChecker {
policy = 'workflowsFromSameOwner';
}

const subworkflowOwner = await getWorkflowOwner(subworkflow.id);
const subworkflowOwner = await Container.get(OwnershipService).getWorkflowOwner(subworkflow.id);

const errorToThrow = new SubworkflowOperationError(
`Target workflow ID ${subworkflow.id ?? ''} may not be called`,
Expand Down
11 changes: 0 additions & 11 deletions packages/cli/src/UserManagement/UserManagementHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,6 @@ import { License } from '@/License';
import { getWebhookBaseUrl } from '@/WebhookHelpers';
import type { PostHogClient } from '@/posthog';

export async function getWorkflowOwner(workflowId: string): Promise<User> {
const workflowOwnerRole = await Container.get(RoleRepository).findWorkflowOwnerRole();

const sharedWorkflow = await Db.collections.SharedWorkflow.findOneOrFail({
where: { workflowId, roleId: workflowOwnerRole?.id ?? undefined },
relations: ['user', 'user.globalRole'],
});

return sharedWorkflow.user;
}

export function isEmailSetUp(): boolean {
const smtp = config.getEnv('userManagement.emails.mode') === 'smtp';
const host = !!config.getEnv('userManagement.emails.smtp.host');
Expand Down
5 changes: 3 additions & 2 deletions packages/cli/src/WaitTracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import type {
IWorkflowExecutionDataProcess,
} from '@/Interfaces';
import { WorkflowRunner } from '@/WorkflowRunner';
import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper';
import { recoverExecutionDataFromEventLogMessages } from './eventbus/MessageEventBus/recoverEvents';
import { ExecutionRepository } from '@db/repositories';
import type { ExecutionEntity } from '@db/entities/ExecutionEntity';
import { OwnershipService } from './services/ownership.service';

@Service()
export class WaitTracker {
Expand Down Expand Up @@ -186,7 +186,8 @@ export class WaitTracker {
if (!fullExecutionData.workflowData.id) {
throw new Error('Only saved workflows can be resumed.');
}
const user = await getWorkflowOwner(fullExecutionData.workflowData.id);
const workflowId = fullExecutionData.workflowData.id;
const user = await Container.get(OwnershipService).getWorkflowOwner(workflowId);

const data: IWorkflowExecutionDataProcess = {
executionMode: fullExecutionData.mode,
Expand Down
8 changes: 4 additions & 4 deletions packages/cli/src/WaitingWebhooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
/* eslint-disable no-param-reassign */
import type { INode, WebhookHttpMethod } from 'n8n-workflow';
import { NodeHelpers, Workflow, LoggerProxy as Logger } from 'n8n-workflow';
import { Service } from 'typedi';
import Container, { Service } from 'typedi';
import type express from 'express';

import * as ResponseHelper from '@/ResponseHelper';
import * as WebhookHelpers from '@/WebhookHelpers';
import { NodeTypes } from '@/NodeTypes';
import type { IExecutionResponse, IResponseCallbackData, IWorkflowDb } from '@/Interfaces';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper';
import { ExecutionRepository } from '@db/repositories';
import { OwnershipService } from './services/ownership.service';

@Service()
export class WaitingWebhooks {
Expand Down Expand Up @@ -83,7 +83,7 @@ export class WaitingWebhooks {
const { workflowData } = fullExecutionData;

const workflow = new Workflow({
id: workflowData.id!.toString(),
id: workflowData.id!,
name: workflowData.name,
nodes: workflowData.nodes,
connections: workflowData.connections,
Expand All @@ -95,7 +95,7 @@ export class WaitingWebhooks {

let workflowOwner;
try {
workflowOwner = await getWorkflowOwner(workflowData.id!.toString());
workflowOwner = await Container.get(OwnershipService).getWorkflowOwner(workflowData.id!);
} catch (error) {
throw new ResponseHelper.NotFoundError('Could not find workflow');
}
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/WebhookHelpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'
import { ActiveExecutions } from '@/ActiveExecutions';
import type { User } from '@db/entities/User';
import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper';
import { EventsService } from '@/services/events.service';
import { OwnershipService } from './services/ownership.service';

const pipeline = promisify(stream.pipeline);

Expand Down Expand Up @@ -177,7 +177,7 @@ export async function executeWebhook(
user = (workflowData as WorkflowEntity).shared[0].user;
} else {
try {
user = await getWorkflowOwner(workflowData.id);
user = await Container.get(OwnershipService).getWorkflowOwner(workflowData.id);
} catch (error) {
throw new ResponseHelper.NotFoundError('Cannot find workflow');
}
Expand Down
14 changes: 9 additions & 5 deletions packages/cli/src/WorkflowExecuteAdditionalData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ import { NodeTypes } from '@/NodeTypes';
import { Push } from '@/push';
import * as WebhookHelpers from '@/WebhookHelpers';
import * as WorkflowHelpers from '@/WorkflowHelpers';
import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper';
import { findSubworkflowStart, isWorkflowIdValid } from '@/utils';
import { PermissionChecker } from './UserManagement/PermissionChecker';
import { WorkflowsService } from './workflows/workflows.services';
import { InternalHooks } from '@/InternalHooks';
import type { ExecutionMetadata } from '@db/entities/ExecutionMetadata';
import { ExecutionRepository } from '@db/repositories';
import { EventsService } from '@/services/events.service';
import { OwnershipService } from './services/ownership.service';

const ERROR_TRIGGER_TYPE = config.getEnv('nodes.errorTriggerType');

Expand Down Expand Up @@ -152,7 +152,9 @@ export function executeErrorWorkflow(
// make sure there are no possible security gaps
return;
}
getWorkflowOwner(workflowId)

Container.get(OwnershipService)
.getWorkflowOwner(workflowId)
.then((user) => {
void WorkflowHelpers.executeErrorWorkflow(errorWorkflow, workflowErrorData, user);
})
Expand All @@ -175,9 +177,11 @@ export function executeErrorWorkflow(
workflowData.nodes.some((node) => node.type === ERROR_TRIGGER_TYPE)
) {
Logger.verbose('Start internal error workflow', { executionId, workflowId });
void getWorkflowOwner(workflowId).then((user) => {
void WorkflowHelpers.executeErrorWorkflow(workflowId, workflowErrorData, user);
});
void Container.get(OwnershipService)
.getWorkflowOwner(workflowId)
.then((user) => {
void WorkflowHelpers.executeErrorWorkflow(workflowId, workflowErrorData, user);
});
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/commands/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ import { PermissionChecker } from '@/UserManagement/PermissionChecker';
import config from '@/config';
import type { Job, JobId, JobQueue, JobResponse, WebhookResponse } from '@/Queue';
import { Queue } from '@/Queue';
import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper';
import { generateFailedExecutionFromError } from '@/WorkflowHelpers';
import { N8N_VERSION } from '@/constants';
import { BaseCommand } from './BaseCommand';
import { ExecutionRepository } from '@db/repositories';
import { OwnershipService } from '@/services/ownership.service';

export class Worker extends BaseCommand {
static description = '\nStarts a n8n worker';
Expand Down Expand Up @@ -112,7 +112,7 @@ export class Worker extends BaseCommand {
`Start job: ${job.id} (Workflow ID: ${workflowId} | Execution: ${executionId})`,
);

const workflowOwner = await getWorkflowOwner(workflowId);
const workflowOwner = await Container.get(OwnershipService).getWorkflowOwner(workflowId);

let { staticData } = fullExecutionData.workflowData;
if (loadStaticData) {
Expand Down
4 changes: 4 additions & 0 deletions packages/cli/src/services/cache.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import { LoggerProxy, jsonStringify } from 'n8n-workflow';

@Service()
export class CacheService {
/**
* Keys and values:
* - `'workflow-owner:${workflowId}'`: `User`
*/
private cache: RedisCache | MemoryCache | undefined;

async init() {
Expand Down
8 changes: 4 additions & 4 deletions packages/cli/src/services/events.service.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { EventEmitter } from 'events';
import { Service } from 'typedi';
import Container, { Service } from 'typedi';
import type { INode, IRun, IWorkflowBase } from 'n8n-workflow';
import { LoggerProxy } from 'n8n-workflow';
import { StatisticsNames } from '@db/entities/WorkflowStatistics';
import { WorkflowStatisticsRepository } from '@db/repositories';
import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper';
import { UserService } from '@/user/user.service';
import { OwnershipService } from './ownership.service';

@Service()
export class EventsService extends EventEmitter {
Expand Down Expand Up @@ -41,7 +41,7 @@ export class EventsService extends EventEmitter {
const upsertResult = await this.repository.upsertWorkflowStatistics(name, workflowId);

if (name === 'production_success' && upsertResult === 'insert') {
netroy marked this conversation as resolved.
Show resolved Hide resolved
const owner = await getWorkflowOwner(workflowId);
const owner = await Container.get(OwnershipService).getWorkflowOwner(workflowId);
const metrics = {
user_id: owner.id,
workflow_id: workflowId,
Expand Down Expand Up @@ -72,7 +72,7 @@ export class EventsService extends EventEmitter {
if (insertResult === 'failed') return;

// Compile the metrics since this was a new data loaded event
const owner = await getWorkflowOwner(workflowId);
const owner = await Container.get(OwnershipService).getWorkflowOwner(workflowId);

let metrics = {
user_id: owner.id,
Expand Down
36 changes: 36 additions & 0 deletions packages/cli/src/services/ownership.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import Container, { Service } from 'typedi';
import * as Db from '@/Db';
import { CacheService } from './cache.service';
import { RoleRepository } from '@/databases/repositories';
import type { User } from '@/databases/entities/User';

@Service()
export class OwnershipService {
netroy marked this conversation as resolved.
Show resolved Hide resolved
private cacheService = Container.get(CacheService);
netroy marked this conversation as resolved.
Show resolved Hide resolved

private roleRepository = Container.get(RoleRepository);

/**
* Retrieve the user who owns the workflow. Note that workflow ownership **cannot** be changed.
*
* @caching
*/
async getWorkflowOwner(workflowId: string) {
const cachedOwner = await this.cacheService.get<User>(`workflow-owner:${workflowId}`);

if (cachedOwner) return cachedOwner;

const workflowOwnerRole = await this.roleRepository.findWorkflowOwnerRole();

if (!workflowOwnerRole) throw new Error('Failed to find workflow owner role');

const sharedWorkflow = await Db.collections.SharedWorkflow.findOneOrFail({
where: { workflowId, roleId: workflowOwnerRole.id },
relations: ['user', 'user.globalRole'],
});

void this.cacheService.set(`workflow-owner:${workflowId}`, sharedWorkflow.user);

return sharedWorkflow.user;
}
}
14 changes: 8 additions & 6 deletions packages/cli/test/unit/PermissionChecker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import * as testDb from '../integration/shared/testDb';
import { mockNodeTypesData } from './Helpers';
import type { SaveCredentialFunction } from '../integration/shared/types';
import { mockInstance } from '../integration/shared/utils/';
import { OwnershipService } from '@/services/ownership.service';

let mockNodeTypes: INodeTypes;
let credentialOwnerRole: Role;
Expand Down Expand Up @@ -221,6 +222,7 @@ describe('PermissionChecker.checkSubworkflowExecutePolicy', () => {
const userId = uuid();
const fakeUser = new User();
fakeUser.id = userId;
const ownershipService = mockInstance(OwnershipService);

const ownerMockRole = new Role();
ownerMockRole.name = 'owner';
Expand All @@ -234,7 +236,7 @@ describe('PermissionChecker.checkSubworkflowExecutePolicy', () => {

test('sets default policy from environment when subworkflow has none', async () => {
config.set('workflows.callerPolicyDefaultOption', 'none');
jest.spyOn(UserManagementHelper, 'getWorkflowOwner').mockImplementation(async (workflowId) => {
jest.spyOn(ownershipService, 'getWorkflowOwner').mockImplementation(async (workflowId) => {
return fakeUser;
});
jest.spyOn(UserManagementHelper, 'isSharingEnabled').mockReturnValue(true);
Expand All @@ -253,7 +255,7 @@ describe('PermissionChecker.checkSubworkflowExecutePolicy', () => {

test('if sharing is disabled, ensures that workflows are owner by same user', async () => {
jest
.spyOn(UserManagementHelper, 'getWorkflowOwner')
.spyOn(ownershipService, 'getWorkflowOwner')
.mockImplementation(async (workflowId) => fakeUser);
jest.spyOn(UserManagementHelper, 'isSharingEnabled').mockReturnValue(false);
jest.spyOn(UserService, 'get').mockImplementation(async () => fakeUser);
Expand Down Expand Up @@ -287,7 +289,7 @@ describe('PermissionChecker.checkSubworkflowExecutePolicy', () => {
test('list of ids must include the parent workflow id', async () => {
const invalidParentWorkflowId = uuid();
jest
.spyOn(UserManagementHelper, 'getWorkflowOwner')
.spyOn(ownershipService, 'getWorkflowOwner')
.mockImplementation(async (workflowId) => fakeUser);
jest.spyOn(UserManagementHelper, 'isSharingEnabled').mockReturnValue(true);
jest.spyOn(UserService, 'get').mockImplementation(async () => fakeUser);
Expand All @@ -313,7 +315,7 @@ describe('PermissionChecker.checkSubworkflowExecutePolicy', () => {

test('sameOwner passes when both workflows are owned by the same user', async () => {
jest
.spyOn(UserManagementHelper, 'getWorkflowOwner')
.spyOn(ownershipService, 'getWorkflowOwner')
.mockImplementation(async (workflowId) => fakeUser);
jest.spyOn(UserManagementHelper, 'isSharingEnabled').mockReturnValue(false);
jest.spyOn(UserService, 'get').mockImplementation(async () => fakeUser);
Expand All @@ -336,7 +338,7 @@ describe('PermissionChecker.checkSubworkflowExecutePolicy', () => {
test('workflowsFromAList works when the list contains the parent id', async () => {
const workflowId = uuid();
jest
.spyOn(UserManagementHelper, 'getWorkflowOwner')
.spyOn(ownershipService, 'getWorkflowOwner')
.mockImplementation(async (workflowId) => fakeUser);
jest.spyOn(UserManagementHelper, 'isSharingEnabled').mockReturnValue(true);
jest.spyOn(UserService, 'get').mockImplementation(async () => fakeUser);
Expand All @@ -362,7 +364,7 @@ describe('PermissionChecker.checkSubworkflowExecutePolicy', () => {

test('should not throw when workflow policy is set to any', async () => {
jest
.spyOn(UserManagementHelper, 'getWorkflowOwner')
.spyOn(ownershipService, 'getWorkflowOwner')
.mockImplementation(async (workflowId) => fakeUser);
jest.spyOn(UserManagementHelper, 'isSharingEnabled').mockReturnValue(true);
jest.spyOn(UserService, 'get').mockImplementation(async () => fakeUser);
Expand Down
6 changes: 4 additions & 2 deletions packages/cli/test/unit/services/events.service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ import type { WorkflowStatistics } from '@db/entities/WorkflowStatistics';
import { WorkflowStatisticsRepository } from '@db/repositories';
import { EventsService } from '@/services/events.service';
import { UserService } from '@/user/user.service';
import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper';
import { OwnershipService } from '@/services/ownership.service';
import { mockInstance } from '../../integration/shared/utils';

jest.mock('@/UserManagement/UserManagementHelper', () => ({ getWorkflowOwner: jest.fn() }));

describe('EventsService', () => {
const dbType = config.getEnv('database.type');
const fakeUser = mock<User>({ id: 'abcde-fghij' });
const ownershipService = mockInstance(OwnershipService);

const entityManager = mock<EntityManager>();
const dataSource = mock<DataSource>({
Expand All @@ -36,7 +38,7 @@ describe('EventsService', () => {
LoggerProxy.init(mock<ILogger>());
config.set('diagnostics.enabled', true);
config.set('deployment.type', 'n8n-testing');
mocked(getWorkflowOwner).mockResolvedValue(fakeUser);
mocked(ownershipService.getWorkflowOwner).mockResolvedValue(fakeUser);
const updateUserSettingsMock = jest.spyOn(UserService, 'updateUserSettings').mockImplementation();

const eventsService = new EventsService(new WorkflowStatisticsRepository(dataSource));
Expand Down
Loading