Skip to content

Commit

Permalink
Merge 29e5ddf into 81e75ac
Browse files Browse the repository at this point in the history
  • Loading branch information
gnought committed Sep 2, 2019
2 parents 81e75ac + 29e5ddf commit 1797b8f
Showing 1 changed file with 56 additions and 64 deletions.
120 changes: 56 additions & 64 deletions lib/handlers/subscribe.js
Expand Up @@ -11,12 +11,28 @@ var topicActions = fastfall([
subTopic
])

function SubAck (packet, granted) {
this.cmd = 'suback'
this.messageId = packet.messageId
this.granted = granted
}

function Subscription (qos, func) {
this.qos = qos
this.func = func
}

function SubscribeState (client, packet, finish, granted) {
this.client = client
this.packet = packet
this.finish = finish
this.subState = []
}

function SubState (client, packet, granted) {
this.client = client
this.packet = packet
this.granted = granted
this.subsIndex = 0
}

// if same subscribed topic in subs array, we pick up the last one
Expand All @@ -34,28 +50,26 @@ function _dedupe (subs) {
}

function handleSubscribe (client, packet, done) {
var broker = client.broker
var subs = packet.subscriptions
var granted = []

broker._series(
new SubscribeState(client, packet, done, granted),
client.broker._parallel(
new SubscribeState(client, packet, done),
doSubscribe,
subs.length === 1 ? subs : _dedupe(subs),
completeSubscribe)
}

function doSubscribe (sub, done) {
// TODO this function should not be needed
topicActions.call(this, sub, done)
var s = new SubState(this.client, this.packet, sub.qos)
this.subState.push(s)
topicActions.call(s, sub, done)
}

function authorize (sub, done) {
var client = this.client
var err = validateTopic(sub.topic, 'SUBSCRIBE')
if (err) {
return done(err)
}
var client = this.client
client.broker.authorizeSubscribe(client, sub, done)
}

Expand All @@ -70,72 +84,52 @@ function blockDollarSignTopics (func) {
}
}

function Subscription (qos, func) {
this.qos = qos
this.func = func
}

function storeSubscriptions (sub, done) {
var packet = this.packet
var client = this.client
var broker = client.broker
var perst = broker.persistence

if (!sub) {
this.granted.push(128)
if (!sub || typeof sub !== 'object') {
return done(null, null)
}

if (packet.subscriptions.length > 0 && ++this.subsIndex < packet.subscriptions.length) {
// TODO change aedes subscribe handle, but this fix bugs for now.
return done(null, sub)
}
var packet = this.packet

if (packet.restore) {
return done(null, sub)
}

var client = this.client

if (client.clean) {
return done(null, sub)
}

perst.addSubscriptions(client, packet.subscriptions, function addSub (err) {
client.broker.persistence.addSubscriptions(client, packet.subscriptions, function addSub (err) {
done(err, sub)
})
}

function subTopic (sub, done) {
var client = this.client
var broker = client.broker
var func = nop

if (!sub) {
if (!sub || typeof sub !== 'object') {
this.granted = 128
return done()
}

switch (sub.qos) {
case 2:
case 1:
func = client.deliverQoS
break
default:
func = client.deliver0
break
}
var client = this.client
var broker = client.broker
var topic = sub.topic
var qos = sub.qos
var func = qos > 0 ? client.deliverQoS : client.deliver0

// [MQTT-4.7.2-1]
if (isStartsWithWildcard(sub.topic)) {
if (isStartsWithWildcard(topic)) {
func = blockDollarSignTopics(func)
}

this.granted.push(sub.qos)

if (!client.subscriptions[sub.topic]) {
client.subscriptions[sub.topic] = new Subscription(sub.qos, func)
broker.subscribe(sub.topic, func, done)
} else if (client.subscriptions[sub.topic].qos !== sub.qos) {
broker.unsubscribe(sub.topic, client.subscriptions[sub.topic].func)
client.subscriptions[sub.topic] = new Subscription(sub.qos, func)
broker.subscribe(sub.topic, func, done)
if (!client.subscriptions[topic]) {
client.subscriptions[topic] = new Subscription(qos, func)
broker.subscribe(topic, func, done)
} else if (client.subscriptions[topic].qos !== qos) {
broker.unsubscribe(topic, client.subscriptions[topic].func)
client.subscriptions[topic] = new Subscription(qos, func)
broker.subscribe(topic, func, done)
} else {
done()
}
Expand All @@ -149,35 +143,39 @@ function isStartsWithWildcard (topic) {
}

function completeSubscribe (err) {
var packet = this.packet
var client = this.client
var broker = client.broker
var granted = this.granted
var done = this.finish

if (err) {
return done(err)
}

var packet = this.packet
var client = this.client
var broker = client.broker
var granted = this.subState.map(obj => obj.granted)
this.subState = []

var subs = packet.subscriptions

if (!packet.restore) {
broker.emit('subscribe', packet.subscriptions, client)
broker.emit('subscribe', subs, client)
}

if (packet.messageId) {
write(client, new SubAck(packet, granted), nop)
}

// negated subscription check
if (this.granted && this.granted[0] === 128) {
if (granted[0] === 128) {
return done()
} else {
done()
}

var persistence = broker.persistence
var topics = []
for (var i = 0; i < packet.subscriptions.length; i++) {
topics.push(packet.subscriptions[i].topic)
for (var i = 0; i < subs.length; i++) {
topics.push(subs[i].topic)
}
var stream = persistence.createRetainedStreamCombi(topics)
stream.pipe(through.obj(function sendRetained (packet, enc, cb) {
Expand All @@ -194,12 +192,6 @@ function completeSubscribe (err) {
}))
}

function SubAck (packet, granted) {
this.cmd = 'suback'
this.messageId = packet.messageId
this.granted = granted
}

function nop () {}

module.exports = handleSubscribe

0 comments on commit 1797b8f

Please sign in to comment.