Permalink
Browse files

Bug 795417 - Persistent job queues. r=bc

Phones also no longer reboot by default.  Added a reboot command,
and made device commands handle "all" instead of a device ID.
Cleaned up some old comments.
  • Loading branch information...
1 parent cc8be50 commit 63933d50f835c0b7c2f87d4de089ec94fc027b49 @markrcote markrcote committed Mar 21, 2013
Showing with 226 additions and 136 deletions.
  1. +1 −0 .gitignore
  2. +30 −43 autophone.py
  3. +80 −0 jobs.py
  4. +115 −93 worker.py
View
@@ -3,3 +3,4 @@
*.log
*~
configs/unittest_defaults.ini
+jobs.sqlite
View
@@ -24,6 +24,7 @@
import builds
import buildserver
+import jobs
import phonetest
from mailer import Mailer
@@ -65,7 +66,7 @@ def handle(self):
response = self.server.cmd_cb(line)
self.request.send(response + '\n')
- def __init__(self, clear_cache, reboot_phones, test_path, cachefile,
+ def __init__(self, clear_cache, test_path, cachefile,
ipaddr, port, logfile, loglevel, emailcfg, enable_pulse,
repos, buildtypes, build_cache_port):
self._test_path = test_path
@@ -85,6 +86,7 @@ def __init__(self, clear_cache, reboot_phones, test_path, cachefile,
self._stop = False
self._next_worker_num = 0
+ self.jobs = jobs.Jobs()
self.phone_workers = {} # indexed by mac address
self.worker_lock = threading.Lock()
self.cmd_lock = threading.Lock()
@@ -106,8 +108,9 @@ def __init__(self, clear_cache, reboot_phones, test_path, cachefile,
else:
# Otherwise assume cache is valid and read from it
self.read_cache()
- if reboot_phones:
- self.reset_phones()
+
+ if clear_cache:
+ self.jobs.clear_all()
self.server = None
self.server_thread = None
@@ -175,13 +178,6 @@ def check_for_dead_workers(self):
logging.info(traceback.format_exc())
def worker_msg_loop(self):
- # FIXME: look up worker by msg.phoneid and have worker process
- # message. worker, as part of the main process, can log status
- # and store it for later querying.
- # also, store first instance of current status (e.g. idle for 30
- # minutes, last update 1 minute ago). store test name and start time
- # if status is WORKING. All this will help us determine if and where
- # a phone/worker process is stuck.
try:
while not self._stop:
self.check_for_dead_workers()
@@ -197,14 +193,15 @@ def worker_msg_loop(self):
self.stop()
# Start the phones for testing
- def start_tests(self, build_url, devices=None):
+ def new_job(self, build_url, devices=None):
self.worker_lock.acquire()
for p in self.phone_workers.values():
- if devices and p.phone_cfg['phoneid'] not in devices:
+ phoneid = p.phone_cfg['phoneid']
+ if devices and phoneid not in devices:
continue
- logging.info('Notifying device %s of new build.' %
- p.phone_cfg['phoneid'])
- p.new_build(build_url)
+ self.jobs.new_job(build_url, phoneid)
+ logging.info('Notifying device %s of new job.' % phoneid)
+ p.new_job()
self.worker_lock.release()
def route_cmd(self, data):
@@ -247,28 +244,22 @@ def route_cmd(self, data):
response += ' previous state %s ago:\n %s\n' % (now - w.last_status_of_previous_type.timestamp, w.last_status_of_previous_type.short_desc())
response += 'ok'
elif (cmd == 'disable' or cmd == 'enable' or cmd == 'debug' or
- cmd == 'ping'):
+ cmd == 'ping' or cmd == 'reboot'):
# Commands that take a phone as a parameter
# FIXME: need start, stop, and remove
- # Note that disable means that the device will still be pinged
- # periodically. Do we need permanently disabled/stopped?
phoneid, space, params = params.partition(' ')
- worker = None
- for w in self.phone_workers.values():
- if (w.phone_cfg['serial'] == phoneid or
- w.phone_cfg['phoneid'] == phoneid):
- worker = w
- break
- if worker:
- f = getattr(worker, cmd)
- if params:
- f(params)
- else:
- f()
- response = 'ok'
- self.update_phone_cache()
- else:
- response = 'error: phone not found'
+ response = 'error: phone not found'
+ for worker in self.phone_workers.values():
+ if (phoneid.lower() == 'all' or
+ worker.phone_cfg['serial'] == phoneid or
+ worker.phone_cfg['phoneid'] == phoneid):
+ f = getattr(worker, cmd)
+ if params:
+ f(params)
+ else:
+ f()
+ response = 'ok'
+ self.update_phone_cache()
else:
response = 'Unknown command "%s"\n' % cmd
self.cmd_lock.release()
@@ -377,7 +368,7 @@ def trigger_jobs(self, data):
args = data.split(' ')
if not args:
return 'invalid args'
- self.start_tests(args[0], args[1:])
+ self.new_job(args[0], args[1:])
return 'ok'
def reset_phones(self):
@@ -395,7 +386,7 @@ def on_build(self, msg):
# those, and only run the ones with real URLs
# We create jobs for all the phones and push them into the queue
if 'buildurl' in msg:
- self.start_tests(msg['buildurl'])
+ self.new_job(msg['buildurl'])
def stop(self):
self._stop = True
@@ -405,7 +396,7 @@ def stop(self):
self.server_thread.join()
-def main(clear_cache, reboot_phones, test_path, cachefile, ipaddr, port,
+def main(clear_cache, test_path, cachefile, ipaddr, port,
logfile, loglevel_name, emailcfg, enable_pulse, enable_unittests,
cache_dir, override_build_dir, repos, buildtypes, build_cache_port):
@@ -459,7 +450,7 @@ def sigterm_handler(signum, frame):
print '%s Starting server on port %d.' % (
datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), port)
- autophone = AutoPhone(clear_cache, reboot_phones, test_path, cachefile,
+ autophone = AutoPhone(clear_cache, test_path, cachefile,
ipaddr, port, logfile, loglevel, emailcfg,
enable_pulse, repos, buildtypes, build_cache_port)
signal.signal(signal.SIGTERM, sigterm_handler)
@@ -480,10 +471,6 @@ def sigterm_handler(signum, frame):
default=False,
help='If specified, we clear the information in the '
'autophone cache before starting')
- parser.add_option('--no-reboot', action='store_false', dest='reboot_phones',
- default=True, help='Indicates that phones should not be '
- 'rebooted when autophone starts (ignored if '
- '--clear-cache is used')
parser.add_option('--ipaddr', action='store', type='string', dest='ipaddr',
default=None, help='IP address of interface to use for '
'phone callbacks, e.g. after rebooting. If not given, '
@@ -558,7 +545,7 @@ def sigterm_handler(signum, frame):
if not options.buildtypes:
options.buildtypes = ['opt']
- exit_code = main(options.clear_cache, options.reboot_phones,
+ exit_code = main(options.clear_cache,
options.test_path, options.cachefile, options.ipaddr,
options.port, options.logfile, options.loglevel,
options.emailcfg, options.enable_pulse,
View
80 jobs.py
@@ -0,0 +1,80 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this file,
+# You can obtain one at http://mozilla.org/MPL/2.0/.
+
+import datetime
+import os
+import sqlite3
+
+
+class Jobs(object):
+
+ MAX_ATTEMPTS = 3
+
+ def __init__(self, default_device=None):
+ self.default_device = default_device
+ self.filename = 'jobs.sqlite'
+ if not os.path.exists(self.filename):
+ conn = self._conn()
+ c = conn.cursor()
+ c.execute('create table jobs '
+ '(created text, last_attempt text, build_url text, '
+ 'attempts int, device text)')
+ conn.commit()
+
+ def _conn(self):
+ return sqlite3.connect(self.filename)
+
+ def clear_all(self):
+ conn = self._conn()
+ conn.cursor().execute('delete from jobs')
+ conn.commit()
+
+ def new_job(self, build_url, device=None):
+ if not device:
+ device = self.default_device
+ now = datetime.datetime.now().isoformat()
+ conn = self._conn()
+ conn.cursor().execute('insert into jobs values (?, ?, ?, 0, ?)',
+ (now, None, build_url, device))
+ conn.commit()
+
+ def jobs_pending(self, device=None):
+ if not device:
+ device = self.default_device
+ return self._conn().cursor().execute(
+ 'select count(ROWID) from jobs where device=?',
+ (device,)).fetchone()[0]
+
+ def get_next_job(self, device=None):
+ if not device:
+ device = self.default_device
+ conn = self._conn()
+ c = conn.cursor()
+ c.execute('delete from jobs where device=? and attempts>=?',
+ (device, self.MAX_ATTEMPTS))
+ conn.commit()
+ jobs = [{'id': job[0],
+ 'created': job[1],
+ 'last_attempt': job[2],
+ 'build_url': job[3],
+ 'attempts': job[4]}
+ for job in c.execute(
+ 'select ROWID as id,created,last_attempt,build_url,attempts'
+ ' from jobs where device=? order by created', (device,))]
+ if not jobs:
+ return None
+ next_job = jobs[0]
+ next_job['attempts'] += 1
+ next_job['last_attempt'] = datetime.datetime.now().isoformat()
+ c.execute('update jobs set attempts=?, last_attempt=? where ROWID=?',
+ (next_job['attempts'], next_job['last_attempt'],
+ next_job['id']))
+ conn.commit()
+ return next_job
+
+ def job_completed(self, job_id):
+ conn = self._conn()
+ c = conn.cursor()
+ c.execute('delete from jobs where ROWID=?', (job_id,))
+ conn.commit()
Oops, something went wrong.

0 comments on commit 63933d5

Please sign in to comment.