Skip to content

Commit

Permalink
Merge branch 'node-319-postgres-node-overhaul' into feature/resource-…
Browse files Browse the repository at this point in the history
…mapping-component

* node-319-postgres-node-overhaul: (53 commits)
  ⚡ credentials test fix, clean up
  ⚡ singlton for conections
  feat: Execution custom data saving and filtering (#5496)
  Minor tweaks to query params decription.
  ⚡ ui update
  ⚡ error message update
  ⚡ clean up
  ⚡ ui fixes
  Copy updates.
  ⚡ hint to query
  ⚡ runQueries tests setup
  ⚡ select, update, upsert operations tests
  ⚡ executeQuery and insert operations tests
  ⚡ setup for testing operations, tests for deleteTable operation
  ⚡ refactoring
  ⚡ utils unit tests
  ⚡ config files clean up
  ⚡ unit tests wip
  ⚡ tests setup
  ⚡ improved error message
  ...

# Conflicts:
#	packages/editor-ui/src/Interface.ts
#	packages/nodes-base/nodes/Postgres/Postgres.node.ts
#	packages/nodes-base/nodes/Postgres/v2/PostgresV2.node.ts
#	packages/nodes-base/nodes/Postgres/v2/actions/database/insert.operation.ts
#	packages/nodes-base/nodes/Postgres/v2/actions/database/update.operation.ts
#	packages/nodes-base/nodes/Postgres/v2/actions/database/upsert.operation.ts
#	packages/nodes-base/nodes/Postgres/v2/actions/router.ts
#	packages/nodes-base/nodes/Postgres/v2/actions/versionDescription.ts
#	packages/nodes-base/nodes/Postgres/v2/helpers/interfaces.ts
#	packages/nodes-base/nodes/Postgres/v2/methods/credentialTest.ts
#	packages/nodes-base/nodes/Postgres/v2/methods/index.ts
#	packages/nodes-base/nodes/Postgres/v2/methods/listSearch.ts
#	packages/nodes-base/nodes/Postgres/v2/methods/loadOptions.ts
#	packages/nodes-base/nodes/Postgres/v2/transport/index.ts
  • Loading branch information
MiloradFilipovic committed Mar 30, 2023
2 parents 0f43dc5 + 59744f4 commit 38c2a21
Show file tree
Hide file tree
Showing 37 changed files with 1,495 additions and 296 deletions.
2 changes: 2 additions & 0 deletions packages/cli/src/Db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ export async function init(
collections.InstalledPackages = linkRepository(entities.InstalledPackages);
collections.InstalledNodes = linkRepository(entities.InstalledNodes);
collections.WorkflowStatistics = linkRepository(entities.WorkflowStatistics);
collections.ExecutionMetadata = linkRepository(entities.ExecutionMetadata);

collections.EventDestinations = linkRepository(entities.EventDestinations);

isInitialized = true;
Expand Down
2 changes: 2 additions & 0 deletions packages/cli/src/Interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import type { WebhookEntity } from '@db/entities/WebhookEntity';
import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
import type { WorkflowStatistics } from '@db/entities/WorkflowStatistics';
import type { EventDestinations } from '@db/entities/MessageEventBusDestinationEntity';
import type { ExecutionMetadata } from './databases/entities/ExecutionMetadata';

export interface IActivationError {
time: number;
Expand Down Expand Up @@ -88,6 +89,7 @@ export interface IDatabaseCollections {
InstalledNodes: Repository<InstalledNodes>;
WorkflowStatistics: Repository<WorkflowStatistics>;
EventDestinations: Repository<EventDestinations>;
ExecutionMetadata: Repository<ExecutionMetadata>;
}

// ----------------------------------
Expand Down
33 changes: 33 additions & 0 deletions packages/cli/src/WorkflowExecuteAdditionalData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import { PermissionChecker } from './UserManagement/PermissionChecker';
import { WorkflowsService } from './workflows/workflows.services';
import { Container } from 'typedi';
import { InternalHooks } from '@/InternalHooks';
import type { ExecutionMetadata } from './databases/entities/ExecutionMetadata';

const ERROR_TRIGGER_TYPE = config.getEnv('nodes.errorTriggerType');

Expand Down Expand Up @@ -264,6 +265,22 @@ async function pruneExecutionData(this: WorkflowHooks): Promise<void> {
}
}

export async function saveExecutionMetadata(
executionId: string,
executionMetadata: Record<string, string>,
): Promise<ExecutionMetadata[]> {
const metadataRows = [];
for (const [key, value] of Object.entries(executionMetadata)) {
metadataRows.push({
execution: { id: executionId },
key,
value,
});
}

return Db.collections.ExecutionMetadata.save(metadataRows);
}

/**
* Returns hook functions to push data to Editor-UI
*
Expand Down Expand Up @@ -657,6 +674,14 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
executionData as IExecutionFlattedDb,
);

try {
if (fullRunData.data.resultData.metadata) {
await saveExecutionMetadata(this.executionId, fullRunData.data.resultData.metadata);
}
} catch (e) {
Logger.error(`Failed to save metadata for execution ID ${this.executionId}`, e);
}

if (fullRunData.finished === true && this.retryOf !== undefined) {
// If the retry was successful save the reference it on the original execution
// await Db.collections.Execution.save(executionData as IExecutionFlattedDb);
Expand Down Expand Up @@ -789,6 +814,14 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
status: executionData.status,
});

try {
if (fullRunData.data.resultData.metadata) {
await saveExecutionMetadata(this.executionId, fullRunData.data.resultData.metadata);
}
} catch (e) {
Logger.error(`Failed to save metadata for execution ID ${this.executionId}`, e);
}

if (fullRunData.finished === true && this.retryOf !== undefined) {
// If the retry was successful save the reference it on the original execution
await Db.collections.Execution.update(this.retryOf, {
Expand Down
6 changes: 5 additions & 1 deletion packages/cli/src/databases/entities/ExecutionEntity.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { ExecutionStatus, WorkflowExecuteMode } from 'n8n-workflow';
import { Column, Entity, Generated, Index, PrimaryColumn } from 'typeorm';
import { Column, Entity, Generated, Index, OneToMany, PrimaryColumn } from 'typeorm';
import { datetimeColumnType, jsonColumnType } from './AbstractEntity';
import { IWorkflowDb } from '@/Interfaces';
import type { IExecutionFlattedDb } from '@/Interfaces';
import { idStringifier } from '../utils/transformers';
import type { ExecutionMetadata } from './ExecutionMetadata';

@Entity()
@Index(['workflowId', 'id'])
Expand Down Expand Up @@ -49,4 +50,7 @@ export class ExecutionEntity implements IExecutionFlattedDb {

@Column({ type: datetimeColumnType, nullable: true })
waitTill: Date;

@OneToMany('ExecutionMetadata', 'execution')
metadata: ExecutionMetadata[];
}
22 changes: 22 additions & 0 deletions packages/cli/src/databases/entities/ExecutionMetadata.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { Column, Entity, ManyToOne, PrimaryGeneratedColumn, RelationId } from 'typeorm';
import { ExecutionEntity } from './ExecutionEntity';

@Entity()
export class ExecutionMetadata {
@PrimaryGeneratedColumn()
id: number;

@ManyToOne('ExecutionEntity', 'metadata', {
onDelete: 'CASCADE',
})
execution: ExecutionEntity;

@RelationId((executionMetadata: ExecutionMetadata) => executionMetadata.execution)
executionId: number;

@Column('text')
key: string;

@Column('text')
value: string;
}
2 changes: 2 additions & 0 deletions packages/cli/src/databases/entities/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { User } from './User';
import { WebhookEntity } from './WebhookEntity';
import { WorkflowEntity } from './WorkflowEntity';
import { WorkflowStatistics } from './WorkflowStatistics';
import { ExecutionMetadata } from './ExecutionMetadata';

export const entities = {
AuthIdentity,
Expand All @@ -33,4 +34,5 @@ export const entities = {
WebhookEntity,
WorkflowEntity,
WorkflowStatistics,
ExecutionMetadata,
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { MigrationInterface, QueryRunner, Table } from 'typeorm';
import { getTablePrefix, logMigrationEnd, logMigrationStart } from '@db/utils/migrationHelpers';

export class CreateExecutionMetadataTable1679416281779 implements MigrationInterface {
name = 'CreateExecutionMetadataTable1679416281779';

public async up(queryRunner: QueryRunner): Promise<void> {
logMigrationStart(this.name);
const tablePrefix = getTablePrefix();

await queryRunner.query(
`CREATE TABLE ${tablePrefix}execution_metadata (
id int(11) auto_increment NOT NULL PRIMARY KEY,
executionId int(11) NOT NULL,
\`key\` TEXT NOT NULL,
value TEXT NOT NULL,
CONSTRAINT \`${tablePrefix}execution_metadata_FK\` FOREIGN KEY (\`executionId\`) REFERENCES \`${tablePrefix}execution_entity\` (\`id\`) ON DELETE CASCADE,
INDEX \`IDX_${tablePrefix}6d44376da6c1058b5e81ed8a154e1fee106046eb\` (\`executionId\` ASC)
)
ENGINE=InnoDB`,
);

// Remove indices that are no longer needed since the addition of the status column
await queryRunner.query(
`DROP INDEX \`IDX_${tablePrefix}06da892aaf92a48e7d3e400003\` ON \`${tablePrefix}execution_entity\``,
);
await queryRunner.query(
`DROP INDEX \`IDX_${tablePrefix}78d62b89dc1433192b86dce18a\` ON \`${tablePrefix}execution_entity\``,
);
await queryRunner.query(
`DROP INDEX \`IDX_${tablePrefix}1688846335d274033e15c846a4\` ON \`${tablePrefix}execution_entity\``,
);
await queryRunner.query(
`DROP INDEX \`IDX_${tablePrefix}cefb067df2402f6aed0638a6c1\` ON \`${tablePrefix}execution_entity\``,
);

// Add index to the new status column
await queryRunner.query(
`CREATE INDEX \`IDX_${tablePrefix}8b6f3f9ae234f137d707b98f3bf43584\` ON \`${tablePrefix}execution_entity\` (\`status\`, \`workflowId\`)`,
);

logMigrationEnd(this.name);
}

public async down(queryRunner: QueryRunner): Promise<void> {
const tablePrefix = getTablePrefix();

await queryRunner.query(`DROP TABLE "${tablePrefix}execution_metadata"`);
await queryRunner.query(
`CREATE INDEX \`IDX_${tablePrefix}06da892aaf92a48e7d3e400003\` ON \`${tablePrefix}execution_entity\` (\`workflowId\`, \`waitTill\`, \`id\`)`,
);
await queryRunner.query(
`CREATE INDEX \`IDX_${tablePrefix}78d62b89dc1433192b86dce18a\` ON \`${tablePrefix}execution_entity\` (\`workflowId\`, \`finished\`, \`id\`)`,
);
await queryRunner.query(
`CREATE INDEX \`IDX_${tablePrefix}1688846335d274033e15c846a4\` ON \`${tablePrefix}execution_entity\` (\`finished\`, \`id\`)`,
);
await queryRunner.query(
'CREATE INDEX `IDX_' +
tablePrefix +
'cefb067df2402f6aed0638a6c1` ON `' +
tablePrefix +
'execution_entity` (`stoppedAt`)',
);
await queryRunner.query(
`DROP INDEX \`IDX_${tablePrefix}8b6f3f9ae234f137d707b98f3bf43584\` ON \`${tablePrefix}execution_entity\``,
);
}
}
2 changes: 2 additions & 0 deletions packages/cli/src/databases/migrations/mysqldb/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import { PurgeInvalidWorkflowConnections1675940580449 } from './1675940580449-Pu
import { AddStatusToExecutions1674138566000 } from './1674138566000-AddStatusToExecutions';
import { MigrateExecutionStatus1676996103000 } from './1676996103000-MigrateExecutionStatus';
import { UpdateRunningExecutionStatus1677236788851 } from './1677236788851-UpdateRunningExecutionStatus';
import { CreateExecutionMetadataTable1679416281779 } from './1679416281779-CreateExecutionMetadataTable';

export const mysqlMigrations = [
InitialMigration1588157391238,
Expand Down Expand Up @@ -72,4 +73,5 @@ export const mysqlMigrations = [
AddStatusToExecutions1674138566000,
MigrateExecutionStatus1676996103000,
UpdateRunningExecutionStatus1677236788851,
CreateExecutionMetadataTable1679416281779,
];
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { MigrationInterface, QueryRunner, Table } from 'typeorm';
import { getTablePrefix, logMigrationEnd, logMigrationStart } from '@db/utils/migrationHelpers';

export class CreateExecutionMetadataTable1679416281778 implements MigrationInterface {
name = 'CreateExecutionMetadataTable1679416281778';

public async up(queryRunner: QueryRunner): Promise<void> {
logMigrationStart(this.name);
const tablePrefix = getTablePrefix();

await queryRunner.query(
`CREATE TABLE ${tablePrefix}execution_metadata (
"id" serial4 NOT NULL PRIMARY KEY,
"executionId" int4 NOT NULL,
"key" text NOT NULL,
"value" text NOT NULL,
CONSTRAINT ${tablePrefix}execution_metadata_fk FOREIGN KEY ("executionId") REFERENCES ${tablePrefix}execution_entity(id) ON DELETE CASCADE
)`,
);

await queryRunner.query(
`CREATE INDEX "IDX_${tablePrefix}6d44376da6c1058b5e81ed8a154e1fee106046eb" ON "${tablePrefix}execution_metadata" ("executionId");`,
);

// Remove indices that are no longer needed since the addition of the status column
await queryRunner.query(`DROP INDEX IF EXISTS "IDX_${tablePrefix}33228da131bb1112247cf52a42"`);
await queryRunner.query(`DROP INDEX IF EXISTS "IDX_${tablePrefix}72ffaaab9f04c2c1f1ea86e662"`);
await queryRunner.query(`DROP INDEX IF EXISTS "IDX_${tablePrefix}58154df94c686818c99fb754ce"`);
await queryRunner.query(`DROP INDEX IF EXISTS "IDX_${tablePrefix}4f474ac92be81610439aaad61e"`);

// Create new index for status
await queryRunner.query(
`CREATE INDEX "IDX_${tablePrefix}8b6f3f9ae234f137d707b98f3bf43584" ON "${tablePrefix}execution_entity" ("status", "workflowId");`,
);

logMigrationEnd(this.name);
}

public async down(queryRunner: QueryRunner): Promise<void> {
const tablePrefix = getTablePrefix();

// Re-add removed indices
await queryRunner.query(
`CREATE INDEX IF NOT EXISTS "IDX_${tablePrefix}33228da131bb1112247cf52a42" ON ${tablePrefix}execution_entity ("stoppedAt") `,
);
await queryRunner.query(
`CREATE INDEX IF NOT EXISTS "IDX_${tablePrefix}72ffaaab9f04c2c1f1ea86e662" ON ${tablePrefix}execution_entity ("finished", "id") `,
);
await queryRunner.query(
`CREATE INDEX IF NOT EXISTS "IDX_${tablePrefix}58154df94c686818c99fb754ce" ON ${tablePrefix}execution_entity ("workflowId", "waitTill", "id") `,
);
await queryRunner.query(
`CREATE INDEX IF NOT EXISTS "IDX_${tablePrefix}4f474ac92be81610439aaad61e" ON ${tablePrefix}execution_entity ("workflowId", "finished", "id") `,
);

await queryRunner.query(
`DROP INDEX IF EXISTS "IDX_${tablePrefix}8b6f3f9ae234f137d707b98f3bf43584"`,
);

await queryRunner.query(`DROP TABLE "${tablePrefix}execution_metadata"`);
}
}
2 changes: 2 additions & 0 deletions packages/cli/src/databases/migrations/postgresdb/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import { PurgeInvalidWorkflowConnections1675940580449 } from './1675940580449-Pu
import { AddStatusToExecutions1674138566000 } from './1674138566000-AddStatusToExecutions';
import { MigrateExecutionStatus1676996103000 } from './1676996103000-MigrateExecutionStatus';
import { UpdateRunningExecutionStatus1677236854063 } from './1677236854063-UpdateRunningExecutionStatus';
import { CreateExecutionMetadataTable1679416281778 } from './1679416281778-CreateExecutionMetadataTable';

export const postgresMigrations = [
InitialMigration1587669153312,
Expand Down Expand Up @@ -68,4 +69,5 @@ export const postgresMigrations = [
AddStatusToExecutions1674138566000,
MigrateExecutionStatus1676996103000,
UpdateRunningExecutionStatus1677236854063,
CreateExecutionMetadataTable1679416281778,
];
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { MigrationInterface, QueryRunner, Table } from 'typeorm';
import { getTablePrefix, logMigrationEnd, logMigrationStart } from '@db/utils/migrationHelpers';

export class CreateExecutionMetadataTable1679416281777 implements MigrationInterface {
name = 'CreateExecutionMetadataTable1679416281777';

public async up(queryRunner: QueryRunner): Promise<void> {
logMigrationStart(this.name);
const tablePrefix = getTablePrefix();

await queryRunner.query(
`CREATE TABLE "${tablePrefix}execution_metadata" (
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
executionId INTEGER NOT NULL,
"key" TEXT NOT NULL,
value TEXT NOT NULL,
CONSTRAINT ${tablePrefix}execution_metadata_entity_FK FOREIGN KEY (executionId) REFERENCES ${tablePrefix}execution_entity(id) ON DELETE CASCADE
)`,
);

await queryRunner.query(
`CREATE INDEX IF NOT EXISTS "IDX_${tablePrefix}6d44376da6c1058b5e81ed8a154e1fee106046eb" ON "${tablePrefix}execution_metadata" ("executionId");`,
);

// Re add some lost indices from migration DeleteExecutionsWithWorkflows.ts
// that were part of AddExecutionEntityIndexes.ts
// not all were needed since we added the `status` column to execution_entity
await queryRunner.query(
`CREATE INDEX IF NOT EXISTS 'IDX_${tablePrefix}b94b45ce2c73ce46c54f20b5f9' ON '${tablePrefix}execution_entity' ('waitTill', 'id') `,
);
await queryRunner.query(
`CREATE INDEX IF NOT EXISTS 'IDX_${tablePrefix}81fc04c8a17de15835713505e4' ON '${tablePrefix}execution_entity' ('workflowId', 'id') `,
);

// Also add index to the new status column
await queryRunner.query(
`CREATE INDEX IF NOT EXISTS 'IDX_${tablePrefix}8b6f3f9ae234f137d707b98f3bf43584' ON '${tablePrefix}execution_entity' ('status', 'workflowId') `,
);

// Remove no longer needed index to waitTill since it's already covered by the index b94b45ce2c73ce46c54f20b5f9 above
await queryRunner.query(`DROP INDEX IF EXISTS 'IDX_${tablePrefix}ca4a71b47f28ac6ea88293a8e2'`);
// Remove index for stoppedAt since it's not used anymore
await queryRunner.query(`DROP INDEX IF EXISTS 'IDX_${tablePrefix}cefb067df2402f6aed0638a6c1'`);

logMigrationEnd(this.name);
}

public async down(queryRunner: QueryRunner): Promise<void> {
const tablePrefix = getTablePrefix();

await queryRunner.query(`DROP TABLE "${tablePrefix}execution_metadata"`);
await queryRunner.query(`DROP INDEX IF EXISTS 'IDX_${tablePrefix}b94b45ce2c73ce46c54f20b5f9'`);
await queryRunner.query(`DROP INDEX IF EXISTS 'IDX_${tablePrefix}81fc04c8a17de15835713505e4'`);
await queryRunner.query(
`DROP INDEX IF EXISTS 'IDX_${tablePrefix}8b6f3f9ae234f137d707b98f3bf43584'`,
);
await queryRunner.query(
`CREATE INDEX "IDX_${tablePrefix}ca4a71b47f28ac6ea88293a8e2" ON "${tablePrefix}execution_entity" ("waitTill")`,
);
await queryRunner.query(
`CREATE INDEX "IDX_${tablePrefix}cefb067df2402f6aed0638a6c1" ON "${tablePrefix}execution_entity" ("stoppedAt")`,
);
}
}
2 changes: 2 additions & 0 deletions packages/cli/src/databases/migrations/sqlite/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import { PurgeInvalidWorkflowConnections1675940580449 } from './1675940580449-Pu
import { AddStatusToExecutions1674138566000 } from './1674138566000-AddStatusToExecutions';
import { MigrateExecutionStatus1676996103000 } from './1676996103000-MigrateExecutionStatus';
import { UpdateRunningExecutionStatus1677237073720 } from './1677237073720-UpdateRunningExecutionStatus';
import { CreateExecutionMetadataTable1679416281777 } from './1679416281777-CreateExecutionMetadataTable';

const sqliteMigrations = [
InitialMigration1588102412422,
Expand Down Expand Up @@ -66,6 +67,7 @@ const sqliteMigrations = [
AddStatusToExecutions1674138566000,
MigrateExecutionStatus1676996103000,
UpdateRunningExecutionStatus1677237073720,
CreateExecutionMetadataTable1679416281777,
];

export { sqliteMigrations };
Loading

0 comments on commit 38c2a21

Please sign in to comment.