Skip to content

Commit

Permalink
AMQP: Use exchange instead of queue for discovery
Browse files Browse the repository at this point in the history
References msgflo/msgflo#13
  • Loading branch information
jonnor committed Feb 27, 2017
1 parent 6a87305 commit a1db2bc
Showing 1 changed file with 14 additions and 7 deletions.
21 changes: 14 additions & 7 deletions src/amqp.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
debug = require('debug')('msgflo:amqp')
async = require 'async'
interfaces = require './interfaces'
uuid = require 'uuid'

try
amqp = require 'amqplib/callback_api'
Expand Down Expand Up @@ -144,10 +145,12 @@ class Client extends interfaces.MessagingClient
protocol: 'discovery'
command: 'participant'
payload: part
@channel.assertQueue 'fbp'
data = new Buffer JSON.stringify msg
@channel.sendToQueue 'fbp', data
return callback null
topic = 'fbp'
@channel.assertExchange topic, 'fanout', (err) ->
return callback err if err
data = new Buffer JSON.stringify msg
@channel.publish topic, data
return callback null

class MessageBroker extends Client
constructor: (address, options) ->
Expand Down Expand Up @@ -221,9 +224,13 @@ class MessageBroker extends Client
data: data
return handler out

@channel.assertQueue 'fbp'
@channel.consume 'fbp', deserialize
return callback null
@channel.assertExchange 'fbp', 'fanout', {}, (err) =>
return callback err if err
subscribeQueue = '.fbp-subscribe-' + uuid.v4()
@channel.assertQueue subscribeQueue, { persistent: false }, (err) =>
return callback err if err
@channel.consume subscribeQueue, deserialize
return callback null

exports.Client = Client
exports.MessageBroker = MessageBroker

0 comments on commit a1db2bc

Please sign in to comment.