From 58518b684b6c9495aa6efd0e815a8d01f102bbe4 Mon Sep 17 00:00:00 2001 From: Michael Kret <88898367+michael-radency@users.noreply.github.com> Date: Tue, 9 Apr 2024 18:41:51 +0300 Subject: [PATCH] feat(Postgres Node): Options keepAlive and keepAliveInitialDelayMillis (#9067) --- .../Postgres/PostgresTrigger.functions.ts | 13 +++++++ .../nodes/Postgres/PostgresTrigger.node.ts | 27 ++++++++++++++ .../v2/actions/common.descriptions.ts | 10 +++++ .../actions/database/deleteTable.operation.ts | 3 +- .../database/executeQuery.operation.ts | 9 ++++- .../v2/actions/database/insert.operation.ts | 3 +- .../v2/actions/database/select.operation.ts | 3 +- .../v2/actions/database/update.operation.ts | 5 ++- .../v2/actions/database/upsert.operation.ts | 3 +- .../nodes/Postgres/v2/actions/router.ts | 5 ++- .../nodes/Postgres/v2/helpers/interfaces.ts | 33 +++++++++++++++++ .../Postgres/v2/methods/credentialTest.ts | 5 +-- .../nodes/Postgres/v2/methods/listSearch.ts | 5 ++- .../nodes/Postgres/v2/methods/loadOptions.ts | 3 +- .../Postgres/v2/methods/resourceMapping.ts | 3 +- .../nodes/Postgres/v2/transport/index.ts | 37 ++++++++++++------- 16 files changed, 136 insertions(+), 31 deletions(-) diff --git a/packages/nodes-base/nodes/Postgres/PostgresTrigger.functions.ts b/packages/nodes-base/nodes/Postgres/PostgresTrigger.functions.ts index 46bf31dba1cfa..e6830fc701dbb 100644 --- a/packages/nodes-base/nodes/Postgres/PostgresTrigger.functions.ts +++ b/packages/nodes-base/nodes/Postgres/PostgresTrigger.functions.ts @@ -87,6 +87,10 @@ export async function pgTriggerFunction( export async function initDB(this: ITriggerFunctions | ILoadOptionsFunctions) { const credentials = await this.getCredentials('postgres'); + const options = this.getNodeParameter('options', {}) as { + connectionTimeout?: number; + delayClosingIdleConnection?: number; + }; const pgp = pgPromise({ // prevent spam in console "WARNING: Creating a duplicate database object for the same connection." noWarnings: true, @@ -97,8 +101,17 @@ export async function initDB(this: ITriggerFunctions | ILoadOptionsFunctions) { database: credentials.database as string, user: credentials.user as string, password: credentials.password as string, + keepAlive: true, }; + if (options.connectionTimeout) { + config.connectionTimeoutMillis = options.connectionTimeout * 1000; + } + + if (options.delayClosingIdleConnection) { + config.keepAliveInitialDelayMillis = options.delayClosingIdleConnection * 1000; + } + if (credentials.allowUnauthorizedCerts === true) { config.ssl = { rejectUnauthorized: false, diff --git a/packages/nodes-base/nodes/Postgres/PostgresTrigger.node.ts b/packages/nodes-base/nodes/Postgres/PostgresTrigger.node.ts index db46f2c35f392..639da1d3e6092 100644 --- a/packages/nodes-base/nodes/Postgres/PostgresTrigger.node.ts +++ b/packages/nodes-base/nodes/Postgres/PostgresTrigger.node.ts @@ -209,6 +209,33 @@ export class PostgresTrigger implements INodeType { }, ], }, + { + displayName: 'Options', + name: 'options', + type: 'collection', + placeholder: 'Add Option', + default: {}, + options: [ + { + displayName: 'Connection Timeout', + name: 'connectionTimeout', + type: 'number', + default: 30, + description: 'Number of seconds reserved for connecting to the database', + }, + { + displayName: 'Delay Closing Idle Connection', + name: 'delayClosingIdleConnection', + type: 'number', + default: 0, + description: + 'Number of seconds to wait before idle connection would be eligible for closing', + typeOptions: { + minValue: 0, + }, + }, + ], + }, ], }; diff --git a/packages/nodes-base/nodes/Postgres/v2/actions/common.descriptions.ts b/packages/nodes-base/nodes/Postgres/v2/actions/common.descriptions.ts index 47474a402c68c..8ed1b3a7d77b9 100644 --- a/packages/nodes-base/nodes/Postgres/v2/actions/common.descriptions.ts +++ b/packages/nodes-base/nodes/Postgres/v2/actions/common.descriptions.ts @@ -30,6 +30,16 @@ export const optionsCollection: INodeProperties = { default: 30, description: 'Number of seconds reserved for connecting to the database', }, + { + displayName: 'Delay Closing Idle Connection', + name: 'delayClosingIdleConnection', + type: 'number', + default: 0, + description: 'Number of seconds to wait before idle connection would be eligible for closing', + typeOptions: { + minValue: 0, + }, + }, { displayName: 'Query Batching', name: 'queryBatching', diff --git a/packages/nodes-base/nodes/Postgres/v2/actions/database/deleteTable.operation.ts b/packages/nodes-base/nodes/Postgres/v2/actions/database/deleteTable.operation.ts index 1e35dad292ddd..badede9f5312e 100644 --- a/packages/nodes-base/nodes/Postgres/v2/actions/database/deleteTable.operation.ts +++ b/packages/nodes-base/nodes/Postgres/v2/actions/database/deleteTable.operation.ts @@ -8,6 +8,7 @@ import { NodeOperationError } from 'n8n-workflow'; import type { PgpDatabase, + PostgresNodeOptions, QueriesRunner, QueryValues, QueryWithValues, @@ -95,7 +96,7 @@ export async function execute( this: IExecuteFunctions, runQueries: QueriesRunner, items: INodeExecutionData[], - nodeOptions: IDataObject, + nodeOptions: PostgresNodeOptions, _db?: PgpDatabase, ): Promise { const queries: QueryWithValues[] = []; diff --git a/packages/nodes-base/nodes/Postgres/v2/actions/database/executeQuery.operation.ts b/packages/nodes-base/nodes/Postgres/v2/actions/database/executeQuery.operation.ts index c4aaa8e8dec69..6d7908905e856 100644 --- a/packages/nodes-base/nodes/Postgres/v2/actions/database/executeQuery.operation.ts +++ b/packages/nodes-base/nodes/Postgres/v2/actions/database/executeQuery.operation.ts @@ -6,7 +6,12 @@ import type { } from 'n8n-workflow'; import { NodeOperationError } from 'n8n-workflow'; -import type { PgpDatabase, QueriesRunner, QueryWithValues } from '../../helpers/interfaces'; +import type { + PgpDatabase, + PostgresNodeOptions, + QueriesRunner, + QueryWithValues, +} from '../../helpers/interfaces'; import { replaceEmptyStringsByNulls } from '../../helpers/utils'; @@ -46,7 +51,7 @@ export async function execute( this: IExecuteFunctions, runQueries: QueriesRunner, items: INodeExecutionData[], - nodeOptions: IDataObject, + nodeOptions: PostgresNodeOptions, _db?: PgpDatabase, ): Promise { items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean); diff --git a/packages/nodes-base/nodes/Postgres/v2/actions/database/insert.operation.ts b/packages/nodes-base/nodes/Postgres/v2/actions/database/insert.operation.ts index 01cce7d8907f6..0dcdf23f3fe47 100644 --- a/packages/nodes-base/nodes/Postgres/v2/actions/database/insert.operation.ts +++ b/packages/nodes-base/nodes/Postgres/v2/actions/database/insert.operation.ts @@ -7,6 +7,7 @@ import type { import type { PgpDatabase, + PostgresNodeOptions, QueriesRunner, QueryValues, QueryWithValues, @@ -157,7 +158,7 @@ export async function execute( this: IExecuteFunctions, runQueries: QueriesRunner, items: INodeExecutionData[], - nodeOptions: IDataObject, + nodeOptions: PostgresNodeOptions, db: PgpDatabase, ): Promise { items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean); diff --git a/packages/nodes-base/nodes/Postgres/v2/actions/database/select.operation.ts b/packages/nodes-base/nodes/Postgres/v2/actions/database/select.operation.ts index 36f6ec89cb904..b0848f2a6b59a 100644 --- a/packages/nodes-base/nodes/Postgres/v2/actions/database/select.operation.ts +++ b/packages/nodes-base/nodes/Postgres/v2/actions/database/select.operation.ts @@ -7,6 +7,7 @@ import type { import type { PgpDatabase, + PostgresNodeOptions, QueriesRunner, QueryValues, QueryWithValues, @@ -75,7 +76,7 @@ export async function execute( this: IExecuteFunctions, runQueries: QueriesRunner, items: INodeExecutionData[], - nodeOptions: IDataObject, + nodeOptions: PostgresNodeOptions, _db?: PgpDatabase, ): Promise { items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean); diff --git a/packages/nodes-base/nodes/Postgres/v2/actions/database/update.operation.ts b/packages/nodes-base/nodes/Postgres/v2/actions/database/update.operation.ts index e9f9da6825b15..844806221aedd 100644 --- a/packages/nodes-base/nodes/Postgres/v2/actions/database/update.operation.ts +++ b/packages/nodes-base/nodes/Postgres/v2/actions/database/update.operation.ts @@ -8,6 +8,7 @@ import { NodeOperationError } from 'n8n-workflow'; import type { PgpDatabase, + PostgresNodeOptions, QueriesRunner, QueryValues, QueryWithValues, @@ -194,7 +195,7 @@ export async function execute( this: IExecuteFunctions, runQueries: QueriesRunner, items: INodeExecutionData[], - nodeOptions: IDataObject, + nodeOptions: PostgresNodeOptions, db: PgpDatabase, ): Promise { items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean); @@ -279,7 +280,7 @@ export async function execute( const rowExists = await doesRowExist(db, schema, table, matchValues); if (!rowExists) { const descriptionValues: string[] = []; - matchValues.forEach((val, index) => { + matchValues.forEach((_, index) => { if (index % 2 === 0) { descriptionValues.push(`${matchValues[index]}=${matchValues[index + 1]}`); } diff --git a/packages/nodes-base/nodes/Postgres/v2/actions/database/upsert.operation.ts b/packages/nodes-base/nodes/Postgres/v2/actions/database/upsert.operation.ts index 2d164491be39e..3a4b85d900f55 100644 --- a/packages/nodes-base/nodes/Postgres/v2/actions/database/upsert.operation.ts +++ b/packages/nodes-base/nodes/Postgres/v2/actions/database/upsert.operation.ts @@ -8,6 +8,7 @@ import { NodeOperationError } from 'n8n-workflow'; import type { PgpDatabase, + PostgresNodeOptions, QueriesRunner, QueryValues, QueryWithValues, @@ -193,7 +194,7 @@ export async function execute( this: IExecuteFunctions, runQueries: QueriesRunner, items: INodeExecutionData[], - nodeOptions: IDataObject, + nodeOptions: PostgresNodeOptions, db: PgpDatabase, ): Promise { items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean); diff --git a/packages/nodes-base/nodes/Postgres/v2/actions/router.ts b/packages/nodes-base/nodes/Postgres/v2/actions/router.ts index 60d780ce0e2de..47cc0a3f40c85 100644 --- a/packages/nodes-base/nodes/Postgres/v2/actions/router.ts +++ b/packages/nodes-base/nodes/Postgres/v2/actions/router.ts @@ -6,6 +6,7 @@ import { configureQueryRunner } from '../helpers/utils'; import type { PostgresType } from './node.type'; import * as database from './database/Database.resource'; +import type { PostgresNodeCredentials, PostgresNodeOptions } from '../helpers/interfaces'; export async function router(this: IExecuteFunctions): Promise { let returnData: INodeExecutionData[] = []; @@ -14,8 +15,8 @@ export async function router(this: IExecuteFunctions): Promise('resource', 0); const operation = this.getNodeParameter('operation', 0); - const credentials = await this.getCredentials('postgres'); - const options = this.getNodeParameter('options', 0, {}); + const credentials = (await this.getCredentials('postgres')) as PostgresNodeCredentials; + const options = this.getNodeParameter('options', 0, {}) as PostgresNodeOptions; options.nodeVersion = this.getNode().typeVersion; options.operation = operation; diff --git a/packages/nodes-base/nodes/Postgres/v2/helpers/interfaces.ts b/packages/nodes-base/nodes/Postgres/v2/helpers/interfaces.ts index 57d5c6b22a185..5835579d3ac96 100644 --- a/packages/nodes-base/nodes/Postgres/v2/helpers/interfaces.ts +++ b/packages/nodes-base/nodes/Postgres/v2/helpers/interfaces.ts @@ -35,3 +35,36 @@ export type QueriesRunner = ( items: INodeExecutionData[], options: IDataObject, ) => Promise; + +export type PostgresNodeOptions = { + nodeVersion?: number; + operation?: string; + cascade?: boolean; + connectionTimeout?: number; + delayClosingIdleConnection?: number; + queryBatching?: QueryMode; + queryReplacement?: string; + outputColumns?: string[]; + largeNumbersOutput?: 'numbers' | 'text'; + skipOnConflict?: boolean; + replaceEmptyStrings?: boolean; +}; + +export type PostgresNodeCredentials = { + sshAuthenticateWith: 'password' | 'privateKey'; + host: string; + port: number; + database: string; + user: string; + password: string; + allowUnauthorizedCerts?: boolean; + ssl?: 'disable' | 'allow' | 'require' | 'verify' | 'verify-full'; + sshTunnel?: boolean; + sshHost?: string; + sshPort?: number; + sshPostgresPort?: number; + sshUser?: string; + sshPassword?: string; + privateKey?: string; + passphrase?: string; +}; diff --git a/packages/nodes-base/nodes/Postgres/v2/methods/credentialTest.ts b/packages/nodes-base/nodes/Postgres/v2/methods/credentialTest.ts index f107c83ebf4b6..1b4d588fa3b35 100644 --- a/packages/nodes-base/nodes/Postgres/v2/methods/credentialTest.ts +++ b/packages/nodes-base/nodes/Postgres/v2/methods/credentialTest.ts @@ -1,20 +1,19 @@ import type { ICredentialsDecrypted, ICredentialTestFunctions, - IDataObject, INodeCredentialTestResult, } from 'n8n-workflow'; import { Client } from 'ssh2'; import { configurePostgres } from '../transport'; -import type { PgpClient } from '../helpers/interfaces'; +import type { PgpClient, PostgresNodeCredentials } from '../helpers/interfaces'; export async function postgresConnectionTest( this: ICredentialTestFunctions, credential: ICredentialsDecrypted, ): Promise { - const credentials = credential.data as IDataObject; + const credentials = credential.data as PostgresNodeCredentials; let sshClientCreated: Client | undefined = new Client(); let pgpClientCreated: PgpClient | undefined; diff --git a/packages/nodes-base/nodes/Postgres/v2/methods/listSearch.ts b/packages/nodes-base/nodes/Postgres/v2/methods/listSearch.ts index 7bb01aaa94a3d..68c5c3d9ef89c 100644 --- a/packages/nodes-base/nodes/Postgres/v2/methods/listSearch.ts +++ b/packages/nodes-base/nodes/Postgres/v2/methods/listSearch.ts @@ -1,9 +1,10 @@ import type { ILoadOptionsFunctions, INodeListSearchResult } from 'n8n-workflow'; import { configurePostgres } from '../transport'; +import type { PostgresNodeCredentials } from '../helpers/interfaces'; export async function schemaSearch(this: ILoadOptionsFunctions): Promise { - const credentials = await this.getCredentials('postgres'); + const credentials = (await this.getCredentials('postgres')) as PostgresNodeCredentials; const options = { nodeVersion: this.getNode().typeVersion }; const { db, sshClient } = await configurePostgres(credentials, options); @@ -27,7 +28,7 @@ export async function schemaSearch(this: ILoadOptionsFunctions): Promise { - const credentials = await this.getCredentials('postgres'); + const credentials = (await this.getCredentials('postgres')) as PostgresNodeCredentials; const options = { nodeVersion: this.getNode().typeVersion }; const { db, sshClient } = await configurePostgres(credentials, options); diff --git a/packages/nodes-base/nodes/Postgres/v2/methods/loadOptions.ts b/packages/nodes-base/nodes/Postgres/v2/methods/loadOptions.ts index 64e4ba8d4e34c..e7ac1ac9453ee 100644 --- a/packages/nodes-base/nodes/Postgres/v2/methods/loadOptions.ts +++ b/packages/nodes-base/nodes/Postgres/v2/methods/loadOptions.ts @@ -2,9 +2,10 @@ import type { ILoadOptionsFunctions, INodePropertyOptions } from 'n8n-workflow'; import { getTableSchema } from '../helpers/utils'; import { configurePostgres } from '../transport'; +import type { PostgresNodeCredentials } from '../helpers/interfaces'; export async function getColumns(this: ILoadOptionsFunctions): Promise { - const credentials = await this.getCredentials('postgres'); + const credentials = (await this.getCredentials('postgres')) as PostgresNodeCredentials; const options = { nodeVersion: this.getNode().typeVersion }; const { db, sshClient } = await configurePostgres(credentials, options); diff --git a/packages/nodes-base/nodes/Postgres/v2/methods/resourceMapping.ts b/packages/nodes-base/nodes/Postgres/v2/methods/resourceMapping.ts index 5f3428af3ae6f..27333a558eeea 100644 --- a/packages/nodes-base/nodes/Postgres/v2/methods/resourceMapping.ts +++ b/packages/nodes-base/nodes/Postgres/v2/methods/resourceMapping.ts @@ -1,6 +1,7 @@ import type { ILoadOptionsFunctions, ResourceMapperFields, FieldType } from 'n8n-workflow'; import { getEnumValues, getEnums, getTableSchema, uniqueColumns } from '../helpers/utils'; import { configurePostgres } from '../transport'; +import type { PostgresNodeCredentials } from '../helpers/interfaces'; const fieldTypeMapping: Partial> = { string: ['text', 'varchar', 'character varying', 'character', 'char'], @@ -45,7 +46,7 @@ function mapPostgresType(postgresType: string): FieldType { export async function getMappingColumns( this: ILoadOptionsFunctions, ): Promise { - const credentials = await this.getCredentials('postgres'); + const credentials = (await this.getCredentials('postgres')) as PostgresNodeCredentials; const { db, sshClient } = await configurePostgres(credentials); diff --git a/packages/nodes-base/nodes/Postgres/v2/transport/index.ts b/packages/nodes-base/nodes/Postgres/v2/transport/index.ts index 267c52fc6fc97..7736805812717 100644 --- a/packages/nodes-base/nodes/Postgres/v2/transport/index.ts +++ b/packages/nodes-base/nodes/Postgres/v2/transport/index.ts @@ -6,10 +6,14 @@ import type { ConnectConfig } from 'ssh2'; import type { IDataObject } from 'n8n-workflow'; import pgPromise from 'pg-promise'; -import type { PgpDatabase } from '../helpers/interfaces'; +import type { + PgpDatabase, + PostgresNodeCredentials, + PostgresNodeOptions, +} from '../helpers/interfaces'; import { formatPrivateKey } from '@utils/utilities'; -async function createSshConnectConfig(credentials: IDataObject) { +async function createSshConnectConfig(credentials: PostgresNodeCredentials) { if (credentials.sshAuthenticateWith === 'password') { return { host: credentials.sshHost as string, @@ -26,7 +30,7 @@ async function createSshConnectConfig(credentials: IDataObject) { }; if (credentials.passphrase) { - options.passphrase = credentials.passphrase as string; + options.passphrase = credentials.passphrase; } return options; @@ -34,8 +38,8 @@ async function createSshConnectConfig(credentials: IDataObject) { } export async function configurePostgres( - credentials: IDataObject, - options: IDataObject = {}, + credentials: PostgresNodeCredentials, + options: PostgresNodeOptions = {}, createdSshClient?: Client, ) { const pgp = pgPromise({ @@ -63,15 +67,20 @@ export async function configurePostgres( } const dbConfig: IDataObject = { - host: credentials.host as string, - port: credentials.port as number, - database: credentials.database as string, - user: credentials.user as string, - password: credentials.password as string, + host: credentials.host, + port: credentials.port, + database: credentials.database, + user: credentials.user, + password: credentials.password, + keepAlive: true, }; if (options.connectionTimeout) { - dbConfig.connectionTimeoutMillis = (options.connectionTimeout as number) * 1000; + dbConfig.connectionTimeoutMillis = options.connectionTimeout * 1000; + } + + if (options.delayClosingIdleConnection) { + dbConfig.keepAliveInitialDelayMillis = options.delayClosingIdleConnection * 1000; } if (credentials.allowUnauthorizedCerts === true) { @@ -80,7 +89,7 @@ export async function configurePostgres( }; } else { dbConfig.ssl = !['disable', undefined].includes(credentials.ssl as string | undefined); - dbConfig.sslmode = (credentials.ssl as string) || 'disable'; + dbConfig.sslmode = credentials.ssl || 'disable'; } if (!credentials.sshTunnel) { @@ -105,8 +114,8 @@ export async function configurePostgres( sshClient.forwardOut( socket.remoteAddress as string, socket.remotePort as number, - credentials.host as string, - credentials.port as number, + credentials.host, + credentials.port, (err, stream) => { if (err) reject(err);