Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Replace websocket-stream with ws #413

Closed
wants to merge 4 commits into from
Closed

Conversation

robertsLando
Copy link
Member

No description provided.

@robertsLando
Copy link
Member Author

@mcollina I'm getting an error on CI that seems caused by protocol decoder:

Cannot read property 'remotePort' of undefined

I see that in websocket-stream npm module the socket property is added here

And protocol decoder fails here:

var socket = conn.socket || conn
....

 proto.port = socket._socket.remotePort
 ipFamily = socket._socket.remoteFamily

In poor words how can I get remotePort and remoteFamily props using the ws stream? I think this should be fixed in protocol decoder so

@mcollina
Copy link
Collaborator

mcollina commented Feb 6, 2020

it should be there somewhere, dig deep

@robertsLando
Copy link
Member Author

It should be there but can't find it 😕 https://nodejs.org/api/net.html#net_socket_remoteaddress

@mcollina
Copy link
Collaborator

mcollina commented Feb 6, 2020

@robertsLando
Copy link
Member Author

robertsLando commented Feb 6, 2020

@mcollina This way it works but I much prefer websocket-stream way

@github-actions
Copy link

github-actions bot commented Feb 6, 2020

Pull Request Test Coverage Report for Build f991d0e6ae895de4310f48e734d75f5b17bb1404-PR-413

  • 0 of 0 changed or added relevant lines in 0 files are covered.
  • No unchanged relevant lines lost coverage.
  • Overall coverage remained the same at ?%

Totals Coverage Status
Change from base Build 727c37ac50904da88a068e651cf2d5a46526e643: 0%
Covered Lines:
Relevant Lines: 0

💛 - Coveralls

@mcollina
Copy link
Collaborator

mcollina commented Feb 6, 2020

The implementation of ws is significantly better than the one in websocket-stream. We might want to update the readme with this change as well.

Note: we should ask about adding the socket property to ws streams.

@robertsLando
Copy link
Member Author

robertsLando commented Feb 6, 2020 via email

@robertsLando
Copy link
Member Author

@mcollina If you check the issue on ws module I asked to add the property but they told me that they won't do it (check link for more info). Should I also update examples?

@mcollina
Copy link
Collaborator

mcollina commented Feb 7, 2020 via email

@robertsLando
Copy link
Member Author

robertsLando commented Feb 7, 2020 via email

@robertsLando
Copy link
Member Author

@mcollina So what with this?

@mcollina
Copy link
Collaborator

I would just create a tiny module that offers the same API of websocket-stream server: https://www.npmjs.com/package/websocket-stream#on-the-server

@gnought
Copy link
Collaborator

gnought commented Feb 10, 2020

How about we create a factory createServer method to provide websocket, tls function?

@robertsLando
Copy link
Member Author

robertsLando commented Feb 10, 2020 via email

@gnought
Copy link
Collaborator

gnought commented Feb 10, 2020

We could build a tiny module to include it

@robertsLando
Copy link
Member Author

robertsLando commented Feb 10, 2020 via email

@gnought
Copy link
Collaborator

gnought commented Feb 11, 2020

Maybe we could follow what fastify did. https://github.com/fastify/fastify/blob/master/lib/server.js

@gnought
Copy link
Collaborator

gnought commented Feb 11, 2020

Haven't done thoughtful check,

const assert = require('assert')
const WebSocket = require('ws')
const tls = require('tls')
const http = require('http')
const https = require('https')
const http2 = require('http2')
const net = require('net')

function createServer (options, aedesHandler) {
  assert(options, 'Missing options')
  assert(aedesHandler, 'Missing aedes handler')

  var server = null
  if (options.serverFactory) {
    server = options.serverFactory(aedesHandler, options)
  } else if (options.tls) {
    server = tls.createServer(options, aaedesHandler)
  } else if (options.ws) {
    if (options.https) {
      if (options.http2) {
        server = http2.createSecureServer(options.https)
      } else {
        server = https.createServer(options.https)
      }
    } else {
      if (options.http2) {
        server = http2.createServer()
        server.on('session', sessionTimeout(options.http2SessionTimeout))
      } else {
        server = http.createServer()
      }
    }
    const ws = new WebSocket.Server({ server: server })
    ws.on('connection', function (conn, req) {
      const stream = WebSocket.createWebSocketStream(conn)
      aedesHandler(stream, req)
    })
  } else {
    server = net.createServer(options, aedesHandler)
  }
  return server
}

@robertsLando
Copy link
Member Author

@gnought I think that is not necessary to support every protocol, @mcollina was speaking about a module for websockets, that should be enought

