Skip to content

Commit

Permalink
Merge pull request #243 from jookies/v0.6-beta
Browse files Browse the repository at this point in the history
v0.6b34
  • Loading branch information
farirat committed Jul 7, 2015
2 parents e5fd8fb + f8bcfeb commit 087bb1a
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 63 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ python:
# Command to install dependencies
install:
- python setup.py sdist
- sudo pip install dist/jasmin-0.6b33.tar.gz
- sudo pip install dist/jasmin-0.6b34.tar.gz
- sudo cp misc/config/init-script/jasmind-ubuntu /etc/init.d/jasmind
- sudo update-rc.d jasmind defaults
# Commands to run tests:
Expand Down
2 changes: 1 addition & 1 deletion jasmin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

MAJOR = 0
MINOR = 6
PATCH = 33
PATCH = 34
META = 'b'

def get_version():
Expand Down
62 changes: 25 additions & 37 deletions jasmin/managers/clients.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging
import pickle
import uuid
import time
import datetime
import jasmin
Expand Down Expand Up @@ -209,36 +208,16 @@ def perspective_connector_add(self, ClientConfig):
defer.returnValue(False)

# Declare queues
# First declare the messaging exchange
# First declare the messaging exchange (has no effect if its already declared)
yield self.amqpBroker.chan.exchange_declare(exchange='messaging', type='topic')
# submit.sm queue declaration and binding
routing_key_submit_sm = 'submit.sm.%s' % c.id
self.log.info('Declaring submit_sm queue to listen to: %s', routing_key_submit_sm)
yield self.amqpBroker.named_queue_declare(queue=routing_key_submit_sm)
yield self.amqpBroker.chan.queue_bind(queue=routing_key_submit_sm,
submit_sm_queue = 'submit.sm.%s' % c.id
routing_key = 'submit.sm.%s' % c.id
self.log.info('Binding %s queue to %s route_key' % (submit_sm_queue, routing_key))
yield self.amqpBroker.named_queue_declare(queue=submit_sm_queue, exclusive = True)
yield self.amqpBroker.chan.queue_bind(queue=submit_sm_queue,
exchange="messaging",
routing_key=routing_key_submit_sm)
# submit.sm.resp queue declaration and binding
routing_key_submit_sm_resp = 'submit.sm.resp.%s' % c.id
self.log.info('Declaring submit_sm_resp queue to publish to: %s', routing_key_submit_sm_resp)
yield self.amqpBroker.named_queue_declare(queue=routing_key_submit_sm_resp)
yield self.amqpBroker.chan.queue_bind(queue=routing_key_submit_sm_resp,
exchange="messaging",
routing_key=routing_key_submit_sm_resp)
# deliver.sm queue declaration and binding
routing_key_deliver_sm = 'deliver.sm.%s' % c.id
self.log.info('Declaring deliver_sm queue to publish to: %s', routing_key_deliver_sm)
yield self.amqpBroker.named_queue_declare(queue=routing_key_deliver_sm)
yield self.amqpBroker.chan.queue_bind(queue=routing_key_deliver_sm,
exchange="messaging",
routing_key=routing_key_deliver_sm)
# dlr queue declaration and binding
routing_key_dlr = 'dlr.%s' % c.id
self.log.info('Declaring dlr queue to publish to: %s', routing_key_dlr)
yield self.amqpBroker.named_queue_declare(queue=routing_key_dlr)
yield self.amqpBroker.chan.queue_bind(queue=routing_key_dlr,
exchange="messaging",
routing_key=routing_key_dlr)
routing_key=routing_key)

# Instanciate smpp client service manager
serviceManager = SMPPClientService(c, self.config)
Expand Down Expand Up @@ -346,15 +325,23 @@ def perspective_connector_start(self, cid):

# Subscribe to submit.sm.%cid queue
# check jasmin.queues.test.test_amqp.PublishConsumeTestCase.test_simple_publish_consume_by_topic
routing_key_submit_sm = 'submit.sm.%s' % connector['id']
consumerTag = 'SMPPClientFactory-%s.%s' % (connector['id'], str(uuid.uuid4()))
yield self.amqpBroker.chan.basic_consume(queue = routing_key_submit_sm,
no_ack = False,
consumer_tag = consumerTag)
submit_sm_queue = 'submit.sm.%s' % connector['id']
consumerTag = 'SMPPClientFactory-%s' % (connector['id'])

try:
# Using the same consumerTag will prevent getting multiple consumers on the same queue
# This can resolve the dark hole issue #234
yield self.amqpBroker.chan.basic_consume(queue = submit_sm_queue,
no_ack = False,
consumer_tag = consumerTag)
except Exception, e:
self.log.error('Error consuming from queue %s: %s' % (submit_sm_queue, e))
defer.returnValue(False)

submit_sm_q = yield self.amqpBroker.client.queue(consumerTag)
self.log.info('%s is consuming from routing key: %s', consumerTag, routing_key_submit_sm)
self.log.info('%s is consuming from queue: %s', consumerTag, submit_sm_queue)

