Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat!]: change to named exports to enable monkey patching #888

Open
getlarge opened this issue Sep 1, 2023 · 11 comments
Open

[feat!]: change to named exports to enable monkey patching #888

getlarge opened this issue Sep 1, 2023 · 11 comments

Comments

@getlarge
Copy link
Member

getlarge commented Sep 1, 2023

Is your feature request related to a problem? Please describe.

I am currently trying to build an OpenTelemetry instrumentation library for Aedes to enable traces to be completed with what happens inside Aedes.
OpenTelemetry provides a library that contains a base class which provide many helpers to patch the module to instrument.
It relies heavily on shimmer, require-in-the-middle and import-in-the-middle to achieve this.
My plan is to start by patching :

  • some handlers (handleConnect, handlePublish, handleSubscribe, handleUnsubscribe)
  • some Aedes methods (handle, preConnect, subscribe)
    To be able to write a first working PoC that would create spans and close spans at publish, subscribe and message delivery for QoS 0.

After giving a first try i noticed few obstacles :

  1. the handlers are (kind of) default export, and it is not possible (unless proven otherwise) to patch the default export (i guess it is not mutable object ?)
  2. the methods assigned during the class (or function) instantiation (for example aedes instance method handle) cannot be patched as they are not explicitly assigned to Aedes.prototype
  3. it will be needed to make the span context transit from the packet published to the packet delivered, it means adding “hidden” properties to the packet that would be replicated when creating a new packet instance.
    So aedes-packet would need to be patched as well, and updated to use a named export.
    So mqtt-packet needs to be patched at runtime to store and retrieve the serialized context following this proposal

Describe the solution you'd like

  • I know this is a major breaking change, but converting all the handlers to named export would solve (1).
  • Assigning handle and preConnect to Aedes.prototype would solve (2) with the cost of assigning extra variables to Aedes instance.

Describe alternatives you've considered

  • Using require in the middle to patch before patching… i guess it sounds as bad as it is.
  • using patch-package works well but is not sustainable

Additional context
Add any other context or screenshots about the feature request here.

@getlarge
Copy link
Member Author

getlarge commented Sep 1, 2023

This could be an opportunity to revise how modules are exported and solve #878 by the same occasion.

@getlarge
Copy link
Member Author

I will work on a PR soon. For reference i am copying the patch that i applied for aedes-otel-instrumentation.

diff --git a/node_modules/aedes/aedes.js b/node_modules/aedes/aedes.js
index c02d289..668b162 100644
--- a/node_modules/aedes/aedes.js
+++ b/node_modules/aedes/aedes.js
@@ -7,7 +7,7 @@ const series = require('fastseries')
 const { v4: uuidv4 } = require('uuid')
 const reusify = require('reusify')
 const { pipeline } = require('stream')
-const Packet = require('aedes-packet')
+const { Packet } = require('aedes-packet')
 const memory = require('aedes-persistence')
 const mqemitter = require('mqemitter')
 const Client = require('./lib/client')
@@ -45,6 +45,7 @@ function Aedes (opts) {
   // +1 when construct a new aedes-packet
   // internal track for last brokerCounter
   this.counter = 0
+  this.concurrency = opts.concurrency
   this.queueLimit = opts.queueLimit
   this.connectTimeout = opts.connectTimeout
   this.maxClientsIdLength = opts.maxClientsIdLength
@@ -52,24 +53,19 @@ function Aedes (opts) {
     concurrency: opts.concurrency,
     matchEmptyLevels: true // [MQTT-4.7.1-3]
   })
-  this.handle = function handle (conn, req) {
-    conn.setMaxListeners(opts.concurrency * 2)
-    // create a new Client instance for a new connection
-    // return, just to please standard
-    return new Client(that, conn, req)
-  }
+
   this.persistence = opts.persistence || memory()
   this.persistence.broker = this
   this._parallel = parallel()
   this._series = series()
   this._enqueuers = reusify(DoEnqueues)
 