@robertsLando
Copy link
Member Author

@mcollina Are you ok so for a aedes-websocket module?

@mcollina
Copy link
Collaborator

yes I am

@gnought
Copy link
Collaborator

gnought commented Feb 14, 2020

As _socket is not available in ws, so there’s a proposal to create a tiny module to provide that function and replace the websocket-stream, but I don’t think it is bound to aedes only.
Rather I prefer to have a tiny module to provide a createServer function and we offer a serverFactory to let users to provide their own architecture, even including PROXY is nice.

@robertsLando
Copy link
Member Author

@gnought Yes but that requires some modules that maybe the user don't need ?

@robertsLando
Copy link
Member Author

robertsLando commented Feb 18, 2020

@mcollina @gnought I would like to work on this in next days but I would like to find something that is good for everone. @mcollina says to create the single websocket module, @gnought prefers a module that will act as server factory for aedes. In my opinion a websocket module is enought as others servers are really easy to setup and that is the only one that requires a little trick

What do you think?

@robertsLando
Copy link
Member Author

From what I understand the proxy protocol decoding must happen before the websocket even exists

I don't understand what you mean @mcollina. How do you think we should implement protocol decoder so? As an aedes plugin (like it is working now) or like @gnought suggests?

As @gnought said now the flow is:

  1. pass conn to aedes (req will be available if conn is ws), conn is a raw stream (could be http/https/tcp/ws)
  2. decode the raw stream if PROXY is there.
  3. then pass to mqtt parser to read mqtt packets

And he would like it to be:

  1. have a connection factory and decode conn to be a plain mqtt stream
  2. pass conn to aedes handle
  3. then pass to mqtt parser to read mqtt packets.

I'm ok for boths but I would like to know what you prefer so we can decide if implement a 'websocket module' or a 'connection factory' module

@mcollina
Copy link
Collaborator

Decoding of PROXY must happen before decoding the websocket protocol. From my understanding the approach @gnought is suggesting will not work with the most common proxies (haproxy for example).

@mmm8955405
Copy link

I hope that the code will be submitted as soon as possible and provide examples for use. If you can't use an API similar to websocket stream on the server, what is the function of the stream API you provide? However, I can only choose websocket stream at present

@robertsLando
Copy link
Member Author

robertsLando commented Feb 25, 2020

However, I can only choose websocket stream at present

@mmm8955405 You can replace websocket-stream in this way:

const { createWebSocketStream, Server } = require('ws')

function createWebsocketServer (server, broker) {
  const ws = new Server({ server })

  ws.on('connection', function (conn, req) {
    var stream = createWebSocketStream(conn)
    stream._socket = conn._socket
    broker.handle(stream, req)
  })
}

const http = require('http')
const server = http.createServer()
// Than use this function to bind aedes to your server
createWebsocketServer(server, broker)

@gnought
Copy link
Collaborator

gnought commented Feb 25, 2020

@mcollina I am suggesting like the following:

  1. have a connection factory and decode conn to be a plain mqtt stream
  2. pass conn to aedes handle
  3. then pass to mqtt parser to read mqtt packets.

The connection factory is like a layer on top of aedes, if we want ws,
ws => aedes.handle
if want want PROXY+ws
PROXY => ws => aedes.handle

I will try to create a createServer() function to demonstrate what it will look like.

@robertsLando
Copy link
Member Author

@gnought @mcollina ping

@mcollina
Copy link
Collaborator

mcollina commented Apr 8, 2020

Decoding of PROXY must happen before decoding the websocket protocol.

I do not have anything to add to the above. PROXY should not be implemented as a chain of streams.
PROXY is decoded and then the data does not touch that code anymore. None of this should be relevant to Websockets.

@robertsLando
Copy link
Member Author

I need to know what @gnought wants to do so

@gnought
Copy link
Collaborator

gnought commented Apr 9, 2020

sorry, too much work on my shoulder.
I have tried ws over http/2, it works without PROXY only, and requires several hacks in https://github.com/websockets/ws project. I will create a PR for support PROXY on plain mqtt, mqtt over tls and ws over http/https only. Tested successfully on haproxy+aedes, maybe I haven't enough time working on unit tests. Help is welcome. :)

@gnought gnought marked this pull request as draft April 9, 2020 12:08
@demian85
Copy link

demian85 commented Jul 2, 2020

Hello, I found an issue after upgrading from 0.40 to 0.42. Typescript fails with the following example found in README

const aedes = require('./aedes')()
const httpServer = require('http').createServer()
const ws = require('websocket-stream')
const port = 8888

