Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(core): Introduce import service (no-changelog) #8001

Merged
merged 3 commits into from Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
155 changes: 16 additions & 139 deletions packages/cli/src/commands/import/workflow.ts
@@ -1,26 +1,18 @@
import { flags } from '@oclif/command';
import type { INode, INodeCredentialsDetails } from 'n8n-workflow';
import { ApplicationError, jsonParse } from 'n8n-workflow';
import fs from 'fs';
import glob from 'fast-glob';
import { Container } from 'typedi';
import type { EntityManager } from 'typeorm';
import { v4 as uuid } from 'uuid';
import * as Db from '@/Db';
import { SharedWorkflow } from '@db/entities/SharedWorkflow';
import { WorkflowEntity } from '@db/entities/WorkflowEntity';
import type { Role } from '@db/entities/Role';
import type { User } from '@db/entities/User';
import { disableAutoGeneratedIds } from '@db/utils/commandHelpers';
import type { ICredentialsDb, IWorkflowToImport } from '@/Interfaces';
import { replaceInvalidCredentials } from '@/WorkflowHelpers';
import type { IWorkflowToImport } from '@/Interfaces';
import { BaseCommand } from '../BaseCommand';
import { generateNanoId } from '@db/utils/generators';
import { RoleService } from '@/services/role.service';
import { TagService } from '@/services/tag.service';
import { UM_FIX_INSTRUCTION } from '@/constants';
import { UserRepository } from '@db/repositories/user.repository';
import { CredentialsRepository } from '@db/repositories/credentials.repository';
import { ImportService } from '@/services/import.service';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';

function assertHasWorkflowsToImport(workflows: unknown): asserts workflows is IWorkflowToImport[] {
if (!Array.isArray(workflows)) {
Expand Down Expand Up @@ -64,16 +56,9 @@ export class ImportWorkflowsCommand extends BaseCommand {
}),
};

private ownerWorkflowRole: Role;

private transactionManager: EntityManager;

private tagService: TagService;

async init() {
disableAutoGeneratedIds(WorkflowEntity);
await super.init();
this.tagService = Container.get(TagService);
}

async run(): Promise<void> {
Expand All @@ -94,12 +79,8 @@ export class ImportWorkflowsCommand extends BaseCommand {
}
}

await this.initOwnerWorkflowRole();
const user = flags.userId ? await this.getAssignee(flags.userId) : await this.getOwner();

const credentials = await Container.get(CredentialsRepository).find();
const tags = await this.tagService.getAll();

let totalImported = 0;

