Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: pause while piping #76

Merged
merged 1 commit into from
May 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 32 additions & 17 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,20 @@ function _connect (client) {
client[kQueue].pause()
client[kLastBody] = null

let body = null
let _body = null
let _reset = null

parser[HTTPParser.kOnHeaders] = () => {}
parser[HTTPParser.kOnHeadersComplete] = ({ statusCode, headers }) => {
// TODO move client[kInflight] from being an array. The array allocation
// is showing up in the flamegraph.
const { request, callback } = client[kInflight].shift()
const { request, callback, reset } = client[kInflight].shift()
const skipBody = request.method === 'HEAD'

_reset = reset

if (!skipBody) {
body = client[kLastBody] = new client[kStream].Readable({
_body = client[kLastBody] = new client[kStream].Readable({
autoDestroy: true,
read () {
socket.resume()
Expand All @@ -85,40 +88,46 @@ function _connect (client) {
err = new Error('aborted')
}

if (_reset) {
socket.destroy()
}

cb(err, null)
}
})
body.push = request.wrapSimple(body, body.push)
_body.push = request.wrapSimple(_body, _body.push)
}
callback(null, {
statusCode,
headers: parseHeaders(headers),
body
body: _body
})
destroyMaybe(client)
return skipBody
}

parser[HTTPParser.kOnBody] = (chunk, offset, length) => {
if (body.destroyed) {
if (_body.destroyed) {
// TODO: Add test and limit how much is read while response
// body is destroyed.
return
}

if (!body.push(chunk.slice(offset, offset + length))) {
if (!_body.push(chunk.slice(offset, offset + length))) {
socket.pause()
}
}

parser[HTTPParser.kOnMessageComplete] = () => {
if (body) {
body.push(null)
body = null
if (_body) {
_body.push(null)
_body = null

// TODO: Remove this and force consumer to fully consume body.
client[kLastBody] = null
} else if (_reset) {
socket.destroy()
}

destroyMaybe(client)
}

Expand All @@ -127,9 +136,9 @@ function _connect (client) {

client[kQueue].pause()

if (body) {
body.destroy(err)
body = null
if (_body) {
_body.destroy(err)
_body = null

// TODO: Remove this and force consumer to fully consume body.
client[kLastBody] = null
Expand Down Expand Up @@ -229,7 +238,7 @@ class Client extends EventEmitter {
return
}

const { method, path, body } = request
const { method, path, body, reset } = request
const headers = request.headers || {}
this.socket.cork()
this.socket.write(`${method} ${path} HTTP/1.1\r\nConnection: keep-alive\r\n`, 'ascii')
Expand All @@ -243,6 +252,14 @@ class Client extends EventEmitter {
this.socket.write(name + ': ' + headers[name] + '\r\n', 'ascii')
}

if (reset) {
// Requests with body we disconnect once response has
// been received (some servers do this anyway).
// Queue will automatically resume once reconnected.
// TODO: Make this less strict?
this[kQueue].pause()
}

const chunked = !headers.hasOwnProperty('content-length')

if (typeof body === 'string' || body instanceof Uint8Array) {
Expand All @@ -260,8 +277,6 @@ class Client extends EventEmitter {
this.socket.write('\r\n', 'ascii')
}

// TODO: Pause the queue while piping.

let finished = false

const socket = this.socket
Expand Down
2 changes: 2 additions & 0 deletions lib/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ class Request extends AsyncResource {

// should we validate the headers?
this.headers = opts.headers

this.reset = !!opts.body
}

wrap (cb) {
Expand Down
22 changes: 9 additions & 13 deletions test/client-errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ test('socket fail while writing request body', (t) => {
})

test('socket fail while ending request body', (t) => {
t.plan(2)
t.plan(1)

const server = createServer()
server.once('request', (req, res) => {
Expand All @@ -543,22 +543,18 @@ test('socket fail while ending request body', (t) => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.close.bind(client))

const _err = new Error('kaboom')
client.on('connect', () => {
client.socket.destroy(_err)
})
const body = new Readable({ read () {} })
body.push(null)
client.request({
path: '/',
method: 'POST',
body: 'asd'
body
}, (err) => {
t.error(err)
const body = new Readable({ read () {} })
body.push(null)
client.request({
path: '/',
method: 'POST',
body
}, (err) => {
t.ok(err)
})
client.socket.destroy('kaboom')
t.strictEqual(err, _err)
})
})
})
68 changes: 67 additions & 1 deletion test/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const { test } = require('tap')
const { Client } = require('..')
const { createServer } = require('http')
const { readFileSync, createReadStream } = require('fs')
const { finished } = require('readable-stream')
const { finished, Readable } = require('readable-stream')

test('basic get', (t) => {
t.plan(7)
Expand Down Expand Up @@ -655,3 +655,69 @@ test('close waits until socket is destroyed', (t) => {
}
})
})

test('pipelined chunked POST ', (t) => {
t.plan(4 + 8 + 8)

let a = 0
let b = 0

const server = createServer((req, res) => {
req.on('data', chunk => {
// Make sure a and b don't interleave.
t.ok(a === 9 || b === 0)
res.write(chunk)
}).on('end', () => {
res.end()
})
})
t.tearDown(server.close.bind(server))

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, {
pipelining: 2
})

client.request({
path: '/',
method: 'GET'
}, (err, { body }) => {
body.resume()
t.error(err)
})

client.request({
path: '/',
method: 'POST',
body: new Readable({
read () {
this.push(++a > 8 ? null : 'a')
}
})
}, (err, { body }) => {
body.resume()
t.error(err)
})

client.request({
path: '/',
method: 'GET'
}, (err, { body }) => {
body.resume()
t.error(err)
})

client.request({
path: '/',
method: 'POST',
body: new Readable({
read () {
this.push(++b > 8 ? null : 'b')
}
})
}, (err, { body }) => {
body.resume()
t.error(err)
})
})
})