Skip to content

Commit

Permalink
be ready to mqtt qos1
Browse files Browse the repository at this point in the history
  • Loading branch information
kalmyk committed Feb 4, 2020
1 parent 0cc0f10 commit 47b1a7a
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 15 deletions.
50 changes: 41 additions & 9 deletions lib/mqtt/gate.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ class MqttContext extends Context {
if (session.getLastPublishedId() == cmd.qid) {
return // do not duplicate MQTT events
}
let customId = this.getSession().genSessionMsgId()
session.waitForId(cmd.qid, customId)
session.setLastPublishedId(cmd.qid)

let payload = ''
Expand All @@ -102,7 +104,7 @@ class MqttContext extends Context {
topic: restoreMqttUri(cmd.uri),
payload: payload,
qos: (cmd.opt.history ? 1 : 0),
messageId: this.getSession().genSessionMsgId(),
messageId: customId,
cmd: 'publish'
})
}
Expand Down Expand Up @@ -159,26 +161,40 @@ class MqttGate extends BaseGate {
joinRealm(ctx, session, message) {
this.getRouter().getRealm(session.realmName, (realm) => {
realm.joinSession(session)
ctx.mqttConnack(CONNACK_RETURN_ACCEPTED, false)
if (message.will) {
session.setDisconnectPublish(ctx, this.makePublishCmd(ctx, message.will))
}
if (session.secureDetails.clientId) {
let found = false
realm.getKey((key, data, messageId) => {
session.setLastPublishedId(data)
found = true
}, () => {
ctx.mqttConnack(CONNACK_RETURN_ACCEPTED, found)
}, ['$FOX', 'clientOffset', session.secureDetails.clientId])
} else {
ctx.mqttConnack(CONNACK_RETURN_ACCEPTED, false)
}
})
}

hello (ctx, session, message) {
connect (ctx, session, message) {
let result
if (typeof message.username === 'string') {
result = message.username.match(/(.*)@([a-z0-9]*)$/i)
}
session.secureDetails = {}
if (result) {
session.realmName = result[2]
session.secureDetails = {username: result[1]}
// session.setSessionName(message.clientId, message.clean)
session.secureDetails.username = result[1]
} else if (message.username) {
ctx.mqttConnack(CONNACK_RETURN_BAD_USER_NAME_OR_PASSWORD, false)
return
}
if (message.clientId) {
session.secureDetails.clientId = message.clientId
session.secureDetails.sessionClean = message.clean
}

if (this.isAuthRequired(session)) {
if (!session.realmName) {
Expand Down Expand Up @@ -258,9 +274,9 @@ class MqttGate extends BaseGate {

handlers.connect = function (ctx, session, message) {
if (session.realm === null) {
this.hello(ctx, session, message)
this.connect(ctx, session, message)
} else {
ctx.mqttClose(1002, 'protocol violation')
ctx.mqttConnack(CONNACK_RETURN_IDENTIFIER_REJECTED, false)
}
return false
}
Expand Down Expand Up @@ -288,8 +304,20 @@ cmdAck.publish = function (cmd) {

handlers.puback = function (ctx, session, message) {
this.checkRealm(session)
// session.decode(message.messageId)
// session.realm.doConfirm(ctx, cmd)
let qid = session.fetchWaitId(message.messageId)
if (qid) {
session.realm.doConfirm(ctx, {
id: qid
})
if (session.secureDetails && session.secureDetails.clientId) {
session.realm.setKey(
['$FOX', 'clientOffset', session.secureDetails.clientId],
0, // session.sessionId,
qid,
qid
)
}
}
}

handlers.pingreq = function (ctx, session, message) {
Expand All @@ -304,6 +332,7 @@ handlers.subscribe = function (ctx, session, message) {
count: message.subscriptions.length
}
ctx.setId(pkg)
const afterId = session.getLastPublishedId()
for (let index=0; index < message.subscriptions.length; index++) {
let qos = Math.min(message.subscriptions[index].qos, 1)
pkg.granted[index] = qos
Expand All @@ -313,6 +342,9 @@ handlers.subscribe = function (ctx, session, message) {
if (qos > 0) {
opt.keepHistoryFlag = true
}
if (afterId) {
opt.after = afterId
}
session.realm.doTrace(ctx, {
mtype: 'subscribe',
id: index,
Expand Down
20 changes: 17 additions & 3 deletions lib/realm.js
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ class BaseEngine {
actor.acknowledged() // WAMP require to have TRACE ACK before first event

if (actor.getOpt().after) {
this.getHistory(actor, (cmd) => {
this.getHistoryAfter(actor.getOpt().after, actor.getUri(), (cmd) => {
actor.sendEvent({
data: cmd.data,
uri: cmd.uri,
Expand Down Expand Up @@ -449,8 +449,9 @@ class BaseEngine {
}

removeSession (sessionId) {}
setKey () {}
getKey (cbRow, cbDone, uri) {}
setKey (uri, sessionId, data, messageId) {}
getKey (cbRow, cbDone, uri) {cbDone()}
getHistoryAfter (after, uri, cbRow) {return new Promise(() => {})}
}

class BaseRealm extends EventEmitter {
Expand Down Expand Up @@ -644,12 +645,21 @@ class BaseRealm extends EventEmitter {
}
return this._foxApi
}

setKey (uri, sessionId, data, messageId) {
this.engine.setKey (uri, sessionId, data, messageId)
}

getKey (cbRow, cbDone, uri) {
this.engine.getKey(cbRow, cbDone, uri)
}
}

class MemEngine extends BaseEngine {
constructor () {
super()
this._keyDb = new Map()
this._messages = []
}

setKey (uri, sessionId, data, messageId) {
Expand All @@ -674,6 +684,10 @@ class MemEngine extends BaseEngine {
this._keyDb.delete(restoreUri(key))
}

getHistoryAfter (after, uri, cbRow) {
return new Promise(() => {})
}

removeSession (sessionId) {
let toRemove = []
for (let key in this._keyDb) {
Expand Down
4 changes: 2 additions & 2 deletions lib/router.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ class Router extends EventEmitter {
this._sessions = new Map()

this.on(SESSION_TX, function (session, data) {
this.trace('[' + session.sessionId + '] TX >', data)
this.trace('[' + session.sessionId + '] >', data)
})

this.on(SESSION_RX, function (session, msg) {
this.trace('[' + session.sessionId + '] RX <', msg)
this.trace('[' + session.sessionId + '] <', msg)
})

this.on('session.debug', function (session, msg) {
Expand Down
11 changes: 11 additions & 0 deletions lib/session.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ function Session (gate, sessionId) {
let willPublishCmd = undefined
let sessionMsgId = 0
let lastPublishedId = ''
let publishMap = new Map()
let userDetails = {}

/**
Expand Down Expand Up @@ -131,6 +132,16 @@ function Session (gate, sessionId) {
return ++sessionMsgId
}

this.waitForId = function (id, customId) {
publishMap.set(customId, id)
}

this.fetchWaitId = function (customId) {
let result = publishMap.get(customId)
publishMap.delete(customId)
return result
}

this.setLastPublishedId = function (id) {
lastPublishedId = id
}
Expand Down
2 changes: 1 addition & 1 deletion lib/tools.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module.exports.randomId = function () {
return Math.floor(Math.random() * 9007199254740992)
return require('crypto').randomBytes(6).readUIntBE(0, 6)
}

0 comments on commit 47b1a7a

Please sign in to comment.