# Set callbacks for every consumed message from routing_key_submit_sm
# Set callbacks for every consumed message from submit_sm_queue queue
d = submit_sm_q.get()
d.addCallback(
connector['sm_listener'].submit_sm_callback
Expand Down Expand Up @@ -412,8 +399,9 @@ def perspective_connector_stop(self, cid, delQueues = False):
connector['sm_listener'].clearAllTimers()

# Stop the queue consumer
self.log.debug('Stopping submit_sm_q consumer in connector [%s]', cid)
yield self.amqpBroker.chan.basic_cancel(consumer_tag = connector['consumer_tag'])
if connector['consumer_tag'] is not None:
self.log.debug('Stopping submit_sm_q consumer in connector [%s]', cid)
yield self.amqpBroker.chan.basic_cancel(consumer_tag = connector['consumer_tag'])

# Cleaning
self.log.debug('Cleaning objects in connector [%s]', cid)
Expand Down
19 changes: 15 additions & 4 deletions jasmin/managers/test/test_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,15 +462,20 @@ def test_start_sameconnector_twice_with_reconnecting_on_failure(self):

@defer.inlineCallbacks
def test_start_sameconnector_twice_with_noreconnecting_on_failure(self):
"""It was discovered that starting the connector twice would lead
to a multiple consumers on same queue, as of now, starting connector
twice is no more permitted
Related to #234"""

yield self.connect('127.0.0.1', self.pbPort)

localConfig = copy.copy(self.defaultConfig)
localConfig.reconnectOnConnectionFailure = False
yield self.add(localConfig)
yield self.start(localConfig.id)
startRet = yield self.start(localConfig.id)

self.assertEqual(True, startRet)
self.assertEqual(False, startRet)

yield self.stopall()

Expand Down Expand Up @@ -975,6 +980,12 @@ def deliver_sm_callback(self, message):
def test_deliverSm(self):
yield self.connect('127.0.0.1', self.pbPort)

# Bind to deliver.sm.CID
routingKey = 'deliver.sm.%s' % self.defaultConfig.id
queueName = 'test_deliverSm'
yield self.amqpBroker.named_queue_declare(queue=queueName, exclusive = True, auto_delete = True)
yield self.amqpBroker.chan.queue_bind(queue=queueName, exchange="messaging", routing_key=routingKey)

yield self.add(self.defaultConfig)
yield self.start(self.defaultConfig.id)

Expand All @@ -984,12 +995,12 @@ def test_deliverSm(self):
yield waitFor(2)

# Listen on the deliver.sm queue
queueName = 'deliver.sm.%s' % self.defaultConfig.id
consumerTag = 'test_deliverSm'
yield self.amqpBroker.chan.basic_consume(queue=queueName, consumer_tag=consumerTag, no_ack=True)
yield self.amqpBroker.chan.basic_consume(queue=queueName, no_ack=True, consumer_tag=consumerTag)
deliver_sm_q = yield self.amqpBroker.client.queue(consumerTag)
deliver_sm_q.get().addCallback(self.deliver_sm_callback)


yield self.stop(self.defaultConfig.id)

# Wait for unbound state
Expand Down
3 changes: 2 additions & 1 deletion jasmin/protocols/smpp/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ def __init__(self, config, msgHandler = None):
self.log = logging.getLogger(LOG_CATEGORY_CLIENT_BASE+".%s" % config.id)
if len(self.log.handlers) != 1:
self.log.setLevel(config.log_level)
_when = self.config.log_rotate if hasattr(self.config, 'log_rotate') else 'midnight'
handler = TimedRotatingFileHandler(filename=self.config.log_file,
when = self.config.log_rotate)
when = _when)
formatter = logging.Formatter(config.log_format, config.log_date_format)
handler.setFormatter(formatter)
self.log.addHandler(handler)
Expand Down
20 changes: 8 additions & 12 deletions jasmin/routing/router.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import time
import logging
import pickle
import uuid
import jasmin
from logging.handlers import TimedRotatingFileHandler
from twisted.spread import pb
Expand Down Expand Up @@ -72,10 +71,10 @@ def addAmqpBroker(self, amqpBroker):

# Subscribe to deliver.sm.* queues
yield self.amqpBroker.chan.exchange_declare(exchange='messaging', type='topic')
consumerTag = 'RouterPB.%s' % str(uuid.uuid4())
consumerTag = 'RouterPB-delivers'
routingKey = 'deliver.sm.*'
queueName = 'RouterPB_deliver_sm_all' # A local queue to RouterPB
yield self.amqpBroker.named_queue_declare(queue=queueName)
yield self.amqpBroker.named_queue_declare(queue=queueName, exclusive = True, auto_delete = True)
yield self.amqpBroker.chan.queue_bind(queue=queueName, exchange="messaging", routing_key=routingKey)
yield self.amqpBroker.chan.basic_consume(queue=queueName, no_ack=False, consumer_tag=consumerTag)
self.deliver_sm_q = yield self.amqpBroker.client.queue(consumerTag)
Expand All @@ -84,10 +83,10 @@ def addAmqpBroker(self, amqpBroker):

