Skip to content

Latest commit



165 lines (121 loc) · 4.31 KB

File metadata and controls

165 lines (121 loc) · 4.31 KB


Coverage Status Build Status

Use MQTT with RXJS in node or in the browser with MQTT over WebSocket. Docs

The API is similar to the rxjs internal WebSocket implementation for consistency but replaces the multiplex operator with topics.]


yarn add musquette
# or
npm install musquette

Import via

import { MQTTSubject } from 'musquette'
// or
import { MQTTSubject } from 'musquette/dist/lib/musquette'


Establish a connection and listen on a topic:

It is assumed that the payload is JSON parseable string.

import { MQTTSubject } from 'musquette'

let mqtt = new MQTTSubject(`ws://localhost:9001`)
let topic = mqtt.topic(`test/topic`)

  next: message => console.log(message.test) // "test",
  error: console.error

// or if you are not interested in errors
topic.subscribe(message => console.log(message.test)) // "test

// publish when call on a topic only expects a payload
  test: 'test'

Send a payload without subscribing to a topic

import { MQTTSubject } from 'musquette'

let mqtt = new MQTTSubject(`ws://localhost:9001`)

// next expects an MQTTMessage object that consists at least
// of a topic and a message property{
  topic: 'test/topic',
  message: {
    test: 'test'

// publish on a connection expects two arguments: a topic and payload
mqtt.publish('test/topic', {
    test: 'test'

This is equivalent to the first method but does not subscribe to the topic


import { Subject, Observable, merge } from 'rxjs'
import { mapTo } from 'rxjs/operators'
import { MQTTSubject } from 'musquette/dist/lib/musquette.js'

let connected$ = new Subject()
let disconnecting$ = new Subject()
let disconnected$ = new Subject()


let mqtt = new MQTTSubject({
  url: `ws://localhost:9001`,

  // mqtt.js options
  options: {
    keepalive: 3000,
      'mqttjs_' +
        .substr(2, 8)

  // function that packs the payload that is sent
  // (T) => Buffer
  serializer: value => Buffer.from(JSON.stringify(value)),

  // function that unpacks the payload
  // (Buffer) => T
  deserializer: message => JSON.parse(message.toString()),

  // Observer that is called when connection is established
  connectObserver: connected$,

  // Observer that is called when disconnect is imminent
  disconnectingObserver: disconnecting$,

  // Observer that is notified when connection has ended
  disconnectObserver: disconnected$


setTimeout(() => {
  // disconnect
}, 5000)

NPM scripts

  • npm t: Run test suite
  • npm start: Run npm run build in watch mode
  • npm run test:watch: Run test suite in interactive watch mode
  • npm run test:prod: Run linting and generate coverage
  • npm run build: Generate bundles and typings, create docs
  • npm run lint: Lints code
  • npm run commit: Commit using conventional commit style (husky will tell you to use it if you haven't 😉)


This library uses the copy pasted mqtt-wildcard library from Sebastian Raff.

Based on typescript library starter from @alexjoverm.

Contributors (emoji key):

This project follows the all-contributors specification. Contributions of any kind are welcome!