ws.createServer({ server: httpServer }, aedes.handle) // Argument of type '(stream: Connection) => Client' is not assignable to parameter of type '() => void'.ts(2345)

httpServer.listen(port, function () {
  console.log('websocket server listening on port ', port)
})

Any ideas on how to fix this? does that handle return a Client or void as expected by websocket-stream?

@robertsLando
Copy link
Member Author

@demian85 Fixed here: #487

Feel free to fix the example in Readme

@robertsLando
Copy link
Member Author

robertsLando commented Oct 9, 2020

@mcollina This ended up with nothing done. I would like to sort this out but I need to know what in your opinion is the best approach.

Actually we use this approach:

const { createWebSocketStream, Server } = require('ws')

function createWebsocketServer (server, broker) {
  const ws = new Server({ server })

  ws.on('connection', function (conn, req) {
   // PROTOCOL DECODE SHOULD BE DONE HERE ?
    var stream = createWebSocketStream(conn)
    stream._socket = conn._socket
    broker.handle(stream, req)
  })
}

const http = require('http')
const server = http.createServer()
// Than use this function to bind aedes to your server
createWebsocketServer(server, broker)

So actually we firstly create the websocket strem then we pass it to aedes handle and here we decode the protocol (proxy or what else) and here we pass the decoded data to mqtt parser.

So for what I have understanded the best option could be a serverFactory like #413 (comment) and move the protocol decoder there? Am I correct?

@mcollina
Copy link
Collaborator

mcollina commented Oct 9, 2020

I think so, yes.

@robertsLando
Copy link
Member Author

@getlarge Would you like to contribute to that? I can init the repos than there should be some tests to do with your protocol decoder

@getlarge
Copy link
Member

getlarge commented Oct 9, 2020

@robertsLando Sure, i could give it a try next week.
I have to read that feed of info and refresh my memory first, but in the meantime you can start the repo.

@robertsLando
Copy link
Member Author

@getlarge Nice! Will start working to this next week: https://github.com/moscajs/aedes-server-factory

@getlarge
Copy link
Member

@robertsLando Just to be clear what's the plan ?
From the above conversation, i understood that this module was about creating a wrapper around ws, but i see in the description of the new repo Supports tcp, tls, http, https, http2, ws, wss and proxy decoders. If it's the latter, you think of something like @gnought proposed with the serverFactory ?

As for moving the protocol decoder step in that module, even though i agree, it seems quite tricky as we need to use the same conn stream that aedes is using to extract the information. Eventually we could use a transform stream in between (before aedes.handle is called) to get the connection infos and remove the proxyprotocol header from the stream, but then where the information about the socket and / or proxy would be stored ? Should they be returned by the serverFactory and that becomes user responsability ?

And btw, just to add info about the PROXY protocol, it really make sense when we use simple TCP connections (with net module for example), and we need to retrieve information about the original address when using a proxy that passes the stream, that's the main use case i had in mind when creating the protocol decoder.
With HTTP we can easily use headers to attach infos from that PROXY header at the server level ( HAProxy, Nginx ... ), so i considered that req.headers were enough to get connection details.

@robertsLando
Copy link
Member Author

If it's the latter, you think of something like @gnought proposed with the serverFactory ?

Yes, that's the main idea.

it seems quite tricky as we need to use the same conn stream that aedes is using to extract the information

but then where the information about the socket and / or proxy would be stored ? Should they be returned by the serverFactory and that becomes user responsability

I started implementing the server factory and I was having the same questions. I'm stucked and I need some ideas... So the main reason of serverFactory would be to simplify the process of creating and attacing aedes to a server and also decode proxy or whatelse, the problem here is that with the actual protocol decoder implementation you had the client instance to attach the connection details and now you don't. I'm stucked 😕

@mcollina Ideas?

@getlarge
Copy link
Member

@robertsLando The client presence in the arguments of protocolDecoder is one thing that we can workaround quite easily by letting the user, append the extracted data her/himself.
What seems more tricky is to start reading the conn stream to check if there is a PROXY protocol header, without messing with aedes that waits for the full stream.

I tried something for the serverFactory, it's probably not working but it gives an idea

const WebSocket = require('ws');
const tls = require('tls');
const http = require('http');
const https = require('https');
const http2 = require('http2');
const net = require('net');
const net = require('net');
const protocolDecoder = require('aedes-protocol-decoder');
const { Transform, PassThrough } = require('stream');

