Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 17 additions & 12 deletions cf/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,25 +110,26 @@ function Postgres(a, b) {

function sql(strings, ...args) {
const query = strings && Array.isArray(strings.raw)
? new Query(strings, args, handler, cancel)
? new Query(strings, args, handler, cancel, { postgres_options: options })
: typeof strings === 'string' && !args.length
? new Identifier(options.transform.column.to ? options.transform.column.to(strings) : strings)
: new Builder(strings, args)
return query
}

function unsafe(string, args = [], options = {}) {
arguments.length === 2 && !Array.isArray(args) && (options = args, args = [])
function unsafe(string, args = [], queryOptions = {}) {
arguments.length === 2 && !Array.isArray(args) && (queryOptions = args, args = [])
const query = new Query([string], args, handler, cancel, {
prepare: false,
...options,
simple: 'simple' in options ? options.simple : args.length === 0
...queryOptions,
simple: 'simple' in queryOptions ? queryOptions.simple : args.length === 0,
postgres_options: options
})
return query
}

function file(path, args = [], options = {}) {
arguments.length === 2 && !Array.isArray(args) && (options = args, args = [])
function file(path, args = [], queryOptions = {}) {
arguments.length === 2 && !Array.isArray(args) && (queryOptions = args, args = [])
const query = new Query([], args, (query) => {
fs.readFile(path, 'utf8', (err, string) => {
if (err)
Expand All @@ -138,8 +139,9 @@ function Postgres(a, b) {
handler(query)
})
}, cancel, {
...options,
simple: 'simple' in options ? options.simple : args.length === 0
...queryOptions,
simple: 'simple' in queryOptions ? queryOptions.simple : args.length === 0,
postgres_options: options
})
return query
}
Expand Down Expand Up @@ -357,7 +359,9 @@ function Postgres(a, b) {
: (
queries.remove(query),
query.cancelled = true,
query.reject(Errors.generic('57014', 'canceling statement due to user request')),
query.reject(Errors.generic('57014', query.timedOut
? 'canceling statement due to timeout'
: 'canceling statement due to user request')),
resolve()
)
})
Expand Down Expand Up @@ -445,7 +449,7 @@ function parseOptions(a, b) {
'timeout' in o && (console.log('The timeout option is deprecated, use idle_timeout instead'), o.idle_timeout = o.timeout) // eslint-disable-line
query.sslrootcert === 'system' && (query.ssl = 'verify-full')

const ints = ['idle_timeout', 'connect_timeout', 'max_lifetime', 'max_pipeline', 'backoff', 'keep_alive']
const ints = ['idle_timeout', 'connect_timeout', 'max_lifetime', 'max_pipeline', 'backoff', 'keep_alive', 'query_timeout']
const defaults = {
max : 10,
ssl : false,
Expand All @@ -459,7 +463,8 @@ function parseOptions(a, b) {
debug : false,
fetch_types : true,
publications : 'alltables',
target_session_attrs: null
target_session_attrs: null,
query_timeout : null
}

return {
Expand Down
29 changes: 26 additions & 3 deletions cf/src/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ export class Query extends Promise {
this.state = null
this.statement = null

this.resolve = x => (this.active = false, resolve(x))
this.reject = x => (this.active = false, reject(x))
this.resolve = x => (this.active = false, this.clearTimeouts(), resolve(x))
this.reject = x => (this.active = false, this.clearTimeouts(), reject(x))

this.active = false
this.cancelled = null
this.executed = false
this.signature = ''
this.timeoutTimer = null
this.timedOut = false

this[originError] = this.handler.debug
? new Error()
Expand All @@ -50,9 +52,30 @@ export class Query extends Promise {
}

cancel() {
this.clearTimeouts()
return this.canceller && (this.canceller(this), this.canceller = null)
}

clearTimeouts() {
if (this.timeoutTimer) {
clearTimeout(this.timeoutTimer)
this.timeoutTimer = null
}
}

setTimeout() {
const postgresOptions = this.options.postgres_options
if (postgresOptions && postgresOptions.query_timeout && !this.timeoutTimer) {
this.timeoutTimer = setTimeout(() => {
if (this.active && !this.cancelled) {
// Mark as cancelled to distinguish from manual cancellation
this.timedOut = true
this.cancel()
}
}, postgresOptions.query_timeout * 1000)
}
}

simple() {
this.options.simple = true
this.options.prepare = false
Expand Down Expand Up @@ -137,7 +160,7 @@ export class Query extends Promise {
}

async handle() {
!this.executed && (this.executed = true) && await 1 && this.handler(this)
!this.executed && (this.executed = true) && await 1 && (this.setTimeout(), this.handler(this))
}

execute() {
Expand Down
29 changes: 17 additions & 12 deletions cjs/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,25 +109,26 @@ function Postgres(a, b) {

function sql(strings, ...args) {
const query = strings && Array.isArray(strings.raw)
? new Query(strings, args, handler, cancel)
? new Query(strings, args, handler, cancel, { postgres_options: options })
: typeof strings === 'string' && !args.length
? new Identifier(options.transform.column.to ? options.transform.column.to(strings) : strings)
: new Builder(strings, args)
return query
}

function unsafe(string, args = [], options = {}) {
arguments.length === 2 && !Array.isArray(args) && (options = args, args = [])
function unsafe(string, args = [], queryOptions = {}) {
arguments.length === 2 && !Array.isArray(args) && (queryOptions = args, args = [])
const query = new Query([string], args, handler, cancel, {
prepare: false,
...options,
simple: 'simple' in options ? options.simple : args.length === 0
...queryOptions,
simple: 'simple' in queryOptions ? queryOptions.simple : args.length === 0,
postgres_options: options
})
return query
}

function file(path, args = [], options = {}) {
arguments.length === 2 && !Array.isArray(args) && (options = args, args = [])
function file(path, args = [], queryOptions = {}) {
arguments.length === 2 && !Array.isArray(args) && (queryOptions = args, args = [])
const query = new Query([], args, (query) => {
fs.readFile(path, 'utf8', (err, string) => {
if (err)
Expand All @@ -137,8 +138,9 @@ function Postgres(a, b) {
handler(query)
})
}, cancel, {
...options,
simple: 'simple' in options ? options.simple : args.length === 0
...queryOptions,
simple: 'simple' in queryOptions ? queryOptions.simple : args.length === 0,
postgres_options: options
})
return query
}
Expand Down Expand Up @@ -356,7 +358,9 @@ function Postgres(a, b) {
: (
queries.remove(query),
query.cancelled = true,
query.reject(Errors.generic('57014', 'canceling statement due to user request')),
query.reject(Errors.generic('57014', query.timedOut
? 'canceling statement due to timeout'
: 'canceling statement due to user request')),
resolve()
)
})
Expand Down Expand Up @@ -444,7 +448,7 @@ function parseOptions(a, b) {
'timeout' in o && (console.log('The timeout option is deprecated, use idle_timeout instead'), o.idle_timeout = o.timeout) // eslint-disable-line
query.sslrootcert === 'system' && (query.ssl = 'verify-full')

const ints = ['idle_timeout', 'connect_timeout', 'max_lifetime', 'max_pipeline', 'backoff', 'keep_alive']
const ints = ['idle_timeout', 'connect_timeout', 'max_lifetime', 'max_pipeline', 'backoff', 'keep_alive', 'query_timeout']
const defaults = {
max : 10,
ssl : false,
Expand All @@ -458,7 +462,8 @@ function parseOptions(a, b) {
debug : false,
fetch_types : true,
publications : 'alltables',
target_session_attrs: null
target_session_attrs: null,
query_timeout : null
}

return {
Expand Down
29 changes: 26 additions & 3 deletions cjs/src/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ const Query = module.exports.Query = class Query extends Promise {
this.state = null
this.statement = null

this.resolve = x => (this.active = false, resolve(x))
this.reject = x => (this.active = false, reject(x))
this.resolve = x => (this.active = false, this.clearTimeouts(), resolve(x))
this.reject = x => (this.active = false, this.clearTimeouts(), reject(x))

this.active = false
this.cancelled = null
this.executed = false
this.signature = ''
this.timeoutTimer = null
this.timedOut = false

this[originError] = this.handler.debug
? new Error()
Expand All @@ -50,9 +52,30 @@ const Query = module.exports.Query = class Query extends Promise {
}

cancel() {
this.clearTimeouts()
return this.canceller && (this.canceller(this), this.canceller = null)
}

clearTimeouts() {
if (this.timeoutTimer) {
clearTimeout(this.timeoutTimer)
this.timeoutTimer = null
}
}

setTimeout() {
const postgresOptions = this.options.postgres_options
if (postgresOptions && postgresOptions.query_timeout && !this.timeoutTimer) {
this.timeoutTimer = setTimeout(() => {
if (this.active && !this.cancelled) {
// Mark as cancelled to distinguish from manual cancellation
this.timedOut = true
this.cancel()
}
}, postgresOptions.query_timeout * 1000)
}
}

simple() {
this.options.simple = true
this.options.prepare = false
Expand Down Expand Up @@ -137,7 +160,7 @@ const Query = module.exports.Query = class Query extends Promise {
}

async handle() {
!this.executed && (this.executed = true) && await 1 && this.handler(this)
!this.executed && (this.executed = true) && await 1 && (this.setTimeout(), this.handler(this))
}

execute() {
Expand Down
33 changes: 33 additions & 0 deletions cjs/tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2614,3 +2614,36 @@ t('Ensure reserve on query throws proper error', async() => {
'wat', x, reserved.release()
]
})

t('Query timeout cancels long running query', async() => {
const sql = postgres({
...options,
query_timeout: 1
})

const start = Date.now()
const error = await sql`select pg_sleep(3)`.catch(e => e)
const elapsed = Date.now() - start

return [
'57014',
error.code,
elapsed < 2000, // Should timeout in ~1 second, not 3 seconds
await sql.end()
]
})

t('Query timeout allows quick queries to complete', async() => {
const sql = postgres({
...options,
query_timeout: 2
})

const result = await sql`select 1 as x`

return [
1,
result[0].x,
await sql.end()
]
})
31 changes: 18 additions & 13 deletions deno/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,25 +110,26 @@ function Postgres(a, b) {

function sql(strings, ...args) {
const query = strings && Array.isArray(strings.raw)
? new Query(strings, args, handler, cancel)
? new Query(strings, args, handler, cancel, { postgres_options: options })
: typeof strings === 'string' && !args.length
? new Identifier(options.transform.column.to ? options.transform.column.to(strings) : strings)
: new Builder(strings, args)
return query
}

function unsafe(string, args = [], options = {}) {
arguments.length === 2 && !Array.isArray(args) && (options = args, args = [])
function unsafe(string, args = [], queryOptions = {}) {
arguments.length === 2 && !Array.isArray(args) && (queryOptions = args, args = [])
const query = new Query([string], args, handler, cancel, {
prepare: false,
...options,
simple: 'simple' in options ? options.simple : args.length === 0
...queryOptions,
simple: 'simple' in queryOptions ? queryOptions.simple : args.length === 0,
postgres_options: options
})
return query
}

function file(path, args = [], options = {}) {
arguments.length === 2 && !Array.isArray(args) && (options = args, args = [])
function file(path, args = [], queryOptions = {}) {
arguments.length === 2 && !Array.isArray(args) && (queryOptions = args, args = [])
const query = new Query([], args, (query) => {
fs.readFile(path, 'utf8', (err, string) => {
if (err)
Expand All @@ -138,8 +139,9 @@ function Postgres(a, b) {
handler(query)
})
}, cancel, {
...options,
simple: 'simple' in options ? options.simple : args.length === 0
...queryOptions,
simple: 'simple' in queryOptions ? queryOptions.simple : args.length === 0,
postgres_options: options
})
return query
}
Expand Down Expand Up @@ -357,7 +359,9 @@ function Postgres(a, b) {
: (
queries.remove(query),
query.cancelled = true,
query.reject(Errors.generic('57014', 'canceling statement due to user request')),
query.reject(Errors.generic('57014', query.timedOut
? 'canceling statement due to timeout'
: 'canceling statement due to user request')),
resolve()
)
})
Expand Down Expand Up @@ -445,7 +449,7 @@ function parseOptions(a, b) {
'timeout' in o && (console.log('The timeout option is deprecated, use idle_timeout instead'), o.idle_timeout = o.timeout) // eslint-disable-line
query.sslrootcert === 'system' && (query.ssl = 'verify-full')

const ints = ['idle_timeout', 'connect_timeout', 'max_lifetime', 'max_pipeline', 'backoff', 'keep_alive']
const ints = ['idle_timeout', 'connect_timeout', 'max_lifetime', 'max_pipeline', 'backoff', 'keep_alive', 'query_timeout']
const defaults = {
max : 10,
ssl : false,
Expand All @@ -459,7 +463,8 @@ function parseOptions(a, b) {
debug : false,
fetch_types : true,
publications : 'alltables',
target_session_attrs: null
target_session_attrs: null,
query_timeout : null
}

return {
Expand All @@ -482,8 +487,8 @@ function parseOptions(a, b) {
{}
),
connection : {
application_name: env.PGAPPNAME || 'postgres.js',
...o.connection,
application_name: o.connection?.application_name ?? env.PGAPPNAME ?? 'postgres.js',
...Object.entries(query).reduce((acc, [k, v]) => (k in defaults || (acc[k] = v), acc), {})
},
types : o.types || {},
Expand Down
Loading