Skip to content

Commit

Permalink
feat: Queue limit
Browse files Browse the repository at this point in the history
  • Loading branch information
robertsLando committed Jan 24, 2020
1 parent 517eb86 commit a486cfd
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 10 deletions.
1 change: 1 addition & 0 deletions .gitignore
@@ -1,6 +1,7 @@
# Logs
logs
*.log
.vscode

# Runtime data
pids
Expand Down
4 changes: 3 additions & 1 deletion aedes.js
Expand Up @@ -28,7 +28,8 @@ var defaultOptions = {
authorizeForward: defaultAuthorizeForward,
published: defaultPublished,
trustProxy: false,
trustedProxies: []
trustedProxies: [],
queueLimit: 42
}

function Aedes (opts) {
Expand All @@ -42,6 +43,7 @@ function Aedes (opts) {

this.id = opts.id || uuidv4()
this.counter = 0
this.queueLimit = opts.queueLimit
this.connectTimeout = opts.connectTimeout
this.mq = opts.mq || mqemitter(opts)
this.handle = function handle (conn, req) {
Expand Down
18 changes: 12 additions & 6 deletions lib/client.js
Expand Up @@ -50,6 +50,7 @@ function Client (broker, conn, req) {

// queue packets received before client fires 'connect' event. Prevents memory leaks on 'connect' event
this.parser._queue = []
this.parser.queueLimit = broker.queueLimit

this.once('connected', dequeue)

Expand Down Expand Up @@ -337,17 +338,22 @@ function enqueue (packet) {
if (client.connackSent || client._parsingBatch === 1) {
handle(client, packet, client._nextBatch)
} else {
this._queue.push(packet)
if (this._queue.length < this.queueLimit) {
this._queue.push(packet)
} else {
this.emit('error', new Error('Client queue limit reached'))
}
}
}

function dequeue () {
var q = this.parser._queue || []
for (var i = 0, len = q.length; i < len; i++) {
handle(this, q[i], this._nextBatch)
}
if (this.parser._queue) {
for (var i = 0, len = this.parser._queue.length; i < len; i++) {
handle(this, this.parser._queue[i], this._nextBatch)
}

this.parser._queue = null
this.parser._queue = null
}
}

function nop () { }
44 changes: 41 additions & 3 deletions test/connect.js
Expand Up @@ -333,7 +333,8 @@ test('reject clients with wrong protocol name', function (t) {
})

test('After first CONNECT Packet, others are queued until \'connect\' event', function (t) {
var broker = aedes()
var queueLimit = 50
var broker = aedes({ queueLimit })

var publishP = {
cmd: 'publish',
Expand All @@ -357,12 +358,12 @@ test('After first CONNECT Packet, others are queued until \'connect\' event', fu

process.once('warning', e => t.fail('Memory leak detected'))

for (let i = 0; i < 100; i++) {
for (let i = 0; i < queueLimit; i++) {
s.inStream.write(publishP)
}

broker.on('client', function (client) {
t.equal(client.parser._queue.length, 100, 'Packets have been queued')
t.equal(client.parser._queue.length, queueLimit, 'Packets have been queued')

client.once('connected', () => {
t.equal(client.parser._queue, null, 'Queue is empty')
Expand All @@ -372,6 +373,43 @@ test('After first CONNECT Packet, others are queued until \'connect\' event', fu
})
})

test('Test queue limit', function (t) {
var queueLimit = 50
var broker = aedes({ queueLimit })

var publishP = {
cmd: 'publish',
topic: 'hello',
payload: Buffer.from('world'),
qos: 0,
retain: false
}

var connectP = {
cmd: 'connect',
protocolId: 'MQTT',
protocolVersion: 4,
clean: true,
clientId: 'abcde',
keepalive: 0
}

var s = setup(broker, false)
s.inStream.write(connectP)

process.once('warning', e => t.fail('Memory leak detected'))

for (let i = 0; i < queueLimit + 5; i++) {
s.inStream.write(publishP)
}

broker.on('connectionError', function (conn, err) {
t.equal(err.message, 'Client queue limit reached', 'Queue error is thrown')
s.conn.destroy()
broker.close(t.end)
})
})

;[[0, null, false], [1, null, true], [1, new Error('connection banned'), false], [1, new Error('connection banned'), true]].forEach(function (ele) {
var plan = ele[0]
var err = ele[1]
Expand Down

0 comments on commit a486cfd

Please sign in to comment.