Skip to content

Commit

Permalink
Git rid of gevent.pool
Browse files Browse the repository at this point in the history
  • Loading branch information
oldpatricka committed Aug 16, 2012
1 parent 38db47c commit 8ec780a
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 69 deletions.
6 changes: 4 additions & 2 deletions epu/dashiproc/provisioner.py
Expand Up @@ -84,11 +84,13 @@ def start(self):
try:
self.dashi.consume()
except KeyboardInterrupt:
log.warning("Caught terminate signal. Bye!")
log.warning("Provisioner caught terminate signal. Bye!")
else:
log.info("Exiting normally. Bye!")
log.info("Provisioner exiting normally. Bye!")
finally:
log.debug("Deposing leader")
self.leader.depose()
log.debug("Deposed leader")

@property
def default_user(self):
Expand Down
1 change: 0 additions & 1 deletion epu/highavailability/test/test_highavailability_service.py
@@ -1,6 +1,5 @@
import os
import yaml
import gevent
import unittest
import uuid
import time
Expand Down
3 changes: 2 additions & 1 deletion epu/provisioner/core.py
Expand Up @@ -11,7 +11,8 @@
from itertools import izip

from gevent import Timeout
from gevent.pool import Pool

from epu.tevent import Pool

from nimboss.ctx import ContextClient, BrokerError, BrokerAuthError, \
ContextNotFoundError
Expand Down
112 changes: 50 additions & 62 deletions epu/provisioner/leader.py
Expand Up @@ -4,9 +4,7 @@
import time

import epu.tevent as tevent

from gevent.pool import Pool
import gevent
from epu.tevent import Pool

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -149,34 +147,32 @@ def run_terminator(self):
log.info("Starting terminator")
self.terminator_running = True

try:
while self.is_leader and self.terminator_running:
# TODO: PDA is there something better that can be done than
# reinitializing the pool each time?
if self.concurrent_terminations > 1:
pool = Pool(self.concurrent_terminations)
node_ids = self.store.get_terminating()
nodes = self.core._get_nodes_by_id(node_ids, skip_missing=False)
for node_id, node in izip(node_ids, nodes):
if not node:
#maybe an error should make it's way to controller from here?
log.warn('Node %s unknown but requested for termination',
node_id)
continue

log.info("Terminating node %s", node_id)
launch = self.store.get_launch(node['launch_id'])
try:
if self.concurrent_terminations > 1:
pool.spawn(self.core._terminate_node, node, launch)
else:
self.core._terminate_node(node, launch)
except:
log.exception("Termination of node %s failed:", node_id)
pass

while self.is_leader and self.terminator_running:
node_ids = self.store.get_terminating()
nodes = self.core._get_nodes_by_id(node_ids, skip_missing=False)
for node_id, node in izip(node_ids, nodes):
if not node:
#maybe an error should make it's way to controller from here?
log.warn('Node %s unknown but requested for termination',
node_id)
continue

log.info("Terminating node %s", node_id)
launch = self.store.get_launch(node['launch_id'])
try:
if self.concurrent_terminations > 1:
pool.spawn(self.core._terminate_node, node, launch)
else:
self.core._terminate_node(node, launch)
except Exception, e:
log.info("Termination of node %s failed: %s", node_id, str(e))
pass

pool.join()
except gevent.GreenletExit:
pass
pool.join()

def kill_terminator(self):
"""He'll be back"""
Expand All @@ -188,26 +184,22 @@ def run_site_query_thread(self):
log.info("Starting site query thread")
self.site_query_running = True

try:
while self.is_leader and self.site_query_running:
next_query = time.time() + self.query_delay
try:
self.core.query_nodes(concurrency=self.concurrent_queries)
except Exception:
log.exception("IaaS query failed due to an unexpected error")

if self.force_site_query:
with self.site_query_condition:
self.force_site_query = False
self.site_query_condition.notify_all()
while self.is_leader and self.site_query_running:
next_query = time.time() + self.query_delay
try:
self.core.query_nodes(concurrency=self.concurrent_queries)
except Exception:
log.exception("IaaS query failed due to an unexpected error")

if self.force_site_query:
with self.site_query_condition:
timeout = next_query - time.time()
if timeout > 0:
self.site_query_condition.wait(timeout)
self.force_site_query = False
self.site_query_condition.notify_all()

except gevent.GreenletExit:
pass
with self.site_query_condition:
timeout = next_query - time.time()
if timeout > 0:
self.site_query_condition.wait(timeout)

def kill_site_query_thread(self):
self.site_query_running = False
Expand All @@ -218,26 +210,22 @@ def run_context_query_thread(self):
log.info("Starting context query thread")
self.context_query_running = True

