From 0793d621b4e96021d828141bb9a221345be41b0a Mon Sep 17 00:00:00 2001 From: Julien LE CLEACH Date: Thu, 9 Dec 2021 19:47:48 +0100 Subject: [PATCH] on node invalidation, check the impact on the jobs in progress in Starter / Stopper --- CHANGES.md | 2 + supvisors/commander.py | 226 +++++++++++++----- supvisors/context.py | 37 +-- supvisors/process.py | 37 +-- supvisors/statemachine.py | 18 +- .../test/program_check_start_sequence.ini | 2 +- supvisors/test/scripts/event_queues.py | 2 +- supvisors/test/scripts/sequence_checker.py | 6 +- supvisors/tests/test_commander.py | 221 ++++++++++++----- supvisors/tests/test_context.py | 5 +- supvisors/tests/test_statemachine.py | 35 ++- supvisors/ttypes.py | 3 +- 12 files changed, 420 insertions(+), 174 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index a9f5e568..8bb8d7c8 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -15,6 +15,8 @@ * Fix issue in statistics compiler when network interfaces are dynamically created / removed. +* When a node is invalidated, check the impact on the jobs in progress in `Starter` / `Stopper`. + * The option `rules_file` is updated to `rules_files` and supports multiple files for **Supvisors** rules. The option `rules_file` is thus obsolete and will be removed in next version. diff --git a/supvisors/commander.py b/supvisors/commander.py index e7b209de..ed02f22a 100644 --- a/supvisors/commander.py +++ b/supvisors/commander.py @@ -19,15 +19,15 @@ import time -from typing import Any, Dict, List, Optional, Set +from typing import Any, Dict, List, Set from supervisor.childutils import get_asctime from supervisor.states import ProcessStates from .application import ApplicationStatus -from .process import ProcessStatus +from .process import ProcessRules, ProcessStatus from .strategy import get_node, LoadRequestMap -from .ttypes import StartingStrategies, StartingFailureStrategies +from .ttypes import NameList, Payload, StartingStrategies, StartingFailureStrategies class ProcessCommand(object): @@ -35,7 +35,7 @@ class ProcessCommand(object): Attributes are: - process: the process wrapped, - - node_name: the node to which the command is requested, + - node_names: the nodes where the commands are requested, - request_time: the date when the command is requested, - strategy: the strategy used to start the process if applicable, - distributed: set to False if the process belongs to an application that cannot be distributed, @@ -51,7 +51,7 @@ def __init__(self, process: ProcessStatus, strategy: StartingStrategies = None) :param strategy: the applicable starting strategy """ self.process: ProcessStatus = process - self.node_name: Optional[str] = None + self.node_names: NameList = [] self.request_time: int = 0 # the following attributes are only for Starter self.strategy: StartingStrategies = strategy @@ -64,10 +64,10 @@ def __str__(self) -> str: :return: the printable process command """ - return 'process={} state={} last_event_time={} requested_node={} request_time={} strategy={}' \ + return 'process={} state={} last_event_time={} node_names={} request_time={} strategy={}' \ ' distributed=True ignore_wait_exit={} extra_args="{}"' \ .format(self.process.namespec, self.process.state, self.process.last_event_time, - self.node_name, self.request_time, self.strategy.value if self.strategy else 'None', + self.node_names, self.request_time, self.strategy.value if self.strategy else 'None', self.ignore_wait_exit, self.extra_args) def timed_out(self, now: int) -> bool: @@ -185,14 +185,17 @@ def printable_command_list(commands: CommandList) -> PrintableCommandList: return [command.process.namespec for command in commands] @staticmethod - def get_process_command(process: ProcessStatus, jobs: CommandList) -> ProcessCommand: + def get_process_command(node_name: str, process_name: str, jobs: CommandList) -> ProcessCommand: """ Get the process wrapper from list. - :param process: the process status to search + :param node_name: the process status to search + :param process_name: the name of the process search :param jobs: the process command list :return: the process command found """ - return next((command for command in jobs if command.process is process), None) + return next((command for command in jobs + if node_name in command.node_names and command.process.process_name == process_name), + None) def trigger_jobs(self) -> None: """ Triggers the next sequence from the jobs planned (start or stop). @@ -318,6 +321,84 @@ def is_job_completed(self, condition: str, process_state: ProcessStates) -> bool # return True when no more pending jobs return not self.in_progress() + def on_nodes_invalidation(self, invalidated_nodes: NameList, process_failures: Set[ProcessStatus]) -> None: + """ Clear the jobs in progress if requests are pending on the nodes recently declared SILENT. + Indeed, this has to be considered as a starting failure, not a running failure. + Typically, this may happen only if the processes were STARTING, BACKOFF or STOPPING on the lost nodes. + Other states would have removed them from the current_jobs list. + + Clear the processes from process_failures if their starting or stopping is planned. + An additional automatic behaviour on the same entity may not be suitable or even consistent. + This case may seem a bit far-fetched but it has already happened actually in a degraded environment: + - let N1 and N2 be 2 running nodes ; + - let P be a process running on N2 ; + - N2 is lost (let's assume a network congestion, NOT a node crash) so P becomes FATAL ; + - P is requested to restart on N1 (automatic strategy, user action, etc) ; + - while P is still in planned jobs, N2 comes back and thus P becomes RUNNING again ; + - N2 gets lost again. P becomes FATAL again whereas its starting on N1 is still in the pipe. + + :param invalidated_nodes: the nodes that have just been declared SILENT + :param process_failures: the processes that were running on these nodes and declared in failure + :return: None + """ + # clear the invalidated nodes from the pending requests + for application_name, command_list in list(self.current_jobs.items()): + for command in command_list: + command.node_names = [node_name for node_name in command.node_names + if node_name not in invalidated_nodes] + # if no more pending request, declare a starting failure on the process + # and remove from running process_failures set + if not command.node_names: + self.process_failure(command.process) + process_failures.remove(command.process) + # rebuild the command list + jobs = [command for command in command_list if command.node_names] + self.current_jobs[application_name] = jobs + if not jobs: + self.after_event(application_name) + # don't trigger an automatic behaviour on processes in failure when jobs are already planned + self.clear_process_failures(process_failures) + + def clear_process_failures(self, process_failures: Set[ProcessStatus]) -> None: + """ Clear the processes from process_failures if their starting or stopping is planned. + An additional automatic behaviour on the same entity may not be suitable or even consistent. + + This case may seem a bit far-fetched but it has already happened actually in a degraded environment: + - let N1 and N2 be 2 running nodes ; + - let P be a process running on N2 ; + - N2 is lost (let's assume a network congestion, NOT a node crash) so P becomes FATAL ; + - P is requested to restart on N1 (automatic strategy, user action, etc) ; + - while P is still in planned jobs, N2 comes back and thus P becomes RUNNING again ; + - N2 gets lost again. P becomes FATAL again whereas its starting on N1 is still in the pipe. + + :param process_failures: the processes declared in failure + :return: None + """ + for process in list(process_failures): + # clear process failure if the process is included in the planned jobs in progress + self.clear_process_failure(process, self.planned_jobs, process_failures) + # clear process failure if the process is included in the planned sequence + for planned_jobs in self.planned_sequence.values(): + self.clear_process_failure(process, planned_jobs, process_failures) + + @staticmethod + def clear_process_failure(process: ProcessStatus, planned_jobs: PlannedJobs, + process_failures: Set[ProcessStatus]) -> None: + """ Clear the process from process_failures if its starting or stopping is planned. + + :param process: the process declared in failure + :param planned_jobs: a dictionary of planned jobs + :param process_failures: the original list of processes declared in failure + :return: None + """ + planned_application_jobs = planned_jobs.get(process.application_name, {}) + planned_process_jobs = [command.process + for commands in planned_application_jobs.values() + for command in commands] + if process in planned_process_jobs: + # process command is planned so just wait + process_failures.remove(process) + def after_event(self, application_name: str) -> None: """ This method is called when the last event has been received for the current jobs linked to the application. Trigger the next application sub-sequence. @@ -348,6 +429,13 @@ def after_jobs(self, application_name: str) -> None: :return: None """ + def process_failure(self, process: ProcessStatus) -> None: + """ Special processing when the process job has failed. + + :param process: the process structure + :return: None + """ + class Starter(Commander): """ Class handling the starting of processes and applications. """ @@ -456,48 +544,58 @@ def is_starting_completed(self) -> bool: """ return self.is_job_completed('stopped', ProcessStates.FATAL) - def on_event(self, process) -> None: + def on_event(self, process: ProcessStatus, node_name: str, event: Payload) -> None: """ Triggers the following of the start sequencing, depending on the new process status. """ try: # first check if event is in the sequence logic, i.e. it corresponds to a process in current jobs jobs = self.current_jobs[process.application_name] - command = self.get_process_command(process, jobs) + command = self.get_process_command(node_name, process.process_name, jobs) assert command except (KeyError, AssertionError): # otherwise, check if event impacts the starting sequence - self.on_event_out_of_sequence(process) + self.logger.debug('Starter.on_event: event {} from node={} does not match the current starting sequence' + .format(event, node_name)) + self.on_event_out_of_sequence(process, event) else: - self.on_event_in_sequence(command, jobs) + self.on_event_in_sequence(command, event, jobs) + + def on_event_in_sequence(self, command: ProcessCommand, event: Payload, jobs: Commander.CommandList) -> None: + """ Manages the impact of an event that is part of the starting sequence. - def on_event_in_sequence(self, command, jobs) -> None: - """ Manages the impact of an event that is part of the starting sequence. """ + :param command: the process wrapper used in the jobs + :param event: the process event received + :param jobs: the jobs list being considered + :return: None + """ process = command.process - if process.state in (ProcessStates.STOPPED, ProcessStates.STOPPING, ProcessStates.UNKNOWN): - # unexpected event in a starting phase: - # someone has requested to stop the process as it is starting + process_state, expected_exit = event['state'], event['expected'] + if process_state in (ProcessStates.STOPPED, ProcessStates.STOPPING, ProcessStates.UNKNOWN): + # unexpected event in a starting phase: someone has requested to stop the process as it is starting # remove from inProgress + # note that STOPPED should be impossible as it wouldn't be compliant to ProcessStates transitions logic + # UNKNOWN is unlikely as it corresponds to an internal supervisord error jobs.remove(command) # decide to continue starting or not self.process_failure(process) - elif process.state == ProcessStates.STARTING: + elif process_state == ProcessStates.STARTING: # on the way pass - elif command.process.state == ProcessStates.RUNNING: + elif process_state == ProcessStates.RUNNING: # if not exit expected, job done. otherwise, wait if not process.rules.wait_exit or command.ignore_wait_exit: jobs.remove(command) - elif process.state == ProcessStates.BACKOFF: + elif process_state == ProcessStates.BACKOFF: # something wrong happened, just wait self.logger.warn('Starter.on_event_in_sequence: problems detected with {}'.format(process.namespec)) - elif process.state == ProcessStates.EXITED: + elif process_state == ProcessStates.EXITED: # remove from inProgress jobs.remove(command) # an EXITED process is accepted if wait_exit is set - if process.rules.wait_exit and process.expected_exit: + if process.rules.wait_exit and expected_exit: self.logger.info('Starter.on_event_in_sequence: expected exit for {}'.format(process.namespec)) else: self.process_failure(process) - elif process.state == ProcessStates.FATAL: + elif process_state == ProcessStates.FATAL: # remove from inProgress jobs.remove(command) # decide to continue starting or not @@ -506,7 +604,7 @@ def on_event_in_sequence(self, command, jobs) -> None: if not jobs: self.after_event(process.application_name) - def on_event_out_of_sequence(self, process: ProcessStatus) -> None: + def on_event_out_of_sequence(self, process: ProcessStatus, event: Payload) -> None: """ Manages the impact of a crash event that is out of the starting sequence. Note: Keeping in mind the possible origins of the event: @@ -515,16 +613,17 @@ def on_event_out_of_sequence(self, process: ProcessStatus) -> None: * a request performed on a remote Supvisors, let's consider the following cases: 1) The application is in the planned sequence, or process is in the planned jobs. - => do nothing, give a chance to this Starter. + => do nothing, give a chance to the Starter. 2) The process is NOT in the application planned jobs. The process was likely started previously in the sequence of this Starter, and it crashed after its RUNNING state but before the application is fully started. => apply starting failure strategy through basic process_failure 3) The application is NOT handled in this Starter - => running failure strategy to be applied outside of here + => running failure strategy to be considered outside of the Starter scope """ # find the conditions of case 2 - if process.crashed() and process.application_name in self.planned_jobs: + has_crashed = ProcessStatus.is_crashed_event(event['state'], event['expected']) + if has_crashed and process.application_name in self.planned_jobs: planned_application_jobs = self.planned_jobs[process.application_name] planned_process_jobs = [command.process for commands in planned_application_jobs.values() @@ -581,8 +680,7 @@ def prepare_application_jobs(self, application_name, application: ApplicationSta # * application is distributed, so node_name has to be resolved later in process_jobs # * application is not distributed and no applicable node_name has been found command.distributed = False - command.node_name = node_name - # TODO: could node_name be resolved here, even for distributed applications ? + command.node_names.append(node_name) def process_job(self, command: ProcessCommand, jobs: Commander.CommandList) -> bool: """ Start the process on the relevant node. @@ -598,14 +696,19 @@ def process_job(self, command: ProcessCommand, jobs: Commander.CommandList) -> b # node_name has already been decided for a non-distributed application if command.distributed: # find node iaw strategy - command.node_name = get_node(self.supvisors, command.strategy, process.possible_nodes(), - process.rules.expected_load, self.get_load_requests()) - if command.node_name: + node_name = get_node(self.supvisors, command.strategy, process.possible_nodes(), + process.rules.expected_load, self.get_load_requests()) + self.logger.debug('Starter.process_job: found node={} to start process={} with strategy={}' + .format(node_name, process.namespec, command.strategy.name)) + if node_name: + command.node_names.append(node_name) + if command.node_names: + # in Starter, only one node in node_names + node_name = command.node_names[0] # use asynchronous xml rpc to start program - self.supvisors.zmq.pusher.send_start_process(command.node_name, process.namespec, command.extra_args) - self.logger.debug('Starter.process_job: {} requested to start on {} (strategy={}) at {}' - .format(process.namespec, command.node_name, command.strategy.name, - get_asctime(command.request_time))) + self.supvisors.zmq.pusher.send_start_process(node_name, process.namespec, command.extra_args) + self.logger.info('Starter.process_job: {} requested to start on {} at {}' + .format(process.namespec, node_name, get_asctime(command.request_time))) # push command into current jobs command.request_time = time.time() jobs.append(command) @@ -618,31 +721,34 @@ def process_job(self, command: ProcessCommand, jobs: Commander.CommandList) -> b # return True when the job is queued return queued - def process_failure(self, process): + def process_failure(self, process: ProcessStatus) -> None: """ Updates the start sequence when a process could not be started. """ - application_name = process.application_name # impact of failure on application starting if process.rules.required: - self.logger.warn('Starter.process_failure: starting failed for required {}'.format(process.process_name)) + self.logger.warn('Starter.process_failure: starting failed for required {}'.format(process.namespec)) # get starting failure strategy of related application - application = self.supvisors.context.applications[application_name] + application = self.supvisors.context.applications[process.application_name] failure_strategy = application.rules.starting_failure_strategy # apply strategy if failure_strategy == StartingFailureStrategies.ABORT: - self.logger.error('Starter.process_failure: abort starting of application {}'.format(application_name)) + self.logger.warn('Starter.process_failure: abort starting of application {}' + .format(process.application_name)) # remove failed application from starting # do not remove application from current_jobs as requests have already been sent - self.planned_jobs.pop(application_name, None) + self.planned_jobs.pop(process.application_name, None) elif failure_strategy == StartingFailureStrategies.STOP: - self.logger.warn('Starter.process_failure: stop application={} requested'.format(application_name)) - self.planned_jobs.pop(application_name, None) + self.logger.warn('Starter.process_failure: stop application={} requested' + .format(process.application_name)) + self.planned_jobs.pop(process.application_name, None) # defer stop application so that any job in progress are not missed - self.application_stop_requests.append(application_name) + self.application_stop_requests.append(process.application_name) else: - self.logger.warn('Starter.process_failure: continue starting application {}'.format(application_name)) + self.logger.info('Starter.process_failure: continue starting application {}' + .format(process.application_name)) else: - self.logger.warn('Starter.process_failure: starting failed for optional {}'.format(process.process_name)) - self.logger.warn('Starter.process_failure: continue starting application {}'.format(application_name)) + self.logger.warn('Starter.process_failure: starting failed for optional {}'.format(process.namespec)) + self.logger.info('Starter.process_failure: continue starting application {}' + .format(process.application_name)) def after_jobs(self, application_name: str) -> None: """ Trigger any pending application stop once all application jobs are completed. @@ -668,14 +774,16 @@ def get_load_requests(self) -> LoadRequestMap: for command in command_list: # if process is not stopped, its loading is already considered in AddressStatus if command.process.stopped(): - load_request_map.setdefault(command.node_name, []).append(command.process.rules.expected_load) + node_name = command.node_names[0] + load_request_map.setdefault(node_name, []).append(command.process.rules.expected_load) # for non-distributed applications, add loading of planned jobs for application_sequence in self.planned_jobs.values(): for command_list in application_sequence.values(): for command in command_list: # if process is not stopped, its loading is already considered in AddressStatus if not command.distributed and command.process.stopped(): - load_request_map.setdefault(command.node_name, []).append(command.process.rules.expected_load) + node_name = command.node_names[0] + load_request_map.setdefault(node_name, []).append(command.process.rules.expected_load) return {node_name: sum(load_list) for node_name, load_list in load_request_map.items() if node_name} @@ -748,6 +856,7 @@ def process_job(self, command: ProcessCommand, jobs): for node_name in process.running_nodes: self.logger.info('Stopper.process_job: stopping process {} on {}'.format(process.namespec, node_name)) self.supvisors.zmq.pusher.send_stop_process(node_name, process.namespec) + command.node_names.append(node_name) # push to jobs and timestamp process command.request_time = time.time() self.logger.debug('Stopper.process_job: {} requested to stop at {}' @@ -764,20 +873,19 @@ def is_stopping_completed(self) -> bool: """ return self.is_job_completed('running', ProcessStates.STOPPED) - def on_event(self, process: ProcessStatus) -> None: + def on_event(self, process: ProcessStatus, node_name: str) -> None: """ Triggers the following of the stop sequencing, depending on the new process status. """ # check if process event has an impact on stopping in progress if process.application_name in self.current_jobs: jobs = self.current_jobs[process.application_name] self.logger.debug('Stopper.on_event: jobs={}'.format(self.printable_command_list(jobs))) - command = self.get_process_command(process, jobs) + command = self.get_process_command(node_name, process.process_name, jobs) if command: if process.running(): - # several cases: - # 1) expected upon conciliation of a conflicting process - # 2) expected when stopping unmanaged applications with multiple running instances of a program - # 3) concurrent stopping / starting - self.logger.warn('Stopper.on_event: {} still running when stopping'.format(process.namespec)) + # two cases: + # 1) stopping applications with multiple running instances of a program (unmanaged or conflict) + # 2) concurrent stopping / starting from multiple Supervisors + self.logger.debug('Stopper.on_event: {} still running when stopping'.format(process.namespec)) elif process.stopped(): # goal reached, whatever the state jobs.remove(command) diff --git a/supvisors/context.py b/supvisors/context.py index a27e21cc..ab19f62d 100644 --- a/supvisors/context.py +++ b/supvisors/context.py @@ -290,33 +290,38 @@ def on_tick_event(self, node_name: str, event: Payload): # publish AddressStatus event self.supvisors.zmq.publisher.send_node_status(status.serial()) - def on_timer_event(self, event: Payload) -> Set[ProcessStatus]: + def on_timer_event(self, event: Payload) -> Tuple[NameList, Set[ProcessStatus]]: """ Check that all Supvisors instances are still publishing. - Supvisors considers that there a Supvisors instance is not active if no tick received in last 10s. """ - process_failures = set({}) # strange but avoids IDE warning + Supvisors considers that there a Supvisors instance is not active if no tick received in last 10s. + + :param event: the local tick event + :return: the invalidated nodes and the processes in failure + """ + invalidated_nodes, process_failures = [], set({}) # strange but avoids IDE warning on annotations # find all nodes that did not send their periodic tick - current_time = time() + current_time = event['when'] self.local_sequence_counter = event['sequence_counter'] # do not check for invalidation before synchro_timeout if (current_time - self.start_date) > self.supvisors.options.synchro_timeout: # get publisher publisher = self.supvisors.zmq.publisher # check all nodes - for status in self.nodes.values(): - if status.state == AddressStates.UNKNOWN: + for node_status in self.nodes.values(): + if node_status.state == AddressStates.UNKNOWN: # invalid unknown nodes # nothing to do on processes as none received yet - self.invalid(status) - elif status.inactive(self.local_sequence_counter): + self.invalid(node_status) + elif node_status.inactive(self.local_sequence_counter): # invalid silent nodes - self.invalid(status) + self.invalid(node_status) + invalidated_nodes.append(node_status.node_name) # for processes that were running on node, invalidate node in process - # WARN: it has been decided NOT to remove the node payload from the ProcessStatus and NOT to remove + # WARN: decision is made NOT to remove the node payload from the ProcessStatus and NOT to remove # the ProcessStatus from the Context if no more node payload left. # The aim is to keep a trace in the Web UI about the application processes that have been lost # and their related description. - process_failures.update({process for process in status.running_processes() - if process.invalidate_node(status.node_name)}) + process_failures.update({process for process in node_status.running_processes() + if process.invalidate_node(node_status.node_name)}) # publish process status in failure for process in process_failures: publisher.send_process_status(process.serial()) @@ -327,8 +332,8 @@ def on_timer_event(self, event: Payload) -> Set[ProcessStatus]: # application.update_sequences() application.update_status() publisher.send_application_status(application.serial()) - # return all processes that declare a failure - return process_failures + # return all invalidated nodes and processes that declare a failure + return invalidated_nodes, process_failures def on_process_removed_event(self, node_name: str, event: Payload) -> None: """ Method called upon reception of a process removed event from the remote Supvisors instance. @@ -377,9 +382,9 @@ def on_process_state_event(self, node_name: str, event: Payload) -> Optional[Pro :return: None """ if self.supvisors.address_mapper.valid(node_name): - status = self.nodes[node_name] + node_status = self.nodes[node_name] # accept events only in RUNNING state - if status.state == AddressStates.RUNNING: + if node_status.state == AddressStates.RUNNING: self.logger.debug('Context.on_process_event: got event {} from node={}'.format(event, node_name)) # get internal data application = self.applications[event['group']] diff --git a/supvisors/process.py b/supvisors/process.py index 704ae6b8..99e4f6ea 100644 --- a/supvisors/process.py +++ b/supvisors/process.py @@ -50,7 +50,6 @@ def __init__(self, supvisors: Any) -> None: :param supvisors: the global Supvisors structure. """ - # TODO: think about adding a period for tasks (period > startsecs / autorestart = False) # keep a reference to the Supvisors global structure self.supvisors = supvisors self.logger: Logger = supvisors.logger @@ -314,7 +313,15 @@ def crashed(self) -> bool: :return: the crash status of the process """ - return self.state == ProcessStates.FATAL or (self.state == ProcessStates.EXITED and not self.expected_exit) + return ProcessStatus.is_crashed_event(self.state, self.expected_exit) + + @staticmethod + def is_crashed_event(state: ProcessStates, expected_exit: bool) -> bool: + """ Return True if the process has crashed or has exited unexpectedly. + + :return: the crash status of the process + """ + return state == ProcessStates.FATAL or (state == ProcessStates.EXITED and not expected_exit) def stopped(self) -> bool: """ Return True if the process is stopped, as designed in Supervisor. @@ -374,7 +381,7 @@ def get_last_description(self) -> Tuple[Optional[str], str]: :return: the node where the description comes, the process state description """ - self.logger.trace('ProcessStatus.get_last_description: START namespec={}'.format(self.namespec)) + self.logger.trace(f'ProcessStatus.get_last_description: START namespec={self.namespec}') # if the state is forced, return the reason why if self.forced_state is not None: self.logger.trace('ProcessStatus.get_last_description: namespec={} - node_name=None [FORCED]description={}' @@ -423,13 +430,13 @@ def add_info(self, node_name: str, payload: Payload) -> None: # TODO: why reset extra_args ? info['extra_args'] = '' self.extra_args = '' - self.logger.trace('ProcessStatus.add_info: namespec={} - payload={} added to node_name={}' - .format(self.namespec, info, node_name)) + self.logger.trace(f'ProcessStatus.add_info: namespec={self.namespec} - payload={info}' + ' added to node_name={node_name}') # reset forced_state upon reception of new information only if not STOPPED (default state in supervisor) if self.forced_state is not None and info['state'] != ProcessStates.STOPPED: self.forced_state = None self.forced_reason = '' - self.logger.debug('ProcessStatus.add_info: namespec={} - forced_state unset'.format(self.namespec)) + self.logger.debug(f'ProcessStatus.add_info: namespec={self.namespec} - forced_state unset') # update process status self.update_status(node_name, info['state']) @@ -449,8 +456,8 @@ def update_info(self, node_name: str, payload: Payload) -> None: self.extra_args = payload['extra_args'] # refresh internal information info = self.info_map[node_name] - self.logger.trace('ProcessStatus.update_info: namespec={} - updating info[{}]={} with payload={}' - .format(self.namespec, node_name, info, payload)) + self.logger.trace(f'ProcessStatus.update_info: namespec={self.namespec} - updating info[{node_name}]={info}' + ' with payload={payload}') info['local_time'] = self.last_event_time info.update(payload) # re-evaluate description using Supervisor function @@ -466,14 +473,14 @@ def update_info(self, node_name: str, payload: Payload) -> None: if self.forced_state is not None: self.forced_state = None self.forced_reason = None - self.logger.debug('ProcessStatus.update_info: namespec={} - forced_state unset'.format(self.namespec)) + self.logger.debug(f'ProcessStatus.update_info: namespec={self.namespec} - forced_state unset') # update / check running addresses self.update_status(node_name, new_state) - self.logger.debug('ProcessStatus.update_info: namespec={} - new info[{}]={}' - .format(self.namespec, node_name, info)) + self.logger.debug(f'ProcessStatus.update_info: namespec={self.namespec} ' + '- new info[{node_name}]={info}') else: - self.logger.warn('ProcessStatus.update_info: namespec={} - ProcessEvent rejected. Tick expected from {}' - .format(self.namespec, node_name)) + self.logger.warn(f'ProcessStatus.update_info: namespec={self.namespec} - ProcessEvent rejected.' + ' Tick expected from {node_name}') def update_times(self, address: str, remote_time: float) -> None: """ Update the internal process information when a new tick is received from the remote Supvisors instance. @@ -505,8 +512,8 @@ def invalidate_node(self, node_name: str) -> bool: :param node_name: the node from which no more information is received :return: True if process not running anywhere anymore """ - self.logger.debug('ProcessStatus.invalidate_node: namespec={} - node_name={} invalidated' - .format(self.namespec, node_name)) + self.logger.debug(f'ProcessStatus.invalidate_node: namespec={self.namespec} ' + '- node_name={node_name} invalidated') failure = False if node_name in self.running_nodes: # update process status with a FATAL payload diff --git a/supvisors/statemachine.py b/supvisors/statemachine.py index 5b0c6404..f9757bc4 100644 --- a/supvisors/statemachine.py +++ b/supvisors/statemachine.py @@ -419,9 +419,12 @@ def set_state(self, next_state: SupvisorsStates, force_transition: bool = None) def periodic_check(self, event: Payload) -> None: """ Periodic task used to check if remote Supvisors instances are still active. This is also the main event on this state machine. """ - process_failures = self.context.on_timer_event(event) - self.logger.debug('FiniteStateMachine.on_timer_event: process_failures={}' - .format([process.namespec for process in process_failures])) + invalidated_nodes, process_failures = self.context.on_timer_event(event) + self.logger.debug('FiniteStateMachine.periodic_check: invalidated_nodes={} process_failures={}' + .format(invalidated_nodes, [process.namespec for process in process_failures])) + # inform Starter and Stopper. process_failures may be removed + self.supvisors.starter.on_nodes_invalidation(invalidated_nodes, process_failures) + self.supvisors.stopper.on_nodes_invalidation(invalidated_nodes, process_failures) # get invalidated nodes / use next / update processes on invalidated nodes ? self.next() # fix failures if any (can happen after a node invalidation, a process crash or a conciliation request) @@ -443,7 +446,7 @@ def on_tick_event(self, node_name: str, event: Payload): self.periodic_check(event) def on_process_state_event(self, node_name: str, event: Payload) -> None: - """ This event is used to refresh the process data related to the event and address. + """ This event is used to refresh the process data related to the event sent from the node. This event also triggers the application starter and/or stopper. :param node_name: the node that sent the event @@ -453,10 +456,9 @@ def on_process_state_event(self, node_name: str, event: Payload) -> None: process = self.context.on_process_state_event(node_name, event) # returned process may be None if the event is linked to an unknown or an isolated node if process: - # feed starter with event - self.supvisors.starter.on_event(process) - # feed stopper with event - self.supvisors.stopper.on_event(process) + # inform starter and stopper + self.supvisors.starter.on_event(process, node_name, event) + self.supvisors.stopper.on_event(process, node_name) # trigger an automatic (so master only) behaviour for a running failure # process crash triggered only if running failure strategy related to application # Supvisors does not replace Supervisor in the present matter (use autorestart if necessary) diff --git a/supvisors/test/etc/cliche81/test/program_check_start_sequence.ini b/supvisors/test/etc/cliche81/test/program_check_start_sequence.ini index 53b034cf..37aaf190 100644 --- a/supvisors/test/etc/cliche81/test/program_check_start_sequence.ini +++ b/supvisors/test/etc/cliche81/test/program_check_start_sequence.ini @@ -2,7 +2,7 @@ command=python -m unittest scripts.check_start_sequence autostart=false autorestart=false -startsecs=30 +startsecs=40 startretries=0 exitcodes=0 stopsignal=TERM diff --git a/supvisors/test/scripts/event_queues.py b/supvisors/test/scripts/event_queues.py index 13dc3e5b..bbfb545e 100644 --- a/supvisors/test/scripts/event_queues.py +++ b/supvisors/test/scripts/event_queues.py @@ -20,7 +20,7 @@ from time import time from queue import Empty, Queue -from supvisors.client.subscriber import SupvisorsEventInterface, create_logger +from supvisors.client.subscriber import SupvisorsEventInterface class SupvisorsEventQueues(SupvisorsEventInterface): diff --git a/supvisors/test/scripts/sequence_checker.py b/supvisors/test/scripts/sequence_checker.py index 8601a80e..b46ea440 100644 --- a/supvisors/test/scripts/sequence_checker.py +++ b/supvisors/test/scripts/sequence_checker.py @@ -260,14 +260,14 @@ def get_nodes(self): self.nodes = self.evloop.node_queue.get(True, 30) self.assertGreater(len(self.nodes), 0) except Empty: - self.fail('failed to get the nodes event in the last 30 seconds') + self.fail('failed to get the nodes event in the last 20 seconds') def check_events(self, application_name=None): """ Receive and check events for processes and applications. """ while self.context.has_events(application_name): # wait for a process event try: - data = self.evloop.process_queue.get(True, 30) + data = self.evloop.process_queue.get(True, 40) except Empty: self.fail('failed to get the expected events for this process') self.check_process_event(data) @@ -275,7 +275,7 @@ def check_events(self, application_name=None): try: data = self.evloop.application_queue.get(True, 2) except Empty: - self.fail('failed to get the expected events for this process') + self.fail(f'failed to get the expected events for application={application_name}') self.check_application_event(data) def check_process_event(self, event): diff --git a/supvisors/tests/test_commander.py b/supvisors/tests/test_commander.py index ae9fe508..71518ce8 100644 --- a/supvisors/tests/test_commander.py +++ b/supvisors/tests/test_commander.py @@ -19,6 +19,7 @@ import pytest +from supervisor.states import _process_states_by_code from unittest.mock import call, Mock from supvisors.commander import * @@ -55,7 +56,7 @@ def test_str(): command.request_time = 4321 command.ignore_wait_exit = True command.extra_args = '-s test args' - assert str(command) == 'process=proc_1 state=RUNNING last_event_time=1234 requested_node=None request_time=4321'\ + assert str(command) == 'process=proc_1 state=RUNNING last_event_time=1234 node_names=[] request_time=4321'\ ' strategy=0 distributed=True ignore_wait_exit=True extra_args="-s test args"' @@ -225,6 +226,17 @@ def test_commander_printable_planned_sequence(commander, command_list_1, command 3: {'else': {}}} +def test_get_process_command(command_list_1): + """ Test the Commander.get_process_command method. """ + # test on non-existing process + assert not Commander.get_process_command('10.0.0.1', 'dummy', command_list_1) + # test on existing process and incorrect node + assert not Commander.get_process_command('10.0.0.1', 'dummy_A1', command_list_1) + # test on existing process and correct node + command_list_1[0].node_names = ['10.0.0.1'] + assert Commander.get_process_command('10.0.0.1', 'dummy_A1', command_list_1) == command_list_1[0] + + def test_commander_process_application_jobs(mocker, commander, command_list_1, command_list_2): """ Test the Commander.process_application_jobs method. """ commander.pickup_logic = min @@ -368,12 +380,80 @@ def test_commander_is_job_completed(mocker, commander, command_list): assert not mocked_trigger.called +def test_commander_on_nodes_invalidation(mocker, commander, command_list_1, command_list_2): + """ Test the Commander.on_nodes_invalidation method. """ + mocked_after = mocker.patch.object(commander, 'after_event') + mocked_failure = mocker.patch.object(commander, 'process_failure') + # prepare context + for command, node_name in zip(command_list_1, ['10.0.0.1', '10.0.0.2', '10.0.0.3']): + command.node_names = [node_name] + for command in command_list_2: + command.node_names = ['10.0.0.1'] + process_A1 = command_list_1[0].process + process_A3 = command_list_1[2].process + process_B1 = command_list_2[0].process + commander.planned_jobs = {'appli_A': {2: [command_list_1[2]]}} + commander.current_jobs = {'appli_A': command_list_1[0:2], 'appli_B': command_list_2} + # test call with invalidation of 10.0.0.1 + process_failures = {process_A1, process_B1} + commander.on_nodes_invalidation(['10.0.0.1'], process_failures) + assert mocked_failure.call_args_list == [call(process_A1), call(process_B1)] + assert mocked_after.call_args_list == [call('appli_B')] + assert process_failures == set() + + +def test_commander_clear_process_failures(mocker, commander, command_list_1, command_list_2): + """ Test the Commander.clear_process_failures method. """ + mocked_clear = mocker.patch.object(commander, 'clear_process_failure') + # prepare context + for command, node_name in zip(command_list_1, ['10.0.0.1', '10.0.0.2', '10.0.0.3']): + command.node_names = [node_name] + for command in command_list_2: + command.node_names = ['10.0.0.1'] + commander.planned_sequence = {4: {'appli_A': {2: [command_list_1[1]]}}} + commander.planned_jobs = {'appli_A': {2: [command_list_1[2]]}} + commander.current_jobs = {'appli_A': command_list_1[0], 'appli_B': command_list_2} + # test call with invalidation of 10.0.0.1 + process_A1 = command_list_1[0].process + process_B1 = command_list_2[0].process + process_failures = [process_A1, process_B1] + commander.clear_process_failures(process_failures) + assert mocked_clear.call_args_list == [call(process_A1, commander.planned_jobs, process_failures), + call(process_A1, {'appli_A': {2: [command_list_1[1]]}}, process_failures), + call(process_B1, commander.planned_jobs, process_failures), + call(process_B1, {'appli_A': {2: [command_list_1[1]]}}, process_failures)] + assert process_failures == [process_A1, process_B1] + + +def test_commander_clear_process_failure(commander, command_list_1, command_list_2): + """ Test the Commander.clear_process_failure method. """ + # prepare context + for command, node_name in zip(command_list_1, ['10.0.0.1', '10.0.0.2', '10.0.0.3']): + command.node_names = [node_name] + for command in command_list_2: + command.node_names = ['10.0.0.1'] + commander.planned_sequence = {4: {'appli_B': {2: [command_list_2[0]]}}} + commander.planned_jobs = {'appli_A': {2: [command_list_1[0]]}} + # test call with invalidation of 10.0.0.1 + process_A1 = command_list_1[0].process + process_B1 = command_list_2[0].process + process_failures = {process_A1, process_B1} + commander.clear_process_failure(process_A1, commander.planned_jobs, process_failures) + assert process_failures == {process_B1} + commander.clear_process_failure(process_A1, commander.planned_sequence[4], process_failures) + assert process_failures == {process_B1} + commander.clear_process_failure(process_B1, commander.planned_jobs, process_failures) + assert process_failures == {process_B1} + commander.clear_process_failure(process_B1, commander.planned_sequence[4], process_failures) + assert process_failures == set() + + def test_commander_after_event(mocker, commander): """ Test the Commander.after_event method. """ mocked_trigger = mocker.patch('supvisors.commander.Commander.trigger_jobs') mocked_after = mocker.patch('supvisors.commander.Commander.after_jobs') mocked_process = mocker.patch('supvisors.commander.Commander.process_application_jobs') - # prepare some context + # prepare context commander.planned_jobs = {'if': {2: []}} commander.current_jobs = {'if': [], 'then': [], 'else': []} # test after_event when there are still planned jobs for application @@ -408,6 +488,13 @@ def test_commander_after_jobs(commander): # behavior implemented in subclasses +def test_commander_process_failure(commander): + """ Test the Commander.process_failure method. """ + commander.process_failure(Mock()) + # nothing to test as method is empty + # behavior implemented in subclasses + + # Starter part @pytest.fixture def starter(supvisors): @@ -418,6 +505,8 @@ def starter(supvisors): def test_starter_create(starter): """ Test the values set at construction of Starter. """ assert isinstance(starter, Commander) + assert starter.application_stop_requests == [] + assert starter.pickup_logic([2, 1, 3]) == 1 def test_starter_store_application_start_sequence(starter, command_list): @@ -526,22 +615,28 @@ def test_starter_on_event(mocker, starter, command_list): # test that on_event_out_of_sequence is called when process # is not in current jobs due to unknown application process = Mock(application_name='unknown_application') - starter.on_event(process) + starter.on_event(process, '10.0.0.1', {'state': 200}) assert not mocked_in.called - assert mocked_out.call_args_list == [(call(process))] + assert mocked_out.call_args_list == [(call(process, {'state': 200}))] mocked_out.reset_mock() # test that on_event_out_of_sequence is called when process is not in current jobs because unknown process = Mock(application_name='sample_test_1') - starter.on_event(process) + starter.on_event(process, '10.0.0.1', {'state': 200}) assert not mocked_in.called - assert mocked_out.call_args_list == [(call(process))] + assert mocked_out.call_args_list == [(call(process, {'state': 200}))] mocked_out.reset_mock() - # test that on_event_in_sequence is called when process is in list + # test that on_event_in_sequence is called when process is in list but node_names not set jobs = starter.current_jobs['sample_test_1'] command = next(iter(jobs)) - starter.on_event(command.process) + starter.on_event(command.process, '10.0.0.1', {'state': 200}) + assert not mocked_in.called + assert mocked_out.call_args_list == [(call(command.process, {'state': 200}))] + mocked_out.reset_mock() + # test that on_event_in_sequence is called when process is in list and node_names set + command.node_names = ['10.0.0.1'] + starter.on_event(command.process, '10.0.0.1', {'state': 200}) assert not mocked_out.called - assert mocked_in.call_args_list == [(call(command, jobs))] + assert mocked_in.call_args_list == [(call(command, {'state': 200}, jobs))] def test_starter_on_event_in_sequence(mocker, starter, command_list): @@ -563,7 +658,8 @@ def test_starter_on_event_in_sequence(mocker, starter, command_list): command = get_test_command(command_list, 'xlogo') jobs = starter.current_jobs['sample_test_1'] assert command in jobs - starter.on_event_in_sequence(command, jobs) + event = {'state': ProcessStates.STOPPED, 'expected': True} + starter.on_event_in_sequence(command, event, jobs) assert not command.ignore_wait_exit assert command not in jobs assert mocked_failure.call_args_list == [call(command.process)] @@ -573,7 +669,8 @@ def test_starter_on_event_in_sequence(mocker, starter, command_list): # test STOPPING process: xclock command = get_test_command(command_list, 'xclock') assert command in jobs - starter.on_event_in_sequence(command, jobs) + event = {'state': ProcessStates.STOPPING, 'expected': True} + starter.on_event_in_sequence(command, event, jobs) assert not command.ignore_wait_exit assert command not in jobs assert mocked_failure.call_args_list == [call(command.process)] @@ -585,7 +682,8 @@ def test_starter_on_event_in_sequence(mocker, starter, command_list): assert command in jobs assert not command.process.rules.wait_exit assert not command.ignore_wait_exit - starter.on_event_in_sequence(command, jobs) + event = {'state': ProcessStates.RUNNING, 'expected': True} + starter.on_event_in_sequence(command, event, jobs) assert command not in jobs assert not mocked_failure.called assert mocked_after.call_args_list == [call('sample_test_1')] @@ -598,23 +696,25 @@ def test_starter_on_event_in_sequence(mocker, starter, command_list): command.process.rules.wait_exit = True command.ignore_wait_exit = True assert command in jobs - starter.on_event_in_sequence(command, jobs) + event = {'state': ProcessStates.RUNNING, 'expected': True} + starter.on_event_in_sequence(command, event, jobs) assert command not in jobs assert not mocked_failure.called assert not mocked_after.called # test EXITED / expected process: yeux_00 command = get_test_command(command_list, 'yeux_00') command.process.rules.wait_exit = True - command.process.expected_exit = True assert command in jobs - starter.on_event_in_sequence(command, jobs) + event = {'state': ProcessStates.EXITED, 'expected': True} + starter.on_event_in_sequence(command, event, jobs) assert command not in jobs assert not mocked_failure.called assert not mocked_after.called # test FATAL process: sleep (last process of this application) command = get_test_command(command_list, 'sleep') assert command in jobs - starter.on_event_in_sequence(command, jobs) + event = {'state': ProcessStates.FATAL, 'expected': False} + starter.on_event_in_sequence(command, event, jobs) assert not command.ignore_wait_exit assert command not in jobs assert mocked_failure.call_args_list == [call(command.process)] @@ -627,14 +727,16 @@ def test_starter_on_event_in_sequence(mocker, starter, command_list): command = get_test_command(command_list, 'late_segv') jobs = starter.current_jobs['crash'] assert command in jobs - starter.on_event_in_sequence(command, jobs) + event = {'state': ProcessStates.STARTING, 'expected': True} + starter.on_event_in_sequence(command, event, jobs) assert command in jobs assert not mocked_failure.called assert not mocked_after.called # test BACKOFF process: segv (last process of this application) command = get_test_command(command_list, 'segv') assert command in jobs - starter.on_event_in_sequence(command, jobs) + event = {'state': ProcessStates.BACKOFF, 'expected': True} + starter.on_event_in_sequence(command, event, jobs) assert command in jobs assert not mocked_failure.called assert not mocked_after.called @@ -643,9 +745,9 @@ def test_starter_on_event_in_sequence(mocker, starter, command_list): command = get_test_command(command_list, 'firefox') jobs = starter.current_jobs['firefox'] command.process.rules.wait_exit = True - command.process.expected_exit = False assert command in jobs - starter.on_event_in_sequence(command, jobs) + event = {'state': ProcessStates.EXITED, 'expected': False} + starter.on_event_in_sequence(command, event, jobs) assert 'firefox' in starter.current_jobs assert mocked_failure.call_args_list == [call(command.process)] assert mocked_after.call_args_list == [call('firefox')] @@ -659,27 +761,31 @@ def test_starter_on_event_out_of_sequence(mocker, starter, command_list): starter.current_jobs.setdefault(command.process.application_name, []).append(command) # apply patch mocked_failure = mocker.patch.object(starter, 'process_failure') - # test that process_failure is not called if process is not crashed - process = next(command.process for command in command_list - if not command.process.crashed()) - starter.on_event_out_of_sequence(process) - assert not mocked_failure.called - # test that process_failure is not called if process is not in planned jobs - process = next(command.process for command in command_list - if command.process.application_name == 'sample_test_1') - starter.on_event_out_of_sequence(process) - assert not mocked_failure.called - # get a command related to a process crashed and in planned jobs - command = next(command for command in command_list - if command.process.crashed() and command.process.application_name == 'sample_test_2') - # test that process_failure is called if process' starting is not planned - starter.on_event_out_of_sequence(command.process) - assert mocked_failure.call_args_list == [(call(command.process))] - mocked_failure.reset_mock() + # loop on all commands and states + for command in command_list: + for state in _process_states_by_code: + event = {'state': state, 'expected': True} + # test that process_failure is not called if process has not crashed + if state != ProcessStates.FATAL: + starter.on_event_out_of_sequence(command.process, event) + assert not mocked_failure.called + # test that process_failure is not called if process is not in planned jobs + if command.process.application_name == 'sample_test_1': + starter.on_event_out_of_sequence(command.process, event) + assert not mocked_failure.called + # test that process_failure is called if process' starting is not planned + if command.process.application_name == 'sample_test_2': + for state in [ProcessStates.FATAL, ProcessStates.EXITED]: + event = {'state': state, 'expected': False} + starter.on_event_out_of_sequence(command.process, event) + assert mocked_failure.call_args_list == [(call(command.process))] + mocked_failure.reset_mock() # test that process_failure is not called if process' starting is still planned - starter.planned_jobs = {'sample_test_2': {1: [command]}} - starter.on_event_out_of_sequence(command.process) - assert not mocked_failure.called + for state in _process_states_by_code: + starter.planned_jobs = {'sample_test_2': {1: [command]}} + event = {'state': state, 'expected': True} + starter.on_event_out_of_sequence(command.process, event) + assert not mocked_failure.called def test_starter_prepare_application_jobs(mocker, starter, command_list): @@ -694,7 +800,7 @@ def test_starter_prepare_application_jobs(mocker, starter, command_list): starter.planned_jobs = {'sample_test_1': {1: [get_test_command(command_list, 'xfontsel')], 2: [get_test_command(command_list, 'xlogo')]}} starter.current_jobs = {} - assert all(command.distributed and not command.node_name + assert all(command.distributed and not command.node_names for sequence in starter.planned_jobs['sample_test_1'].values() for command in sequence) # add applications to context @@ -707,7 +813,7 @@ def test_starter_prepare_application_jobs(mocker, starter, command_list): assert not mocked_app_nodes.called assert not mocked_app_load.called # commands unchanged - assert all(command.distributed and not command.node_name + assert all(command.distributed and not command.node_names for sequence in starter.planned_jobs['sample_test_1'].values() for command in sequence) # test application provided / application not distributed @@ -716,7 +822,7 @@ def test_starter_prepare_application_jobs(mocker, starter, command_list): assert mocked_node_getter.call_args_list == [call(starter.supvisors, StartingStrategies.MOST_LOADED, ['10.0.0.1', '10.0.0.2'], 27, {'10.0.0.1': 28})] # check commands - assert all(not command.distributed and command.node_name == '10.0.0.1' + assert all(not command.distributed and command.node_names == ['10.0.0.1'] for sequence in starter.planned_jobs['sample_test_1'].values() for command in sequence) @@ -757,6 +863,8 @@ def test_starter_process_job(mocker, starter, command_list): ['10.0.0.1'], 0, {'10.0.0.1': 28})] assert mocked_pusher.call_args_list == [call('10.0.0.1', 'sample_test_1:xlogo', '')] mocked_pusher.reset_mock() + # wrapper node_names is set + assert command.node_names == ['10.0.0.1'] # failure methods are not called assert not mocked_force.called assert not mocked_failure.called @@ -765,6 +873,7 @@ def test_starter_process_job(mocker, starter, command_list): # test with stopped process command = get_test_command(command_list, 'xlogo') command.ignore_wait_exit = True + command.node_names = [] jobs = [] # call the process_jobs starter.process_job(command, jobs) @@ -974,7 +1083,7 @@ def test_starter_get_load_requests(starter, command_list): assert starter.get_load_requests() == {} # fill current jobs for idx, command in enumerate(command_list): - command.node_name = '10.0.0.1' if idx % 2 else '10.0.0.2' + command.node_names = ['10.0.0.1'] if idx % 2 else ['10.0.0.2'] command.process.rules.expected_load = 10 starter.current_jobs.setdefault(command.process.application_name, []).append(command) # 4 stopped processes in context @@ -1016,6 +1125,7 @@ def test_stopper_on_event(mocker, stopper, command_list): # set context in current_jobs for command in command_list: stopper.current_jobs.setdefault(command.process.application_name, []).append(command) + command.node_names = ['10.0.0.1'] # add application context application = create_application('sample_test_1', stopper.supvisors) stopper.supvisors.context.applications['sample_test_1'] = application @@ -1023,50 +1133,53 @@ def test_stopper_on_event(mocker, stopper, command_list): stopper.supvisors.context.applications['sample_test_2'] = application # try with unknown application process = create_process({'group': 'dummy_application', 'name': 'dummy_process'}, stopper.supvisors) - stopper.on_event(process) + stopper.on_event(process, '10.0.0.1') assert not mocked_after.called # with sample_test_1 application # test STOPPED process command = get_test_command(command_list, 'xlogo') assert command in stopper.current_jobs['sample_test_1'] - stopper.on_event(command.process) + stopper.on_event(command.process, '10.0.0.1') assert command.process not in stopper.current_jobs['sample_test_1'] assert not mocked_after.called # test STOPPING process: xclock command = get_test_command(command_list, 'xclock') assert command in stopper.current_jobs['sample_test_1'] - stopper.on_event(command.process) + stopper.on_event(command.process, '10.0.0.1') assert command in stopper.current_jobs['sample_test_1'] assert not mocked_after.called # test RUNNING process: xfontsel command = get_test_command(command_list, 'xfontsel') assert command in stopper.current_jobs['sample_test_1'] - stopper.on_event(command.process) + stopper.on_event(command.process, '10.0.0.1') assert 'sample_test_1' in stopper.current_jobs.keys() assert not mocked_after.called # with sample_test_2 application # test EXITED / expected process: yeux_00 command = get_test_command(command_list, 'yeux_00') assert command in stopper.current_jobs['sample_test_2'] - stopper.on_event(command.process) + stopper.on_event(command.process, '10.0.0.1') assert command not in stopper.current_jobs['sample_test_2'] assert not mocked_after.called # test FATAL process: sleep command = get_test_command(command_list, 'sleep') + command.node_names = ['10.0.0.1'] assert command in stopper.current_jobs['sample_test_2'] - stopper.on_event(command.process) + stopper.on_event(command.process, '10.0.0.1') + assert command not in stopper.current_jobs['sample_test_2'] assert 'sample_test_2' in stopper.current_jobs.keys() assert not mocked_after.called # test RUNNING process: yeux_01 command = get_test_command(command_list, 'yeux_01') assert command in stopper.current_jobs['sample_test_2'] - stopper.on_event(command.process) + stopper.on_event(command.process, '10.0.0.1') assert command in stopper.current_jobs['sample_test_2'] assert not mocked_after.called # force yeux_01 state and re-test command.process._state = ProcessStates.STOPPED + command.node_names = ['10.0.0.1'] assert command in stopper.current_jobs['sample_test_2'] - stopper.on_event(command.process) + stopper.on_event(command.process, '10.0.0.1') assert command not in stopper.current_jobs['sample_test_2'] assert mocked_after.call_args_list == [call('sample_test_2')] # reset resources @@ -1075,13 +1188,13 @@ def test_stopper_on_event(mocker, stopper, command_list): # test STARTING process: late_segv command = get_test_command(command_list, 'late_segv') assert command in stopper.current_jobs['crash'] - stopper.on_event(command.process) + stopper.on_event(command.process, '10.0.0.1') assert command in stopper.current_jobs['crash'] assert not mocked_after.called # test BACKOFF process: segv (last process of this application) command = get_test_command(command_list, 'segv') assert command in stopper.current_jobs['crash'] - stopper.on_event(command.process) + stopper.on_event(command.process, '10.0.0.1') assert command in stopper.current_jobs['crash'] assert not mocked_after.called diff --git a/supvisors/tests/test_context.py b/supvisors/tests/test_context.py index ed702ec7..eb5c946b 100644 --- a/supvisors/tests/test_context.py +++ b/supvisors/tests/test_context.py @@ -699,7 +699,6 @@ def test_on_process_state_event(mocker, context): def test_on_timer_event(mocker, context): """ Test the handling of a timer event. """ - mocker.patch('supvisors.context.time', return_value=3600) mocked_send = context.supvisors.zmq.publisher.send_node_status # update context nodes context.nodes['127.0.0.1'].__dict__.update({'_state': AddressStates.RUNNING, 'local_sequence_counter': 31}) @@ -718,7 +717,7 @@ def test_on_timer_event(mocker, context): mocker.patch.object(context.nodes['10.0.0.2'], 'running_processes', return_value=[proc_1, proc_2]) # test when start_date is recent context.start_date = 3590 - assert context.on_timer_event({'sequence_counter': 31}) == set() + assert context.on_timer_event({'sequence_counter': 31, 'when': 3600}) == ([], set()) assert context.local_sequence_counter == 31 assert not mocked_send.called assert not proc_1.invalidate_node.called @@ -726,7 +725,7 @@ def test_on_timer_event(mocker, context): assert context.nodes['10.0.0.5'].state == AddressStates.UNKNOWN # test when synchro_timeout has passed context.start_date = 3589 - assert context.on_timer_event({'sequence_counter': 32}) == {proc_2} + assert context.on_timer_event({'sequence_counter': 32, 'when': 3600}) == (['10.0.0.2'], {proc_2}) assert context.local_sequence_counter == 32 assert context.nodes['10.0.0.5'].state == AddressStates.ISOLATING assert mocked_send.call_args_list == [call({'address_name': '10.0.0.2', 'statecode': 4, 'statename': 'ISOLATING', diff --git a/supvisors/tests/test_statemachine.py b/supvisors/tests/test_statemachine.py index 648d8518..4e394983 100644 --- a/supvisors/tests/test_statemachine.py +++ b/supvisors/tests/test_statemachine.py @@ -574,8 +574,11 @@ def test_timer_event(mocker, fsm): # apply patches proc_1 = Mock(namespec='proc_1') proc_2 = Mock(namespec='proc_2') - mocked_event = mocker.patch.object(fsm.supvisors.context, 'on_timer_event', return_value=[proc_1, proc_2]) + mocked_event = mocker.patch.object(fsm.supvisors.context, 'on_timer_event', + return_value=(['10.0.0.3'], [proc_1, proc_2])) mocked_next = mocker.patch.object(fsm, 'next') + mocked_starter = fsm.supvisors.starter.on_nodes_invalidation + mocked_stopper = fsm.supvisors.stopper.on_nodes_invalidation mocked_add = fsm.supvisors.failure_handler.add_default_job mocked_trigger = fsm.supvisors.failure_handler.trigger_jobs mocked_isolation = mocker.patch.object(fsm.supvisors.context, 'handle_isolation', @@ -587,6 +590,8 @@ def test_timer_event(mocker, fsm): fsm.periodic_check(event) # check result: marked processes are started assert mocked_event.call_args_list == [call(event)] + assert mocked_starter.call_args_list == [call(['10.0.0.3'], [proc_1, proc_2])] + assert mocked_stopper.call_args_list == [call(['10.0.0.3'], [proc_1, proc_2])] assert mocked_next.call_args_list == [call()] assert not mocked_add.called assert not mocked_trigger.called @@ -595,6 +600,8 @@ def test_timer_event(mocker, fsm): # reset mocks mocker.resetall() mocked_isolate.reset_mock() + mocked_starter.reset_mock() + mocked_stopper.reset_mock() # test when not master and no node to isolate fsm.context._is_master = True mocked_isolation.return_value = [] @@ -602,6 +609,8 @@ def test_timer_event(mocker, fsm): fsm.periodic_check(event) # check result: marked processes are started assert mocked_event.call_args_list == [call(event)] + assert mocked_starter.call_args_list == [call(['10.0.0.3'], [proc_1, proc_2])] + assert mocked_stopper.call_args_list == [call(['10.0.0.3'], [proc_1, proc_2])] assert mocked_next.call_args_list == [call()] assert mocked_add.call_args_list == [call(proc_1), call(proc_2)] assert mocked_trigger.call_args_list == [call()] @@ -649,8 +658,8 @@ def test_process_state_event(mocker, fsm): fsm.supvisors.context._is_master = False fsm.on_process_state_event('10.0.0.1', {'process_name': 'dummy_proc'}) assert mocked_ctx.call_args_list == [call('10.0.0.1', {'process_name': 'dummy_proc'})] - assert mocked_start_evt.call_args_list == [call(process)] - assert mocked_stop_evt.call_args_list == [call(process)] + assert mocked_start_evt.call_args_list == [call(process, '10.0.0.1', {'process_name': 'dummy_proc'})] + assert mocked_stop_evt.call_args_list == [call(process, '10.0.0.1')] assert not mocked_add.called # reset mocks mocked_ctx.reset_mock() @@ -663,8 +672,8 @@ def test_process_state_event(mocker, fsm): process.rules.running_failure_strategy = strategy fsm.on_process_state_event('10.0.0.1', {'process_name': 'dummy_proc'}) assert mocked_ctx.call_args_list == [call('10.0.0.1', {'process_name': 'dummy_proc'})] - assert mocked_start_evt.call_args_list == [call(process)] - assert mocked_stop_evt.call_args_list == [call(process)] + assert mocked_start_evt.call_args_list == [call(process, '10.0.0.1', {'process_name': 'dummy_proc'})] + assert mocked_stop_evt.call_args_list == [call(process, '10.0.0.1')] assert mocked_add.call_args_list == [] # reset mocks mocked_ctx.reset_mock() @@ -675,8 +684,8 @@ def test_process_state_event(mocker, fsm): process.rules.running_failure_strategy = RunningFailureStrategies.STOP_APPLICATION fsm.on_process_state_event('10.0.0.1', {'process_name': 'dummy_proc'}) assert mocked_ctx.call_args_list == [call('10.0.0.1', {'process_name': 'dummy_proc'})] - assert mocked_start_evt.call_args_list == [call(process)] - assert mocked_stop_evt.call_args_list == [call(process)] + assert mocked_start_evt.call_args_list == [call(process, '10.0.0.1', {'process_name': 'dummy_proc'})] + assert mocked_stop_evt.call_args_list == [call(process, '10.0.0.1')] assert mocked_add.call_args_list == [call(process)] # reset mocks mocked_ctx.reset_mock() @@ -689,8 +698,8 @@ def test_process_state_event(mocker, fsm): process.rules.running_failure_strategy = RunningFailureStrategies.RESTART_APPLICATION fsm.on_process_state_event('10.0.0.1', {'process_name': 'dummy_proc'}) assert mocked_ctx.call_args_list == [call('10.0.0.1', {'process_name': 'dummy_proc'})] - assert mocked_start_evt.call_args_list == [call(process)] - assert mocked_stop_evt.call_args_list == [call(process)] + assert mocked_start_evt.call_args_list == [call(process, '10.0.0.1', {'process_name': 'dummy_proc'})] + assert mocked_stop_evt.call_args_list == [call(process, '10.0.0.1')] assert mocked_add.call_args_list == [call(process)] # reset mocks mocked_ctx.reset_mock() @@ -701,8 +710,8 @@ def test_process_state_event(mocker, fsm): process.forced_state = ProcessStates.FATAL fsm.on_process_state_event('10.0.0.1', {'process_name': 'dummy_proc'}) assert mocked_ctx.call_args_list == [call('10.0.0.1', {'process_name': 'dummy_proc'})] - assert mocked_start_evt.call_args_list == [call(process)] - assert mocked_stop_evt.call_args_list == [call(process)] + assert mocked_start_evt.call_args_list == [call(process, '10.0.0.1', {'process_name': 'dummy_proc'})] + assert mocked_stop_evt.call_args_list == [call(process, '10.0.0.1')] assert not mocked_add.called # reset mocks mocked_ctx.reset_mock() @@ -714,8 +723,8 @@ def test_process_state_event(mocker, fsm): process.rules.running_failure_strategy = strategy fsm.on_process_state_event('10.0.0.1', {'process_name': 'dummy_proc'}) assert mocked_ctx.call_args_list == [call('10.0.0.1', {'process_name': 'dummy_proc'})] - assert mocked_start_evt.call_args_list == [call(process)] - assert mocked_stop_evt.call_args_list == [call(process)] + assert mocked_start_evt.call_args_list == [call(process, '10.0.0.1', {'process_name': 'dummy_proc'})] + assert mocked_stop_evt.call_args_list == [call(process, '10.0.0.1')] assert not mocked_add.called # reset mocks mocked_ctx.reset_mock() diff --git a/supvisors/ttypes.py b/supvisors/ttypes.py index 7b26569f..c870e4b2 100644 --- a/supvisors/ttypes.py +++ b/supvisors/ttypes.py @@ -18,7 +18,7 @@ # ====================================================================== from enum import Enum -from typing import Any, Dict, List, TypeVar +from typing import Any, Dict, List, Set, TypeVar from supervisor.events import Event @@ -106,3 +106,4 @@ class ProcessRemovedEvent(ProcessEvent): Payload = Dict[str, Any] PayloadList = List[Payload] NameList = List[str] +NameSet = Set[str]