Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

first commit

  • Loading branch information...
commit 432300ddbb3cc3bc6a1fab238a584437a1f63878 0 parents
@jonallengriffin jonallengriffin authored
3  pulsetranslator/__init__.py
@@ -0,0 +1,3 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this file,
+# You can obtain one at http://mozilla.org/MPL/2.0/.
120 pulsetranslator/daemon.py
@@ -0,0 +1,120 @@
+import os
+import sys
+
+# File mode creation mask of the daemon.
+UMASK = 0
+
+# Default working directory for the daemon.
+WORKDIR = "/"
+
+# Default maximum for the number of available file descriptors.
+MAXFD = 1024
+
+def createDaemon(pidfile, logfile='/dev/null'):
+ """Detach a process from the controlling terminal and run it in the
+ background as a daemon. http://code.activestate.com/recipes/278731/
+ """
+
+ try:
+ # Fork a child process so the parent can exit. This returns control to
+ # the command-line or shell. It also guarantees that the child will not
+ # be a process group leader, since the child receives a new process ID
+ # and inherits the parent's process group ID. This step is required
+ # to insure that the next call to os.setsid is successful.
+ pid = os.fork()
+ except OSError, e:
+ raise Exception, "%s [%d]" % (e.strerror, e.errno)
+
+ if (pid == 0): # The first child.
+ # To become the session leader of this new session and the process group
+ # leader of the new process group, we call os.setsid(). The process is
+ # also guaranteed not to have a controlling terminal.
+ os.setsid()
+
+ # Is ignoring SIGHUP necessary?
+ #
+ # It's often suggested that the SIGHUP signal should be ignored before
+ # the second fork to avoid premature termination of the process. The
+ # reason is that when the first child terminates, all processes, e.g.
+ # the second child, in the orphaned group will be sent a SIGHUP.
+ #
+ # "However, as part of the session management system, there are exactly
+ # two cases where SIGHUP is sent on the death of a process:
+ #
+ # 1) When the process that dies is the session leader of a session that
+ # is attached to a terminal device, SIGHUP is sent to all processes
+ # in the foreground process group of that terminal device.
+ # 2) When the death of a process causes a process group to become
+ # orphaned, and one or more processes in the orphaned group are
+ # stopped, then SIGHUP and SIGCONT are sent to all members of the
+ # orphaned group." [2]
+ #
+ # The first case can be ignored since the child is guaranteed not to have
+ # a controlling terminal. The second case isn't so easy to dismiss.
+ # The process group is orphaned when the first child terminates and
+ # POSIX.1 requires that every STOPPED process in an orphaned process
+ # group be sent a SIGHUP signal followed by a SIGCONT signal. Since the
+ # second child is not STOPPED though, we can safely forego ignoring the
+ # SIGHUP signal. In any case, there are no ill-effects if it is ignored.
+ #
+ # import signal # Set handlers for asynchronous events.
+ # signal.signal(signal.SIGHUP, signal.SIG_IGN)
+
+ try:
+ # Fork a second child and exit immediately to prevent zombies. This
+ # causes the second child process to be orphaned, making the init
+ # process responsible for its cleanup. And, since the first child is
+ # a session leader without a controlling terminal, it's possible for
+ # it to acquire one by opening a terminal in the future (System V-
+ # based systems). This second fork guarantees that the child is no
+ # longer a session leader, preventing the daemon from ever acquiring
+ # a controlling terminal.
+ pid = os.fork() # Fork a second child.
+
+ if pidfile:
+ # update the pid in the pid logfile
+ fp = open(pidfile, 'w')
+ fp.write("%d\n" % pid)
+ fp.close()
+
+ except OSError, e:
+ raise Exception, "%s [%d]" % (e.strerror, e.errno)
+
+ if (pid == 0): # The second child.
+ # Since the current working directory may be a mounted filesystem, we
+ # avoid the issue of not being able to unmount the filesystem at
+ # shutdown time by changing it to the root directory.
+ os.chdir(WORKDIR)
+ # We probably don't want the file mode creation mask inherited from
+ # the parent, so we give the child complete control over permissions.
+ os.umask(UMASK)
+ else:
+ # exit() or _exit()? See below.
+ os._exit(0) # Exit parent (the first child) of the second child.
+ else:
+ # exit() or _exit()?
+ # _exit is like exit(), but it doesn't call any functions registered
+ # with atexit (and on_exit) or any registered signal handlers. It also
+ # closes any open file descriptors. Using exit() may cause all stdio
+ # streams to be flushed twice and any temporary files may be unexpectedly
+ # removed. It's therefore recommended that child branches of a fork()
+ # and the parent branch(es) of a daemon use _exit().
+ os._exit(0) # Exit parent of the first child.
+
+ # Redirect the standard I/O file descriptors to the specified file. Since
+ # the daemon has no controlling terminal, most daemons redirect stdin,
+ # stdout, and stderr to /dev/null. This is done to prevent side-effects
+ # from reads and writes to the standard I/O file descriptors.
+
+ si = open('/dev/null', 'r')
+ so = open(logfile, 'a+', 0)
+ se = open(logfile, 'a+', 0)
+ os.dup2(si.fileno(), sys.stdin.fileno())
+ os.dup2(so.fileno(), sys.stdout.fileno())
+ os.dup2(se.fileno(), sys.stderr.fileno())
+
+ # Set custom file descriptors so that they get proper buffering.
+ sys.stdout, sys.stderr = so, se
+
+ return(0)
+
162 pulsetranslator/loghandler.py
@@ -0,0 +1,162 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this file,
+# You can obtain one at http://mozilla.org/MPL/2.0/.
+
+import calendar
+import httplib
+import json
+import logging
+import logging.handlers
+import os
+import platform
+from Queue import Empty
+import subprocess
+import sys
+import time
+from urlparse import urlparse
+
+from translatorexceptions import *
+from translatorqueues import *
+
+DEBUG = False
+
+
+class LogHandler(object):
+
+ def __init__(self, queue, parent_pid, logdir):
+ self.queue = queue
+ self.parent_pid = parent_pid
+ self.logdir = logdir
+
+ def get_logger(self, name, filename):
+ filepath = os.path.join(self.logdir, filename)
+ if os.access(filepath, os.F_OK):
+ os.remove(filepath)
+ logger = logging.getLogger(name)
+ logger.setLevel(logging.DEBUG)
+ logger.addHandler(logging.handlers.RotatingFileHandler(filepath, maxBytes=300000, backupCount=2))
+ return logger
+
+ def get_url_info(self, url):
+ """Return a (code, content_length) tuple from making an
+ HTTP HEAD request for the given url.
+ """
+
+ try:
+ content_length = -1
+ p = urlparse(url)
+
+ conn = httplib.HTTPConnection(p[1])
+ conn.request('HEAD', p[2])
+ res = conn.getresponse()
+ code = res.status
+
+ if code == 200:
+ for header in res.getheaders():
+ if header[0] == 'content-length':
+ content_length = int(header[1])
+
+ return (code, content_length)
+
+ except AttributeError:
+ # this can happen when we didn't get a valid url from pulse
+ return (-1, -1)
+
+ except Exception, inst:
+ # XXX: something bad happened; we should log this
+ # return (-1, -1)
+ raise
+
+ def process_builddata(self, data):
+ if not data.get('logurl'):
+ # should log this
+ return
+
+ # If it's been less than 15s since we checked for this particular
+ # log, put this item back in the queue without checking again.
+ now = calendar.timegm(time.gmtime())
+ last_check = data.get('last_check', 0)
+ if last_check and now - last_check < 15:
+ self.queue.put(data)
+ return
+
+ code, content_length = self.get_url_info(str(data['logurl']))
+ if DEBUG:
+ if data.get('last_check'):
+ print '...reprocessing logfile', code, data.get('logurl')
+ print '...', data.get('key')
+ print '...', now - data.get('insertion_time', 0), 'seconds since insertion_time'
+ else:
+ print 'processing logfile', code, data.get('logurl')
+ if code == 200:
+ self.publish_unittest_message(data)
+ else:
+ if now - data.get('insertion_time', 0) > 600:
+ # Currently, this is raised for unittests from beta and aurora
+ # builds at least, as their log files get stored in a place
+ # entirely different than the builds. This should change soon
+ # per bug 713846, so I've not adapted the code to handle this.
+ raise LogTimeoutError(data.get('key', 'unknown'), data.get('logurl'))
+ else:
+ # re-insert this into the queue
+ data['last_check'] = now
+ if DEBUG:
+ print 'requeueing after check'
+ self.queue.put(data)
+
+ def publish_unittest_message(self, data):
+ # The original routing key has the format build.foo.bar.finished;
+ # we only use 'foo' in the new routing key.
+ original_key = data['key'].split('.')[1]
+ tree = data['tree']
+ platform = data['platform']
+ buildtype = data['buildtype']
+ os = data['os']
+ test = data['test']
+ product = data['product'] if data['product'] else 'unknown'
+ key_parts = ['talos' if data['talos'] else 'unittest',
+ tree,
+ platform,
+ os,
+ buildtype,
+ test,
+ product,
+ original_key]
+
+ publish_message(TranlsatorPublisher, data, '.'.join(key_parts))
+
+ def start(self):
+ self.errorLogger = self.get_logger('LogHandlerErrorLog', 'log_handler_error.log')
+ while True:
+ try:
+ data = None
+
+ # Check if the parent process is still alive, and if so,
+ # look for another log to process.
+ if 'windows' in platform.system().lower():
+ proc = subprocess.Popen(['tasklist', '-FI', 'PID eq %d' % self.parent_pid],
+ stderr=subprocess.STDOUT,
+ stdout=subprocess.PIPE)
+ if not proc.wait():
+ result = proc.stdout.read()
+ if not str(self.parent_pid) in result:
+ raise OSError
+ else:
+ raise Exception("Unable to call tasklist")
+ else:
+ os.kill(self.parent_pid, 0)
+ data = self.queue.get_nowait()
+ self.process_builddata(data)
+ except Empty:
+ time.sleep(5)
+ continue
+ except OSError:
+ # if the parent process isn't alive, shutdown gracefully
+ # XXX: Need to drain the queue to a file before shutting
+ # down, so we can pick up where we left off when we resume.
+ sys.exit(0)
+ except Exception, inst:
+ obj_to_log = data
+ if data.get('payload') and data['payload'].get('build') and data['payload']['build'].get('properties'):
+ obj_to_log = data['payload']['build']['properties']
+ self.errorLogger.exception(json.dumps(obj_to_log, indent=2))
69 pulsetranslator/messageparams.py
@@ -0,0 +1,69 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this file,
+# You can obtain one at http://mozilla.org/MPL/2.0/.
+
+import re
+
+
+buildtypes = [ 'opt', 'debug', 'pgo' ]
+
+def guess_platform(builder):
+ for platform in sorted(platforms.keys(), reverse=True):
+ if platform in builder:
+ return platform
+
+ for key in platforms:
+ for os in platforms[key]:
+ if os in builder:
+ return os
+
+def convert_os(data):
+ if re.search(r'OS\s*X\s*10.5', data['buildername'], re.I):
+ return 'leopard'
+ if re.search(r'OS\s*X\s*10.6', data['buildername'], re.I):
+ return 'snowleopard'
+ if re.search(r'OS\s*X\s*10.7', data['buildername'], re.I):
+ return 'lion'
+ if re.search(r'WINNT\s*5.2', data['buildername'], re.I):
+ return 'xp'
+ return 'unknown'
+
+os_conversions = {
+ 'leopard-o': lambda x: 'leopard',
+ 'tegra_android-o': lambda x: 'tegra_android',
+ 'macosx': convert_os,
+ 'macosx64': convert_os,
+ 'win32': convert_os,
+}
+
+platforms = {
+ 'linux64-rpm': ['fedora64'],
+ 'linux64': ['fedora64'],
+ 'linuxqt': ['fedora'],
+ 'linux-rpm': ['fedora'],
+ 'linux': ['fedora', 'linux'],
+ 'win32': ['xp', 'win7'],
+ 'win64': ['w764'],
+ 'macosx64': ['macosx64', 'snowleopard', 'leopard', 'lion'],
+ 'macosx': ['macosx', 'leopard'],
+ 'android-xul': ['tegra_android-xul'],
+ 'android': ['tegra_android'],
+}
+
+tags = [
+ '',
+ 'build',
+ 'dep',
+ 'dtrace',
+ 'l10n',
+ 'nightly',
+ 'nomethodjit',
+ 'notracejit',
+ 'release',
+ 'shark',
+ 'spidermonkey',
+ 'valgrind',
+ 'warnaserr',
+ 'warnaserrdebug',
+ 'xulrunner',
+ ]
296 pulsetranslator/pulsetranslator.py
@@ -0,0 +1,296 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this file,
+# You can obtain one at http://mozilla.org/MPL/2.0/.
+
+import calendar
+import datetime
+from dateutil.parser import parse
+import json
+import logging
+import logging.handlers
+from mozillapulse import consumers
+from multiprocessing import Process, Queue
+import os
+import re
+import socket
+import time
+
+from translatorexceptions import *
+from loghandler import LogHandler
+import messageparams
+from translatorqueues import *
+
+
+class PulseBuildbotTranslator(object):
+
+ def __init__(self, logdir='logs'):
+ self.label = 'pulse-build-translator-%s' % socket.gethostname()
+ self.pulse = consumers.BuildConsumer(applabel=self.label)
+ self.pulse.configure(topic='#.finished',
+ callback=self.on_pulse_message,
+ durable=False)
+ self.queue = Queue()
+ self.logdir = logdir
+
+ if not os.access(self.logdir, os.F_OK):
+ os.mkdir(self.logdir)
+
+ self.badPulseMessageLogger = self.get_logger('BadPulseMessage', 'bad_pulse_message.log')
+ self.errorLogger = self.get_logger('ErrorLog', 'error.log')
+
+ def get_logger(self, name, filename):
+ filepath = os.path.join(self.logdir, filename)
+ if os.access(filepath, os.F_OK):
+ os.remove(filepath)
+ logger = logging.getLogger(name)
+ logger.setLevel(logging.DEBUG)
+ logger.addHandler(logging.handlers.RotatingFileHandler(filepath, maxBytes=300000, backupCount=2))
+ return logger
+
+ def start(self):
+ loghandler = LogHandler(self.queue, os.getpid(), self.logdir)
+ self.logprocess = Process(target=loghandler.start)
+ self.logprocess.start()
+ self.pulse.listen()
+
+ def buildid2date(self, string):
+ """Takes a buildid string and returns a python datetime and
+ seconds since epoch.
+ """
+
+ date = parse(string)
+ return (date, int(time.mktime(date.timetuple())))
+
+ def process_unittest(self, data):
+ data['insertion_time'] = calendar.timegm(time.gmtime())
+ if data['platform'] not in messageparams.platforms:
+ raise BadPlatformError(data['key'], data['platform'])
+ elif data['os'] not in messageparams.platforms[data['platform']]:
+ raise BadOSError(data['key'], data['platform'], data['os'], data['buildername'])
+ else:
+ self.queue.put(data)
+
+ def process_build(self, data):
+ if data['platform'] not in messageparams.platforms:
+ raise BadPlatformError(data['key'], data['platform'])
+ for tag in data['tags']:
+ if tag not in messageparams.tags:
+ raise BadTagError(data['key'], tag, data['platform'], data['product'])
+ if not data['buildurl']:
+ raise NoBuildUrlError(data['key'])
+ self.publish_build_message(data)
+
+ def publish_build_message(self, data):
+ # The original routing key has the format build.foo.bar.finished;
+ # we only use 'foo' in the new routing key.
+ original_key = data['key'].split('.')[1]
+ tree = data['tree']
+ platform = data['platform']
+ buildtype = data['buildtype']
+ key_parts = ['build', tree, platform, buildtype]
+ for tag in data['tags']:
+ if tag:
+ key_parts.append(tag)
+ key_parts.append(original_key)
+
+ publish_message(TranlsatorPublisher, data, '.'.join(key_parts))
+
+ def on_pulse_message(self, data, message):
+ key = 'unknown'
+ stage_platform = None
+
+ try:
+ key = data['_meta']['routing_key']
+
+ # Acknowledge the message so it doesn't hang around on the
+ # pulse server.
+ message.ack()
+
+ # Create a dict that holds build properties that apply to both
+ # unittests and builds.
+ builddata = { 'key': key,
+ 'buildid': None,
+ 'platform': None,
+ 'builddate': None,
+ 'buildurl': None,
+ 'logurl': None,
+ 'testsurl': None,
+ 'release': None,
+ 'buildername': None,
+ 'revision': None,
+ 'product': None,
+ 'tree': None,
+ 'timestamp': datetime.datetime.now().strftime('%Y%m%d%H%M%S'),
+ }
+
+ # scan the payload for properties applicable to both tests and builds
+ for property in data['payload']['build']['properties']:
+
+ # look for revision
+ if property[0] == 'revision':
+ builddata['revision'] = property[1]
+
+ # look for product
+ if property[0] == 'product':
+ builddata['product'] = property[1]
+
+ # look for tree
+ if property[0] == 'branch':
+ builddata['tree'] = property[1]
+ # For builds, this proeprty is sometimes a relative path,
+ # ('releases/mozilla-beta') and not just a name. For
+ # consistency, we'll strip the path components.
+ if isinstance(builddata['tree'], basestring):
+ builddata['tree'] = os.path.basename(builddata['tree'])
+
+ # look for buildid
+ if property[0] == 'buildid':
+ builddata['buildid'] = property[1]
+ date, builddata['builddate'] = self.buildid2date(property[1])
+
+ # look for platform
+ elif property[0] == 'platform':
+ builddata['platform'] = property[1]
+ if '-debug' in builddata['platform']:
+ # strip '-debug' from the platform string if it's present
+ builddata['platform'] = builddata['platform'][0:builddata['platform'].find('-debug')]
+
+ # look for build url
+ elif property[0] in ['packageUrl', 'build_url', 'fileURL']:
+ builddata['buildurl'] = property[1]
+
+ # look for release name
+ elif property[0] in ['en_revision', 'script_repo_revision']:
+ builddata['release'] = property[1]
+
+ # look for tests url
+ elif property[0] == 'testsUrl':
+ builddata['testsurl'] = property[1]
+
+ # look for buildername
+ elif property[0] == 'buildername':
+ builddata['buildername'] = property[1]
+
+ # look for stage_platform
+ elif property[0] == 'stage_platform':
+ # For some messages, the platform we really care about
+ # is in the 'stage_platform' property, not the 'platform'
+ # property.
+ stage_platform = property[1]
+ for type in messageparams.buildtypes:
+ if type in stage_platform:
+ stage_platform = stage_platform[0:stage_platform.find(type) - 1]
+
+ if not builddata['tree']:
+ raise BadPulseMessageError(key, "no 'branch' property")
+
+ builddata['buildtype'] = 'opt'
+ if 'debug' in key:
+ builddata['buildtype'] = 'debug'
+ elif 'pgo' in key:
+ builddata['buildtype'] = 'pgo'
+
+ # see if this message is for a unittest
+ unittestRe = re.compile(r'build\.((%s)[-|_](.*?)(-debug|-o-debug|-pgo|_pgo|_test)?[-|_](test|unittest|pgo)-(.*?))\.(\d+)\.' %
+ builddata['tree'])
+ match = unittestRe.match(key)
+ if match:
+ # for unittests, generate some metadata by parsing the key
+
+ # The 'short_builder' string is quite arbitrary, and so this
+ # code is expected to be fragile, and will likely need
+ # frequent maintenance to deal with future changes to this
+ # string. Unfortunately, these items are not available
+ # in a more straightforward fashion at present.
+ short_builder = match.groups()[0]
+
+ builddata['os'] = match.groups()[2]
+ if builddata['os'] in messageparams.os_conversions:
+ builddata['os'] = messageparams.os_conversions[builddata['os']](builddata)
+
+ builddata['test'] = match.groups()[5]
+
+ # yuck!!
+ if builddata['test'].endswith('_2'):
+ short_builder = "%s.2" % short_builder[0:-2]
+ elif builddata['test'].endswith('_2-pgo'):
+ short_builder = "%s.2-pgo" % short_builder[0:-6]
+
+ builddata['buildnumber'] = match.groups()[6]
+ builddata['talos'] = 'talos' in builddata['buildername']
+
+ if stage_platform:
+ builddata['platform'] = stage_platform
+
+ if builddata['buildurl']:
+ builddata['logurl'] = '%s/%s-build%s.txt.gz' % \
+ (os.path.dirname(builddata['buildurl']),
+ short_builder, builddata['buildnumber'])
+
+ self.process_unittest(builddata)
+ elif 'source' in key:
+ # what is this?
+ # ex: build.release-mozilla-esr10-firefox_source.0.finished
+ pass
+ elif 'repack' in key:
+ # what is this?
+ # ex: build.release-mozilla-beta-linux_repack_1.45.finished
+ pass
+ elif [x for x in ['schedulers', 'tag', 'submitter', 'final_verification'] if x in key]:
+ # internal buildbot stuff we don't care about
+ # ex: build.release-mozilla-beta-firefox_reset_schedulers.12.finished
+ # ex: build.release-mozilla-beta-fennec_tag.40.finished
+ # ex: build.release-mozilla-beta-bouncer_submitter.46.finished
+ pass
+ elif 'jetpack' in key:
+ # These are very awkwardly formed; i.e.
+ # build.jetpack-mozilla-central-win7-debug.18.finished,
+ # and the tree appears nowhere except this string. In order
+ # to support these we'd have to keep a tree map of all
+ # possible trees.
+ pass
+ else:
+ if not builddata['platform']:
+ if stage_platform:
+ builddata['platform'] = stage_platform
+ else:
+ # Some messages don't contain the platform
+ # in any place other than the routing key, so we'll
+ # have to guess it based on that.
+ builddata['platform'] = messageparams.guess_platform(key)
+ if not builddata['platform']:
+ raise BadPulseMessageError(key, 'no "platform" property')
+ otherRe = re.compile(r'build\.((release-|jetpack-)?(%s)[-|_](xulrunner[-|_])?(%s)([-|_]?)(.*?))\.(\d+)\.' %
+ (builddata['tree'], builddata['platform']))
+ match = otherRe.match(key)
+ if match:
+ builddata['tags'] = match.group(7).replace('_', '-').split('-')
+
+ # There are some tags we don't care about as tags,
+ # usually because they are redundant with other properties,
+ # so remove them.
+ notags = ['debug', 'pgo', 'opt']
+ builddata['tags'] = [x for x in builddata['tags'] if x not in notags]
+
+ # Sometimes a tag will just be a digit, i.e.,
+ # build.mozilla-central-android-l10n_5.12.finished;
+ # strip these.
+ builddata['tags'] = [x for x in builddata['tags'] if not x.isdigit()]
+
+ if isinstance(match.group(2), basestring):
+ if 'release' in match.group(2):
+ builddata['tags'].append('release')
+ if 'jetpack' in match.group(2):
+ builddata['tags'].append('jetpack')
+
+ if match.group(4) or 'xulrunner' in builddata['tags']:
+ builddata['product'] = 'xulrunner'
+
+ self.process_build(builddata)
+ else:
+ raise BadPulseMessageError(key, "unknown message type")
+
+ except BadPulseMessageError, inst:
+ self.badPulseMessageLogger.exception(json.dumps(data, indent=2))
+ except Exception, inst:
+ self.errorLogger.exception(json.dumps(data, indent=2))
38 pulsetranslator/runservice.py
@@ -0,0 +1,38 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this file,
+# You can obtain one at http://mozilla.org/MPL/2.0/.
+
+from daemon import createDaemon
+import optparse
+import os
+from pulsetranslator import PulseBuildbotTranslator
+
+def main():
+ parser = optparse.OptionParser()
+ parser.add_option('--pidfile', dest='pidfile',
+ default='translator.pid',
+ help='path to file for logging pid')
+ parser.add_option('--logfile', dest='logfile',
+ default='stdout.log',
+ help='path to file for stdout logging')
+ parser.add_option('--logdir', dest='logdir',
+ default='logs',
+ help='directory to store other log files')
+ parser.add_option('--daemon', dest='daemon', action='store_true',
+ help='run as daemon (posix only)')
+ options, args = parser.parse_args()
+
+ if options.daemon:
+ if os.access(options.logfile, os.F_OK):
+ os.remove(options.logfile)
+ createDaemon(options.pidfile, options.logfile)
+
+ f = open(options.pidfile, 'w')
+ f.write("%d\n" % os.getpid())
+ f.close()
+
+ service = PulseBuildbotTranslator(logdir=options.logdir)
+ service.start()
+
+if __name__ == "__main__":
+ main()
23 pulsetranslator/testconsumer.py
@@ -0,0 +1,23 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this file,
+# You can obtain one at http://mozilla.org/MPL/2.0/.
+
+
+from translatorqueues import TranslatorConsumer
+
+def on_pulse_message(data, message):
+ key = data['_meta']['routing_key']
+ if key.startswith('build'):
+ print '---------- build message received', key
+ import json
+ print json.dumps(data, indent=2)
+ else:
+ print '========== test message received', key
+
+if __name__ == "__main__":
+ pulse = TranslatorConsumer(applabel='translator_test_consumer')
+ pulse.configure(topic=['build.#', 'unittest.#', 'talos.#'],
+ callback=on_pulse_message,
+ durable=False)
+ pulse.listen()
+
62 pulsetranslator/translatorexceptions.py
@@ -0,0 +1,62 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this file,
+# You can obtain one at http://mozilla.org/MPL/2.0/.
+
+class BadPulseMessageError(Exception):
+
+ def __init__(self, key, error):
+ self.key = key
+ self.error = error
+
+ def __str__(self):
+ return "%s, key: %s" % (self.error, self.key)
+
+class NoBuildUrlError(BadPulseMessageError):
+
+ def __init__(self, key):
+ self.key = key
+
+ def __str__(self):
+ return self.key
+
+class BadTagError(BadPulseMessageError):
+
+ def __init__(self, key, tag, platform, product):
+ self.key = key
+ self.tag = tag
+ self.platform = platform
+ self.product = product
+
+ def __str__(self):
+ return ("%s, tag: %s, platform: %s, product: %s" %
+ (self.key, self.tag, self.platform, self.product))
+
+class BadPlatformError(BadPulseMessageError):
+
+ def __init__(self, key, platform):
+ self.key = key
+ self.platform = platform
+
+ def __str__(self):
+ return "%s, platform: %s" % (self.key, self.platform)
+
+class BadOSError(BadPlatformError):
+
+ def __init__(self, key, platform, os, buildername):
+ self.key = key
+ self.platform = platform
+ self.os = os
+ self.buildername = buildername
+
+ def __str__(self):
+ return ("%s, platform: %s, os: %s, builder: %s" %
+ (self.key, self.platform, self.os, self.buildername))
+
+class LogTimeoutError(Exception):
+
+ def __init__(self, key, logurl):
+ self.key = key
+ self.logurl = logurl
+
+ def __str__(self):
+ return "key: %s, url: %s" % (self.key, self.logurl)
31 pulsetranslator/translatorqueues.py
@@ -0,0 +1,31 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this file,
+# You can obtain one at http://mozilla.org/MPL/2.0/.
+
+from mozillapulse.config import PulseConfiguration
+from mozillapulse.consumers import GenericConsumer
+from mozillapulse.messages.base import GenericMessage
+from mozillapulse.publishers import GenericPublisher
+
+
+def publish_message(publisherClass, data, routing_key):
+ publisher = publisherClass()
+ msg = GenericMessage()
+ msg.routing_parts = routing_key.split('.')
+ assert(isinstance(data, dict))
+ for key, value in data.iteritems():
+ msg.set_data(key, value)
+ publisher.publish(msg)
+
+class TranlsatorPublisher(GenericPublisher):
+ def __init__(self, **kwargs):
+ GenericPublisher.__init__(self,
+ PulseConfiguration(**kwargs),
+ 'org.mozilla.exchange.build.normalized')
+
+class TranslatorConsumer(GenericConsumer):
+ def __init__(self, **kwargs):
+ GenericConsumer.__init__(self,
+ PulseConfiguration(**kwargs),
+ 'org.mozilla.exchange.build.normalized',
+ **kwargs)
39 setup.py
@@ -0,0 +1,39 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this file,
+# You can obtain one at http://mozilla.org/MPL/2.0/.
+
+import sys
+from setuptools import setup, find_packages
+
+PACKAGE_NAME = 'pulsetranslator'
+version = '0.1'
+
+deps = ['mozillapulse']
+
+python_version = sys.version_info[:2]
+if python_version < (2,6) or python_version >= (3,0):
+ print >>sys.stderr, '%s requires Python >= 2.6 and < 3.0' % PACKAGE_NAME
+ sys.exit(1)
+
+setup(name=PACKAGE_NAME,
+ version=version,
+ description=("Service for translating rawa buildbot pulse messages into "
+ "a standard format"),
+ long_description="""\
+""",
+ classifiers=[], # Get strings from http://pypi.python.org/pypi?%3Aaction=list_classifiers
+ keywords='',
+ author='Jonathan Griffin',
+ author_email='jgriffin@mozilla.com',
+ url='http://hg.mozilla.org/users/jgriffin_mozilla.com/pulsetranslator',
+ license='MPL',
+ packages=find_packages(exclude=['ez_setup', 'examples', 'tests']),
+ include_package_data=True,
+ zip_safe=False,
+ install_requires=deps,
+ entry_points="""
+ # -*- Entry points: -*-
+ [console_scripts]
+ runtranslator = pulsetranslator.runservice:main
+ """,
+ )
Please sign in to comment.
Something went wrong with that request. Please try again.