Skip to content

Commit

Permalink
Merge pull request #421 from moscajs/coverage
Browse files Browse the repository at this point in the history
Coverage and fix #422
  • Loading branch information
robertsLando committed Feb 11, 2020
2 parents 6f6f9d8 + bcdfd5e commit f321ce9
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 3 deletions.
13 changes: 10 additions & 3 deletions aedes.js
Expand Up @@ -11,6 +11,7 @@ const Packet = require('aedes-packet')
const bulk = require('bulk-write-stream')
const reusify = require('reusify')
const Client = require('./lib/client')
const { pipeline } = require('stream')

module.exports = Aedes.Server = Aedes

Expand Down Expand Up @@ -91,9 +92,15 @@ function Aedes (opts) {
this._clearWillInterval = setInterval(function () {
Object.keys(that.brokers).forEach(deleteOldBrokers)

that.persistence
.streamWill(that.brokers)
.pipe(bulk.obj(receiveWills))
pipeline(
that.persistence.streamWill(that.brokers),
bulk.obj(receiveWills),
function done (err) {
if (err) {
that.emit('error', err)
}
}
)
}, opts.heartbeatInterval * 4)

function receiveWills (chunks, done) {
Expand Down
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -68,6 +68,7 @@
],
"license": "MIT",
"devDependencies": {
"@sinonjs/fake-timers": "^6.0.0",
"@types/node": "^12.12.26",
"@typescript-eslint/eslint-plugin": "^2.19.0",
"@typescript-eslint/parser": "^2.19.0",
Expand Down
50 changes: 50 additions & 0 deletions test/client-pub-sub.js
Expand Up @@ -37,6 +37,56 @@ test('publish direct to a single client QoS 0', function (t) {
})
})

test('publish direct to a single client throws error', function (t) {
t.plan(1)

const broker = aedes()
t.tearDown(broker.close.bind(broker))

broker.persistence.outgoingEnqueue = function (sub, packet, done) {
done(new Error('Throws error'))
}

broker.on('client', function (client) {
client.publish({
topic: 'hello',
payload: Buffer.from('world'),
qos: 1,
retain: false
}, function (err) {
t.pass('Throws error', err.message, 'throws error')
})
})

connect(setup(broker), { clean: false })
})

test('publish direct to a single client throws error 2', function (t) {
t.plan(1)

const broker = aedes()
t.tearDown(broker.close.bind(broker))

broker.persistence.outgoingUpdate = function (client, packet, done) {
done(new Error('Throws error'), client, packet)
}

broker.on('client', function (client) {
client.publish({
topic: 'hello',
payload: Buffer.from('world'),
qos: 1,
retain: false
}, () => {})

client.once('error', function (err) {
t.pass('Throws error', err.message, 'throws error')
})
})

connect(setup(broker), { clean: false })
})

test('publish direct to a single client QoS 1', function (t) {
t.plan(2)

Expand Down
23 changes: 23 additions & 0 deletions test/qos1.js
Expand Up @@ -33,6 +33,29 @@ test('publish QoS 1', function (t) {
})
})

test('publish QoS 1 throws error', function (t) {
t.plan(1)

const s = connect(setup())
t.tearDown(s.broker.close.bind(s.broker))

s.broker.persistence.subscriptionsByTopic = function (packet, done) {
return done(new Error('Throws error'))
}

s.inStream.write({
cmd: 'publish',
topic: 'hello',
payload: 'world',
qos: 1,
messageId: 42
})

s.broker.on('error', function (err) {
t.equal('Throws error', err.message, 'Throws error')
})
})

test('publish QoS 1 and check offline queue', function (t) {
t.plan(13)

Expand Down
144 changes: 144 additions & 0 deletions test/will.js
Expand Up @@ -4,6 +4,7 @@ const { test } = require('tap')
const memory = require('aedes-persistence')
const { setup, connect, noError } = require('./helper')
const aedes = require('../')
const Faketimers = require('@sinonjs/fake-timers')

function willConnect (s, opts, connected) {
opts = opts || {}
Expand Down Expand Up @@ -113,6 +114,30 @@ test('delivers old will in case of a crash', function (t) {
})
})

test('delete old broker', function (t) {
t.plan(1)

var clock = Faketimers.install()

var heartbeatInterval = 100
const broker = aedes({
heartbeatInterval: heartbeatInterval
})
t.tearDown(broker.close.bind(broker))

var brokerId = 'dummyBroker'

broker.brokers[brokerId] = Date.now() - heartbeatInterval * 3.5

setTimeout(() => {
t.equal(broker.brokers[brokerId], undefined, 'Broker deleted')
}, heartbeatInterval * 4)

clock.tick(heartbeatInterval * 4)

clock.uninstall()
})

test('store the will in the persistence', function (t) {
t.plan(5)

Expand Down Expand Up @@ -374,3 +399,122 @@ test('does not store multiple will with same clientid', function (t) {
})
})
})

test('don\'t delivers a will if broker alive', function (t) {
const persistence = memory()
const will = {
topic: 'mywill',
payload: Buffer.from('last will'),
qos: 0,
retain: false
}

var oldBroker = 'broker1'

persistence.broker = {
id: oldBroker
}

persistence.putWill({
id: 'myClientId42'
}, will, function (err) {
t.error(err, 'no error')

const opts = {
persistence: persistence,
heartbeatInterval: 10
}

var count = 0

const broker = aedes(opts)
t.tearDown(broker.close.bind(broker))

broker.mq.on('mywill', function (packet, cb) {
t.fail('Will received')
cb()
})

broker.mq.on('$SYS/+/heartbeat', function () {
// update old broker heartbeat to simulate it is alive
broker.brokers[oldBroker] = Date.now()
t.pass('Heartbeat received')

if (++count === 5) t.end()
})
})
})

test('handle will publish error', function (t) {
t.plan(2)
const persistence = memory()
const will = {
topic: 'mywill',
payload: Buffer.from('last will'),
qos: 0,
retain: false
}

persistence.broker = {
id: 'broker1'
}

persistence.putWill({
id: 'myClientId42'
}, will, function (err) {
t.error(err, 'no error')

const opts = {
persistence: persistence,
heartbeatInterval: 10
}

persistence.delWill = function (client, cb) {
cb(new Error('Throws error'))
}

const broker = aedes(opts)
t.tearDown(broker.close.bind(broker))

broker.once('error', function (err) {
t.equal('Throws error', err.message, 'throws error')
})
})
})

test('handle will publish error 2', function (t) {
t.plan(2)
const persistence = memory()
const will = {
topic: 'mywill',
payload: Buffer.from('last will'),
qos: 0,
retain: true
}

persistence.broker = {
id: 'broker1'
}

persistence.putWill({
id: 'myClientId42'
}, will, function (err) {
t.error(err, 'no error')

const opts = {
persistence: persistence,
heartbeatInterval: 10
}

persistence.storeRetained = function (packet, cb) {
cb(new Error('Throws error'))
}

const broker = aedes(opts)
t.tearDown(broker.close.bind(broker))

broker.once('error', function (err) {
t.equal('Throws error', err.message, 'throws error')
})
})
})

0 comments on commit f321ce9

Please sign in to comment.