-  this.preConnect = opts.preConnect
-  this.authenticate = opts.authenticate
-  this.authorizePublish = opts.authorizePublish
-  this.authorizeSubscribe = opts.authorizeSubscribe
-  this.authorizeForward = opts.authorizeForward
-  this.published = opts.published
+  this._preConnect = opts.preConnect
+  this._authenticate = opts.authenticate
+  this._authorizePublish = opts.authorizePublish
+  this._authorizeSubscribe = opts.authorizeSubscribe
+  this._authorizeForward = opts.authorizeForward
+  this._published = opts.published
 
   this.decodeProtocol = opts.decodeProtocol
   this.trustProxy = opts.trustProxy
@@ -250,6 +246,15 @@ function removeSharp (sub) {
   return code !== 43 && code !== 35
 }
 
+// assiging to prototype is a breaking change as it is required to bind the Aedes instance to the function
+// @example net.createServer(broker.handle.bind(broker)) or net.createServer((socket) => broker.handle(socket))
+Aedes.prototype.handle = function handle (conn, req) {
+    conn.setMaxListeners(this.concurrency * 2)
+    // create a new Client instance for a new connection
+    // return, just to please standard
+    return new Client(this, conn, req)
+}
+
 function callPublished (_, done) {
   this.broker.published(this.packet, this.client, done)
   this.broker.emit('publish', this.packet, this.client)
@@ -338,6 +343,30 @@ Aedes.prototype.close = function (cb = noop) {
 
 Aedes.prototype.version = require('./package.json').version
 
+Aedes.prototype.preConnect = function (client, packet, callback) {
+  this._preConnect(client, packet, callback)
+}
+
+Aedes.prototype.authenticate = function (client, username, password, callback) {
+  this._authenticate(client, username, password, callback)
+}
+
+Aedes.prototype.authorizePublish = function (client, packet, callback) {
+  this._authorizePublish(client, packet, callback)
+}
+
+Aedes.prototype.authorizeSubscribe = function (client, sub, callback) {
+  this._authorizeSubscribe(client, sub, callback)
+}
+
+Aedes.prototype.authorizeForward = function (client, packet) {
+  return this._authorizeForward(client, packet)
+}
+
+Aedes.prototype.published = function (packet, client, callback) {
+  this._published(packet, client, callback)
+}
+
 function defaultPreConnect (client, packet, callback) {
   callback(null, true)
 }
diff --git a/node_modules/aedes/lib/client.js b/node_modules/aedes/lib/client.js
index 414d8e5..e525712 100644
--- a/node_modules/aedes/lib/client.js
+++ b/node_modules/aedes/lib/client.js
@@ -4,12 +4,12 @@ const mqtt = require('mqtt-packet')
 const EventEmitter = require('events')
 const util = require('util')
 const eos = require('end-of-stream')
-const Packet = require('aedes-packet')
-const write = require('./write')
+const { Packet } = require('aedes-packet')
+const { write } = require('./write')
 const QoSPacket = require('./qos-packet')
-const handleSubscribe = require('./handlers/subscribe')
-const handleUnsubscribe = require('./handlers/unsubscribe')
-const handle = require('./handlers')
+const { handleSubscribe } = require('./handlers/subscribe')
+const { handleUnsubscribe } = require('./handlers/unsubscribe')
+const { handle } = require('./handlers')
 const { pipeline } = require('stream')
 const { through } = require('./utils')
 
diff --git a/node_modules/aedes/lib/handlers/connect.js b/node_modules/aedes/lib/handlers/connect.js
index a4c32d0..bd2d8cb 100644
--- a/node_modules/aedes/lib/handlers/connect.js
+++ b/node_modules/aedes/lib/handlers/connect.js
@@ -2,10 +2,10 @@
 
 const retimer = require('retimer')
 const { pipeline } = require('stream')
-const write = require('../write')
+const { write } = require('../write')
 const QoSPacket = require('../qos-packet')
 const { through } = require('../utils')
-const handleSubscribe = require('./subscribe')
+const { handleSubscribe } = require('./subscribe')
 const uniqueId = require('hyperid')()
 
 function Connack (arg) {
@@ -264,4 +264,4 @@ function emptyQueueFilter (err, client, packet) {
   }
 }
 
-module.exports = handleConnect
+module.exports = { handleConnect }
diff --git a/node_modules/aedes/lib/handlers/index.js b/node_modules/aedes/lib/handlers/index.js
index a5dfaa8..b611293 100644
--- a/node_modules/aedes/lib/handlers/index.js
+++ b/node_modules/aedes/lib/handlers/index.js
@@ -1,13 +1,13 @@
 'use strict'
 
-const handleConnect = require('./connect')
-const handleSubscribe = require('./subscribe')
-const handleUnsubscribe = require('./unsubscribe')
-const handlePublish = require('./publish')
-const handlePuback = require('./puback')
-const handlePubrel = require('./pubrel')
-const handlePubrec = require('./pubrec')
-const handlePing = require('./ping')
+const { handleConnect } = require('./connect')
+const { handleSubscribe } = require('./subscribe')
+const { handleUnsubscribe } = require('./unsubscribe')
+const { handlePublish } = require('./publish')
+const { handlePuback } = require('./puback')
+const { handlePubrel } = require('./pubrel')
+const { handlePubrec } = require('./pubrec')
+const { handlePing } = require('./ping')
 
 function handle (client, packet, done) {
   if (packet.cmd === 'connect') {
@@ -74,4 +74,4 @@ function finish (conn, packet, done) {
   done(error)
 }
 
-module.exports = handle
+module.exports = { handle }
diff --git a/node_modules/aedes/lib/handlers/ping.js b/node_modules/aedes/lib/handlers/ping.js
index a4c042c..69b3ded 100644
--- a/node_modules/aedes/lib/handlers/ping.js
+++ b/node_modules/aedes/lib/handlers/ping.js
@@ -1,6 +1,6 @@
 'use strict'
 
-const write = require('../write')
+const { write } = require('../write')
 const pingResp = {
   cmd: 'pingresp'
 }
@@ -10,4 +10,4 @@ function handlePing (client, packet, done) {
   write(client, pingResp, done)
 }
 
-module.exports = handlePing
+module.exports = { handlePing }
diff --git a/node_modules/aedes/lib/handlers/puback.js b/node_modules/aedes/lib/handlers/puback.js
index e4b419c..8376861 100644
--- a/node_modules/aedes/lib/handlers/puback.js
+++ b/node_modules/aedes/lib/handlers/puback.js
@@ -8,4 +8,4 @@ function handlePuback (client, packet, done) {
   })
 }
 
-module.exports = handlePuback
+module.exports = { handlePuback }
diff --git a/node_modules/aedes/lib/handlers/publish.js b/node_modules/aedes/lib/handlers/publish.js
index e30c9db..5c3e167 100644
--- a/node_modules/aedes/lib/handlers/publish.js
+++ b/node_modules/aedes/lib/handlers/publish.js
@@ -1,6 +1,6 @@
 'use strict'
 
-const write = require('../write')
+const { write } = require('../write')
 
 function PubAck (packet) {
   this.cmd = 'puback'
@@ -62,4 +62,4 @@ function authorizePublish (packet, done) {
   this.broker.authorizePublish(this, packet, done)
 }
 
-module.exports = handlePublish
+module.exports = { handlePublish }
diff --git a/node_modules/aedes/lib/handlers/pubrec.js b/node_modules/aedes/lib/handlers/pubrec.js
index 5c914dd..dc7a7f0 100644
--- a/node_modules/aedes/lib/handlers/pubrec.js
+++ b/node_modules/aedes/lib/handlers/pubrec.js
@@ -1,6 +1,6 @@
 'use strict'
 
-const write = require('../write')
+const { write } = require('../write')
 
 function PubRel (packet) {
   this.cmd = 'pubrel'
@@ -27,4 +27,4 @@ function handlePubrec (client, packet, done) {
   }
 }
 
-module.exports = handlePubrec
+module.exports = { handlePubrec }
diff --git a/node_modules/aedes/lib/handlers/pubrel.js b/node_modules/aedes/lib/handlers/pubrel.js
index 09dcc86..672b697 100644
--- a/node_modules/aedes/lib/handlers/pubrel.js
+++ b/node_modules/aedes/lib/handlers/pubrel.js
@@ -1,6 +1,6 @@
 'use strict'
 
-const write = require('../write')
+const { write } = require('../write')
 
 function ClientPacketStatus (client, packet) {
   this.client = client
@@ -47,4 +47,4 @@ function pubrelDel (arg, done) {
   persistence.incomingDelPacket(this.client, arg.packet, done)
 }
 
-module.exports = handlePubrel
+module.exports = { handlePubrel }
diff --git a/node_modules/aedes/lib/handlers/subscribe.js b/node_modules/aedes/lib/handlers/subscribe.js
index 2470427..e2007aa 100644
--- a/node_modules/aedes/lib/handlers/subscribe.js
+++ b/node_modules/aedes/lib/handlers/subscribe.js
@@ -1,10 +1,10 @@
 'use strict'
 
 const fastfall = require('fastfall')
-const Packet = require('aedes-packet')
+const { Packet } = require('aedes-packet')
 const { through } = require('../utils')
 const { validateTopic, $SYS_PREFIX } = require('../utils')
-const write = require('../write')
+const { write } = require('../write')
 
 const subscribeTopicActions = fastfall([
   authorize,
@@ -245,4 +245,4 @@ function completeSubscribe (err) {
 
 function noop () { }
 
-module.exports = handleSubscribe
+module.exports = { handleSubscribe }
diff --git a/node_modules/aedes/lib/handlers/unsubscribe.js b/node_modules/aedes/lib/handlers/unsubscribe.js
index e08c317..b9cd7ef 100644
--- a/node_modules/aedes/lib/handlers/unsubscribe.js
+++ b/node_modules/aedes/lib/handlers/unsubscribe.js
@@ -1,6 +1,6 @@
 'use strict'
 
-const write = require('../write')
+const { write } = require('../write')
 const { validateTopic, $SYS_PREFIX } = require('../utils')
 
 function UnSubAck (packet) {
@@ -101,4 +101,4 @@ function completeUnsubscribe (err) {
 
 function noop () { }
 
-module.exports = handleUnsubscribe
+module.exports = { handleUnsubscribe }
diff --git a/node_modules/aedes/lib/qos-packet.js b/node_modules/aedes/lib/qos-packet.js
index 5527fe1..07c1581 100644
--- a/node_modules/aedes/lib/qos-packet.js
+++ b/node_modules/aedes/lib/qos-packet.js
@@ -1,6 +1,6 @@
 'use strict'
 
-const Packet = require('aedes-packet')
+const { Packet } = require('aedes-packet')
 const util = require('util')
 
 function QoSPacket (original, client) {
diff --git a/node_modules/aedes/lib/write.js b/node_modules/aedes/lib/write.js
index 716d81a..a5d186c 100644
--- a/node_modules/aedes/lib/write.js
+++ b/node_modules/aedes/lib/write.js
@@ -21,4 +21,4 @@ function write (client, packet, done) {
   setImmediate(done, error, client)
 }
 
-module.exports = write
+module.exports = { write }
diff --git a/node_modules/aedes/types/client.d.ts b/node_modules/aedes/types/client.d.ts
index 2906213..c415fce 100644
--- a/node_modules/aedes/types/client.d.ts
+++ b/node_modules/aedes/types/client.d.ts
@@ -6,10 +6,10 @@ import {
   Subscriptions,
   UnsubscribePacket
 } from './packet'
-import { Connection } from './instance'
+import Aedes, { Connection } from './instance'
 import { EventEmitter } from 'node:events'
 
-export interface Client extends EventEmitter {
+export class Client extends EventEmitter {
   id: Readonly<string>;
   clean: Readonly<boolean>;
   version: Readonly<number>;
@@ -19,6 +19,8 @@ export interface Client extends EventEmitter {
   connected: Readonly<boolean>;
   closed: Readonly<boolean>;
 
+  constructor(broker: Aedes, conn: Connection, req?: IncomingMessage)
+
   on(event: 'connected', listener: () => void): this;
   on(event: 'error', listener: (error: Error) => void): this;

@mcollina
Copy link
Collaborator

mcollina commented Sep 15, 2023

This seems a massive change to do (not opposed).

I think modifying aedes-packet is incorrect, as the transaction should be decouple in the tracing flow: publishing terminates when mqemitter terminates, and delivery starts as another trace. This is needed to correctly support multi-processes systems with Redis or MongoDB

@getlarge
Copy link
Member Author

getlarge commented Sep 15, 2023

This seems a massive change to do (not opposed).

I think modifying aedes-packet is incorrect, as the transaction should be decouple in the tracing flow: publishing terminates when mqemitter terminates, and delivery starts as another trace. This is needed to correctly support multi-processes systems with Redis or MongoDB

Indeed, this implies some breaking changes.

Aedes-packet does not need to be modified BUT mqtt-packet has to be patched at runtime to propagate the context and enable distributed traces, those traces are composed of multiple spans which SHOULD be related to trace the communication between multiple services.
That’s how it is specified in OpenTelemetry.

To reformulate your statement :
publishing terminates **a span** when mqemitter terminates, and delivery starts as another **span**. …the latter is linked using the span identifier of the former, as a parent.
This test illustrates it.

Regarding your concern about multi process systems, the packets are stored by those systems (in aedes-persistence-X and aedes-emitter-X) right ? So as long as the context is correctly serialized into the packet it should be fine ?
Am i missing something ?
BTW for the serialization of the context, i simply followed what is suggested in this document, as you can here

@mcollina
Copy link
Collaborator

Changing mqtt-packet is a non starter unfortunately:(.

@getlarge
Copy link
Member Author

Changing mqtt-packet is a non starter unfortunately:(.

Maybe i wasn’t clear, no change will be requested in the mqtt-packet source code.

@robertsLando
Copy link
Member

@mcollina what we would like to know is mostly if the approach could have some performance implications and/or if there are better alternatives

@mcollina
Copy link
Collaborator

I think the changes here are not really needed, as the "in-the-middle" approach would be sufficient. However I think it would make things easier.

As for perf, it should be neutral when no monkeypatching is used.

A better approach would be to design a system that requires no monkeypatching at all. This would definitely be faster.


How do you plan to propagate the trace over the MQTT protocol?

@getlarge
Copy link
Member Author

getlarge commented Sep 18, 2023

A better approach would be to design a system that requires no monkeypatching at all. This would definitely be faster.

How do you imagine this design ?
Allowing the Aedes consumer to provide some extra functions that can wrap the functions that needs to monitored ?


How do you plan to propagate the trace over the MQTT protocol?

As said in one of the message above:
There is a document that recommends solutions to store and retrieve the context (trace) to and from the MQTT packet.
To summarize it:

  1. for MQTT v5 (which does not yet apply for Aedes) packet.properties.userProperties should be used to store the traceparent and tracestate as properties.
  2. for MQTT v3, in the case of a JSON payload, the traceparent and tracestate should be stored as properties.
  3. for MQTT v3, in other cases the approach is a bit more vague and relies on the binary protocol proposal. It is a bit more vague as it does not recommend a location for the context to be stored, but it suggests a serialization/deserialization algorithms that we could apply. In that case i thought we could prepend the packet payload with it.

@mcollina
Copy link
Collaborator

IMHO this makes sense only with MQTT5

@getlarge
Copy link
Member Author

I agree that it would make less of a performance penalty for MQTT 5. For MQTT 3 if some users truly wish to propagate trace between their systems, i don't see another alternative (except prepending the whole MQTT packet with the trace context, in the same way this is done for the PROXY protocol).
It could be an opt-in feature ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants