Navigation Menu

Skip to content

Commit

Permalink
Fixes for conflicts under ZK provisioner store
Browse files Browse the repository at this point in the history
- leader continually respawned termination threads
- conflicts adding node and launch records were not handled correctly
  • Loading branch information
labisso committed Feb 29, 2012
1 parent 3008d3f commit e2458b2
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 6 deletions.
4 changes: 3 additions & 1 deletion epu/provisioner/core.py
Expand Up @@ -202,13 +202,15 @@ def prepare_provision(self, launch_id, deployable_type, instance_ids,
except WriteConflictError:
log.debug("record for launch %s already exists, proceeding.",
launch_id)
launch_record = self.store.get_launch(launch_id)

for node in node_records:
for index, node in enumerate(node_records):
try:
self.store.add_node(node)
except WriteConflictError:
log.debug("record for node %s already exists, proceeding.",
node['node_id'])
node_records[index] = self.store.get_node(node['node_id'])

self.notifier.send_records(node_records, subscribers)
return launch_record, node_records
Expand Down
4 changes: 3 additions & 1 deletion epu/provisioner/leader.py
Expand Up @@ -83,10 +83,12 @@ def run(self):
next_query = time.time()
while self.is_leader:

if not self.terminator_thread:
if self.terminator_thread is None:
if self.store.is_disabled():


disabled_agreed = self.store.is_disabled_agreed()
log.debug("terminator: %s disabled_agreed=%s", self.terminator_thread, disabled_agreed)

if not disabled_agreed:
log.info("provisioner termination detected but not all processes agree yet. waiting.")
Expand Down
25 changes: 21 additions & 4 deletions epu/provisioner/test/test_core.py
Expand Up @@ -207,12 +207,28 @@ def test_prepare_execute_no_ctx(self):
self._prepare_execute(context_enabled=False)
self.assertTrue(self.notifier.assure_state(states.PENDING))

def _prepare_execute(self, subscribers=('blah',), context_enabled=True):
def test_prepare_execute_existing_launch(self):
self.core.context = None
launch_id = _new_id()
instance_id = _new_id()

self._prepare_execute(launch_id=launch_id, instance_ids=[instance_id],
context_enabled=False)
self._prepare_execute(launch_id=launch_id, instance_ids=[instance_id],
context_enabled=False, assure_state=False)

self.assertTrue(self.notifier.assure_state(states.PENDING))

def _prepare_execute(self, launch_id=None, instance_ids=None,
subscribers=('blah',), context_enabled=True,
assure_state=True):
self.dtrs.result = {'document' : _get_one_node_cluster_doc("node1", "image1"),
"node" : {}}

launch_id = _new_id()
instance_ids=[_new_id()]
if not launch_id:
launch_id = _new_id()
if not instance_ids:
instance_ids=[_new_id()]
launch, nodes = self.core.prepare_provision(launch_id=launch_id,
deployable_type="foo", instance_ids=instance_ids,
subscribers=subscribers, site="site1")
Expand All @@ -231,7 +247,8 @@ def _prepare_execute(self, subscribers=('blah',), context_enabled=True):
else:
self.assertEqual(launch['context'], None)

self.assertTrue(self.notifier.assure_state(states.REQUESTED))
if assure_state:
self.assertTrue(self.notifier.assure_state(states.REQUESTED))

self.core.execute_provision(launch, nodes)

Expand Down

0 comments on commit e2458b2

Please sign in to comment.