Skip to content
Find file
Fetching contributors…
Cannot retrieve contributors at this time
370 lines (284 sloc) 11.8 KB
#!/usr/bin/env python
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at
""" PulseBroker
:copyright: (c) 2012 by Mozilla
:license: MPLv2
Assumes Python v2.6+
-c --config Configuration file (json format)
default: None
-r --redis Redis server connection string
default: localhost:6379
--redisdb Redis database ID
default: 8
-p --pulse Pulse server connection string
default: None (i.e. Mozilla's Pulse)
-t --topic Pulse topic string
default: #
-d --debug Turn on debug logging
default: False
-l --logpath Path where the log file output is written
default: None
-b --background Fork to a daemon process
default: False
Sample Configuration file
{ 'redis': 'localhost:6379',
'debug': True,
'logpath': '.'
bear Mike Taylor <>
import json
import time
from Queue import Empty
from multiprocessing import Process, Queue, get_logger
import zmq
from releng import initOptions, initLogs, dbRedis
from releng.constants import ID_PULSE_WORKER
from mozillapulse import consumers
appInfo = '|briar-patch'
log = get_logger()
eventQueue = Queue()
SERVER_CHECK_INTERVAL = 30 # how often, in seconds, to check for new servers
PING_FAIL_MAX = 1 # how many pings can fail before server is marked inactive
PING_INTERVAL = 30 # ping servers every 2 minutes
MSG_TIMEOUT = 30 # how long to wait in seconds until a pending message is considered expired
def OfflineTest(options):'Starting Offline message testing')
hArchive = open(options.testfile, 'r+')
for msg in hArchive:
job = json.loads(msg)
def cbMessage(data, message):
""" cbMessage
Parses the incoming pulse event and create a "job" that will be sent
to a job processing server via ZeroMQ router.
The job is placed into an event queue for async processing.
routingKey = data['_meta']['routing_key']
msgType = routingKey.split('.')[0]
payload = data['payload']
job = {}
job['master'] = data['_meta']['master_name']
job['pulse_key'] = routingKey
job['time'] = data['_meta']['sent']
job['id'] = data['_meta']['message_id']
job['pulse'] = data
if msgType == 'build':
if 'build' in payload:
job['slave'] = payload['build']['slave']
job['event'] = 'build'
elif msgType == 'slave':
if 'slave' in payload:
job['slave'] = payload['slave']['name']
job['event'] = 'slave connect'
job['slave'] = payload['slavename']
job['event'] = 'slave disconnect'
elif msgType == 'change':
job['event'] = 'source'
class zmqService(object):
def __init__(self, serverID, router, db, events):
self.router = router
self.db = db = events = serverID
self.address ='%s:' % ID_PULSE_WORKER, '')
if ':' not in self.address:
self.address = '%s:5555' % self.address
self.address = 'tcp://%s' % self.address
log.debug('connecting to server %s' % self.address)
def init(self):
self.payload = None
self.expires = None
self.sequence = 0
self.errors = 0
self.alive = True
self.lastPing = time.time()
def isAvailable(self):
return self.alive and self.payload is None
def reply(self, reply):
if options.debug:
log.debug('reply %s' %
sequenceReply = reply.pop(0)
if options.debug:
log.debug('recv %s [%s]' % (, reply[0]))
self.errors = 0
self.alive = True
self.lastPing = time.time()
self.expires = self.lastPing + MSG_TIMEOUT
if int(sequenceReply) == self.sequence:
self.payload = None
log.error('reply received out of sequence')
def request(self, msg):
if options.debug:
log.debug('request %s' %
if self.isAvailable():
self.sequence += 1
self.payload = [, str(self.sequence), 'job', msg]
self.expires = time.time() + MSG_TIMEOUT
if options.debug:
log.debug('send %s %d chars [%s]' % (, len(msg), msg[:42]))
return True
return False
def heartbeat(self):
if self.payload is not None and time.time() > self.expires:
if self.payload[2] == 'ping':
log.warning('server %s has failed to respond to %d ping requests' % (, self.errors))
if self.errors >= PING_FAIL_MAX:
self.alive = False
log.warning('server %s has expired request: %s [%s]' % (, ','.join(self.payload[:3]), self.payload[3][:42]))
self.alive = False
if self.alive:
if time.time() - self.lastPing > PING_INTERVAL:
log.error('removing %s from active server list' %
db.sadd('%s:inactive' % ID_PULSE_WORKER,
def ping(self, force=False):
if options.debug:
log.debug('ping %s' %
if force or self.isAvailable():
self.sequence += 1
self.payload = [, str(self.sequence), 'ping']
self.lastPing = time.time()
self.expires = self.lastPing + MSG_TIMEOUT
self.errors += 1
self.alive = False
log.warning('ping requested for offline service [%s]' %
def discoverServers(servers, db, events, router):
for serverID in db.lrange(ID_PULSE_WORKER, 0, -1):
if db.sismember('%s:inactive' % ID_PULSE_WORKER, serverID):
log.warning('server %s found in inactive list, disconnecting' % serverID)
if serverID in servers:
servers[serverID].alive = False
if serverID in servers:
if servers[serverID].alive == False:'server %s found, resetting' % serverID)
log.debug('server %s is new, adding to connect queue' % serverID)
servers[serverID] = zmqService(serverID, router, db, events)
def handleZMQ(options, events, db):
""" handleZMQ
Primary event loop for everything ZeroMQ related
All payloads to be sent onward arrive via the event queue.
Currently it is a very simple implementation of the Freelance
pattern with no server heartbeat checks.
The incoming events are structured as a list that always
begins with the event type.
Job: ('job', "{'payload': 'sample'}")
Heartbeat: ('ping',)
The structure of the message sent between nodes is:
[destination, sequence, control, payload]
all items are sent as strings.
servers = {}
lastDiscovery = time.time()
context = zmq.Context()
router = context.socket(zmq.ROUTER)
poller = zmq.Poller()
poller.register(router, zmq.POLLIN)
while True:
available = False
for serverID in servers:
if servers[serverID].isAvailable():
available = True
event = events.get(False)
except Empty:
event = None
if event is not None:
if available:
eventType = event[0]
if eventType == 'exit':'exit command received, terminating')
if eventType == 'ping':
if event[1] in servers and servers[event[1]].alive:
elif eventType == 'job':
handled = False
for serverID in servers:
if servers[serverID].isAvailable() and servers[serverID].request(event[1]):
handled = True
if not handled:
log.error('no active servers to handle request')
# TODO - push archived item to redis
log.warning('unknown event [%s]' % eventType)
log.error('no active servers to handle request')
items = dict(poller.poll(100))
if router in items:
reply = router.recv_multipart()
serverID = reply.pop(0)
for serverID in servers:
if servers[serverID].alive:
if time.time() > lastDiscovery:
discoverServers(servers, db, events, router)
lastDiscovery = time.time() + SERVER_CHECK_INTERVAL'done')
def pushJob(job):
s = json.dumps(job)
eventQueue.put(('job', s))
_defaultOptions = { 'config': ('-c', '--config', None, 'Configuration file'),
'debug': ('-d', '--debug', True, 'Enable Debug'),
'appinfo': ('-a', '--appinfo', appInfo, 'Mozilla Pulse app string'),
'background': ('-b', '--background', False, 'daemonize ourselves'),
'logpath': ('-l', '--logpath', None, 'Path where log file is to be written'),
'redis': ('-r', '--redis', 'localhost:6379', 'Redis connection string'),
'redisdb': ('', '--redisdb', '8', 'Redis database'),
'pulse': ('-p', '--pulse', None, 'Pulse connection string'),
'topic': ('-t', '--topic', '#', 'Mozilla Pulse Topic filter string'),
'testfile': ('', '--testfile', None, 'Offline testing, uses named file instead of Pulse server'),
if __name__ == '__main__':
options = initOptions(params=_defaultOptions)
initLogs(options)'Starting')'Connecting to datastore')
db = dbRedis(options)'Creating ZeroMQ handler')
Process(name='zmq', target=handleZMQ, args=(options, eventQueue, db)).start()
if options.testfile:
try:'Connecting to Mozilla Pulse with topic "%s"' % options.topic)
pulse = consumers.BuildConsumer(applabel=options.appinfo)
pulse.configure(topic=options.topic, callback=cbMessage)
log.debug('Starting pulse.listen()')
log.error('Pulse Exception', exc_info=True)
Jump to Line
Something went wrong with that request. Please try again.