Skip to content

Commit

Permalink
on node invalidation, check the impact on the jobs in progress in Sta…
Browse files Browse the repository at this point in the history
…rter / Stopper
  • Loading branch information
julien6387 committed Dec 9, 2021
1 parent 03ffaea commit 0793d62
Show file tree
Hide file tree
Showing 12 changed files with 420 additions and 174 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Expand Up @@ -15,6 +15,8 @@

* Fix issue in statistics compiler when network interfaces are dynamically created / removed.

* When a node is invalidated, check the impact on the jobs in progress in `Starter` / `Stopper`.

* The option `rules_file` is updated to `rules_files` and supports multiple files for **Supvisors** rules.
The option `rules_file` is thus obsolete and will be removed in next version.

Expand Down
226 changes: 167 additions & 59 deletions supvisors/commander.py

Large diffs are not rendered by default.

37 changes: 21 additions & 16 deletions supvisors/context.py
Expand Up @@ -290,33 +290,38 @@ def on_tick_event(self, node_name: str, event: Payload):
# publish AddressStatus event
self.supvisors.zmq.publisher.send_node_status(status.serial())

def on_timer_event(self, event: Payload) -> Set[ProcessStatus]:
def on_timer_event(self, event: Payload) -> Tuple[NameList, Set[ProcessStatus]]:
""" Check that all Supvisors instances are still publishing.
Supvisors considers that there a Supvisors instance is not active if no tick received in last 10s. """
process_failures = set({}) # strange but avoids IDE warning
Supvisors considers that there a Supvisors instance is not active if no tick received in last 10s.
:param event: the local tick event
:return: the invalidated nodes and the processes in failure
"""
invalidated_nodes, process_failures = [], set({}) # strange but avoids IDE warning on annotations
# find all nodes that did not send their periodic tick
current_time = time()
current_time = event['when']
self.local_sequence_counter = event['sequence_counter']
# do not check for invalidation before synchro_timeout
if (current_time - self.start_date) > self.supvisors.options.synchro_timeout:
# get publisher
publisher = self.supvisors.zmq.publisher
# check all nodes
for status in self.nodes.values():
if status.state == AddressStates.UNKNOWN:
for node_status in self.nodes.values():
if node_status.state == AddressStates.UNKNOWN:
# invalid unknown nodes
# nothing to do on processes as none received yet
self.invalid(status)
elif status.inactive(self.local_sequence_counter):
self.invalid(node_status)
elif node_status.inactive(self.local_sequence_counter):
# invalid silent nodes
self.invalid(status)
self.invalid(node_status)
invalidated_nodes.append(node_status.node_name)
# for processes that were running on node, invalidate node in process
# WARN: it has been decided NOT to remove the node payload from the ProcessStatus and NOT to remove
# WARN: decision is made NOT to remove the node payload from the ProcessStatus and NOT to remove
# the ProcessStatus from the Context if no more node payload left.
# The aim is to keep a trace in the Web UI about the application processes that have been lost
# and their related description.
process_failures.update({process for process in status.running_processes()
if process.invalidate_node(status.node_name)})
process_failures.update({process for process in node_status.running_processes()
if process.invalidate_node(node_status.node_name)})
# publish process status in failure
for process in process_failures:
publisher.send_process_status(process.serial())
Expand All @@ -327,8 +332,8 @@ def on_timer_event(self, event: Payload) -> Set[ProcessStatus]:
# application.update_sequences()
application.update_status()
publisher.send_application_status(application.serial())
# return all processes that declare a failure
return process_failures
# return all invalidated nodes and processes that declare a failure
return invalidated_nodes, process_failures

def on_process_removed_event(self, node_name: str, event: Payload) -> None:
""" Method called upon reception of a process removed event from the remote Supvisors instance.
Expand Down Expand Up @@ -377,9 +382,9 @@ def on_process_state_event(self, node_name: str, event: Payload) -> Optional[Pro
:return: None
"""
if self.supvisors.address_mapper.valid(node_name):
status = self.nodes[node_name]
node_status = self.nodes[node_name]
# accept events only in RUNNING state
if status.state == AddressStates.RUNNING:
if node_status.state == AddressStates.RUNNING:
self.logger.debug('Context.on_process_event: got event {} from node={}'.format(event, node_name))
# get internal data
application = self.applications[event['group']]
Expand Down
37 changes: 22 additions & 15 deletions supvisors/process.py
Expand Up @@ -50,7 +50,6 @@ def __init__(self, supvisors: Any) -> None:
:param supvisors: the global Supvisors structure.
"""
# TODO: think about adding a period for tasks (period > startsecs / autorestart = False)
# keep a reference to the Supvisors global structure
self.supvisors = supvisors
self.logger: Logger = supvisors.logger
Expand Down Expand Up @@ -314,7 +313,15 @@ def crashed(self) -> bool:
:return: the crash status of the process
"""
return self.state == ProcessStates.FATAL or (self.state == ProcessStates.EXITED and not self.expected_exit)
return ProcessStatus.is_crashed_event(self.state, self.expected_exit)

@staticmethod
def is_crashed_event(state: ProcessStates, expected_exit: bool) -> bool:
""" Return True if the process has crashed or has exited unexpectedly.
:return: the crash status of the process
"""
return state == ProcessStates.FATAL or (state == ProcessStates.EXITED and not expected_exit)

def stopped(self) -> bool:
""" Return True if the process is stopped, as designed in Supervisor.
Expand Down Expand Up @@ -374,7 +381,7 @@ def get_last_description(self) -> Tuple[Optional[str], str]:
:return: the node where the description comes, the process state description
"""
self.logger.trace('ProcessStatus.get_last_description: START namespec={}'.format(self.namespec))
self.logger.trace(f'ProcessStatus.get_last_description: START namespec={self.namespec}')
# if the state is forced, return the reason why
if self.forced_state is not None:
self.logger.trace('ProcessStatus.get_last_description: namespec={} - node_name=None [FORCED]description={}'
Expand Down Expand Up @@ -423,13 +430,13 @@ def add_info(self, node_name: str, payload: Payload) -> None:
# TODO: why reset extra_args ?
info['extra_args'] = ''
self.extra_args = ''
self.logger.trace('ProcessStatus.add_info: namespec={} - payload={} added to node_name={}'
.format(self.namespec, info, node_name))
self.logger.trace(f'ProcessStatus.add_info: namespec={self.namespec} - payload={info}'
' added to node_name={node_name}')
# reset forced_state upon reception of new information only if not STOPPED (default state in supervisor)
if self.forced_state is not None and info['state'] != ProcessStates.STOPPED:
self.forced_state = None
self.forced_reason = ''
self.logger.debug('ProcessStatus.add_info: namespec={} - forced_state unset'.format(self.namespec))
self.logger.debug(f'ProcessStatus.add_info: namespec={self.namespec} - forced_state unset')
# update process status
self.update_status(node_name, info['state'])

Expand All @@ -449,8 +456,8 @@ def update_info(self, node_name: str, payload: Payload) -> None:
self.extra_args = payload['extra_args']
# refresh internal information
info = self.info_map[node_name]
self.logger.trace('ProcessStatus.update_info: namespec={} - updating info[{}]={} with payload={}'
.format(self.namespec, node_name, info, payload))
self.logger.trace(f'ProcessStatus.update_info: namespec={self.namespec} - updating info[{node_name}]={info}'
' with payload={payload}')
info['local_time'] = self.last_event_time
info.update(payload)
# re-evaluate description using Supervisor function
Expand All @@ -466,14 +473,14 @@ def update_info(self, node_name: str, payload: Payload) -> None:
if self.forced_state is not None:
self.forced_state = None
self.forced_reason = None
self.logger.debug('ProcessStatus.update_info: namespec={} - forced_state unset'.format(self.namespec))
self.logger.debug(f'ProcessStatus.update_info: namespec={self.namespec} - forced_state unset')
# update / check running addresses
self.update_status(node_name, new_state)
self.logger.debug('ProcessStatus.update_info: namespec={} - new info[{}]={}'
.format(self.namespec, node_name, info))
self.logger.debug(f'ProcessStatus.update_info: namespec={self.namespec} '
'- new info[{node_name}]={info}')
else:
self.logger.warn('ProcessStatus.update_info: namespec={} - ProcessEvent rejected. Tick expected from {}'
.format(self.namespec, node_name))
self.logger.warn(f'ProcessStatus.update_info: namespec={self.namespec} - ProcessEvent rejected.'
' Tick expected from {node_name}')

def update_times(self, address: str, remote_time: float) -> None:
""" Update the internal process information when a new tick is received from the remote Supvisors instance.
Expand Down Expand Up @@ -505,8 +512,8 @@ def invalidate_node(self, node_name: str) -> bool:
:param node_name: the node from which no more information is received
:return: True if process not running anywhere anymore
"""
self.logger.debug('ProcessStatus.invalidate_node: namespec={} - node_name={} invalidated'
.format(self.namespec, node_name))
self.logger.debug(f'ProcessStatus.invalidate_node: namespec={self.namespec} '
'- node_name={node_name} invalidated')
failure = False
if node_name in self.running_nodes:
# update process status with a FATAL payload
Expand Down
18 changes: 10 additions & 8 deletions supvisors/statemachine.py
Expand Up @@ -419,9 +419,12 @@ def set_state(self, next_state: SupvisorsStates, force_transition: bool = None)
def periodic_check(self, event: Payload) -> None:
""" Periodic task used to check if remote Supvisors instances are still active.
This is also the main event on this state machine. """
process_failures = self.context.on_timer_event(event)
self.logger.debug('FiniteStateMachine.on_timer_event: process_failures={}'
.format([process.namespec for process in process_failures]))
invalidated_nodes, process_failures = self.context.on_timer_event(event)
self.logger.debug('FiniteStateMachine.periodic_check: invalidated_nodes={} process_failures={}'
.format(invalidated_nodes, [process.namespec for process in process_failures]))
# inform Starter and Stopper. process_failures may be removed
self.supvisors.starter.on_nodes_invalidation(invalidated_nodes, process_failures)
self.supvisors.stopper.on_nodes_invalidation(invalidated_nodes, process_failures)
# get invalidated nodes / use next / update processes on invalidated nodes ?
self.next()
# fix failures if any (can happen after a node invalidation, a process crash or a conciliation request)
Expand All @@ -443,7 +446,7 @@ def on_tick_event(self, node_name: str, event: Payload):
self.periodic_check(event)

def on_process_state_event(self, node_name: str, event: Payload) -> None:
""" This event is used to refresh the process data related to the event and address.
""" This event is used to refresh the process data related to the event sent from the node.
This event also triggers the application starter and/or stopper.
:param node_name: the node that sent the event
Expand All @@ -453,10 +456,9 @@ def on_process_state_event(self, node_name: str, event: Payload) -> None:
process = self.context.on_process_state_event(node_name, event)
# returned process may be None if the event is linked to an unknown or an isolated node
if process:
# feed starter with event
self.supvisors.starter.on_event(process)
# feed stopper with event
self.supvisors.stopper.on_event(process)
# inform starter and stopper
self.supvisors.starter.on_event(process, node_name, event)
self.supvisors.stopper.on_event(process, node_name)
# trigger an automatic (so master only) behaviour for a running failure
# process crash triggered only if running failure strategy related to application
# Supvisors does not replace Supervisor in the present matter (use autorestart if necessary)
Expand Down
Expand Up @@ -2,7 +2,7 @@
command=python -m unittest scripts.check_start_sequence
autostart=false
autorestart=false
startsecs=30
startsecs=40
startretries=0
exitcodes=0
stopsignal=TERM
Expand Down
2 changes: 1 addition & 1 deletion supvisors/test/scripts/event_queues.py
Expand Up @@ -20,7 +20,7 @@
from time import time
from queue import Empty, Queue

from supvisors.client.subscriber import SupvisorsEventInterface, create_logger
from supvisors.client.subscriber import SupvisorsEventInterface


class SupvisorsEventQueues(SupvisorsEventInterface):
Expand Down
6 changes: 3 additions & 3 deletions supvisors/test/scripts/sequence_checker.py
Expand Up @@ -260,22 +260,22 @@ def get_nodes(self):
self.nodes = self.evloop.node_queue.get(True, 30)
self.assertGreater(len(self.nodes), 0)
except Empty:
self.fail('failed to get the nodes event in the last 30 seconds')
self.fail('failed to get the nodes event in the last 20 seconds')

def check_events(self, application_name=None):
""" Receive and check events for processes and applications. """
while self.context.has_events(application_name):
# wait for a process event
try:
data = self.evloop.process_queue.get(True, 30)
data = self.evloop.process_queue.get(True, 40)
except Empty:
self.fail('failed to get the expected events for this process')
self.check_process_event(data)
# wait for an application event
try:
data = self.evloop.application_queue.get(True, 2)
except Empty:
self.fail('failed to get the expected events for this process')
self.fail(f'failed to get the expected events for application={application_name}')
self.check_application_event(data)

def check_process_event(self, event):
Expand Down

0 comments on commit 0793d62

Please sign in to comment.