# Subscribe to bill_request.submit_sm_resp.* queues
yield self.amqpBroker.chan.exchange_declare(exchange='billing', type='topic')
consumerTag = 'RouterPB.%s' % str(uuid.uuid4())
consumerTag = 'RouterPB-billrequests'
routingKey = 'bill_request.submit_sm_resp.*'
queueName = 'RouterPB_bill_request_submit_sm_resp_all' # A local queue to RouterPB
yield self.amqpBroker.named_queue_declare(queue=queueName)
yield self.amqpBroker.named_queue_declare(queue=queueName, exclusive = True, auto_delete = True)
yield self.amqpBroker.chan.queue_bind(queue=queueName, exchange="billing", routing_key=routingKey)
yield self.amqpBroker.chan.basic_consume(queue=queueName, no_ack=False, consumer_tag=consumerTag)
self.bill_request_submit_sm_resp_q = yield self.amqpBroker.client.queue(consumerTag)
Expand All @@ -98,15 +97,12 @@ def addAmqpBroker(self, amqpBroker):
)
self.log.info('RouterPB is consuming from routing key: %s', routingKey)

def rejectAndRequeueMessage(self, message):
msgid = message.content.properties['message-id']

self.log.debug("Requeuing DeliverSmPDU[%s] without delay" % msgid)
return self.amqpBroker.chan.basic_reject(delivery_tag=message.delivery_tag, requeue=1)
@defer.inlineCallbacks
def rejectMessage(self, message):
return self.amqpBroker.chan.basic_reject(delivery_tag=message.delivery_tag, requeue=0)
yield self.amqpBroker.chan.basic_reject(delivery_tag=message.delivery_tag, requeue=0)
@defer.inlineCallbacks
def ackMessage(self, message):
return self.amqpBroker.chan.basic_ack(message.delivery_tag)
yield self.amqpBroker.chan.basic_ack(message.delivery_tag)

def activatePersistenceTimer(self):
if self.persistenceTimer and self.persistenceTimer.active():
Expand Down
14 changes: 7 additions & 7 deletions jasmin/routing/throwers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import pickle
import logging
import urllib
import uuid
from logging.handlers import TimedRotatingFileHandler
from twisted.application.service import Service
from twisted.internet import defer
Expand Down Expand Up @@ -36,7 +35,7 @@ def __init__(self):
self.log_category = "abstract-thrower"

self.exchangeName = 'messaging'
self.consumerTag = 'abstractThrower.%s' % str(uuid.uuid4())
self.consumerTag = 'abstractThrower'
self.routingKey = 'abstract_thrower.*'
self.queueName = 'abstract_thrower'
self.callback = self.throwing_callback
Expand Down Expand Up @@ -114,7 +113,7 @@ def addAmqpBroker(self, amqpBroker):
# Declare exchange, queue and start consuming to self.callback
yield self.amqpBroker.chan.exchange_declare(exchange = self.exchangeName,
type='topic')
yield self.amqpBroker.named_queue_declare(queue = self.queueName)
yield self.amqpBroker.named_queue_declare(queue = self.queueName, exclusive = True, auto_delete = True)
yield self.amqpBroker.chan.queue_bind(queue = self.queueName,
exchange = self.exchangeName,
routing_key = self.routingKey)
Expand Down Expand Up @@ -164,7 +163,7 @@ def __init__(self):

self.log_category = "jasmin-deliversm-thrower"
self.exchangeName = 'messaging'
self.consumerTag = 'deliverSmThrower.%s' % str(uuid.uuid4())
self.consumerTag = 'deliverSmThrower'
self.routingKey = 'deliver_sm_thrower.*'
self.queueName = 'deliver_sm_thrower'

Expand Down Expand Up @@ -202,13 +201,14 @@ def http_deliver_sm_callback(self, message):
encodedArgs = urllib.urlencode(args)
postdata = None
baseurl = dc.baseurl
if dc.method == 'GET':
_method = dc.method.upper()
if _method == 'GET':
baseurl += '?%s' % encodedArgs
else:
postdata = encodedArgs

self.log.debug('Calling %s with args %s using %s method.' % (dc.baseurl, args, dc.method))
content = yield getPage(baseurl, method = dc.method, postdata = postdata,
self.log.debug('Calling %s with args %s using %s method.' % (dc.baseurl, args, _method))
content = yield getPage(baseurl, method = _method, postdata = postdata,
timeout = self.config.timeout, agent = 'Jasmin gateway/1.0 deliverSmHttpThrower',
headers = {'Content-Type' : 'application/x-www-form-urlencoded',
'Accept' : 'text/plain'})
Expand Down

0 comments on commit 087bb1a

Please sign in to comment.