Skip to content
This repository has been archived by the owner on Aug 14, 2020. It is now read-only.

Commit

Permalink
Bug 1353509 - Autophone - completely remove the shared lock to elimin…
Browse files Browse the repository at this point in the history
…ate hangs, r=jmaher.

The shared lock's sole purpose is to serialize access to the jobs database across the
different processes and threads. The inherent locking available in sqlite3 should be
sufficient for this purpose and eliminating the shared lock will reduce the possibility
of a deadlock.
  • Loading branch information
bclary committed May 24, 2017
1 parent 19d1f2b commit 0510845
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 79 deletions.
18 changes: 4 additions & 14 deletions autophone.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ def __init__(self, loglevel, options):
allow_duplicates=options.allow_duplicate_jobs)
self.phone_workers = {} # indexed by phone id
self.lock = threading.RLock()
self.shared_lock = multiprocessing.Lock()
self._tests = []
self._devices = {} # dict indexed by device names found in devices ini file
self.server = None
Expand All @@ -121,8 +120,7 @@ def __init__(self, loglevel, options):
self.treeherder = AutophoneTreeherder(None,
self.options,
self.jobs,
mailer=self.mailer,
shared_lock=self.shared_lock)
mailer=self.mailer)
self.treeherder_thread = None
self.logging_server = LogRecordServer(autophone=self)
self.logging_server_thread = None
Expand All @@ -144,10 +142,6 @@ def __init__(self, loglevel, options):
for worker in self.phone_workers.values():
worker.start()

# We must wait to start the pulse monitor until after the
# workers have started in order to make certain that the
# shared_lock is passed to the worker subprocesses in an
# unlocked state.
if options.enable_pulse:
self.pulse_monitor = AutophonePulseMonitor(
userid=options.pulse_user,
Expand All @@ -160,7 +154,6 @@ def __init__(self, loglevel, options):
platforms=options.platforms,
buildtypes=options.buildtypes,
durable_queues=self.options.pulse_durable_queue,
shared_lock=self.shared_lock,
verbose=options.verbose)
self.pulse_monitor.start()

Expand Down Expand Up @@ -696,8 +689,7 @@ def create_worker(self, phone):
self.options,
self.queue,
self.loglevel,
self.mailer,
self.shared_lock)
self.mailer)
self.phone_workers[phone.id] = worker
return worker

Expand Down Expand Up @@ -965,7 +957,6 @@ def on_build(self, build_data):
if self.pulse_monitor._stopping.is_set():
LOGGER.debug('on_build: shutting down: ignoring build')
return
self.lock_acquire()
try:
if self.state != ProcessStates.RUNNING:
return
Expand Down Expand Up @@ -1021,13 +1012,12 @@ def on_build(self, build_data):
}
self.new_job(job_data)
finally:
self.lock_release()
pass

def on_jobaction(self, job_action):
if self.pulse_monitor._stopping.is_set():
LOGGER.debug('on_jobaction: shutting down: ignoring %s', job_action)
return
self.lock_acquire()
try:
if self.state != ProcessStates.RUNNING or job_action['job_group_name'] != 'Autophone':
return
Expand Down Expand Up @@ -1077,7 +1067,7 @@ def on_jobaction(self, job_action):
LOGGER.warning('on_jobaction: unknown action %s',
job_action['action'])
finally:
self.lock_release()
pass

