Skip to content

Commit

Permalink
to trace
Browse files Browse the repository at this point in the history
  • Loading branch information
kalmyk committed Feb 18, 2020
1 parent b3f3683 commit 9f45900
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 39 deletions.
21 changes: 12 additions & 9 deletions bin/basic.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
//
// demo how to register custom function inside the router
//
const MSG = require('../lib/messages')
const Router = require('../index')
const program = require('commander')
Expand All @@ -6,10 +9,10 @@ program
.option('-p, --port <port>', 'Server IP port', 9000)
.parse(process.argv)

var app = new Router()
app.setLogTrace(true)
const router = new Router()
router.setLogTrace(true)

app.on(MSG.REALM_CREATED, function (realm, realmName) {
router.on(MSG.REALM_CREATED, function (realm, realmName) {
console.log('new Relm:', realmName)

realm.on(MSG.ON_REGISTERED, function (registeration) {
Expand All @@ -20,13 +23,13 @@ app.on(MSG.REALM_CREATED, function (realm, realmName) {
})
})

app.getRealm('realm1', function (realm) {
var api = realm.wampApi()
api.register('test.foo', function (id, args, kwargs) {
console.log('called with ', args, kwargs)
api.resrpc(id, null /* no error */, [ 'bar', 'bar2' ], { 'key1': 'bar1', 'key2': 'bar2' })
router.getRealm('realm1', function (realm) {
const api = realm.wampApi()
api.register('test.foo', function (id, args, kwargs, opt) {
console.log('function "test.foo" called with ', args, kwargs, opt)
api.resrpc(id, null /* no error */, ['bar', 'bar2'], { key1: 'bar1', key2: 'bar2' })
})
})

console.log('Listening port:', program.port)
app.listenWAMP({ port: program.port })
router.listenWAMP({ port: program.port })
19 changes: 8 additions & 11 deletions bin/mqtt_gate.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
//
// This is demonstration how to integrate HTTP server
// with two sockets listeners, WAMP & MQTT
//
const http = require('http')
const url = require('url')
const program = require('commander')
Expand All @@ -9,21 +13,13 @@ program
.option('-q, --mqtt <port>', 'MQTT Server IP port', 1883)
.parse(process.argv)

let router = new Router()
const router = new Router()
router.setLogTrace(true)

router.on(MSG.REALM_CREATED, function (realm, realmName) {
console.log('new Relm:', realmName)
})

router.getRealm('realm1', function (realm) {
var api = realm.wampApi()
api.register('test.foo', function (id, args, kwargs) {
console.log('called with ', args, kwargs)
api.resrpc(id, null /* no error */, ['bar', 'bar2'], { 'key1': 'bar1', 'key2': 'bar2' })
})
})

router.listenMQTT({ port: program.mqtt })
console.log('MQTT Listening port', program.mqtt)

Expand All @@ -33,14 +29,15 @@ console.log(`WAMP Web Socket ws://localhost:${program.http}/wamp`)
const wssMQTT = router.listenWsMQTT({ noServer: true })
console.log(`MQTT Web Socket ws://localhost:${program.http}/mqtt`)

let httpServer = http.createServer(function (req, res) {
const httpServer = http.createServer(function (req, res) {
res.writeHead(200, { 'Content-Type': 'text/html' })
console.log(req.headers)
res.end('hello!')
res.end('Hello from Fox-WAMP server!')
})

httpServer.listen(program.http, () => console.log(`HTTP Server Listening on ${program.http}`))

// share same socket between two listeners
// https://github.com/websockets/ws/pull/885
httpServer.on('upgrade', (request, socket, head) => {
const pathname = url.parse(request.url).pathname
Expand Down
20 changes: 6 additions & 14 deletions lib/mqtt/gate.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,6 @@ parse Packet {
length: 17,
topic: 'test',
payload: <Buffer 74 68 65 20 6d 65 73 73 61 67 65> }
parse Packet {
cmd: 'disconnect',
retain: false,
qos: 0,
dup: false,
length: 0,
topic: null,
payload: null }
===========================================================
Expand Down Expand Up @@ -79,11 +71,11 @@ let cmdAck = {}

class MqttContext extends Context {
sendEvent (cmd) {
let session = this.getSession()
const session = this.getSession()
if (session.getLastPublishedId() == cmd.qid) {
return // do not duplicate MQTT events
return // do not duplicate MQTT events
}
let customId = this.getSession().genSessionMsgId()
const customId = this.getSession().genSessionMsgId()
session.waitForId(cmd.qid, customId)
session.setLastPublishedId(cmd.qid)

Expand All @@ -103,7 +95,7 @@ class MqttContext extends Context {
this.mqttSend({
topic: restoreMqttUri(cmd.uri),
payload: payload,
qos: (cmd.opt.history ? 1 : 0),
qos: (cmd.opt.trace ? 1 : 0),
messageId: customId,
cmd: 'publish'
})
Expand Down Expand Up @@ -225,7 +217,7 @@ class MqttGate extends BaseGate {
opt.retain = true
}
if (message.qos >= 1) {
opt.history = true
opt.trace = true
}

let uri = mqttParse(message.topic)
Expand Down Expand Up @@ -343,7 +335,7 @@ handlers.subscribe = function (ctx, session, message) {
if (this.checkAuthorize(ctx, 'subscribe', uri, index)) {
let opt = {}
if (qos > 0) {
opt.keepHistoryFlag = true
opt.keepTraceFlag = true
}
if (afterId) {
opt.after = afterId
Expand Down
11 changes: 6 additions & 5 deletions lib/realm.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ class ActorTrace extends Actor {
sendEvent (cmd) {
cmd.id = this.msg.id
cmd.traceId = this.msg.qid
if (!this.msg.opt.keepHistoryFlag) {
delete cmd.opt.history
if (!this.msg.opt.keepTraceFlag) {
delete cmd.opt.trace
}
this.ctx.sendEvent(cmd)
}
Expand Down Expand Up @@ -387,11 +387,12 @@ class BaseEngine {
opt: { retained: true }
})
}).then(() => {
actor.flushDelayStack()
actor.traceStarted = true
actor.flushDelayStack()
})
} else {
actor.traceStarted = true
actor.flushDelayStack()
}

if (actor.getOpt().retained) {
Expand Down Expand Up @@ -702,7 +703,7 @@ class BaseRealm extends EventEmitter {
}

getKey (uri, cbRow) {
this.engine.getKey(uri, cbRow)
return this.engine.getKey(uri, cbRow)
}
}

Expand Down Expand Up @@ -765,7 +766,7 @@ class MemEngine extends BaseEngine {
doPush (actor) {
actor.setEventId(++this._messageGen)
super.doPush(actor)
if (actor.getOpt().history) {
if (actor.getOpt().trace) {
this._messages.push(actor.getEvent())
if (this._messages.length > 1100) {
this._messages = this._messages.splice(100)
Expand Down
97 changes: 97 additions & 0 deletions test/mqtt.js
Original file line number Diff line number Diff line change
Expand Up @@ -356,4 +356,101 @@ describe('mqtt-realm', function () {
done()
})
})

/* it('connect-clientid', function () {
realm.cleanupSession(cli)
router.getRealm = (realmName, cb) => {cb(realm)}
let i = 0
sender.send = chai.spy((msg, callback) => {
sender.send = nextPublish
console.log('TEST-MSG', ++i, msg)
cli.handle(ctx, {
cmd: 'subscribe',
retain: false,
qos: 1,
dup: false,
length: 17,
topic: null,
payload: null,
subscriptions: [{ topic: 'topic1', qos: 1 }],
messageId: 1
})
})
const nextPublish = chai.spy((msg) => {
sender.send = nextConnect
console.log('TEST-MSG', ++i, msg)
api.publish('topic1', [], { data: 1 }, { trace: true })
expect(realm.engine._messages.length, 'trace message need to be saved').to.equal(1)
cli.handle(ctx, {
cmd: 'disconnect',
retain: false,
qos: 0,
dup: false,
length: 0,
topic: null,
payload: null
})
})
const nextConnect = chai.spy((msg) => {
sender.send = nextConnack2
console.log('TEST-MSG', ++i, msg)
// api.publish('topic1', [], { data: 2 }, { trace: true })
cli.handle(ctx, {
cmd: 'connect',
retain: false,
qos: 0,
dup: false,
length: 17,
topic: null,
payload: null,
clean: false,
username: 'user@realm',
clientId: 'agent-state'
})
})
const nextConnack2 = chai.spy((msg) => {
sender.send = nextEventReceive
console.log('RTEST-MSG', ++i, msg)
cli.handle(ctx, {
cmd: 'subscribe',
retain: false,
qos: 1,
dup: false,
length: 17,
topic: null,
payload: null,
subscriptions: [{ topic: 'topic1', qos: 1 }],
messageId: 1
})
})
const nextEventReceive = chai.spy((msg) => {
console.log('NTEST-MSG', ++i, msg)
})
// START HERE
cli.handle(ctx, {
cmd: 'connect',
retain: false,
qos: 0,
dup: false,
length: 17,
topic: null,
payload: null,
clean: false,
username: 'user@realm',
clientId: 'agent-state'
})
}) */
})

0 comments on commit 9f45900

Please sign in to comment.