Skip to content

Commit

Permalink
Merge 32beb65 into e7b506e
Browse files Browse the repository at this point in the history
  • Loading branch information
gnought committed Jul 25, 2019
2 parents e7b506e + 32beb65 commit bd70114
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 25 deletions.
4 changes: 4 additions & 0 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ function Client (broker, conn) {
conn.on('error', this.emit.bind(this, 'error'))
this.parser.on('error', this.emit.bind(this, 'error'))

conn.on('end', this.close.bind(this))
this._eos = eos(this.conn, this.close.bind(this))

function dedupe (packet) {
Expand Down Expand Up @@ -273,6 +274,9 @@ Client.prototype.close = function (done) {
conn.removeAllListeners('error')
conn.on('error', nop)

that.connected = false
that.disconnected = true

if (that.broker.clients[that.id]) {
that.broker.unregisterClient(that)
}
Expand Down
12 changes: 10 additions & 2 deletions lib/handlers/connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ function authenticate (arg, done) {
negate)

function negate (err, successful) {
if (!client.connected) {
// a hack, sometimes close happends before authenticate comes back
// we stop here for not to register it and deregister it in write()
return
}
var errCode
if (!err && successful) {
return done()
Expand Down Expand Up @@ -160,9 +165,12 @@ function Connack (arg) {
}

function doConnack (arg, done) {
var client = this.client
const connack = new Connack(arg)
write(this.client, connack, done)
this.client.broker.emit('connackSent', this.client)
write(client, connack, function () {
client.broker.emit('connackSent', client)
done()
})
}

function emptyQueue (arg, done) {
Expand Down
10 changes: 6 additions & 4 deletions lib/handlers/publish.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ function enqueuePublish (packet, done) {

switch (packet.qos) {
case 2:
write(client, new PubRec(packet))
client.broker.persistence.incomingStorePacket(client, packet, done)
write(client, new PubRec(packet), function () {
client.broker.persistence.incomingStorePacket(client, packet, done)
})
break
case 1:
write(client, new PubAck(packet))
client.broker.publish(packet, client, done)
write(client, new PubAck(packet), function () {
client.broker.publish(packet, client, done)
})
break
case 0:
client.broker.publish(packet, client, done)
Expand Down
18 changes: 12 additions & 6 deletions lib/write.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@
var mqtt = require('mqtt-packet')

function write (client, packet, done) {
var result = mqtt.writeToStream(packet, client.conn)

if (!result && !client.errored && done) {
client.conn.once('drain', done)
} else if (done) {
setImmediate(done)
if (client.conn.writable && client.connected) {
var result = mqtt.writeToStream(packet, client.conn)
if (!result && !client.errored && done) {
console.log('drain')
client.conn.once('drain', done)
return
}
if (done) {
setImmediate(done)
}
} else {
setImmediate(client._onError.bind(client, new Error('connection closed')))
}
}

Expand Down
23 changes: 15 additions & 8 deletions test/basic.js
Original file line number Diff line number Diff line change
Expand Up @@ -292,20 +292,27 @@ test('disconnect', function (t) {
})

test('client closes', function (t) {
t.plan(3)
t.plan(7)

var broker = aedes()
var client = noError(connect(setup(broker, false), { clientId: 'abcde' }))
eos(client.conn, t.pass.bind('client closes'))

setImmediate(() => {
broker.clients['abcde'].close(function () {
t.equal(broker.clients['abcde'], undefined, 'client instance is removed')
var brokerClient
var client = noError(connect(setup(broker, false), { clientId: 'abcde' }, function () {
brokerClient = broker.clients['abcde']
t.equal(brokerClient.connected, true, 'client connected')
t.equal(brokerClient.disconnected, false)
eos(client.conn, t.pass.bind('client closes'))
setImmediate(() => {
brokerClient.close(function () {
t.equal(broker.clients['abcde'], undefined, 'client instance is removed')
})
t.equal(brokerClient.connected, false, 'client disconnected')
t.equal(brokerClient.disconnected, true)
broker.close(function (err) {
t.error(err, 'no error')
t.end()
})
})
})
}))
})

test('broker closes', function (t) {
Expand Down
155 changes: 155 additions & 0 deletions test/close_socket_by_other_party.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
'use strict'

var test = require('tape').test
var EE = require('events')
var helper = require('./helper')
var aedes = require('../')
var setup = helper.setup
var connect = helper.connect
var subscribe = helper.subscribe

function sleep (ms) {
return new Promise((resolve, reject) => setTimeout(resolve, ms))
}

test('client is closed before authenticate returns', function (t) {
t.plan(2)

var evt = new EE()
var broker = aedes({
authenticate: async (client, username, password, done) => {
evt.emit('AuthenticateBegin', client)
await sleep(2000) // simulate network
done(null, true)
evt.emit('AuthenticateEnd', client)
}
})
broker.on('client', function (client) {
t.fail('should no client registration')
})
broker.on('connackSent', function () {
t.fail('should no connack be sent')
})
broker.on('clientError', function (client, err) {
t.error(err)
})

connect(setup(broker, false))

evt.on('AuthenticateBegin', function (client) {
t.equal(broker.connectedClients, 0)
client.close()
})
evt.on('AuthenticateEnd', function (client) {
t.equal(broker.connectedClients, 0)
setImmediate(() => {
broker.close()
t.end()
})
})
})

test('client is closed before authorizePublish returns', function (t) {
t.plan(3)

var evt = new EE()
var broker = aedes({
authorizePublish: async (client, packet, done) => {
evt.emit('AuthorizePublishBegin', client)
await sleep(2000) // simulate latency writing to persistent store.
done()
evt.emit('AuthorizePublishEnd', client)
}
})
broker.on('clientError', function (client, err) {
t.equal(err.message, 'connection closed')
})

var s = connect(setup(broker, false))
s.inStream.write({
cmd: 'publish',
topic: 'hello',
payload: 'world',
qos: 1,
messageId: 10,
retain: false
})

evt.on('AuthorizePublishBegin', function (client) {
t.equal(broker.connectedClients, 1)
client.close()
})
evt.on('AuthorizePublishEnd', function (client) {
t.equal(broker.connectedClients, 0)
setImmediate(() => {
broker.close()
t.end()
})
})
})

test('close client when its socket is closed', function (t) {
t.plan(4)

var broker = aedes()
var subscriber = connect(setup(broker, false))

subscribe(t, subscriber, 'hello', 1, function () {
subscriber.inStream.end()
subscriber.conn.on('close', function () {
t.equal(broker.connectedClients, 0, 'no connected client')
broker.close()
t.end()
})
})
})

test('multiple clients subscribe same topic, and all clients still receive message except the closed one', function (t) {
t.plan(5)

var mqtt = require('mqtt')
var broker = aedes()
var server = require('net').createServer(broker.handle)
var port = 1883
server.listen(port)
broker.on('clientError', function (client, err) {
t.error(err)
})

var client1, client2
var _sameTopic = 'hello'

// client 1
client1 = mqtt.connect('mqtt://localhost', { clientId: 'client1', resubscribe: false, reconnectPeriod: -1 })
client1.on('message', () => {
t.fail('client1 receives message')
})

client1.subscribe(_sameTopic, { qos: 0, retain: false }, () => {
t.pass('client1 sub callback')
// stimulate closed socket by users
client1.stream.destroy()

// client 2
client2 = mqtt.connect('mqtt://localhost', { clientId: 'client2', resubscribe: false })
client2.on('message', () => {
t.pass('client2 receives message')
})
client2.subscribe(_sameTopic, { qos: 0, retain: false }, () => {
t.pass('client2 sub callback')

// pubClient
var pubClient = mqtt.connect('mqtt://localhost', { clientId: 'pubClient' })
pubClient.publish(_sameTopic, 'world', { qos: 0, retain: false }, () => {
t.pass('pubClient publish event')
pubClient.end()
})
})
})
setTimeout(() => {
t.equal(broker.connectedClients, 1)
client2.end()
broker.close()
server.close()
}, 2000)
})
2 changes: 1 addition & 1 deletion test/meta.js
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ test('connect and connackSent event', function (t) {

s.broker.on('connackSent', function (client) {
t.equal(client.id, clientId, 'connackSent event and clientId matches')
t.end()
})

s.inStream.write({
Expand All @@ -232,6 +233,5 @@ test('connect and connackSent event', function (t) {
payload: null,
sessionPresent: false
}, 'successful connack')
t.end()
})
})
2 changes: 1 addition & 1 deletion test/retain.js
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ test('not clean and retain messages with QoS 1', function (t) {
t.equal(packet.cmd, 'puback')

broker.on('clientError', function (client, err) {
t.fail('no error')
t.equal(err.message, 'connection closed')
})

subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }, function (connect) {
Expand Down
9 changes: 6 additions & 3 deletions test/will.js
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,13 @@ test('delete the will in the persistence after publish', function (t) {
test('delivers a will with authorization', function (t) {
let authorized = false
var opts = {}
var broker = aedes({ authorizePublish: (_1, _2, callback) => { authorized = true; callback(null) } })
// willConnect populates opts with a will
var s = willConnect(setup(aedes({ authorizePublish: (_1, _2, callback) => { authorized = true; callback(null) } })), opts)
var s = willConnect(setup(broker), opts)

s.broker.on('clientDisconnect', function () {
t.end()
s.broker.on('clientDisconnect', function (client) {
t.equal(client.connected, false)
t.equal(client.disconnected, true)
})

s.broker.mq.on('mywill', function (packet, cb) {
Expand All @@ -189,6 +191,7 @@ test('delivers a will with authorization', function (t) {
process.nextTick(function () {
s.conn.destroy()
})
broker.on('closed', t.end.bind(t))
})

test('delivers a will waits for authorization', function (t) {
Expand Down

0 comments on commit bd70114

Please sign in to comment.