Skip to content

Commit

Permalink
Destroy socket when DISCONNECT sent (#401)
Browse files Browse the repository at this point in the history
* Destroy socket when DISCONNECT sent
end() will send final data and allow some i/o activity to finish before destroying socket.
MQTT-3.14.4-2 states "MUST NOT send any more Control Packets on that Network Connection". In some case aedes still deliverQoS data to subscribers via  mqemitter when subscriber is conn.end(), broker then raises "connection closed" error.

* Added missing noError declaration
  • Loading branch information
gnought committed Feb 5, 2020
1 parent 4b71718 commit 031e3ad
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 25 deletions.
4 changes: 2 additions & 2 deletions lib/handlers/index.js
Expand Up @@ -51,8 +51,8 @@ function handle (client, packet, done) {
case 'disconnect':
// [MQTT-3.14.4-3]
client._disconnected = true
// [MQTT-3.14.4-1]
client.conn.end()
// [MQTT-3.14.4-1] [MQTT-3.14.4-2]
client.conn.destroy()
return
default:
client.conn.destroy()
Expand Down
2 changes: 1 addition & 1 deletion test/basic.js
Expand Up @@ -253,7 +253,7 @@ test('disconnect', function (t) {
const s = noError(connect(setup()), t)
t.tearDown(s.broker.close.bind(s.broker))

s.outStream.on('finish', function () {
s.broker.on('clientDisconnect', function () {
t.pass('closed stream')
})

Expand Down
6 changes: 3 additions & 3 deletions test/meta.js
@@ -1,7 +1,7 @@
'use strict'

const { test } = require('tap')
const { setup, connect, subscribe } = require('./helper')
const { setup, connect, subscribe, noError } = require('./helper')
const aedes = require('../')

test('count connected clients', function (t) {
Expand Down Expand Up @@ -187,7 +187,7 @@ test('dont emit unsubscribe event on client close', function (t) {
const broker = aedes()
t.tearDown(broker.close.bind(broker))

const s = connect(setup(broker), { clientId: 'abcde' })
const s = noError(connect(setup(broker), { clientId: 'abcde' }), t)

broker.on('unsubscribe', function (unsubscriptions, client) {
t.error('unsubscribe should not be emitted')
Expand All @@ -213,7 +213,7 @@ test('emit clientDisconnect event', function (t) {
t.equal(client.id, 'abcde', 'client matches')
})

const s = connect(setup(broker), { clientId: 'abcde' })
const s = noError(connect(setup(broker), { clientId: 'abcde' }), t)

s.inStream.end({
cmd: 'disconnect'
Expand Down
20 changes: 6 additions & 14 deletions test/retain.js
Expand Up @@ -492,12 +492,12 @@ test('deliver QoS 0 retained message with QoS 1 subscription', function (t) {
})

test('disconnect and retain messages with QoS 1 [clean=false]', function (t) {
t.plan(8)
t.plan(7)

const broker = aedes()
t.tearDown(broker.close.bind(broker))

var subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' })
var subscriber = noError(connect(setup(broker), { clean: false, clientId: 'abcde' }), t)
const expected = {
cmd: 'publish',
topic: 'hello',
Expand All @@ -517,7 +517,7 @@ test('disconnect and retain messages with QoS 1 [clean=false]', function (t) {
console.log('original', packet)
})

const publisher = connect(setup(broker))
const publisher = noError(connect(setup(broker)), t)

publisher.inStream.write({
cmd: 'publish',
Expand All @@ -531,10 +531,6 @@ test('disconnect and retain messages with QoS 1 [clean=false]', function (t) {
publisher.outStream.once('data', function (packet) {
t.equal(packet.cmd, 'puback')

broker.on('clientError', function (client, err) {
t.equal(err.message, 'connection closed')
})

subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }, function (connect) {
t.equal(connect.sessionPresent, true, 'session present is set to true')
})
Expand All @@ -556,12 +552,12 @@ test('disconnect and retain messages with QoS 1 [clean=false]', function (t) {
})

test('disconnect and two retain messages with QoS 1 [clean=false]', function (t) {
t.plan(17)
t.plan(15)

const broker = aedes()
t.tearDown(broker.close.bind(broker))

var subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' })
var subscriber = noError(connect(setup(broker), { clean: false, clientId: 'abcde' }), t)
const expected = {
cmd: 'publish',
topic: 'hello',
Expand All @@ -580,7 +576,7 @@ test('disconnect and two retain messages with QoS 1 [clean=false]', function (t)
console.log('original', packet)
})

const publisher = connect(setup(broker))
const publisher = noError(connect(setup(broker)), t)

publisher.inStream.write({
cmd: 'publish',
Expand All @@ -606,10 +602,6 @@ test('disconnect and two retain messages with QoS 1 [clean=false]', function (t)
publisher.outStream.once('data', function (packet) {
t.equal(packet.cmd, 'puback')

broker.on('clientError', function (client, err) {
t.equal(err.message, 'connection closed')
})

subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }, function (connect) {
t.equal(connect.sessionPresent, true, 'session present is set to true')
})
Expand Down
10 changes: 5 additions & 5 deletions test/will.js
Expand Up @@ -2,7 +2,7 @@

const { test } = require('tap')
const memory = require('aedes-persistence')
const { setup, connect } = require('./helper')
const { setup, connect, noError } = require('./helper')
const aedes = require('../')

function willConnect (s, opts, connected) {
Expand Down Expand Up @@ -332,11 +332,11 @@ test('does not deliver will when client sends a DISCONNECT', function (t) {
const broker = aedes()
t.tearDown(broker.close.bind(broker))

const s = willConnect(setup(broker), {}, function () {
const s = noError(willConnect(setup(broker), {}, function () {
s.inStream.end({
cmd: 'disconnect'
})
})
}), t)

s.broker.mq.on('mywill', function (packet, cb) {
t.fail(packet)
Expand All @@ -351,12 +351,12 @@ test('does not store multiple will with same clientid', function (t) {

const broker = aedes()

var s = willConnect(setup(broker), opts, function () {
var s = noError(willConnect(setup(broker), opts, function () {
// gracefully close client so no will is sent
s.inStream.end({
cmd: 'disconnect'
})
})
}), t)

broker.on('clientDisconnect', function (client) {
// reconnect same client with will
Expand Down

0 comments on commit 031e3ad

Please sign in to comment.