Skip to content

Commit

Permalink
when sending in a bench type of situation where buffers are large and…
Browse files Browse the repository at this point in the history
… loops

are tight, the libuv buffers for the socket can be exhausted. Changed
the way that the outbound is serialized:

- client will try to send 16K buffer at most
- if the write returns that it buffered outside of kernel, client won't send until socket is drained
  • Loading branch information
aricart committed Jul 17, 2020
1 parent 6095429 commit 2e73823
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 30 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
A [Node.js](http://nodejs.org/) client for the [NATS messaging system](https://nats.io).

[![license](https://img.shields.io/github/license/nats-io/node-nats.svg)](https://www.apache.org/licenses/LICENSE-2.0)
![NATS.js CI](https://github.com/nats-io/nats.js/workflows/NATS.js%20CI/badge.svg)
[![NATS.js CI](https://github.com/nats-io/nats.js/workflows/NATS.js%20CI/badge.svg)](https://github.com/nats-io/nats.js/workflows/NATS.js%20CI/badge.svg)
[![npm](https://img.shields.io/npm/v/nats.svg)](https://www.npmjs.com/package/nats)
[![npm](https://img.shields.io/npm/dt/nats.svg)](https://www.npmjs.com/package/nats)
[![npm](https://img.shields.io/npm/dm/nats.svg)](https://www.npmjs.com/package/nats)
Expand Down
64 changes: 64 additions & 0 deletions benchmark/bulk-pub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2020 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/* jslint node: true */
'use strict'

// change '../' to 'nats' when copying
const nats = require('../')
const argv = require('minimist')(process.argv.slice(2))

const url = argv.s || nats.DEFAULT_URI
const creds = argv.creds
const subject = argv._[0]
const count = argv.c || 1
const len = argv.p || 0
let msg = argv._[1] || ''

if (!subject) {
console.log('Usage: node-pubsub [-s server] [--creds=filepath] <subject> [msg]')
process.exit()
}

if (len) {
for (let i = 0; i < len; i++) {
msg += (i % 10) + ''
}
}

// Connect to NATS server.
const opts = nats.creds(creds) || {}
opts.yieldTime = 1000
opts.name = 'bulk-pub'
const nc = nats.connect(url, opts)
.on('connect', () => {
(async () => {
let i = 0
for (i = 0; i < count; i++) {
nc.publish(subject, msg)
if (i % 1000 === 0) {
console.info(`< ${i} messages`)
}
}
nc.flush(() => {
console.log(`Published ${i} messages`)
})
})()
})

nc.on('error', (e) => {
console.log('Error [' + nc.currentServer + ']: ' + e)
process.exit()
})
56 changes: 56 additions & 0 deletions benchmark/bulk-sub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2020 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/* jslint node: true */
'use strict'

// change '../' to 'nats' when copying
const nats = require('../')
const argv = require('minimist')(process.argv.slice(2))

const url = argv.s || nats.DEFAULT_URI
const creds = argv.creds
const subject = argv._[0]

if (!subject) {
console.log('Usage: bulk-sub [-s server] [--creds=filepath] <subject>')
process.exit()
}

// Connect to NATS server.
const opts = nats.creds(creds) || {}
opts.yieldTime = 1000
const nc = nats.connect(url, opts)
.on('connect', () => {
console.log('connected');
(async () => {
let i = 0
nc.subscribe(subject, (msg) => {
i++
if (i % 1000 === 0) {
console.info(`> ${i} messages`)
}
})
})()
})

nc.on('close', () => {
console.log('client closed')
})

nc.on('error', (e) => {
console.log('Error [' + nc.currentServer + ']: ' + e)
process.exit()
})
69 changes: 69 additions & 0 deletions examples/node-pubsub
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#!/usr/bin/env node

/*
* Copyright 2013-2020 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/* jslint node: true */
'use strict'

// change '../' to 'nats' when copying
const nats = require('../')
const argv = require('minimist')(process.argv.slice(2))

const url = argv.s || nats.DEFAULT_URI
const creds = argv.creds
const subject = argv._[0]
const count = argv.c || 1
const msg = argv._[1] || ''

if (!subject) {
console.log('Usage: node-pub [-s server] [--creds=filepath] <subject> [msg]')
process.exit()
}

// Connect to NATS server.
const opts = nats.creds(creds) || {}
opts.yieldTime = 1000
const nc = nats.connect(url, opts)
.on('connect', () => {
(async () => {
let i = 0
nc.subscribe(subject, (msg) => {
i++
if (i % 1000 === 0) {
console.info(`> ${i} messages`)
}
})
})();

(async () => {
let i = 0
for (i = 0; i < count; i++) {
nc.publish(subject, msg)
if (i % 1000 === 0) {
console.info(`< ${i} messages`)
}
}
nc.flush(() => {
console.log(`Published ${i} messages`)
process.exit()
})
})()
})

nc.on('error', (e) => {
console.log('Error [' + nc.currentServer + ']: ' + e)
process.exit()
})
83 changes: 54 additions & 29 deletions lib/nats.js
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ const SIGCB_NOTFUNC_MSG = 'Signature callback is not a function.'
const SIGNATURE_REQUIRED_MSG = 'Server requires an Nkey signature.'
const SUB_DRAINING_MSG = 'Subscription draining'

const FLUSH_THRESHOLD = 65536
const FLUSH_THRESHOLD = 1024 * 64

/**
* @param {String} message
Expand Down Expand Up @@ -772,6 +772,13 @@ Client.prototype.setupHandlers = function () {
this.scheduleHeartbeat()
})

stream.on('drain', () => {
this.flushed = true
setImmediate(() => {
this.flushPending()
})
})

stream.on('close', () => {
const done = (this.closed === true || this.options.reconnect === false || this.servers.length === 0)
// if connected, it resets everything as partial buffers may have been sent
Expand Down Expand Up @@ -923,6 +930,7 @@ Client.prototype.createConnection = function () {
this.pending = this.pending || []
this.pSize = this.pSize || 0
this.pstate = AWAITING_CONTROL
this.flushed = true

// Clear info processing.
this.info = null
Expand Down Expand Up @@ -1027,49 +1035,66 @@ Client.prototype.closeStream = function () {
this.cancelHeartbeat()
}

Client.prototype.pack = function () {
const buf16k = 1024 * 16
const max = this.pSize >= buf16k ? buf16k : this.pSize
const buf = Buffer.allocUnsafe(max)
let avail = max
let cursor = 0
while (this.pending.length) {
if (avail === 0) {
break
}
const v = this.pending[0]
const src = Buffer.isBuffer(v) ? v : Buffer.from(v, 'utf-8')
const copied = src.copy(buf, cursor, 0)
avail -= copied
cursor += copied
if (src.length > copied) {
// slicing it runs out of mem?
// const remainder = src.slice(copied)
const remainder = Buffer.allocUnsafe(src.length - copied)
src.copy(remainder, 0, copied)
this.pending[0] = remainder
} else {
this.pending.shift()
}
}
this.pSize -= buf.length
return buf
}

/**
* Flush all pending data to the server.
*
* @api private
*/
Client.prototype.flushPending = function () {
if (this.connected === false ||
!this.flushed ||
this.pending === null ||
this.pending.length === 0 ||
this.infoReceived !== true) {
return
}

const write = (data) => {
this.pending = []
this.pSize = 0
return this.stream.write(data)
}
if (!this.pBufs) {
// All strings, fastest for now.
return write(this.pending.join(EMPTY))
} else {
// We have some or all Buffers. Figure out if we can optimize.
let allBufs = true
for (let i = 0; i < this.pending.length; i++) {
if (!Buffer.isBuffer(this.pending[i])) {
allBufs = false
break
}
// we are going to ignore the error here, because
// some errors related to disconnection are handled
// by the close handler on the stream. We are however
// interested in whether bytes were all flushed to
// the kernel
this.flushed = this.stream.write(data)
return this.flushed
}

while (true) {
const buf = this.pack()
if (buf.byteLength === 0) {
break
}
// If all buffers, concat together and write once.
if (allBufs) {
return write(Buffer.concat(this.pending, this.pSize))
} else {
// We have a mix, so write each one individually.
const pending = this.pending
this.pending = []
this.pSize = 0
let result = true
for (let i = 0; i < pending.length; i++) {
result = this.stream.write(pending[i]) && result
}
return result
if (!write(buf)) {
break
}
}
}
Expand Down

0 comments on commit 2e73823

Please sign in to comment.