try:
while self.is_leader and self.context_query_running:
next_query = time.time() + self.query_delay
try:
self.core.query_contexts(concurrency=self.concurrent_queries)
except Exception:
log.exception("Context query failed due to an unexpected error")

if self.force_context_query:
with self.context_query_condition:
self.force_context_query = False
self.context_query_condition.notify_all()
while self.is_leader and self.context_query_running:
next_query = time.time() + self.query_delay
try:
self.core.query_contexts(concurrency=self.concurrent_queries)
except Exception:
log.exception("Context query failed due to an unexpected error")

if self.force_context_query:
with self.context_query_condition:
timeout = next_query - time.time()
if timeout > 0:
self.context_query_condition.wait(timeout)
self.force_context_query = False
self.context_query_condition.notify_all()

except gevent.GreenletExit:
pass
with self.context_query_condition:
timeout = next_query - time.time()
if timeout > 0:
self.context_query_condition.wait(timeout)

def kill_context_query_thread(self):
self.context_query_running = False
Expand Down
2 changes: 1 addition & 1 deletion epu/provisioner/store.py
Expand Up @@ -302,7 +302,7 @@ def add_terminating(self, node_id):
def get_terminating(self):
if not self.terminating:
with self.termination_condition:
self.termination_condition.wait()
self.termination_condition.wait(timeout=0.1)

return self.terminating.keys()

Expand Down
3 changes: 2 additions & 1 deletion epu/provisioner/test/test_leader.py
Expand Up @@ -56,4 +56,5 @@ def side_effect():
self.assertFalse(leader.is_leader)

leader_thread.join(1)
self.assertTrue(leader_thread.successful())
# TODO: PDA: test that thread exited cleanly?
#self.assertTrue(leader_thread.successful())
9 changes: 9 additions & 0 deletions epu/provisioner/test/test_provisioner_service.py
Expand Up @@ -78,19 +78,25 @@ def spawn_procs(self):
self._spawn_process(self.provisioner.start)

def shutdown_procs(self):
print "thread: %s" % dir(self.threads[0])
self._shutdown_processes(self.threads)

def _spawn_process(self, process):
thread = tevent.spawn(process)
self.threads.append(thread)

def _shutdown_processes(self, threads):
print "PDA: cancelling dashi"
self.dtrs.dashi.cancel()
print "PDA: cancelling provisioner dashi"
self.provisioner.dashi.cancel()
print "PDA: joining all threads %s" % threads
tevent.joinall(threads)

def tearDown(self):
print "PDA shutting down procs"
self.shutdown_procs()
print "PDA tearing down store"
self.teardown_store()

def setup_store(self):
Expand Down Expand Up @@ -512,9 +518,12 @@ def test_launch_no_context(self):
node = self.store.get_node(node_id)
self.driver.set_node_running(node['iaas_id'])

print "PDA: about to wait for running"
self.notifier.wait_for_state(InstanceState.RUNNING, all_node_ids,
before=self.provisioner.leader._force_cycle)
print "PDA: about assert node"
self.assertStoreNodeRecords(InstanceState.RUNNING, *all_node_ids)
print "PDA: done asserting"


class ProvisionerZooKeeperServiceTest(ProvisionerServiceTest):
Expand Down
34 changes: 33 additions & 1 deletion epu/tevent.py
@@ -1,16 +1,48 @@
import weakref
import threading
from multiprocessing.pool import ThreadPool
"""
Helper functions for working with stdlib threading library
Inspired by the gevent api
"""

def spawn(func, *args, **kwargs):
_thread = threading.Thread(target=func, args=tuple(args), kwargs=kwargs)
if hasattr(func, 'im_class'):
name = "%s.%s" % (func.im_class.__name__, func.__name__)
else:
name = func.__name__

_thread = threading.Thread(target=func, name=name, args=tuple(args), kwargs=kwargs)
_thread.daemon = True
_thread.start()
return _thread


def joinall(threads):
for thread in threads:
thread.join()

class Pool(ThreadPool):
"""Subclass multiprocessing's ThreadPool to have a similar API to gevent
"""

def __init__(self, *args, **kwargs):
"""We need to patch threading to support ThreadPool being run in
child threads.
Shouldn't be necessary when http://bugs.python.org/issue10015 is fixed
"""

if not hasattr(threading.current_thread(), "_children"):
threading.current_thread()._children = weakref.WeakKeyDictionary()

ThreadPool.__init__(self, *args, **kwargs)

def spawn(self, func, *args, **kwargs):

self.apply_async(func, tuple(args), kwargs)

def join(self):
self.close()
ThreadPool.join(self)

0 comments on commit 8ec780a

Please sign in to comment.