def stop(self):
self.state = ProcessStates.STOPPING
Expand Down
47 changes: 1 addition & 46 deletions autophonepulsemonitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ class AutophonePulseMonitor(object):
process. Possible values are 'opt', 'debug'
:param timeout: Timeout in seconds for the kombu connection
drain_events. Defaults to 5 seconds.
:param shared_lock: Required lock used to control concurrent
access. Used to prevent socket based deadlocks.
:param verbose: If True, will log build and job action messages.
Defaults to False.
"""
Expand All @@ -100,7 +98,6 @@ def __init__(self,
platforms=[],
buildtypes=[],
timeout=5,
shared_lock=None,
verbose=False):

assert userid, "userid is required."
Expand All @@ -109,7 +106,6 @@ def __init__(self,
assert trees, "trees is required."
assert platforms, "platforms is required."
assert buildtypes, "buildtypes is required."
assert shared_lock, "shared_lock is required."

taskcompleted_exchange_name = 'exchange/taskcluster-queue/v1/task-completed'
taskcompleted_queue_name = 'task-completed'
Expand All @@ -128,7 +124,6 @@ def __init__(self,
self.platforms.sort(cmp=lambda x, y: (len(y) - len(x)))
self.buildtypes = list(buildtypes)
self.timeout = timeout
self.shared_lock = shared_lock
self.verbose = verbose
self._stopping = threading.Event()
self.listen_thread = None
Expand Down Expand Up @@ -185,9 +180,6 @@ def listen(self):
restart = True
while restart:
restart = False
if self.verbose:
logger.debug('AutophonePulseMonitor: start shared_lock.acquire')
self.shared_lock.acquire()
try:
# connection does not connect to the server until
# either the connection.connect() method is called
Expand All @@ -213,70 +205,35 @@ def listen(self):
with consumer:
while not self._stopping.is_set():
try:
if self.verbose:
logger.debug('AutophonePulseMonitor shared_lock.release')
self.shared_lock.release()
connection.drain_events(timeout=self.timeout)
except socket.timeout:
pass
except socket.error, e:
if "timed out" not in str(e):
raise
finally:
if self.verbose:
logger.debug('AutophonePulseMonitor shared_lock.acquire')
self.shared_lock.acquire()
logger.debug('AutophonePulseMonitor.listen: stopping')
except:
logger.exception('AutophonePulseMonitor Exception')
if connection:
connection.release()
if self.verbose:
logger.debug('AutophonePulseMonitor exit shared_lock.release')
try:
self.shared_lock.release()
except ValueError, e:
logger.warning('AutophonePulseMonitor: %s handling exception', e)
if not self._stopping.is_set():
restart = True
time.sleep(wait)
if self.verbose:
logger.debug('AutophonePulseMonitor shared_lock.acquire')
self.shared_lock.acquire()
finally:
if self.verbose:
logger.debug('AutophonePulseMonitor exit shared_lock.release')
if connection and not restart:
connection.release()
try:
self.shared_lock.release()
except ValueError, e:
logger.warning('AutophonePulseMonitor: %s leaving listen()', e)

def handle_message(self, data, message):
if self._stopping.is_set():
return
message.ack()
logger = utils.getLogger()
try:
relock = False
if self.verbose:
logger.debug('AutophonePulseMonitor handle_message shared_lock.release')
self.shared_lock.release()
relock = True
except ValueError, e:
if self.verbose:
logger.debug('AutophonePulseMonitor handle_message shared_lock not set %s', e)
if (self.treeherder_url and 'action' in data and
'project' in data and 'job_id' in data):
self.handle_jobaction(data, message)
elif 'status' in data:
logger.debug('handle_message: data: %s, message: %s', data, message)
self.handle_taskcompleted(data, message)
if relock:
if self.verbose:
logger.debug('AutophonePulseMonitor handle_message shared_lock.acquire')
self.shared_lock.acquire()

def handle_jobaction(self, data, message):
logger = utils.getLogger()
Expand Down Expand Up @@ -545,7 +502,6 @@ def jobaction_callback(job_action):

(options, args) = parser.parse_args()

shared_lock = threading.Lock()
monitor = AutophonePulseMonitor(
userid=options.pulse_user,
password=options.pulse_password,
Expand All @@ -554,8 +510,7 @@ def jobaction_callback(job_action):
jobaction_callback=jobaction_callback,
trees=['try', 'mozilla-inbound'],
platforms=['android-api-9', 'android-api-11', 'android-api-15'],
buildtypes=['opt'],
shared_lock=shared_lock)
buildtypes=['opt'])

monitor.start()
time.sleep(3600)
Expand Down
12 changes: 2 additions & 10 deletions autophonetreeherder.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,15 @@ class TestState(object):
class AutophoneTreeherder(object):

def __init__(self, worker_subprocess, options, jobs, s3_bucket=None,
mailer=None, shared_lock=None):
mailer=None):
assert options, "options is required."
assert shared_lock, "shared_lock is required."

logger = utils.getLogger()

self.options = options
self.jobs = jobs
self.s3_bucket = s3_bucket
self.mailer = mailer
self.shared_lock = shared_lock
self.worker = worker_subprocess
self.shutdown_requested = False
logger.debug('AutophoneTreeherder')
Expand Down Expand Up @@ -124,13 +122,7 @@ def post_request(self, machine, project, job_collection, attempts, last_attempt)
def queue_request(self, machine, project, job_collection):
logger = utils.getLogger()
logger.debug('AutophoneTreeherder.queue_request: %s', job_collection.__dict__)
logger.debug('AutophoneTreeherder shared_lock.acquire')
self.shared_lock.acquire()
try:
self.jobs.new_treeherder_job(machine, project, job_collection)
finally:
logger.debug('AutophoneTreeherder shared_lock.release')
self.shared_lock.release()
self.jobs.new_treeherder_job(machine, project, job_collection)

def _create_job(self, tjc, machine, build_url, project, revision, build_type, build_abi,
build_platform, build_sdk, builder_type, t):
Expand Down
13 changes: 4 additions & 9 deletions worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ def __init__(self,
options,
autophone_queue,
loglevel,
mailer,
shared_lock):
mailer):

self.state = ProcessStates.STARTING
self.tests = tests
Expand Down Expand Up @@ -128,7 +127,6 @@ def __init__(self,
# process via this queue.
self.queue = multiprocessing.Queue()
self.lock = multiprocessing.Lock()
self.shared_lock = shared_lock
self.subprocess = PhoneWorkerSubProcess(dm,
self,
tests,
Expand All @@ -137,8 +135,7 @@ def __init__(self,
autophone_queue,
self.queue,
loglevel,
mailer,
shared_lock)
mailer)
def is_alive(self):
return self.subprocess.is_alive()

Expand Down Expand Up @@ -403,7 +400,7 @@ class PhoneWorkerSubProcess(object):
"""

def __init__(self, dm, parent_worker, tests, phone, options,
autophone_queue, queue, loglevel, mailer, shared_lock):
autophone_queue, queue, loglevel, mailer):

self.state = ProcessStates.RUNNING
self.parent_worker = parent_worker
Expand Down Expand Up @@ -431,7 +428,6 @@ def __init__(self, dm, parent_worker, tests, phone, options,
self.test_logfile = None
self.loglevel = loglevel
self.mailer = mailer
self.shared_lock = shared_lock
self.p = None
self.jobs = None
self.build = None
Expand Down Expand Up @@ -1378,8 +1374,7 @@ def run(self):
self.options,
self.jobs,
s3_bucket=self.s3_bucket,
mailer=self.mailer,
shared_lock=self.shared_lock)
mailer=self.mailer)
self.update_status(phone_status=PhoneStatus.IDLE)
self.dm.power_on()
self.start_usbwatchdog()
Expand Down

0 comments on commit 0510845

Please sign in to comment.