Skip to content

Commit

Permalink
Merge 2e3e67c into 1a87b45
Browse files Browse the repository at this point in the history
  • Loading branch information
gnought committed Aug 24, 2019
2 parents 1a87b45 + 2e3e67c commit 1e5b9e6
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 32 deletions.
54 changes: 24 additions & 30 deletions aedes.js
Expand Up @@ -138,29 +138,26 @@ function Aedes (opts) {

util.inherits(Aedes, EE)

function storeRetained (_, done) {
var packet = this.packet
function storeRetained (packet, done) {
if (packet.retain) {
this.broker.persistence.storeRetained(packet, done)
} else {
done()
}
}

function emitPacket (_, done) {
this.packet.retain = false
this.broker.mq.emit(this.packet, done)
function emitPacket (packet, done) {
packet.retain = false
this.broker.mq.emit(packet, done)
}

function enqueueOffline (_, done) {
var packet = this.packet

function enqueueOffline (packet, done) {
var enqueuer = this.broker._enqueuers.get()

enqueuer.complete = done
enqueuer.status = this
enqueuer.packet = packet
enqueuer.topic = packet.topic

enqueuer.broker = this.broker
this.broker.persistence.subscriptionsByTopic(
packet.topic,
enqueuer.done
Expand All @@ -169,35 +166,36 @@ function enqueueOffline (_, done) {

function DoEnqueues () {
this.next = null
this.status = null
this.complete = null
this.packet = null
this.topic = null
this.broker = null

var that = this

this.done = function doneEnqueue (err, subs) {
var status = that.status
var broker = status.broker
var broker = that.broker

if (err) {
// is this really recoverable?
// let's just error the whole aedes
broker.emit('error', err)
} else {
var complete = that.complete
return
}

if (that.topic.indexOf('$SYS') === 0) {
subs = subs.filter(removeSharp)
}
if (that.topic.indexOf('$SYS') === 0) {
subs = subs.filter(removeSharp)
}

that.status = null
that.complete = null
that.topic = null
var packet = that.packet
var complete = that.complete

broker.persistence.outgoingEnqueueCombi(subs, status.packet, complete)
that.packet = null
that.complete = null
that.topic = null

broker._enqueuers.release(that)
}
broker.persistence.outgoingEnqueueCombi(subs, packet, complete)
broker._enqueuers.release(that)
}
}

Expand All @@ -210,6 +208,7 @@ function removeSharp (sub) {

function callPublished (_, done) {
this.broker.published(this.packet, this.client, done)
this.broker.emit('publish', this.packet, this.client)
}

var publishFuncsSimple = [
Expand All @@ -231,12 +230,7 @@ Aedes.prototype.publish = function (packet, client, done) {
var p = new Packet(packet, this)
var publishFuncs = p.qos > 0 ? publishFuncsQoS : publishFuncsSimple

this._series(new PublishState(this, client, p), publishFuncs, null, function (err) {
this.broker.emit('publish', packet, this.client)
if (done) {
done(err)
}
})
this._series(new PublishState(this, client, packet), publishFuncs, p, done)
}

Aedes.prototype.subscribe = function (topic, func, done) {
Expand Down
8 changes: 6 additions & 2 deletions test/meta.js
Expand Up @@ -55,7 +55,7 @@ test('call published method', function (t) {
})

test('call published method with client', function (t) {
t.plan(2)
t.plan(4)

var broker = aedes()

Expand All @@ -64,6 +64,8 @@ test('call published method with client', function (t) {
if (client) {
t.equal(packet.topic, 'hello', 'topic matches')
t.equal(packet.payload.toString(), 'world', 'payload matches')
t.equal(packet.qos, 1)
t.equal(packet.messageId, 42)
broker.close()
done()
}
Expand All @@ -74,7 +76,9 @@ test('call published method with client', function (t) {
s.inStream.write({
cmd: 'publish',
topic: 'hello',
payload: Buffer.from('world')
payload: Buffer.from('world'),
qos: 1,
messageId: 42
})
})

Expand Down

0 comments on commit 1e5b9e6

Please sign in to comment.