Skip to content

Commit

Permalink
fix: pause while piping
Browse files Browse the repository at this point in the history
Fixes: #52
  • Loading branch information
ronag committed May 7, 2020
1 parent e6715eb commit b8c9d59
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 31 deletions.
49 changes: 32 additions & 17 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,20 @@ function _connect (client) {
client[kQueue].pause()
client[kLastBody] = null

let body = null
let _body = null
let _reset = null

parser[HTTPParser.kOnHeaders] = () => {}
parser[HTTPParser.kOnHeadersComplete] = ({ statusCode, headers }) => {
// TODO move client[kInflight] from being an array. The array allocation
// is showing up in the flamegraph.
const { request, callback } = client[kInflight].shift()
const { request, callback, reset } = client[kInflight].shift()
const skipBody = request.method === 'HEAD'

_reset = reset

if (!skipBody) {
body = client[kLastBody] = new client[kStream].Readable({
_body = client[kLastBody] = new client[kStream].Readable({
autoDestroy: true,
read () {
socket.resume()
Expand All @@ -85,40 +88,46 @@ function _connect (client) {
err = new Error('aborted')
}

if (_reset) {
socket.destroy()
}

cb(err, null)
}
})
body.push = request.wrapSimple(body, body.push)
_body.push = request.wrapSimple(_body, _body.push)
}
callback(null, {
statusCode,
headers: parseHeaders(headers),
body
body: _body
})
destroyMaybe(client)
return skipBody
}

parser[HTTPParser.kOnBody] = (chunk, offset, length) => {
if (body.destroyed) {
if (_body.destroyed) {
// TODO: Add test and limit how much is read while response
// body is destroyed.
return
}

if (!body.push(chunk.slice(offset, offset + length))) {
if (!_body.push(chunk.slice(offset, offset + length))) {
socket.pause()
}
}

parser[HTTPParser.kOnMessageComplete] = () => {
if (body) {
body.push(null)
body = null
if (_body) {
_body.push(null)
_body = null

// TODO: Remove this and force consumer to fully consume body.
client[kLastBody] = null
} else if (_reset) {
socket.destroy()
}

destroyMaybe(client)
}

Expand All @@ -127,9 +136,9 @@ function _connect (client) {

client[kQueue].pause()

if (body) {
body.destroy(err)
body = null
if (_body) {
_body.destroy(err)
_body = null

// TODO: Remove this and force consumer to fully consume body.
client[kLastBody] = null
Expand Down Expand Up @@ -229,7 +238,7 @@ class Client extends EventEmitter {
return
}

const { method, path, body } = request
const { method, path, body, reset } = request
const headers = request.headers || {}
this.socket.cork()
this.socket.write(`${method} ${path} HTTP/1.1\r\nConnection: keep-alive\r\n`, 'ascii')
Expand All @@ -243,6 +252,14 @@ class Client extends EventEmitter {
this.socket.write(name + ': ' + headers[name] + '\r\n', 'ascii')
}

if (reset) {
// Requests with body we disconnect once response has
// been received (some servers do this anyway).
// Queue will automatically resume once reconnected.
// TODO: Make this less strict?
this[kQueue].pause()
}

const chunked = !headers.hasOwnProperty('content-length')

if (typeof body === 'string' || body instanceof Uint8Array) {
Expand All @@ -260,8 +277,6 @@ class Client extends EventEmitter {
this.socket.write('\r\n', 'ascii')
}

// TODO: Pause the queue while piping.

let finished = false

const socket = this.socket
Expand Down
2 changes: 2 additions & 0 deletions lib/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ class Request extends AsyncResource {

// should we validate the headers?
this.headers = opts.headers

this.reset = !!opts.body
}

wrap (cb) {
Expand Down
22 changes: 9 additions & 13 deletions test/client-errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ test('socket fail while writing request body', (t) => {
})

test('socket fail while ending request body', (t) => {
t.plan(2)
t.plan(1)

const server = createServer()
server.once('request', (req, res) => {
Expand All @@ -543,22 +543,18 @@ test('socket fail while ending request body', (t) => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.close.bind(client))

const _err = new Error('kaboom')
client.on('connect', () => {
client.socket.destroy(_err)
})
const body = new Readable({ read () {} })
body.push(null)
client.request({
path: '/',
method: 'POST',
body: 'asd'
body
}, (err) => {
t.error(err)
const body = new Readable({ read () {} })
body.push(null)
client.request({
path: '/',
method: 'POST',
body
}, (err) => {
t.ok(err)
})
client.socket.destroy('kaboom')
t.strictEqual(err, _err)
})
})
})
68 changes: 67 additions & 1 deletion test/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const { test } = require('tap')
const { Client } = require('..')
const { createServer } = require('http')
const { readFileSync, createReadStream } = require('fs')
const { finished } = require('readable-stream')
const { finished, Readable } = require('readable-stream')

test('basic get', (t) => {
t.plan(7)
Expand Down Expand Up @@ -655,3 +655,69 @@ test('close waits until socket is destroyed', (t) => {
}
})
})

test('pipelined chunked POST ', (t) => {
t.plan(4 + 8 + 8)

let a = 0
let b = 0

const server = createServer((req, res) => {
req.on('data', chunk => {
// Make sure a and b don't interleave.
t.ok(a === 9 || b === 0)
res.write(chunk)
}).on('end', () => {
res.end()
})
})
t.tearDown(server.close.bind(server))

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, {
pipelining: 2
})

client.request({
path: '/',
method: 'GET'
}, (err, { body }) => {
body.resume()
t.error(err)
})

client.request({
path: '/',
method: 'POST',
body: new Readable({
read () {
this.push(++a > 8 ? null : 'a')
}
})
}, (err, { body }) => {
body.resume()
t.error(err)
})

client.request({
path: '/',
method: 'GET'
}, (err, { body }) => {
body.resume()
t.error(err)
})

client.request({
path: '/',
method: 'POST',
body: new Readable({
read () {
this.push(++b > 8 ? null : 'b')
}
})
}, (err, { body }) => {
body.resume()
t.error(err)
})
})
})

0 comments on commit b8c9d59

Please sign in to comment.