function createServer(options, aedesHandler) {
  return new Promise((resolve, reject) => {
    if (!options) {
      return reject(new Error('Missing options'));
    } else if (!aedesHandler) {
      return reject(new Error('aedes handler'));
    }

    var server = null;
    if (options.serverFactory) {
      server = options.serverFactory(aedesHandler, options);
      resolve({ server });
    } else if (options.tls) {
      server = tls.createServer(options, aaedesHandler);
      resolve({ server });
    } else if (options.ws) {
      if (options.https) {
        if (options.http2) {
          server = http2.createSecureServer(options.https);
        } else {
          server = https.createServer(options.https);
        }
      } else {
        if (options.http2) {
          server = http2.createServer();
          server.on('session', sessionTimeout(options.http2SessionTimeout));
        } else {
          server = http.createServer();
        }
      }
      const ws = new WebSocket.Server({ server });
      ws.on('connection', function (conn, req) {
        const stream = WebSocket.createWebSocketStream(conn);
        stream._socket = conn._socket;
        // extract proxy if any ?
        // const removeProxy = removeFromStream(proxyLength);
        // stream.pipe(removeProxy)
        aedesHandler(stream, req);
        resolve({ server });
      });
    } else {
      server = net.createServer(options, (conn) => {
        if (options.useProxy) {
          const rewindableStream = conn.pipe(new Rewindable());
          const { buf, protocol } = extractProxy(rewindableStream);
          const removeProxy = removeFromStream(buf.length);
          const rewound = rewindableStream.rewind();
          conn = rewound.pipe(removeProxy);
        }
        aedesHandler(conn);
        resolve({ server });
      });
    }
  });
}

const extractProxy = (conn, req) => {
  const buf = conn.read(null);
  const protocol = protocolDecoder({ req }, buf);
  // proto contains {data, ipAddress, port, serverIpAddress, isProxy}
  return { buf, protocol };
};

class Rewindable extends Transform {
  constructor() {
    super();
    this.accumulator = [];
  }

  _transform(buf, enc, cb) {
    this.accumulator.push(buf);
    callback();
  }

  rewind() {
    const stream = new PassThrough();
    this.accumulator.forEach((chunk) => stream.write(chunk));
    return stream;
  }
}

const removeFromStream = (length) => {
  let buffer = Buffer.alloc(0);
  let removed = false;

  return new Transform({
    transform(chunk, encoding, callback) {
      if (!removed || buffer.length <= length) {
        buffer = Buffer.concat([buffer, chunk], buffer.length + chunk.length);
        const partChunk = buffer.slice(length);
        this.push(partChunk);
        removed = true;
      } else {
        this.push(chunk);
      }
      callback();
    },
  });
};

module.exports = {
  createServer,
};

@robertsLando
Copy link
Member Author

It seems ok but also a little bit tricky and maybe performance killer? Also If you check here the protocol decoder was running only in case connack wasn't sent, is there any reason to do it on every rquest?

@getlarge
Copy link
Member

It's based on Promise, so that we can return the server + the connDetails to the user.
Check the else condition that uses the net.createServer, i only worked on this case now.
Based on the idea that we should extract connection details and eventual Proxy header:

  • we start to read the conn stream (wrapped in a transform stream),
  • use the buffer to extract informations needed
  • rewind the stream
  • remove the proxy part from the rewinded stream (if any) with another transform stream
  • use that stream chains in aedes.handle

The advantage is that aedes would not have to bother with protocol decoding and that would allow to remove those ugly nested conditions in nextBatch ( that i made :) )

@getlarge
Copy link
Member

Previously we needed that connack check because nextBatch was executed for each incoming packet, in the serverFactory, it would be executed only on connection event

@robertsLando
Copy link
Member Author

robertsLando commented Oct 13, 2020

@getlarge I think that this could be the way, good job! Would you like to submit a PR to server factory? I also would like to know @mcollina thoughts about this solution and If this caould be improved someway :)

PS: If you are interested I could add you to our org MoscaJS org

@mcollina
Copy link
Collaborator

as long as it does not slow performance, I'm good.

@robertsLando
Copy link
Member Author

@mcollina Do you think that solution could slow it or not? @getlarge for this we will add some performance test to see how it goes

@getlarge
Copy link
Member

getlarge commented Oct 13, 2020

@robertsLando Yes i can make a PR soon, but i'll have to change the protocolDecoder implementation first, and yes for the org, i will not have not to handle some forks anymore ;)

@mcollina For the perf, the connection handling will most probably slower if there is a proxy header because of the stream manipulation, but it could only affect positively the common operations ( incoming messages handling ), as we could remove the nested conditions in aedes.client.nextBatch.
But we can add some tests to be sure.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants