Skip to content

Commit

Permalink
refactor(core): Introduce import service (no-changelog) (#8001)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivov committed Dec 13, 2023
1 parent 70f0755 commit 7b5d0a9
Show file tree
Hide file tree
Showing 4 changed files with 356 additions and 206 deletions.
155 changes: 16 additions & 139 deletions packages/cli/src/commands/import/workflow.ts
@@ -1,26 +1,18 @@
import { flags } from '@oclif/command';
import type { INode, INodeCredentialsDetails } from 'n8n-workflow';
import { ApplicationError, jsonParse } from 'n8n-workflow';
import fs from 'fs';
import glob from 'fast-glob';
import { Container } from 'typedi';
import type { EntityManager } from 'typeorm';
import { v4 as uuid } from 'uuid';
import * as Db from '@/Db';
import { SharedWorkflow } from '@db/entities/SharedWorkflow';
import { WorkflowEntity } from '@db/entities/WorkflowEntity';
import type { Role } from '@db/entities/Role';
import type { User } from '@db/entities/User';
import { disableAutoGeneratedIds } from '@db/utils/commandHelpers';
import type { ICredentialsDb, IWorkflowToImport } from '@/Interfaces';
import { replaceInvalidCredentials } from '@/WorkflowHelpers';
import type { IWorkflowToImport } from '@/Interfaces';
import { BaseCommand } from '../BaseCommand';
import { generateNanoId } from '@db/utils/generators';
import { RoleService } from '@/services/role.service';
import { TagService } from '@/services/tag.service';
import { UM_FIX_INSTRUCTION } from '@/constants';
import { UserRepository } from '@db/repositories/user.repository';
import { CredentialsRepository } from '@db/repositories/credentials.repository';
import { ImportService } from '@/services/import.service';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';

function assertHasWorkflowsToImport(workflows: unknown): asserts workflows is IWorkflowToImport[] {
if (!Array.isArray(workflows)) {
Expand Down Expand Up @@ -64,16 +56,9 @@ export class ImportWorkflowsCommand extends BaseCommand {
}),
};

private ownerWorkflowRole: Role;

private transactionManager: EntityManager;

private tagService: TagService;

async init() {
disableAutoGeneratedIds(WorkflowEntity);
await super.init();
this.tagService = Container.get(TagService);
}

async run(): Promise<void> {
Expand All @@ -94,12 +79,8 @@ export class ImportWorkflowsCommand extends BaseCommand {
}
}

await this.initOwnerWorkflowRole();
const user = flags.userId ? await this.getAssignee(flags.userId) : await this.getOwner();

const credentials = await Container.get(CredentialsRepository).find();
const tags = await this.tagService.getAll();

let totalImported = 0;

if (flags.separate) {
Expand All @@ -116,41 +97,17 @@ export class ImportWorkflowsCommand extends BaseCommand {

totalImported = files.length;
this.logger.info(`Importing ${totalImported} workflows...`);
await Db.getConnection().transaction(async (transactionManager) => {
this.transactionManager = transactionManager;

for (const file of files) {
const workflow = jsonParse<IWorkflowToImport>(
fs.readFileSync(file, { encoding: 'utf8' }),
);
if (!workflow.id) {
workflow.id = generateNanoId();
}

if (credentials.length > 0) {
workflow.nodes.forEach((node: INode) => {
this.transformCredentials(node, credentials);

if (!node.id) {
node.id = uuid();
}
});
}

if (Object.prototype.hasOwnProperty.call(workflow, 'tags')) {
await this.tagService.setTagsForImport(transactionManager, workflow, tags);
}

if (workflow.active) {
this.logger.info(
`Deactivating workflow "${workflow.name}" during import, remember to activate it later.`,
);
workflow.active = false;
}

await this.storeWorkflow(workflow, user);

for (const file of files) {
const workflow = jsonParse<IWorkflowToImport>(fs.readFileSync(file, { encoding: 'utf8' }));
if (!workflow.id) {
workflow.id = generateNanoId();
}
});

const _workflow = Container.get(WorkflowRepository).create(workflow);

await Container.get(ImportService).importWorkflows([_workflow], user.id);
}

this.reportSuccess(totalImported);
process.exit();
Expand All @@ -160,46 +117,13 @@ export class ImportWorkflowsCommand extends BaseCommand {
fs.readFileSync(flags.input, { encoding: 'utf8' }),
);

const _workflows = workflows.map((w) => Container.get(WorkflowRepository).create(w));

assertHasWorkflowsToImport(workflows);

totalImported = workflows.length;

await Db.getConnection().transaction(async (transactionManager) => {
this.transactionManager = transactionManager;

for (const workflow of workflows) {
let oldCredentialFormat = false;
if (credentials.length > 0) {
workflow.nodes.forEach((node: INode) => {
this.transformCredentials(node, credentials);
if (!node.id) {
node.id = uuid();
}
if (!node.credentials?.id) {
oldCredentialFormat = true;
}
});
}
if (oldCredentialFormat) {
try {
await replaceInvalidCredentials(workflow as unknown as WorkflowEntity);
} catch (error) {
this.logger.error('Failed to replace invalid credential', error as Error);
}
}
if (Object.prototype.hasOwnProperty.call(workflow, 'tags')) {
await this.tagService.setTagsForImport(transactionManager, workflow, tags);
}
if (workflow.active) {
this.logger.info(
`Deactivating workflow "${workflow.name}" during import, remember to activate it later.`,
);
workflow.active = false;
}

await this.storeWorkflow(workflow, user);
}
});
await Container.get(ImportService).importWorkflows(_workflows, user.id);

this.reportSuccess(totalImported);
}
Expand All @@ -213,29 +137,6 @@ export class ImportWorkflowsCommand extends BaseCommand {
this.logger.info(`Successfully imported ${total} ${total === 1 ? 'workflow.' : 'workflows.'}`);
}

private async initOwnerWorkflowRole() {
const ownerWorkflowRole = await Container.get(RoleService).findWorkflowOwnerRole();

if (!ownerWorkflowRole) {
throw new ApplicationError(`Failed to find owner workflow role. ${UM_FIX_INSTRUCTION}`);
}

this.ownerWorkflowRole = ownerWorkflowRole;
}

private async storeWorkflow(workflow: object, user: User) {
const result = await this.transactionManager.upsert(WorkflowEntity, workflow, ['id']);
await this.transactionManager.upsert(
SharedWorkflow,
{
workflowId: result.identifiers[0].id as string,
userId: user.id,
roleId: this.ownerWorkflowRole.id,
},
['workflowId', 'userId'],
);
}

private async getOwner() {
const ownerGlobalRole = await Container.get(RoleService).findGlobalOwnerRole();

Expand All @@ -259,28 +160,4 @@ export class ImportWorkflowsCommand extends BaseCommand {

return user;
}

private transformCredentials(node: INode, credentialsEntities: ICredentialsDb[]) {
if (node.credentials) {
const allNodeCredentials = Object.entries(node.credentials);
for (const [type, name] of allNodeCredentials) {
if (typeof name === 'string') {
const nodeCredentials: INodeCredentialsDetails = {
id: null,
name,
};

const matchingCredentials = credentialsEntities.filter(
(credentials) => credentials.name === name && credentials.type === type,
);

if (matchingCredentials.length === 1) {
nodeCredentials.id = matchingCredentials[0].id;
}

node.credentials[type] = nodeCredentials;
}
}
}
}
}
153 changes: 153 additions & 0 deletions packages/cli/src/services/import.service.ts
@@ -0,0 +1,153 @@
import { Service } from 'typedi';
import { v4 as uuid } from 'uuid';
import { type INode, type INodeCredentialsDetails } from 'n8n-workflow';
import type { EntityManager } from 'typeorm';

import { Logger } from '@/Logger';
import * as Db from '@/Db';
import { CredentialsRepository } from '@/databases/repositories/credentials.repository';
import { TagRepository } from '@/databases/repositories/tag.repository';
import { SharedWorkflow } from '@/databases/entities/SharedWorkflow';
import { RoleService } from '@/services/role.service';
import { replaceInvalidCredentials } from '@/WorkflowHelpers';
import { WorkflowEntity } from '@/databases/entities/WorkflowEntity';
import { WorkflowTagMapping } from '@/databases/entities/WorkflowTagMapping';

import type { TagEntity } from '@/databases/entities/TagEntity';
import type { Role } from '@/databases/entities/Role';
import type { ICredentialsDb } from '@/Interfaces';

@Service()
export class ImportService {
private dbCredentials: ICredentialsDb[] = [];

private dbTags: TagEntity[] = [];

private workflowOwnerRole: Role;

constructor(
private readonly logger: Logger,
private readonly credentialsRepository: CredentialsRepository,
private readonly tagRepository: TagRepository,
private readonly roleService: RoleService,
) {}

async initRecords() {
this.dbCredentials = await this.credentialsRepository.find();
this.dbTags = await this.tagRepository.find();
this.workflowOwnerRole = await this.roleService.findWorkflowOwnerRole();
}

async importWorkflows(workflows: WorkflowEntity[], userId: string) {
await this.initRecords();

for (const workflow of workflows) {
workflow.nodes.forEach((node) => {
this.toNewCredentialFormat(node);

if (!node.id) node.id = uuid();
});

const hasInvalidCreds = workflow.nodes.some((node) => !node.credentials?.id);

if (hasInvalidCreds) await this.replaceInvalidCreds(workflow);
}

await Db.transaction(async (tx) => {
for (const workflow of workflows) {
if (workflow.active) {
workflow.active = false;

this.logger.info(`Deactivating workflow "${workflow.name}". Remember to activate later.`);
}

const upsertResult = await tx.upsert(WorkflowEntity, workflow, ['id']);

const workflowId = upsertResult.identifiers.at(0)?.id as string;

await tx.upsert(SharedWorkflow, { workflowId, userId, roleId: this.workflowOwnerRole.id }, [
'workflowId',
'userId',
]);

if (!workflow.tags?.length) continue;

await this.setTags(tx, workflow);

for (const tag of workflow.tags) {
await tx.upsert(WorkflowTagMapping, { tagId: tag.id, workflowId }, [
'tagId',
'workflowId',
]);
}
}
});
}

async replaceInvalidCreds(workflow: WorkflowEntity) {
try {
await replaceInvalidCredentials(workflow);
} catch (e) {
const error = e instanceof Error ? e : new Error(`${e}`);
this.logger.error('Failed to replace invalid credential', error);
}
}

/**
* Convert a node's credentials from old format `{ <nodeType>: <credentialName> }`
* to new format: `{ <nodeType>: { id: string | null, name: <credentialName> } }`
*/
private toNewCredentialFormat(node: INode) {
if (!node.credentials) return;

for (const [type, name] of Object.entries(node.credentials)) {
if (typeof name !== 'string') continue;

const nodeCredential: INodeCredentialsDetails = { id: null, name };

const match = this.dbCredentials.find((c) => c.name === name && c.type === type);

if (match) nodeCredential.id = match.id;

node.credentials[type] = nodeCredential;
}
}

/**
* Set tags on workflow to import while ensuring all tags exist in the database,
* either by matching incoming to existing tags or by creating them first.
*/
private async setTags(tx: EntityManager, workflow: WorkflowEntity) {
if (!workflow?.tags?.length) return;

for (let i = 0; i < workflow.tags.length; i++) {
const importTag = workflow.tags[i];

if (!importTag.name) continue;

const identicalMatch = this.dbTags.find(
(dbTag) =>
dbTag.id === importTag.id &&
dbTag.createdAt &&
importTag.createdAt &&
dbTag.createdAt.getTime() === new Date(importTag.createdAt).getTime(),
);

if (identicalMatch) {
workflow.tags[i] = identicalMatch;
continue;
}

const nameMatch = this.dbTags.find((dbTag) => dbTag.name === importTag.name);

if (nameMatch) {
workflow.tags[i] = nameMatch;
continue;
}

const tagEntity = this.tagRepository.create(importTag);

workflow.tags[i] = await tx.save<TagEntity>(tagEntity);
}
}
}

0 comments on commit 7b5d0a9

Please sign in to comment.