Skip to content
Browse files

fixed critical bug, consume in multi_amqp now has different interface

  • Loading branch information...
1 parent dd46a5e commit cc5b12859ab463cce762d5ec468d11a1e97196a9 Felix Richter committed Feb 5, 2011
Showing with 5 additions and 3 deletions.
  1. +1 −1 VERSION
  2. +4 −2 genericore/multi_amqp.py
View
2 VERSION
@@ -1,2 +1,2 @@
#Brainfuck
-+++++++[>++++++++>+++++++>+++++++<<<-]>--.>---.>.
++++++++[>++++++++>+++++++>+++++++<<<-]>--.>---.>+.
View
6 genericore/multi_amqp.py
@@ -80,15 +80,17 @@ def _setup_tubes(self):
inp['type'] = inp['type'] if 'type' in inp else 'fanout'
chan.exchange_declare(**inp)
o.qname = chan.queue_declare(exclusive=True).queue
+ o.inp = inp['exchange']
chan.queue_bind(exchange=inp['exchange'],queue=o.qname)
- o.consume = lambda cb : chan.basic_consume(cb,queue=o.qname,no_ack=True)
+ o.consume = lambda cb,queue : chan.basic_consume(cb,queue=queue,no_ack=True)
o.start_loop = lambda : pika.asyncore_loop()
if out and out['exchange']:
out['type'] = out['type'] if 'type' in out else 'fanout'
log.info('generating Output Exchange'+ str(out))
chan.exchange_declare(**out)
- o.publish = lambda msg: self.channel.basic_publish(exchange=out['exchange'],routing_key='',body=msg)
+ o.out = out['exchange']
+ o.publish = lambda msg,exchange: self.channel.basic_publish(exchange=exchange,routing_key='',body=msg)
ret.append(o)
print ret
return ret

0 comments on commit cc5b128

Please sign in to comment.
Something went wrong with that request. Please try again.