Permalink
Browse files

finished initial work on python-genericore

  • Loading branch information...
1 parent 1bd72c0 commit c7ef2d28759caec5da7d323e7a7195cc3cb6298c Felix Richter committed Jan 19, 2011
Showing with 40 additions and 17 deletions.
  1. +4 −0 README.md
  2. +30 −16 genericore/auto_amqp.py
  3. +6 −1 genericore/utils.py
View
@@ -0,0 +1,4 @@
+GENERICORE
+==========
+This is the python port of genericore, the generic information gathering
+framework.
View
@@ -1,5 +1,5 @@
import pika
-import json, sys,time
+import simplejson as json, sys,time
import logging
log = logging.getLogger('genericore-amqp')
@@ -9,7 +9,8 @@
"password" : "guest",
"host" : "localhost",
"port" : 5672,
- "heartbeat" : 0
+ "heartbeat" : 0,
+ "vhost" : "/"
},
"in" : {
"exchange" : False,
@@ -28,46 +29,59 @@ def __init__(self,config=None):
if config:
self.load_conf(config)
- def load_conf(self,config):
- self.config.update(config)
+ def load_conf(self,new_config):
+ """ loads and merges configuration from the given dictionary """
+ #self.config.update(new_config)
+ for k,v in new_config.items():
+ if k in self.config:
+ self.config[k].update(v)
+ else:
+ self.config[k] = v
def load_conf_file(self,config_file):
+ """ loads and merges configuration directly from a file """
with open(config_file) as f:
- new_conf = json.load(f)
+ new_conf = json.load(f,encoding='ascii')
self.load_conf(new_conf["genericore"])
def create_connection(self):
- if self.connection:
+ """ starts the connection the the AMQP Serve """
+ if self.conn:
raise Exception("Connection already open")
- log.debug (self.config['connection'])
- locals.update(self.config['connection'])
+ cfg = self.config['connection']
+ log.debug (str(cfg))
+ locals().update()
self.conn = pika.AsyncoreConnection(pika.ConnectionParameters(
- credentials = pika.PlainCredentials(username,password),
- heartbeat=heartbeat,
- virtual_host=vhost,
- port=port,
- host=host))
+ credentials = pika.PlainCredentials(cfg['login'],cfg['password']),
+ heartbeat=cfg['heartbeat'],
+ virtual_host=cfg['vhost'],
+ port=cfg['port'],
+ host=cfg['host']))
self.channel = self.conn.channel()
+
self._setup_tubes()
def _setup_tubes(self):
+ """ creates the in 'config' configured input and output """
chan = self.channel
inp = self.config['in']
out = self.config['out']
if inp['exchange']:
+ log.info('generating Input Queue'+ str(inp))
chan.exchange_declare(**inp)
self.qname = chan.queue_declare(exclusive=True).queue
chan.queue_bind(exchange=inp['exchange'],queue=self.qname)
self.consume = lambda cb : chan.basic_consume(cb,queue=self.qname,no_ack=True)
- self.start_loop = lambda : chan.asyncore_loop()
+ self.start_loop = lambda : pika.asyncore_loop()
if out['exchange']:
+ log.info('generating Output Exchange'+ str(out))
chan.exchange_declare(**out)
self.publish = lambda msg: chan.basic_publish(exchange=out['exchange'],routing_key='',body=msg)
def close_connection(self):
- self.connection.close()
- self.connection = None
+ self.conn.close()
+ self.conn= None
#cleanup
if hasattr(self,'consume'):
delattr(self,'consume')
View
@@ -1,6 +1,7 @@
#parses all "default" parser values with argparse
-import argparse
+import argparse,hashlib
+import simplejson
def parse_default(parser):
parser.add_argument('-s','--host',default='141.31.8.11', help='AMQP host ip address')
@@ -10,3 +11,7 @@ def parse_default(parser):
parser.add_argument('-b','--heartbeat',type=int,default=0,help='AMQP Heartbeat value')
parser.add_argument('-v','--vhost',default='/',help='AMQP vhost definition')
parser.add_argument('-d','--debug_level',default='/',help='AMQP vhost definition')
+ parser.add_argument('--unique-key',action='store_true', help='Unique Key')
+def generate_unique(PROTO_VERSION,args):
+ return hashlib.sha1(str(PROTO_VERSION) +
+ simplejson.dumps(args,sort_keys=True)).hexdigest()

0 comments on commit c7ef2d2

Please sign in to comment.