Permalink
Browse files

Fix provisioner ZK connection handling/recovery

  • Loading branch information...
labisso committed Mar 19, 2013
1 parent 97ad3f0 commit a988abba56f37d346bb9ef818a766d7303d1a7c8
Showing with 38 additions and 23 deletions.
  1. +6 −4 epu/provisioner/leader.py
  2. +29 −16 epu/provisioner/store.py
  3. +3 −3 epu/test/test_zk_leaders_provisoner.py
View
@@ -197,11 +197,9 @@ def _terminate_pending_terminations(self):
for node_id, node in izip(node_ids, nodes):
if not node:
# maybe an error should make it's way to controller from here?
- log.warn('Node %s unknown but requested for termination',
- node_id)
+ log.warn('Node %s unknown but requested for termination', node_id)
self.store.remove_terminating(node_id)
- log.info("Removed terminating entry for node %s from store",
- node_id)
+ log.info("Removed terminating entry for node %s from store", node_id)
continue
log.info("Terminating node %s", node_id)
@@ -222,6 +220,7 @@ def kill_terminator(self):
self.terminator_running = False
if self.terminator_thread:
self.terminator_thread.join()
+ self.terminator_thread = None
def run_site_query_thread(self):
log.info("Starting site query thread")
@@ -248,6 +247,7 @@ def kill_site_query_thread(self):
self.site_query_running = False
if self.site_query_thread:
self.site_query_thread.join()
+ self.site_query_thread = None
def run_context_query_thread(self):
log.info("Starting context query thread")
@@ -274,6 +274,7 @@ def kill_context_query_thread(self):
self.context_query_running = False
if self.context_query_thread:
self.context_query_thread.join()
+ self.context_query_thread = None
def run_record_reaper_thread(self):
log.info("Starting record reaper thread")
@@ -303,3 +304,4 @@ def kill_record_reaper_thread(self):
self.record_reaper_condition.notify_all()
self.record_reaper_thread.join()
+ self.record_reaper_thread = None
View
@@ -351,7 +351,7 @@ def __init__(self, hosts, base_path, username=None, password=None,
timeout=None, use_gevent=False, proc_name=None):
kwargs = zkutil.get_kazoo_kwargs(username=username, password=password,
- timeout=timeout, use_gevent=use_gevent)
+ timeout=timeout, use_gevent=use_gevent)
self.kazoo = KazooClient(hosts + base_path, **kwargs)
self.retry = zkutil.get_kazoo_retry()
@@ -367,34 +367,38 @@ def __init__(self, hosts, base_path, username=None, password=None,
# callback fired when the connection state changes
self.kazoo.add_listener(self._connection_state_listener)
+ self._shutdown = False
+
self._election_enabled = False
self._election_condition = threading.Condition()
self._election_thread = None
- self._election_thread_running = False
self._leader = None
self._disabled = False
self._disabled_condition = threading.Condition()
def initialize(self):
-
+ self._shutdown = False
self.kazoo.start()
for path in (self.LAUNCH_PATH, self.NODE_PATH, self.TERMINATING_PATH):
self.kazoo.ensure_path(path)
def shutdown(self):
- # depose the leader and cancel the election just in case
- self.election.cancel()
- self._election_thread_running = False
+ with self._election_condition:
+ self._shutdown = True
+ self._election_enabled = False
+ self._election_condition.notify_all()
try:
if self._leader:
self._leader.depose()
except Exception, e:
log.exception("Error deposing leader: %s", e)
+ self.election.cancel()
+
if self._election_thread:
self._election_thread.join()
self.kazoo.stop()
@@ -407,6 +411,7 @@ def _connection_state_listener(self, state):
def _handle_connection_state(self, state):
if state in (KazooState.LOST, KazooState.SUSPENDED):
+ log.debug("disabling election and leader")
with self._election_condition:
self._election_enabled = False
self._election_condition.notify_all()
@@ -421,6 +426,7 @@ def _handle_connection_state(self, state):
self.election.cancel()
elif state == KazooState.CONNECTED:
+ log.debug("enabling election")
with self._election_condition:
self._election_enabled = True
self._election_condition.notify_all()
@@ -435,7 +441,7 @@ def _update_disabled_state(self):
# check if the node exists and set up a callback
exists = self.retry(self.kazoo.exists, self.DISABLED_PATH,
- self._disabled_watch)
+ self._disabled_watch)
if exists:
if not self._disabled:
log.warn("Detected provisioner DISABLED state began")
@@ -491,21 +497,28 @@ def contend_leader(self, leader):
"""
assert self._leader is None
self._leader = leader
- self._election_thread = tevent.spawn(self._run_election)
+ self._election_thread = tevent.spawn(self._run_election,
+ self.election, leader, "leader")
- def _run_election(self):
+ def _run_election(self, election, leader, name):
"""Election thread function
"""
- self._election_thread_running = True
- while self._election_thread_running:
+ while True:
with self._election_condition:
- try:
- self.election.run(self._leader.inaugurate)
- except Exception, e:
- log.exception("Error in leader election: %s", e)
-
while not self._election_enabled:
+ if self._shutdown:
+ return
+ log.debug("%s election waiting for to be enabled", name)
self._election_condition.wait()
+ if self._shutdown:
+ return
+ try:
+ election.run(leader.inaugurate)
+ except Exception, e:
+ log.exception("Error in %s election: %s", name, e)
+ except:
+ log.exception("Unhandled error in election")
+ raise
#########################################################################
# LAUNCHES
@@ -393,9 +393,9 @@ def doit(self):
def create_terminate_all(kill_func_name, places_to_kill, n):
def doit(self):
- kill_func= getattr(self, kill_func_name)
- self._add_many_domains_terminate_all(kill_func=kill_func,
- places_to_kill=places_to_kill, n=n)
+ kill_func = getattr(self, kill_func_name)
+ self._add_many_domains_terminate_all(
+ kill_func=kill_func, places_to_kill=places_to_kill, n=n)
return doit

0 comments on commit a988abb

Please sign in to comment.