Skip to content

mcollina/mqstreams

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

19 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

mqstreams  Build Status

Publish-Subscribe node streams style, based on mqemitter.

Installation

$ npm install mqemitter mqstreams --save

Basic Example

'use strict'

var mqemitter = require('mqemitter')
var mqstreams = require('mqstreams')
var emitter = mqstreams(mqemitter())
var through = require('through2')
var input = emitter.writable()
var output = emitter.readable('output/#')

emitter
  .readable('some/+')
  .pipe(through.obj(function (msg, enc, callback) {
    msg.topic = 'output/' + msg.topic
    this.push(msg)
    callback()
  }))
  .pipe(emitter.writable())

input.write({ topic: 'some/food', type: 'greek' })
input.write({ topic: 'some/startup', type: 'instasomething' })
input.end({ topic: 'some/dev', type: 'matteo' })

output.on('data', function (msg) {
  console.log(msg)

  // OUTPUT:
  // { topic: 'output/some/food', type: 'greek' }
  // { topic: 'output/some/startup', type: 'instasomething' }
  // { topic: 'output/some/dev', type: 'matteo' }
})

API


mqstreams(mqemitter)

Extends the MQEmitter with the readable() and writable() methods.


emitter.readable([topic], [opts])

Return a Readable stream in object mode that will include all emitter messages that match the given topic. The opts parameter is passed through to the Stream constructor. This stream fully respect the Stream3 interface.

The topic parameter is passed to the emitter.on method.

the returned object has the following method added: subscribe(), unsubscribe(), destroy().

emitter.readable#subscribe(topic)

Subscribe to the given topic, which can also be an array of topics.

emitter.readable#unsubscribe(topic)

Unsubscribe from the given topic, which can also be an array of topics.

emitter.readable#destroy()

Close the stream, unsubscribing from all the topics. This is aliased to close() for backwards compatibility.


emitter.writable([opts])

Return a Writable stream in object mode that will pass any message to the emitter.emit method. This stream fully respect the Stream3 interface.

LICENSE

MIT

About

MQ pub/sub as streams - based on mqemitter

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Sponsor this project

 

Packages

No packages published