if (flags.separate) {
Expand All @@ -116,41 +97,17 @@ export class ImportWorkflowsCommand extends BaseCommand {

totalImported = files.length;
this.logger.info(`Importing ${totalImported} workflows...`);
await Db.getConnection().transaction(async (transactionManager) => {
this.transactionManager = transactionManager;

for (const file of files) {
const workflow = jsonParse<IWorkflowToImport>(
fs.readFileSync(file, { encoding: 'utf8' }),
);
if (!workflow.id) {
workflow.id = generateNanoId();
}

if (credentials.length > 0) {
workflow.nodes.forEach((node: INode) => {
this.transformCredentials(node, credentials);

if (!node.id) {
node.id = uuid();
}
});
}

if (Object.prototype.hasOwnProperty.call(workflow, 'tags')) {
await this.tagService.setTagsForImport(transactionManager, workflow, tags);
}

if (workflow.active) {
this.logger.info(
`Deactivating workflow "${workflow.name}" during import, remember to activate it later.`,
);
workflow.active = false;
}

await this.storeWorkflow(workflow, user);

for (const file of files) {
const workflow = jsonParse<IWorkflowToImport>(fs.readFileSync(file, { encoding: 'utf8' }));
if (!workflow.id) {
workflow.id = generateNanoId();
}
});

const _workflow = Container.get(WorkflowRepository).create(workflow);

await Container.get(ImportService).importWorkflows([_workflow], user.id);
}

this.reportSuccess(totalImported);
process.exit();
Expand All @@ -160,46 +117,13 @@ export class ImportWorkflowsCommand extends BaseCommand {
fs.readFileSync(flags.input, { encoding: 'utf8' }),
);

const _workflows = workflows.map((w) => Container.get(WorkflowRepository).create(w));

assertHasWorkflowsToImport(workflows);

totalImported = workflows.length;

await Db.getConnection().transaction(async (transactionManager) => {
this.transactionManager = transactionManager;

for (const workflow of workflows) {
let oldCredentialFormat = false;
if (credentials.length > 0) {
workflow.nodes.forEach((node: INode) => {
this.transformCredentials(node, credentials);
if (!node.id) {
node.id = uuid();
}
if (!node.credentials?.id) {
oldCredentialFormat = true;
}
});
}
if (oldCredentialFormat) {
try {
await replaceInvalidCredentials(workflow as unknown as WorkflowEntity);
} catch (error) {
this.logger.error('Failed to replace invalid credential', error as Error);
}
}
if (Object.prototype.hasOwnProperty.call(workflow, 'tags')) {
await this.tagService.setTagsForImport(transactionManager, workflow, tags);
}
if (workflow.active) {
this.logger.info(
`Deactivating workflow "${workflow.name}" during import, remember to activate it later.`,
);
workflow.active = false;
}

await this.storeWorkflow(workflow, user);
}
});
await Container.get(ImportService).importWorkflows(_workflows, user.id);

this.reportSuccess(totalImported);
}
Expand All @@ -213,29 +137,6 @@ export class ImportWorkflowsCommand extends BaseCommand {
this.logger.info(`Successfully imported ${total} ${total === 1 ? 'workflow.' : 'workflows.'}`);
}

private async initOwnerWorkflowRole() {
const ownerWorkflowRole = await Container.get(RoleService).findWorkflowOwnerRole();

if (!ownerWorkflowRole) {
throw new ApplicationError(`Failed to find owner workflow role. ${UM_FIX_INSTRUCTION}`);
}

this.ownerWorkflowRole = ownerWorkflowRole;
}

private async storeWorkflow(workflow: object, user: User) {
const result = await this.transactionManager.upsert(WorkflowEntity, workflow, ['id']);
await this.transactionManager.upsert(
SharedWorkflow,
{
workflowId: result.identifiers[0].id as string,
userId: user.id,
roleId: this.ownerWorkflowRole.id,
},
['workflowId', 'userId'],
);
}

private async getOwner() {
const ownerGlobalRole = await Container.get(RoleService).findGlobalOwnerRole();

Expand All @@ -259,28 +160,4 @@ export class ImportWorkflowsCommand extends BaseCommand {

return user;
}

private transformCredentials(node: INode, credentialsEntities: ICredentialsDb[]) {
if (node.credentials) {
const allNodeCredentials = Object.entries(node.credentials);
for (const [type, name] of allNodeCredentials) {
if (typeof name === 'string') {
const nodeCredentials: INodeCredentialsDetails = {
id: null,
name,
};

const matchingCredentials = credentialsEntities.filter(
(credentials) => credentials.name === name && credentials.type === type,
);

if (matchingCredentials.length === 1) {
nodeCredentials.id = matchingCredentials[0].id;
}

node.credentials[type] = nodeCredentials;
}
}
}
}
}
153 changes: 153 additions & 0 deletions packages/cli/src/services/import.service.ts
@@ -0,0 +1,153 @@
import { Service } from 'typedi';
import { v4 as uuid } from 'uuid';
import { type INode, type INodeCredentialsDetails } from 'n8n-workflow';
import type { EntityManager } from 'typeorm';

import { Logger } from '@/Logger';
import * as Db from '@/Db';
import { CredentialsRepository } from '@/databases/repositories/credentials.repository';
import { TagRepository } from '@/databases/repositories/tag.repository';
import { SharedWorkflow } from '@/databases/entities/SharedWorkflow';
import { RoleService } from '@/services/role.service';
import { replaceInvalidCredentials } from '@/WorkflowHelpers';
import { WorkflowEntity } from '@/databases/entities/WorkflowEntity';
import { WorkflowTagMapping } from '@/databases/entities/WorkflowTagMapping';

import type { TagEntity } from '@/databases/entities/TagEntity';
import type { Role } from '@/databases/entities/Role';
import type { ICredentialsDb } from '@/Interfaces';

@Service()
export class ImportService {
private dbCredentials: ICredentialsDb[] = [];

private dbTags: TagEntity[] = [];

private workflowOwnerRole: Role;

constructor(
private readonly logger: Logger,
private readonly credentialsRepository: CredentialsRepository,
private readonly tagRepository: TagRepository,
private readonly roleService: RoleService,
) {}

async initRecords() {
this.dbCredentials = await this.credentialsRepository.find();
this.dbTags = await this.tagRepository.find();
this.workflowOwnerRole = await this.roleService.findWorkflowOwnerRole();
}

async importWorkflows(workflows: WorkflowEntity[], userId: string) {
await this.initRecords();

for (const workflow of workflows) {
workflow.nodes.forEach((node) => {
this.toNewCredentialFormat(node);

if (!node.id) node.id = uuid();
});

const hasInvalidCreds = workflow.nodes.some((node) => !node.credentials?.id);

if (hasInvalidCreds) await this.replaceInvalidCreds(workflow);
}

await Db.transaction(async (tx) => {
for (const workflow of workflows) {
if (workflow.active) {
workflow.active = false;

this.logger.info(`Deactivating workflow "${workflow.name}". Remember to activate later.`);
}

const upsertResult = await tx.upsert(WorkflowEntity, workflow, ['id']);

const workflowId = upsertResult.identifiers.at(0)?.id as string;

await tx.upsert(SharedWorkflow, { workflowId, userId, roleId: this.workflowOwnerRole.id }, [
'workflowId',
'userId',
]);

if (!workflow.tags?.length) continue;

await this.setTags(tx, workflow);

for (const tag of workflow.tags) {
await tx.upsert(WorkflowTagMapping, { tagId: tag.id, workflowId }, [
'tagId',
'workflowId',
]);
}
}
});
}

async replaceInvalidCreds(workflow: WorkflowEntity) {
try {
await replaceInvalidCredentials(workflow);
} catch (e) {
const error = e instanceof Error ? e : new Error(`${e}`);
this.logger.error('Failed to replace invalid credential', error);
}
}

/**
* Convert a node's credentials from old format `{ <nodeType>: <credentialName> }`
* to new format: `{ <nodeType>: { id: string | null, name: <credentialName> } }`
*/
private toNewCredentialFormat(node: INode) {
if (!node.credentials) return;

for (const [type, name] of Object.entries(node.credentials)) {
if (typeof name !== 'string') continue;

const nodeCredential: INodeCredentialsDetails = { id: null, name };

const match = this.dbCredentials.find((c) => c.name === name && c.type === type);

if (match) nodeCredential.id = match.id;

node.credentials[type] = nodeCredential;
}
}

/**
* Set tags on workflow to import while ensuring all tags exist in the database,
* either by matching incoming to existing tags or by creating them first.
*/
private async setTags(tx: EntityManager, workflow: WorkflowEntity) {
if (!workflow?.tags?.length) return;

for (let i = 0; i < workflow.tags.length; i++) {
const importTag = workflow.tags[i];

if (!importTag.name) continue;

const identicalMatch = this.dbTags.find(
(dbTag) =>
dbTag.id === importTag.id &&
dbTag.createdAt &&
importTag.createdAt &&
dbTag.createdAt.getTime() === new Date(importTag.createdAt).getTime(),
);

if (identicalMatch) {
workflow.tags[i] = identicalMatch;
continue;
}

const nameMatch = this.dbTags.find((dbTag) => dbTag.name === importTag.name);

if (nameMatch) {
workflow.tags[i] = nameMatch;
continue;
}

const tagEntity = this.tagRepository.create(importTag);

workflow.tags[i] = await tx.save<TagEntity>(tagEntity);
}
}
}