Skip to content
This repository has been archived by the owner on Aug 14, 2020. It is now read-only.

Commit

Permalink
Bug 790380 - Handle worker crashes and misc. other improvements and f…
Browse files Browse the repository at this point in the history
…ixes. r=bc
  • Loading branch information
Mark Côté committed Sep 24, 2012
1 parent d1e02c5 commit 524b0db
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 47 deletions.
67 changes: 56 additions & 11 deletions autophone.py
Expand Up @@ -42,7 +42,7 @@

class AutoPhone(object):

class CmdTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
class CmdTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):

allow_reuse_address = True
daemon_threads = True
Expand Down Expand Up @@ -95,12 +95,13 @@ def __init__(self, clear_cache, reboot_phones, test_path, cachefile,
self.build_cache = builds.BuildCache(override_build_dir=override_build_dir,
enable_unittests=enable_unittests)
self._stop = False
self._next_worker_num = 0
self.phone_workers = {} # indexed by mac address
self.worker_lock = threading.Lock()
self.cmd_lock = threading.Lock()
self._tests = []
logging.info('Starting autophone.')

# queue for listening to status updates from tests
self.worker_msg_queue = multiprocessing.Queue()

Expand Down Expand Up @@ -133,6 +134,12 @@ def __init__(self, clear_cache, reboot_phones, test_path, cachefile,

self.enable_unittests = enable_unittests

@property
def next_worker_num(self):
n = self._next_worker_num
self._next_worker_num += 1
return n

def run(self):
self.server = self.CmdTCPServer(('0.0.0.0', self.port),
self.CmdTCPHandler)
Expand All @@ -141,6 +148,27 @@ def run(self):
self.server_thread.start()
self.worker_msg_loop()

def check_for_dead_workers(self):
phoneids = self.phone_workers.keys()
for phoneid in phoneids:
if not self.phone_workers[phoneid].is_alive():
print 'Worker %s died!' % phoneid
logging.error('Worker %s died!' % phoneid)
worker = self.phone_workers[phoneid]
worker.stop()
worker.start(disabled=True)
logging.info('Sending notification...')
try:
self.mailer.send('Worker for phone %s died' %
worker.phone_cfg['phoneid'],
'''Hello, this is Autophone. The worker process for phone %s died and
has been disabled. Sorry about that.
''' % worker.phone_cfg['phoneid'])
logging.info('Sent.')
except socket.error:
logging.error('Failed to send dead-phone notification.')
logging.info(traceback.format_exc())

def worker_msg_loop(self):
# FIXME: look up worker by msg.phoneid and have worker process
# message. worker, as part of the main process, can log status
Expand All @@ -151,6 +179,7 @@ def worker_msg_loop(self):
# a phone/worker process is stuck.
try:
while not self._stop:
self.check_for_dead_workers()
try:
msg = self.worker_msg_queue.get(timeout=5)
except Queue.Empty:
Expand Down Expand Up @@ -192,6 +221,12 @@ def start_tests(self, job):
self.worker_lock.release()

def route_cmd(self, data):
# There is not currently any way to get proper responses for commands
# that interact with workers, since communication between the main
# process and the worker processes is asynchronous.
# It would be possible but nontrivial for the workers to put responses
# onto a queue and have them routed to the proper connection by using
# request IDs or something like that.
self.cmd_lock.acquire()
data = data.strip()
cmd, space, params = data.partition(' ')
Expand All @@ -211,6 +246,7 @@ def route_cmd(self, data):
now = datetime.datetime.now().replace(microsecond=0)
for i, w in self.phone_workers.iteritems():
response += 'phone %s (%s):\n' % (i, w.phone_cfg['ip'])
response += ' debug level %d\n' % w.phone_cfg.get('debug', 3)
if not w.last_status_msg:
response += ' no updates\n'
else:
Expand All @@ -223,16 +259,24 @@ def route_cmd(self, data):
if w.last_status_of_previous_type:
response += ' previous state %s ago:\n %s\n' % (now - w.last_status_of_previous_type.timestamp, w.last_status_of_previous_type.short_desc())
response += 'ok'
elif cmd == 'disable' or cmd == 'reenable':
serial = params.strip()
elif (cmd == 'disable' or cmd == 'enable' or cmd == 'debug' or
cmd == 'ping'):
# Commands that take a phone as a parameter
phoneid, space, params = params.partition(' ')
worker = None
for w in self.phone_workers.values():
if w.phone_cfg['serial'] == serial:
if (w.phone_cfg['serial'] == phoneid or
w.phone_cfg['phoneid'] == phoneid):
worker = w
break
if worker:
getattr(worker, cmd)()
f = getattr(worker, cmd)
if params:
f(params)
else:
f()
response = 'ok'
self.update_phone_cache()
else:
response = 'error: phone not found'
else:
Expand All @@ -245,7 +289,7 @@ def register_phone(self, phone_cfg):
x in self._tests]

logfile_prefix = os.path.splitext(self.logfile)[0]
worker = PhoneWorker(len(self.phone_workers.keys()), self.ipaddr,
worker = PhoneWorker(self.next_worker_num, self.ipaddr,
tests, phone_cfg, self.worker_msg_queue,
'%s-%s' % (logfile_prefix, phone_cfg['phoneid']),
self.loglevel, self.mailer)
Expand All @@ -270,7 +314,8 @@ def register_cmd(self, data):
ip=data['ipaddr'][0],
sutcmdport=int(data['cmdport'][0]),
machinetype=data['hardware'][0],
osver=data['os'][0])
osver=data['os'][0],
debug=3)
self.register_phone(phone_cfg)
self.update_phone_cache()
else:
Expand Down Expand Up @@ -369,7 +414,7 @@ def build_job(self, cache_build_dir):
# we should have already tried to redownload bad zips, so treat
# this as fatal.
logging.error('%s is a bad apk; aborting job.' % build_path)
shutil.rmtree(tmpdir)
shutil.rmtree(tmpdir)
return None
cfg = ConfigParser.RawConfigParser()
cfg.read(os.path.join(tmpdir, 'application.ini'))
Expand All @@ -393,7 +438,7 @@ def build_job(self, cache_build_dir):
'androidprocname': procname,
'version': ver,
'bldtype': 'opt' }
shutil.rmtree(tmpdir)
shutil.rmtree(tmpdir)
return job

