Skip to content

Commit

Permalink
feat(Postgres Node): Options keepAlive and keepAliveInitialDelayMillis (
Browse files Browse the repository at this point in the history
  • Loading branch information
michael-radency committed Apr 9, 2024
1 parent c2f4d7d commit 58518b6
Show file tree
Hide file tree
Showing 16 changed files with 136 additions and 31 deletions.
13 changes: 13 additions & 0 deletions packages/nodes-base/nodes/Postgres/PostgresTrigger.functions.ts
Expand Up @@ -87,6 +87,10 @@ export async function pgTriggerFunction(

export async function initDB(this: ITriggerFunctions | ILoadOptionsFunctions) {
const credentials = await this.getCredentials('postgres');
const options = this.getNodeParameter('options', {}) as {
connectionTimeout?: number;
delayClosingIdleConnection?: number;
};
const pgp = pgPromise({
// prevent spam in console "WARNING: Creating a duplicate database object for the same connection."
noWarnings: true,
Expand All @@ -97,8 +101,17 @@ export async function initDB(this: ITriggerFunctions | ILoadOptionsFunctions) {
database: credentials.database as string,
user: credentials.user as string,
password: credentials.password as string,
keepAlive: true,
};

if (options.connectionTimeout) {
config.connectionTimeoutMillis = options.connectionTimeout * 1000;
}

if (options.delayClosingIdleConnection) {
config.keepAliveInitialDelayMillis = options.delayClosingIdleConnection * 1000;
}

if (credentials.allowUnauthorizedCerts === true) {
config.ssl = {
rejectUnauthorized: false,
Expand Down
27 changes: 27 additions & 0 deletions packages/nodes-base/nodes/Postgres/PostgresTrigger.node.ts
Expand Up @@ -209,6 +209,33 @@ export class PostgresTrigger implements INodeType {
},
],
},
{
displayName: 'Options',
name: 'options',
type: 'collection',
placeholder: 'Add Option',
default: {},
options: [
{
displayName: 'Connection Timeout',
name: 'connectionTimeout',
type: 'number',
default: 30,
description: 'Number of seconds reserved for connecting to the database',
},
{
displayName: 'Delay Closing Idle Connection',
name: 'delayClosingIdleConnection',
type: 'number',
default: 0,
description:
'Number of seconds to wait before idle connection would be eligible for closing',
typeOptions: {
minValue: 0,
},
},
],
},
],
};

Expand Down
Expand Up @@ -30,6 +30,16 @@ export const optionsCollection: INodeProperties = {
default: 30,
description: 'Number of seconds reserved for connecting to the database',
},
{
displayName: 'Delay Closing Idle Connection',
name: 'delayClosingIdleConnection',
type: 'number',
default: 0,
description: 'Number of seconds to wait before idle connection would be eligible for closing',
typeOptions: {
minValue: 0,
},
},
{
displayName: 'Query Batching',
name: 'queryBatching',
Expand Down
Expand Up @@ -8,6 +8,7 @@ import { NodeOperationError } from 'n8n-workflow';

import type {
PgpDatabase,
PostgresNodeOptions,
QueriesRunner,
QueryValues,
QueryWithValues,
Expand Down Expand Up @@ -95,7 +96,7 @@ export async function execute(
this: IExecuteFunctions,
runQueries: QueriesRunner,
items: INodeExecutionData[],
nodeOptions: IDataObject,
nodeOptions: PostgresNodeOptions,
_db?: PgpDatabase,
): Promise<INodeExecutionData[]> {
const queries: QueryWithValues[] = [];
Expand Down
Expand Up @@ -6,7 +6,12 @@ import type {
} from 'n8n-workflow';
import { NodeOperationError } from 'n8n-workflow';

import type { PgpDatabase, QueriesRunner, QueryWithValues } from '../../helpers/interfaces';
import type {
PgpDatabase,
PostgresNodeOptions,
QueriesRunner,
QueryWithValues,
} from '../../helpers/interfaces';

import { replaceEmptyStringsByNulls } from '../../helpers/utils';

Expand Down Expand Up @@ -46,7 +51,7 @@ export async function execute(
this: IExecuteFunctions,
runQueries: QueriesRunner,
items: INodeExecutionData[],
nodeOptions: IDataObject,
nodeOptions: PostgresNodeOptions,
_db?: PgpDatabase,
): Promise<INodeExecutionData[]> {
items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean);
Expand Down
Expand Up @@ -7,6 +7,7 @@ import type {

import type {
PgpDatabase,
PostgresNodeOptions,
QueriesRunner,
QueryValues,
QueryWithValues,
Expand Down Expand Up @@ -157,7 +158,7 @@ export async function execute(
this: IExecuteFunctions,
runQueries: QueriesRunner,
items: INodeExecutionData[],
nodeOptions: IDataObject,
nodeOptions: PostgresNodeOptions,
db: PgpDatabase,
): Promise<INodeExecutionData[]> {
items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean);
Expand Down
Expand Up @@ -7,6 +7,7 @@ import type {

import type {
PgpDatabase,
PostgresNodeOptions,
QueriesRunner,
QueryValues,
QueryWithValues,
Expand Down Expand Up @@ -75,7 +76,7 @@ export async function execute(
this: IExecuteFunctions,
runQueries: QueriesRunner,
items: INodeExecutionData[],
nodeOptions: IDataObject,
nodeOptions: PostgresNodeOptions,
_db?: PgpDatabase,
): Promise<INodeExecutionData[]> {
items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean);
Expand Down
Expand Up @@ -8,6 +8,7 @@ import { NodeOperationError } from 'n8n-workflow';

import type {
PgpDatabase,
PostgresNodeOptions,
QueriesRunner,
QueryValues,
QueryWithValues,
Expand Down Expand Up @@ -194,7 +195,7 @@ export async function execute(
this: IExecuteFunctions,
runQueries: QueriesRunner,
items: INodeExecutionData[],
nodeOptions: IDataObject,
nodeOptions: PostgresNodeOptions,
db: PgpDatabase,
): Promise<INodeExecutionData[]> {
items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean);
Expand Down Expand Up @@ -279,7 +280,7 @@ export async function execute(
const rowExists = await doesRowExist(db, schema, table, matchValues);
if (!rowExists) {
const descriptionValues: string[] = [];
matchValues.forEach((val, index) => {
matchValues.forEach((_, index) => {
if (index % 2 === 0) {
descriptionValues.push(`${matchValues[index]}=${matchValues[index + 1]}`);
}
Expand Down
Expand Up @@ -8,6 +8,7 @@ import { NodeOperationError } from 'n8n-workflow';

import type {
PgpDatabase,
PostgresNodeOptions,
QueriesRunner,
QueryValues,
QueryWithValues,
Expand Down Expand Up @@ -193,7 +194,7 @@ export async function execute(
this: IExecuteFunctions,
runQueries: QueriesRunner,
items: INodeExecutionData[],
nodeOptions: IDataObject,
nodeOptions: PostgresNodeOptions,
db: PgpDatabase,
): Promise<INodeExecutionData[]> {
items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean);
Expand Down
5 changes: 3 additions & 2 deletions packages/nodes-base/nodes/Postgres/v2/actions/router.ts
Expand Up @@ -6,6 +6,7 @@ import { configureQueryRunner } from '../helpers/utils';
import type { PostgresType } from './node.type';

import * as database from './database/Database.resource';
import type { PostgresNodeCredentials, PostgresNodeOptions } from '../helpers/interfaces';

export async function router(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
let returnData: INodeExecutionData[] = [];
Expand All @@ -14,8 +15,8 @@ export async function router(this: IExecuteFunctions): Promise<INodeExecutionDat
const resource = this.getNodeParameter<PostgresType>('resource', 0);
const operation = this.getNodeParameter('operation', 0);

const credentials = await this.getCredentials('postgres');
const options = this.getNodeParameter('options', 0, {});
const credentials = (await this.getCredentials('postgres')) as PostgresNodeCredentials;
const options = this.getNodeParameter('options', 0, {}) as PostgresNodeOptions;
options.nodeVersion = this.getNode().typeVersion;
options.operation = operation;

Expand Down
33 changes: 33 additions & 0 deletions packages/nodes-base/nodes/Postgres/v2/helpers/interfaces.ts
Expand Up @@ -35,3 +35,36 @@ export type QueriesRunner = (
items: INodeExecutionData[],
options: IDataObject,
) => Promise<INodeExecutionData[]>;

export type PostgresNodeOptions = {
nodeVersion?: number;
operation?: string;
cascade?: boolean;
connectionTimeout?: number;
delayClosingIdleConnection?: number;
queryBatching?: QueryMode;
queryReplacement?: string;
outputColumns?: string[];
largeNumbersOutput?: 'numbers' | 'text';
skipOnConflict?: boolean;
replaceEmptyStrings?: boolean;
};

export type PostgresNodeCredentials = {
sshAuthenticateWith: 'password' | 'privateKey';
host: string;
port: number;
database: string;
user: string;
password: string;
allowUnauthorizedCerts?: boolean;
ssl?: 'disable' | 'allow' | 'require' | 'verify' | 'verify-full';
sshTunnel?: boolean;
sshHost?: string;
sshPort?: number;
sshPostgresPort?: number;
sshUser?: string;
sshPassword?: string;
privateKey?: string;
passphrase?: string;
};
@@ -1,20 +1,19 @@
import type {
ICredentialsDecrypted,
ICredentialTestFunctions,
IDataObject,
INodeCredentialTestResult,
} from 'n8n-workflow';

import { Client } from 'ssh2';
import { configurePostgres } from '../transport';

import type { PgpClient } from '../helpers/interfaces';
import type { PgpClient, PostgresNodeCredentials } from '../helpers/interfaces';

export async function postgresConnectionTest(
this: ICredentialTestFunctions,
credential: ICredentialsDecrypted,
): Promise<INodeCredentialTestResult> {
const credentials = credential.data as IDataObject;
const credentials = credential.data as PostgresNodeCredentials;

let sshClientCreated: Client | undefined = new Client();
let pgpClientCreated: PgpClient | undefined;
Expand Down
5 changes: 3 additions & 2 deletions packages/nodes-base/nodes/Postgres/v2/methods/listSearch.ts
@@ -1,9 +1,10 @@
import type { ILoadOptionsFunctions, INodeListSearchResult } from 'n8n-workflow';

import { configurePostgres } from '../transport';
import type { PostgresNodeCredentials } from '../helpers/interfaces';

export async function schemaSearch(this: ILoadOptionsFunctions): Promise<INodeListSearchResult> {
const credentials = await this.getCredentials('postgres');
const credentials = (await this.getCredentials('postgres')) as PostgresNodeCredentials;
const options = { nodeVersion: this.getNode().typeVersion };

const { db, sshClient } = await configurePostgres(credentials, options);
Expand All @@ -27,7 +28,7 @@ export async function schemaSearch(this: ILoadOptionsFunctions): Promise<INodeLi
}
}
export async function tableSearch(this: ILoadOptionsFunctions): Promise<INodeListSearchResult> {
const credentials = await this.getCredentials('postgres');
const credentials = (await this.getCredentials('postgres')) as PostgresNodeCredentials;
const options = { nodeVersion: this.getNode().typeVersion };

const { db, sshClient } = await configurePostgres(credentials, options);
Expand Down
3 changes: 2 additions & 1 deletion packages/nodes-base/nodes/Postgres/v2/methods/loadOptions.ts
Expand Up @@ -2,9 +2,10 @@ import type { ILoadOptionsFunctions, INodePropertyOptions } from 'n8n-workflow';

import { getTableSchema } from '../helpers/utils';
import { configurePostgres } from '../transport';
import type { PostgresNodeCredentials } from '../helpers/interfaces';

export async function getColumns(this: ILoadOptionsFunctions): Promise<INodePropertyOptions[]> {
const credentials = await this.getCredentials('postgres');
const credentials = (await this.getCredentials('postgres')) as PostgresNodeCredentials;
const options = { nodeVersion: this.getNode().typeVersion };

const { db, sshClient } = await configurePostgres(credentials, options);
Expand Down
@@ -1,6 +1,7 @@
import type { ILoadOptionsFunctions, ResourceMapperFields, FieldType } from 'n8n-workflow';
import { getEnumValues, getEnums, getTableSchema, uniqueColumns } from '../helpers/utils';
import { configurePostgres } from '../transport';
import type { PostgresNodeCredentials } from '../helpers/interfaces';

const fieldTypeMapping: Partial<Record<FieldType, string[]>> = {
string: ['text', 'varchar', 'character varying', 'character', 'char'],
Expand Down Expand Up @@ -45,7 +46,7 @@ function mapPostgresType(postgresType: string): FieldType {
export async function getMappingColumns(
this: ILoadOptionsFunctions,
): Promise<ResourceMapperFields> {
const credentials = await this.getCredentials('postgres');
const credentials = (await this.getCredentials('postgres')) as PostgresNodeCredentials;

const { db, sshClient } = await configurePostgres(credentials);

Expand Down

0 comments on commit 58518b6

Please sign in to comment.