Skip to content

Commit

Permalink
refactor: simplify reading and pause queue while piping
Browse files Browse the repository at this point in the history
Fixes: nodejs#52
  • Loading branch information
ronag committed May 4, 2020
1 parent aef2499 commit f0c3751
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 44 deletions.
73 changes: 31 additions & 42 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,11 @@ const { EventEmitter } = require('events')
const Request = require('./request')
const assert = require('assert')

const kRead = Symbol('read')
const kReadCb = Symbol('readCallback')
const kIsWaiting = Symbol('isWaiting')
const kQueue = Symbol('queue')
const kInflight = Symbol('inflight')
const kTimer = Symbol('timer')
const kTLSOpts = Symbol('TLS Options')
const kLastBody = Symbol('lastBody')
const kNeedHeaders = Symbol('needHeaders')
const kStream = Symbol('kStream')

function connect (client) {
Expand All @@ -35,10 +31,14 @@ function connect (client) {

client.socket = socket

socket.on('connect', () => {
client.emit('connect')
client[kQueue].resume()
})
socket
.on('connect', () => {
client.emit('connect')
client[kQueue].resume()
})
.on('data', (chunk) => {
client.parser.execute(chunk)
})

client[kStream].finished(socket, (err) => {
reconnect(client, err)
Expand All @@ -64,6 +64,7 @@ function reconnect (client, err) {
}

// reset events
client.socket.removeAllListeners('data')
client.socket.removeAllListeners('end')
client.socket.removeAllListeners('finish')
client.socket.removeAllListeners('error')
Expand Down Expand Up @@ -113,8 +114,6 @@ class Client extends EventEmitter {
const endRequest = () => {
this.socket.write('\r\n', 'ascii')
this.socket.uncork()
this[kNeedHeaders]++
this[kRead]()
}

this.timeout = opts.timeout || 30000 // 30 seconds
Expand Down Expand Up @@ -237,31 +236,6 @@ class Client extends EventEmitter {
}

reset(this)

this[kRead] = () => {
var socket = this.socket
if (!socket) {
// TODO this should not happen
return
}

var chunk = null
var hasRead = false
while ((chunk = socket.read()) !== null) {
hasRead = true
this.parser.execute(chunk)
}

if (!this[kIsWaiting] && (!hasRead || this[kNeedHeaders] > 0)) {
this[kIsWaiting] = true
socket.once('readable', this[kReadCb])
}
}

this[kReadCb] = () => {
this[kIsWaiting] = false
this[kRead]()
}
}

get pipelining () {
Expand Down Expand Up @@ -377,30 +351,34 @@ module.exports = Client

function reset (client, err) {
client[kQueue].pause()

client[kIsWaiting] = false
client[kNeedHeaders] = 0
if (client[kLastBody]) {
client[kLastBody].destroy(err)
}
client[kLastBody] = null

const read = () => {
if (client.socket) {
client.socket.resume()
}
}

client.parser = new HTTPParser(HTTPParser.RESPONSE)
client.parser[HTTPParser.kOnHeaders] = () => {}
client.parser[HTTPParser.kOnHeadersComplete] = ({ statusCode, headers }) => {
const body = client[kLastBody]
// TODO move client[kInflight] from being an array. The array allocation
// is showing up in the flamegraph.
const { request, callback } = client[kInflight].shift()
const skipBody = request.method === 'HEAD'

if (!skipBody) {
client[kLastBody] = new client[kStream].Readable({ read: client[kRead] })
client[kLastBody].push = request.wrapSimple(client[kLastBody], client[kLastBody].push)
body = client[kLastBody] = new client[kStream].Readable({ read })
body.push = request.wrapSimple(body, body.push)
}
callback(null, {
statusCode,
headers: parseHeaders(headers),
body: client[kLastBody]
body
})
if (client.closed) {
destroyMaybe(client)
Expand All @@ -409,7 +387,18 @@ function reset (client, err) {
}

client.parser[HTTPParser.kOnBody] = (chunk, offset, length) => {
client[kLastBody].push(chunk.slice(offset, offset + length))
const body = client[kLastBody]
if (body.destroyed) {
// TODO: Add test and limit how much is read while response
// body is destroyed.
return
}

if (!body.push(chunk.slice(offset, offset + length))) {
if (client.socket) {
client.socket.pause()
}
}
}

client.parser[HTTPParser.kOnMessageComplete] = () => {
Expand Down
4 changes: 2 additions & 2 deletions test/client-errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ test('POST which fails should error response', (t) => {
t.plan(4)

const server = createServer()
server.once('request', (req, res) => {
server.on('request', (req, res) => {
req.on('data', () => {
res.destroy()
})
Expand Down Expand Up @@ -390,7 +390,7 @@ test('reset parser', (t) => {
t.plan(4)

const server = createServer()
server.once('request', (req, res) => {
server.on('request', (req, res) => {
res.write('asd')
setTimeout(() => {
res.destroy()
Expand Down

0 comments on commit f0c3751

Please sign in to comment.