def stop(self):
Expand Down Expand Up @@ -434,7 +479,7 @@ def sigterm_handler(signum, frame):
enable_pulse, enable_unittests,
override_build_dir)
except builds.BuildCacheException, e:
print '''%s
print '''%s
When specifying --override-build-dir, the directory must already exist
and contain a build.apk package file to be tested.
Expand Down
6 changes: 5 additions & 1 deletion phonetest.py
Expand Up @@ -95,6 +95,10 @@ def profile_path(self):
def runjob(self, job):
raise NotImplementedError

def set_dm_debug(self, level):
self.phone_cfg['debug'] = level
if self._dm:
self._dm.debug = level

"""
sets the status
Expand All @@ -111,7 +115,7 @@ def set_status(self, status=PhoneTestMessage.WORKING, msg=None):
def install_profile(self, profile=None):
if not profile:
profile = FirefoxProfile()

self.dm.removeDir(self.profile_path)
self.dm.mkDir(self.profile_path)
self.dm.pushDir(profile.profile, self.profile_path)
Expand Down
6 changes: 3 additions & 3 deletions sendemail.py
Expand Up @@ -25,16 +25,16 @@ def _get_socket_fixed(self, host, port, timeout):
smtplib.SMTP_SSL._get_socket = _get_socket_fixed


def sendemail(from_addr=None, to_addrs=None, subject='No Subject',
def sendemail(from_addr=None, to_addrs=None, subject='No Subject',
text_data=None, html_data=None,
server='mail.mozilla.com', port=465,
username=None, password=None, use_ssl=True):
"""Sends an email.
from_addr is an email address; to_addrs is a list of email adresses.
Addresses can be plain (e.g. "jsmith@example.com") or with real names
(e.g. "John Smith <jsmith@example.com>").
text_data and html_data are both strings. You can specify one or both.
If you specify both, the email will be sent as a MIME multipart
alternative, i.e., the recipient will see the HTML content if his
Expand Down
2 changes: 1 addition & 1 deletion smoketest.sh
Expand Up @@ -20,7 +20,7 @@ cleanup() {
}

trap cleanup INT TERM EXIT
cache=`mktemp`
cache=$(mktemp tmp.XXXXXXXXXX)
rm -f smoketest_pass smoketest_fail
echo Launching autophone...
python autophone.py --cache $cache -t tests/smoketest_manifest.ini &
Expand Down
16 changes: 8 additions & 8 deletions tests/s1s2test.py
Expand Up @@ -108,14 +108,14 @@ def prepare_phone(self, job):
self.dm.mkDir('/mnt/sdcard/s1test')

testroot = '/mnt/sdcard/s1test'

if not os.path.exists(self.config_file):
self.logger.error('Cannot find config file: %s' % self.config_file)
raise NameError('Cannot find config file: %s' % self.config_file)

cfg = ConfigParser.RawConfigParser()
cfg.read(self.config_file)

# Map URLS - {urlname: url} - urlname serves as testname
self._urls = {}
for u in cfg.items('urls'):
Expand All @@ -133,10 +133,10 @@ def prepare_phone(self, job):
else:
self.dm.pushFile(h[1], posixpath.join(testroot,
os.path.basename(h[1])))

self._iterations = cfg.getint('settings', 'iterations')
self._resulturl = cfg.get('settings', 'resulturl')

def analyze_logcat(self, job):
buf = [x.strip('\r\n') for x in self.dm.getLogcat()]
throbberstartRE = re.compile('.*Throbber start$')
Expand All @@ -162,7 +162,7 @@ def publish_results(self, starttime=0, tstrt=0, tstop=0, drawing=0, job=None, te
msg = 'Start Time: %s Throbber Start: %s Throbber Stop: %s EndDraw: %s' % (starttime, tstrt, tstop, drawing)
print 'RESULTS %s %s:%s' % (self.phone_cfg['phoneid'], datetime.datetime.fromtimestamp(int(job['blddate'])), msg)
self.logger.info('RESULTS: %s:%s' % (self.phone_cfg['phoneid'], msg))

# Create JSON to send to webserver
resultdata = {}
resultdata['phoneid'] = self.phone_cfg['phoneid']
Expand All @@ -172,14 +172,14 @@ def publish_results(self, starttime=0, tstrt=0, tstop=0, drawing=0, job=None, te
resultdata['throbberstop'] = tstop
resultdata['enddrawing'] = drawing
resultdata['blddate'] = job['blddate']

resultdata['revision'] = job['revision']
resultdata['productname'] = job['androidprocname']
resultdata['productversion'] = job['version']
resultdata['osver'] = self.phone_cfg['osver']
resultdata['bldtype'] = job['bldtype']
resultdata['machineid'] = self.phone_cfg['machinetype']

# Upload
result = json.dumps({'data': resultdata})
req = urllib2.Request(self._resulturl, result,
Expand Down

0 comments on commit 524b0db

Please sign in to comment.