Permalink
Browse files

Process restarts are now throttled

  • Loading branch information...
1 parent a37c813 commit fd96f009d967683fb975fa99c6daf2cc9421d90c @oldpatricka oldpatricka committed Apr 4, 2013
@@ -1,2 +1,4 @@
processdispatcher:
engines: {}
+ restart_throttling_config:
+ minimum_time_between_starts: 2
@@ -67,10 +67,12 @@ def __init__(self, amqp_uri=None, topic="process_dispatcher", registry=None,
self.notifier)
launch_type = self.CFG.processdispatcher.get('launch_type', 'supd')
+ restart_throttling_config = self.CFG.processdispatcher.get('restart_throttling_config', {})
self.matchmaker = PDMatchmaker(self.store, self.eeagent_client,
self.registry, self.epum_client, self.notifier, self.topic,
- domain_definition_id, base_domain_config, launch_type)
+ domain_definition_id, base_domain_config, launch_type,
+ restart_throttling_config)
self.doctor = PDDoctor(self.core, self.store)
@@ -688,7 +688,6 @@ def process_should_restart(self, process, exit_state, is_system_restart=False):
except Exception:
# don't want a weird process config structure to blow up PD
log.exception("Error inspecting process config")
-
should_restart = False
if process.restart_mode is None or process.restart_mode == RestartMode.ABNORMAL:
if exit_state != ProcessState.EXITED:
@@ -819,7 +818,7 @@ def process_change_state(self, process, newstate, **updates):
updated = False
while process.state < newstate and cur_round == process.round:
if newstate == ProcessState.RUNNING:
- process.starts += 1
+ process.increment_starts()
process.state = newstate
process.update(updates)
try:
@@ -1,5 +1,6 @@
import logging
import threading
+import time
from math import ceil
from copy import deepcopy
from collections import defaultdict
@@ -34,7 +35,7 @@ class PDMatchmaker(object):
def __init__(self, store, resource_client, ee_registry, epum_client,
notifier, service_name, domain_definition_id,
- base_domain_config, run_type):
+ base_domain_config, run_type, restart_throttling_config):
"""
@type store: ProcessDispatcherStore
@type resource_client: EEAgentClient
@@ -51,10 +52,12 @@ def __init__(self, store, resource_client, ee_registry, epum_client,
self.base_domain_config = base_domain_config
self.run_type = run_type
self._cached_pd_state = None
+ self.restart_throttling_config = restart_throttling_config
self.resources = None
self.queued_processes = None
self.stale_processes = None
+ self.throttled_processes = None
self.unscheduled_pending_processes = []
self.condition = threading.Condition()
@@ -88,6 +91,7 @@ def initialize(self):
self.resources = {}
self.queued_processes = []
self.stale_processes = []
+ self.throttled_processes = []
self.resource_set_changed = True
self.changed_resources = set()
@@ -292,6 +296,8 @@ def run(self):
if self.changed_resources:
self._get_resources()
+ self._check_throttled_processes()
+
# check again if we lost leadership
if not self.is_leader:
return
@@ -308,7 +314,9 @@ def run(self):
with self.condition:
if self.is_leader and not (self.resource_set_changed or
self.changed_resources or self.process_set_changed):
- self.condition.wait()
+ timeout = self._time_until_throttling_ends()
+ if timeout > 0 or timeout is None:
+ self.condition.wait(timeout)
def matchmake(self):
# this is inefficient but that is ok for now
@@ -334,11 +342,16 @@ def matchmake(self):
self.queued_processes.remove((owner, upid, round))
continue
+ if self._throttle_end_time(process) > time.time():
+ self._throttle_process(process)
+ continue
+
# ensure process is not already assigned a slot
matched_resource = self._find_assigned_resource(owner, upid, round)
if matched_resource:
log.debug("process already assigned to resource %s",
matched_resource.resource_id)
+
if not matched_resource and node_containers:
matched_resource = self.matchmake_process(process, node_containers)
@@ -557,6 +570,51 @@ def _mark_process_waiting(self, process):
return process, updated
+ def _throttle_process(self, process):
+ self._mark_process_waiting(process)
+ self.throttled_processes.append(process)
+
+ def _time_until_throttling_ends(self):
+ process_throttle_times = []
+ for process in self.throttled_processes:
+ process_throttle_times.append(self._throttle_end_time(process))
+ try:
+ throttle_end_time = min(process_throttle_times)
+ time_until_throttling_ends = throttle_end_time - time.time()
+ return time_until_throttling_ends
+ except ValueError:
+ return None
+
+ def _check_throttled_processes(self):
+ if self._time_until_throttling_ends() <= 0:
+ self.throttled_processes = []
+ self.needs_matchmaking = True
+
+ def _throttle_end_time(self, process):
+ minimum_time_between_starts = None
+ try:
+ minimum_time_between_starts = float(self.restart_throttling_config['minimum_time_between_starts'])
+ except Exception:
+ # We might still be able to get a config from process config
+ pass
+
+ try:
+ minimum_time_between_starts = float(process.configuration['process']['minimum_time_between_starts'])
+ except Exception:
+ # ignore if we can't get a config for this process
+ pass
+
+ # if there isn't a minimum time configured, never throttle
+ if minimum_time_between_starts is None:
+ return 0
+
+ # Process only needs throttling if it has been restarted at least once
+ if len(process.start_times) <= 1:
+ return 0
+
+ last_start = max(process.start_times)
+ return last_start + minimum_time_between_starts
+
def _mark_process_stale(self, process):
self.stale_processes.append(process)
@@ -1437,13 +1437,19 @@ def new(cls, owner, upid, definition, state, configuration=None,
conf = {}
starts = 0
+ start_times = []
d = dict(owner=owner, upid=upid, subscribers=subscribers, state=state,
round=int(round), definition=definition, configuration=conf,
constraints=const, assigned=assigned, hostname=hostname,
queueing_mode=queueing_mode, restart_mode=restart_mode,
- starts=starts, node_exclusive=node_exclusive, name=name)
+ starts=starts, node_exclusive=node_exclusive, name=name,
+ start_times=start_times)
return cls(d)
+ def increment_starts(self):
+ self.starts += 1
+ self.start_times.append(time.time())
+
def get_key(self):
return self.owner, self.upid, self.round
@@ -253,3 +253,7 @@ def get_domain_config():
def nosystemrestart_process_config():
return {'process': {'omit_from_system_restart': True}}
+
+
+def minimum_time_between_starts_config(minimum_time=2):
+ return {'process': {'minimum_time_between_starts': minimum_time}}
@@ -59,11 +59,13 @@ def setUp(self):
self.definition = get_definition()
self.base_domain_config = get_domain_config()
self.run_type = "fake_run_type"
+ self.restart_throttling_config = {}
self.epum_client.add_domain_definition(self.definition_id, self.definition)
self.mm = PDMatchmaker(self.store, self.resource_client,
self.registry, self.epum_client, self.notifier, self.service_name,
- self.definition_id, self.base_domain_config, self.run_type)
+ self.definition_id, self.base_domain_config, self.run_type,
+ self.restart_throttling_config)
self.mmthread = None
@@ -698,7 +700,8 @@ def test_engine_config(self):
self.registry = EngineRegistry.from_config(engine_conf, default='engine1')
self.mm = PDMatchmaker(self.store, self.resource_client,
self.registry, self.epum_client, self.notifier, self.service_name,
- self.definition_id, self.base_domain_config, self.run_type)
+ self.definition_id, self.base_domain_config, self.run_type,
+ self.restart_throttling_config)
self.mm.initialize()
self.assertEqual(len(self.epum_client.domains), len(engine_conf.keys()))
@@ -14,7 +14,8 @@
from epu.dashiproc.processdispatcher import ProcessDispatcherService, \
ProcessDispatcherClient, SubscriberNotifier
from epu.processdispatcher.test.mocks import FakeEEAgent, MockEPUMClient, \
- MockNotifier, get_definition, get_domain_config, nosystemrestart_process_config
+ MockNotifier, get_definition, get_domain_config, nosystemrestart_process_config, \
+ minimum_time_between_starts_config
from epu.processdispatcher.engines import EngineRegistry, domain_id_from_engine
from epu.states import InstanceState, ProcessState
from epu.processdispatcher.store import ProcessRecord, ProcessDispatcherStore, ProcessDispatcherZooKeeperStore
@@ -1108,7 +1109,7 @@ def test_restart_mode_always(self):
self.client.schedule_process("proc2", self.process_definition_id,
constraints=constraints, queueing_mode=proc2_queueing_mode,
- restart_mode=proc2_restart_mode)
+ restart_mode=proc2_restart_mode, configuration={'process': {'minimum_time_between_starts': 0.1}})
self.notifier.wait_for_state("proc2", ProcessState.RUNNING)
self._wait_assert_pd_dump(self._assert_process_states,
@@ -1230,6 +1231,83 @@ def test_start_count(self):
proc = self.store.get_process(None, "proc1")
self.assertEqual(proc.starts, 2)
+ def test_minimum_time_between_starts(self):
+
+ constraints = dict(hat_type="fedora")
+
+ # Start a node
+ node = "node1"
+ domain_id = domain_id_from_engine('engine1')
+ node_properties = dict(hat_type="fedora")
+ self.client.node_state(node, domain_id, InstanceState.RUNNING,
+ node_properties)
+ eeagent = self._spawn_eeagent(node, 4)
+
+ # Test RestartMode.ALWAYS
+ queueing_mode = QueueingMode.ALWAYS
+ restart_mode = RestartMode.ALWAYS
+
+ default_time_to_throttle = 2
+ time_to_throttle = 5
+
+ self.client.schedule_process("proc1", self.process_definition_id,
+ constraints=constraints, queueing_mode=queueing_mode,
+ restart_mode=restart_mode)
+
+ self.client.schedule_process("proc2", self.process_definition_id,
+ constraints=constraints, queueing_mode=queueing_mode,
+ restart_mode=restart_mode,
+ configuration=minimum_time_between_starts_config(time_to_throttle))
+
+ # Processes should start once without delay
+ self.notifier.wait_for_state("proc1", ProcessState.RUNNING)
+ self._wait_assert_pd_dump(self._assert_process_states,
+ ProcessState.RUNNING, ["proc1"])
+
+ self.notifier.wait_for_state("proc2", ProcessState.RUNNING)
+ self._wait_assert_pd_dump(self._assert_process_states,
+ ProcessState.RUNNING, ["proc2"])
+
+ # Processes should be restarted once without delay
+ eeagent.exit_process("proc1")
+ eeagent.exit_process("proc2")
+
+ self.notifier.wait_for_state("proc1", ProcessState.RUNNING)
+ self._wait_assert_pd_dump(self._assert_process_states,
+ ProcessState.RUNNING, ["proc1"])
+ self.notifier.wait_for_state("proc2", ProcessState.RUNNING)
+ self._wait_assert_pd_dump(self._assert_process_states,
+ ProcessState.RUNNING, ["proc2"])
+
+ # The second time proc1 should be throttled for 2s (the default), and
+ # proc2 should be throttled for the configured 5s
+ eeagent.exit_process("proc1")
+ eeagent.exit_process("proc2")
+
+ self.notifier.wait_for_state("proc1", ProcessState.WAITING)
+ self._wait_assert_pd_dump(self._assert_process_states,
+ ProcessState.WAITING, ["proc1"])
+ self.notifier.wait_for_state("proc2", ProcessState.WAITING)
+ self._wait_assert_pd_dump(self._assert_process_states,
+ ProcessState.WAITING, ["proc2"])
+
+ # After waiting a few seconds, proc1 should be restarted
+ time.sleep(default_time_to_throttle + 1)
+
+ self.notifier.wait_for_state("proc1", ProcessState.RUNNING)
+ self._wait_assert_pd_dump(self._assert_process_states,
+ ProcessState.RUNNING, ["proc1"])
+ self.notifier.wait_for_state("proc2", ProcessState.WAITING)
+ self._wait_assert_pd_dump(self._assert_process_states,
+ ProcessState.WAITING, ["proc2"])
+
+ # After a few more secs, proc2 should be restarted as well
+ time.sleep(time_to_throttle - (default_time_to_throttle + 1) + 1)
+
+ self.notifier.wait_for_state("proc2", ProcessState.RUNNING)
+ self._wait_assert_pd_dump(self._assert_process_states,
+ ProcessState.RUNNING, ["proc2"])
+
def test_describe(self):
self.client.schedule_process("proc1", self.process_definition_id)

0 comments on commit fd96f00

Please sign in to comment.