Skip to content
This repository has been archived by the owner on Aug 31, 2021. It is now read-only.

Commit

Permalink
Merge 7b2f8ca into 9ca01fd
Browse files Browse the repository at this point in the history
  • Loading branch information
jpwilliams committed Jun 6, 2018
2 parents 9ca01fd + 7b2f8ca commit 89ab0d3
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 0 deletions.
2 changes: 2 additions & 0 deletions lib/Remit.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ const Endpoint = require('./Endpoint')
const Listener = require('./Listener')
const Request = require('./Request')
const Emitter = require('./Emitter')
const Watch = require('./Watch')

class Remit {
constructor (options = {}) {
this.listen = new CallableWrapper(this, Listener)
this.emit = new CallableWrapper(this, Emitter)
this.endpoint = new CallableWrapper(this, Endpoint)
this.request = new CallableWrapper(this, Request)
this.watch = new CallableWrapper(this, Watch)

this.version = packageJson.version

Expand Down
173 changes: 173 additions & 0 deletions lib/Watch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
const EventEmitter = require('eventemitter3')
const { ulid } = require('ulid')
const bubble = require('../utils/bubble')
const waterfall = require('../utils/asyncWaterfall')
const parseEvent = require('../utils/parseEvent')
const handlerWrapper = require('../utils/handlerWrapper')
const throwAsException = require('../utils/throwAsException')

class Watch {
constructor (remit, event, ...handlers) {
this._remit = remit
this._emitter = new EventEmitter()
this._state = undefined

if (!event) {
throw new Error('No event specified when creating a watcher')
}

this.options({ event })

if (handlers.length) {
this.handler(...handlers)
}
}

get state () {
return this._state
}

handler (...fns) {
this._handler = waterfall(...fns.map(handlerWrapper))

return this
}

on (...args) {
this._emitter.on(...args)

return this
}

options (opts = {}) {
if (!this._options) this._options = {}
this._options.event = opts.event || this._options.event
this._options.queue = `remit::data::${this._options.event}`

return this
}

start () {
if (this._started) {
return this._started
}

if (!this._handler) {
throw new Error('Trying to boot watcher with no handler')
}

this._started = this._setup(this._options)

return this._started
}

async _incoming (ack, message) {
if (!message) {
await throwAsException(new Error('Consumer cancelled unexpectedly; this was most probably done via RabbitMQ\'s management panel'))
}

try {
var data = JSON.parse(message.content.toString())
} catch (e) {
console.error(e)

return
} finally {
if (ack) this._consumer.nack(message, false, true)
}

this._state = data

if (message.properties.headers) {
bubble.set('originId', message.properties.headers.originId)
if (!bubble.get('bubbleId')) bubble.set('bubbleId', ulid())
}

const event = parseEvent(message.properties, message.fields, data, {
flowType: 'entry',
isReceiver: true
})

this._handler(event)

try {
this._emitter.emit('data', event)
} catch (e) {
console.error(e)
}
}

async _setup ({ queue, event }) {
this._starting = true
let consumerQueue

try {
const worker = await this._remit._workers.acquire()

try {
await Promise.all([
worker.assertQueue(queue, {
exclusive: false,
durable: true,
autoDelete: false,
maxLength: 1
}).then(() => worker.bindQueue(
queue,
this._remit._exchange,
event
)),

worker.assertQueue('', {
exclusive: true,
durable: false,
autoDelete: true,
maxLength: 1
}).then(({ queue }) => {
consumerQueue = queue

return worker.bindQueue(
consumerQueue,
this._remit._exchange,
event
)
})
])
} catch (e) {
this._remit._workers.destroy(worker)

throw e
}

const connection = await this._remit._connection
this._consumer = await connection.createChannel()
this._consumer.on('error', console.error)
this._consumer.on('close', () => {
throwAsException(new Error('Consumer died - this is most likely due to the RabbitMQ connection dying'))
})

await this._consumer.consume(
consumerQueue,
bubble.bind(this._incoming.bind(this, false)),
{
noAck: true,
exclusive: true
}
)

const msg = await this._consumer.get(queue)

if (msg) {
bubble.bind(this._incoming.bind(this, true))(msg)
}

delete this._starting

return this
} catch (e) {
delete this._starting
await throwAsException(e)
}
}
}

module.exports = Watch

0 comments on commit 89ab0d3

Please sign in to comment.