Skip to content

Commit

Permalink
feat: request timeout
Browse files Browse the repository at this point in the history
Fixes: #165
  • Loading branch information
ronag committed May 20, 2020
1 parent 1a153e4 commit 6a934c4
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 42 deletions.
32 changes: 10 additions & 22 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,22 @@ class Parser extends HTTPParser {
[HTTPParser.kOnHeadersComplete] ({ statusCode, headers }) {
const { client, resumeSocket } = this
const request = client[kQueue][client[kComplete]]
const { callback, signal, opaque } = request
const skipBody = request.method === 'HEAD'

assert(!this.read)
assert(!this.body)

let body

if (request.callback) {
body = request.callback(null, {
if (callback) {
body = callback(null, {
statusCode,
headers: parseHeaders(headers),
opaque: request.opaque
opaque
}, resumeSocket)
clearTimeout(request.timeout)
request.timeout = null
request.callback = null
request.opaque = null
}
Expand All @@ -87,15 +90,8 @@ class Parser extends HTTPParser {
if (body) {
this.body = body

if (request.signal) {
const onAbort = () => {
body(new RequestAbortedError(), null)
}
if ('addEventListener' in request.signal) {
request.signal.addEventListener('abort', onAbort)
} else {
request.signal.once('abort', onAbort)
}
if (signal) {
signal.once('error', body)
}
} else {
this.next()
Expand Down Expand Up @@ -815,11 +811,7 @@ function resume (client) {
}
const onFinished = (err) => {
if (signal) {
if ('removeEventListener' in signal) {
signal.removeEventListener('abort', onAbort)
} else {
signal.removeListener('abort', onAbort)
}
signal.removeListener('error', onFinished)
}

socket
Expand Down Expand Up @@ -857,11 +849,7 @@ function resume (client) {
}

if (signal) {
if ('addEventListener' in signal) {
signal.addEventListener('abort', onAbort)
} else {
signal.on('abort', onAbort)
}
signal.on('error', onFinished)
}

body
Expand Down
75 changes: 55 additions & 20 deletions lib/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
const { AsyncResource } = require('async_hooks')
const {
InvalidArgumentError,
RequestAbortedError
RequestAbortedError,
TimeoutError
} = require('./errors')
const EE = require('events')
const assert = require('assert')

const methods = [
'ACL',
Expand Down Expand Up @@ -61,7 +64,7 @@ class Request extends AsyncResource {
throw new InvalidArgumentError('no options passed')
}

const { path, method, body, headers, idempotent, opaque, signal } = opts
const { path, method, body, headers, idempotent, opaque, signal, timeout } = opts

if (!(typeof path === 'string' && path[0] === '/')) {
throw new InvalidArgumentError('path must be a valid path')
Expand All @@ -71,6 +74,10 @@ class Request extends AsyncResource {
throw new InvalidArgumentError('method must be a valid method')
}

if (timeout != null && (!Number.isInteger(timeout) || timeout < 1)) {
throw new InvalidArgumentError('timeout must be a positive integer')
}

if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') {
throw new InvalidArgumentError('signal must implement .on(name, callback)')
}
Expand All @@ -79,7 +86,9 @@ class Request extends AsyncResource {
throw new InvalidArgumentError('body must be a string, a Buffer or a Readable stream')
}

this.signal = signal
this.timeout = null

this.signal = null

this.method = method

Expand All @@ -101,33 +110,59 @@ class Request extends AsyncResource {

this.rawHeaders = ''

if (this.signal) {
if (headers) {
const headerNames = Object.keys(headers)
for (let i = 0; i < headerNames.length; i++) {
const name = headerNames[i]
this.rawHeaders += name + ': ' + headers[name] + '\r\n'
}
}

if (signal) {
if (!this.signal) {
this.signal = new EE()
}

const onAbort = () => {
this.signal.emit('error', new RequestAbortedError())
}

if ('addEventListener' in signal) {
signal.addEventListener('abort', onAbort)
} else {
signal.once('abort', onAbort)
}
}

if (timeout) {
if (!this.signal) {
this.signal = new EE()
}

const onTimeout = () => {
this.signal.emit('error', new TimeoutError())
}

this.timeout = setTimeout(onTimeout, timeout)
}

if (this.signal) {
this.signal.on('error', (err) => {
assert(err)

const { callback } = this

if (!callback) {
return
}

clearTimeout(this.timeout)
this.timeout = null
this.callback = null
this.opaque = null

callback(new RequestAbortedError(), null)
}

if ('addEventListener' in this.signal) {
this.signal.addEventListener('abort', onAbort)
} else {
this.signal.once('abort', onAbort)
}
}

if (headers) {
const headerNames = Object.keys(headers)
for (let i = 0; i < headerNames.length; i++) {
const name = headerNames[i]
this.rawHeaders += name + ': ' + headers[name] + '\r\n'
}
callback(err, null)
})
}
}

Expand Down

0 comments on commit 6a934c4

Please sign in to comment.