Permalink
Browse files

new possible configuration of multiple amqp exchanges

Also cleaned up a few bugs
multi_amqp has a slightly different way to be configured, see genericore mail_shiny main.py for a sample
  • Loading branch information...
1 parent 7c536b2 commit eab861227e74b7625b5c273efa06321ff923a353 Felix Richter committed Jan 30, 2011
Showing with 148 additions and 12 deletions.
  1. +2 −1 genericore/__init__.py
  2. +3 −3 genericore/auto_amqp.py
  3. +18 −3 genericore/mongo_connect.py
  4. +117 −0 genericore/multi_amqp.py
  5. +8 −5 genericore/utils.py
View
@@ -6,7 +6,8 @@
]
#__license__
__contributors__ = ''
-import auto_amqp,utils, mongo_connect
+import auto_amqp,utils, mongo_connect,multi_amqp
from auto_amqp import *
+from multi_amqp import *
from utils import *
from mongo_connect import *
View
@@ -30,14 +30,14 @@ class auto_amqp(Configurable):
def __init__(self,config=None):
Configurable.__init__(self,DEFAULT_CONFIG)
self.load_conf(config)
+
def create_connection(self):
""" starts the connection the the AMQP Serve """
if self.conn:
raise Exception("Connection already open")
cfg = self.config['amqp']['connection']
- log.debug (str(cfg))
- locals().update()
+ log.debug(str(cfg))
self.conn = pika.AsyncoreConnection(pika.ConnectionParameters(
credentials = pika.PlainCredentials(cfg['login'],cfg['password']),
heartbeat=cfg['heartbeat'],
@@ -72,7 +72,7 @@ def close_connection(self):
#cleanup
if hasattr(self,'consume'):
delattr(self,'consume')
- delattr(self,'start_loo')
+ delattr(self,'start_loop')
if hasattr(self,'publish'):
delattr(self,'publish')
@@ -19,21 +19,36 @@
class MongoConnect(Configurable):
def create_connection(self):
-
+ """ Will create a connection to the mongodb and, if wanted by user
+ ('drop_collection' in config is set True) clean up the database,
+ clean up the collection
+ will set following member variables:
+ conn - the connection to the database
+ db - the open database in the mongodb
+ coll - the open collection (similar to table in relational db)
+ """
conf = self.config[self.MODULE_NAME]
dconf = conf['database']
try:
self.conn = Connection(**conf['mongodb'])
-
self.db = self.conn[dconf['database']]
+ self.coll = self.db[dconf['collection']]
except Exception as e:
log.error('Mongodb not running or unreachable ! Bailing out:\n' + str(e))
sys.exit(0)
print dconf
if dconf['drop_collection'] :
log.info('dropping collection due to public demand')
- self.db[dconf['collection']].drop()
+ self.drop_collection()
+
+ def drop_collection(self,collection=None):
+ """ will drop a given collection (via collection name) from the open
+ database object self.db --> delete all data from collection """
+ if not collection:
+ self.coll.drop()
+ else:
+ self.db[collection].drop()
def __init__(self,MODULE_NAME='mongo_connect',conf=None):
self.MODULE_NAME = MODULE_NAME
View
@@ -0,0 +1,117 @@
+import pika
+import sys,time
+from utils import Configurable
+import logging
+log = logging.getLogger('multi_amqp')
+
+DEFAULT_CONFIG = {
+ "amqp" : {
+ "connection" : {
+ "login" : "guest",
+ "password" : "guest",
+ "host" : "localhost",
+ "port" : 5672,
+ "heartbeat" : 0,
+ "vhost" : "/"
+ },
+ "exchanges" : {
+ #"basic" : {
+ # "in" : {
+ # "exchange" : False,
+ # "type" : "fanout"
+ # },
+ # "out" : {
+ # "exchange" : False,
+ # "type" : "fanout"
+ # }
+ # }
+ }
+ }
+ }
+
+class C():
+ #dummy class for extending
+ pass
+
+class multi_amqp(Configurable):
+ """ Multi-amqp works similar to auto_amqp with the difference that it can
+ create multiple 'tubes' (input and output exchanges),
+ the configuration therefore looks a bit different"""
+ conn = None
+ def __init__(self,config=None):
+ Configurable.__init__(self,DEFAULT_CONFIG)
+ self.load_conf(config)
+
+
+ def create_connection(self):
+ """ starts the connection the the AMQP Serve """
+ if self.conn:
+ raise Exception("Connection already open")
+ cfg = self.config['amqp']['connection']
+ log.debug(str(cfg))
+ self.conn = pika.AsyncoreConnection(pika.ConnectionParameters(
+ credentials = pika.PlainCredentials(cfg['login'],cfg['password']),
+ heartbeat=cfg['heartbeat'],
+ virtual_host=cfg['vhost'],
+ port=cfg['port'],
+ host=cfg['host'],
+ ),reconnection_strategy=pika.SimpleReconnectionStrategy())
+ self.channel = self.conn.channel()
+
+ self.tubes = self._setup_tubes()
+ return self.tubes
+
+ def _setup_tubes(self):
+ """ creates the in 'config' configured input and output """
+ chan = self.channel
+ ret = []
+ print self.config['amqp']['exchanges']
+ for k,v in self.config['amqp']['exchanges'].items():
+ o = C()
+ inp = v['in'] if 'in' in v else None
+ out = v['out'] if 'out' in v else None
+ o.name = k
+ print str(k),str(inp),str(out)
+ if inp and inp['exchange']:
+ log.info('generating Input Queue'+ str(inp))
+ inp['type'] = inp['type'] if 'type' in inp else 'fanout'
+ chan.exchange_declare(**inp)
+ o.qname = chan.queue_declare(exclusive=True).queue
+ chan.queue_bind(exchange=inp['exchange'],queue=o.qname)
+ o.consume = lambda cb : chan.basic_consume(cb,queue=o.qname,no_ack=True)
+ o.start_loop = lambda : pika.asyncore_loop()
+
+ if out and out['exchange']:
+ out['type'] = out['type'] if 'type' in out else 'fanout'
+ log.info('generating Output Exchange'+ str(out))
+ chan.exchange_declare(**out)
+ o.publish = lambda msg: self.channel.basic_publish(exchange=out['exchange'],routing_key='',body=msg)
+ ret.append(o)
+ print ret
+ return ret
+
+ def close_connection(self):
+ self.conn.close()
+ self.conn= None
+ del self.tubes[:]
+
+ def populate_parser(self,parser):
+ """ populates an argparse parser """
+ parser.add_argument('-s','--host',dest='amqpHost', help='AMQP host ip address',metavar='HOST')
+ parser.add_argument('--port',type=int,dest='amqpPort',help='AMQP host port',metavar='PORT')
+ parser.add_argument('-u','--username',dest='amqpUsername', help='AMQP username',metavar='USER')
+ parser.add_argument('-p','--password',dest='amqpPassword',help='AMQP password',metavar='PASS')
+ parser.add_argument('-b','--heartbeat',dest='amqpHeartbeat',type=int,help='AMQP Heartbeat value',metavar='SECONDS')
+ parser.add_argument('-v','--vhost',dest='amqpVhost',help='AMQP vhost definition',metavar='VHOST')
+
+ def eval_parser(self,parsed):
+ """ loads its individual configuration from the parsed output """
+ conf = self.config['amqp']['connection']
+ conf['host'] = parsed.amqpHost if parsed.amqpHost else conf['host']
+ conf['port'] = parsed.amqpPort if parsed.amqpPort else conf['port']
+ conf['login'] = parsed.amqpUsername if parsed.amqpUsername else conf['login']
+ conf['password'] = parsed.amqpPassword if parsed.amqpPassword else conf['password']
+ conf['heartbeat'] = parsed.amqpHeartbeat if parsed.amqpHeartbeat in dir(parsed) else conf['heartbeat']
+ conf['vhost'] = parsed.amqpVhost if parsed.amqpVhost else conf['vhost']
+
+
View
@@ -1,7 +1,7 @@
#parses all "default" parser values with argparse
import argparse,hashlib,sys
-import logging
+import logging,copy
import simplejson as json #need to decode in ascii
log = logging.getLogger('genericore-utils')
@@ -15,6 +15,7 @@ def load_conf(self,new_config):
""" loads and merges configuration from the given dictionary """
if not new_config:
return
+ self.config = copy.deepcopy(self.config)
stack = [(self.config,new_config)]
while stack:
current_dst, current_src = stack.pop()
@@ -71,11 +72,13 @@ def configure(self,conf_list):
i.eval_parser(args)
except Exception as e: print (str(i.__class__) + "does not have eval_parser or load_conf" + str(e))
- self.blend(conf_list)
- log.debug ('New Configuration:' + str(self.config))
+ #self.blend(conf_list)
+ #log.debug ('New Configuration:' + str(self.config))
- def blend(self,conf_list):
- """ blends all configurations of all configurables into this object """
+ def _blend(self,conf_list):
+ """ blends all configurations of all configurables into this object
+ WARNING: This function is considered harmful! DO NOT USE
+ """
for i in conf_list:
self.load_conf(i.config)

0 comments on commit eab8612

Please sign in to comment.