Skip to content

Commit

Permalink
Merge pull request #117 from julien6387/dev-0.18
Browse files Browse the repository at this point in the history
Dev 0.17.2
  • Loading branch information
julien6387 committed Dec 3, 2023
2 parents 554d9ac + 70c58c6 commit d5b3466
Show file tree
Hide file tree
Showing 62 changed files with 1,253 additions and 945 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Expand Up @@ -8,7 +8,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: [3.6, 3.7, 3.8, 3.9, "3.10"]
python-version: [3.6, 3.7, 3.8, 3.9, "3.10", 3.11, 3.12]

steps:
- uses: actions/checkout@v3
Expand Down
21 changes: 21 additions & 0 deletions CHANGES.md
@@ -1,5 +1,26 @@
# Change Log

## 0.17.2 (2023-12-04)

* Fix rare I/O exception by joining the `SupervisorsProxy` thread before exiting the `SupvisorsMainLoop`.

* Fix rare exception when host network statistics are prepared for display in the **Supvisors** Web UI in the event
where network interfaces have different history sizes.

* Fix the Supvisors identifier possibilities when using the distribution rule `SINGLE_INSTANCE`.

* Update the process statistics collector thread so that it exits by itself when `supervisord` is killed.

* Improve the node selection when using the distribution rule `SINGLE_NODE`.

* Use an asynchronous server in the **Supvisors** internal communications.
The refactoring fixes an issue with the TCP server that sometimes wouldn't bind despite the `SO_REUSEADDR` set.

* Restore the `action` class in the HTML of the **Supvisors** Web UI.

* CI targets added for Python 3.11 and 3.12.


## 0.17 (2023-08-17)

* Fix [Issue #112](https://github.com/julien6387/supvisors/issues/112).
Expand Down
13 changes: 8 additions & 5 deletions setup.py
Expand Up @@ -21,16 +21,16 @@

from setuptools import setup, find_packages

requires = ['supervisor >= 4.2.4, < 5']
requires = ['supervisor >= 4.2.4, < 4.3']

statistics_require = ['psutil >= 5.7.3', 'pyparsing >= 2.0.2, < 3', 'matplotlib >= 3.3.3']
xml_valid_require = ['lxml >= 4.6.2']
flask_require_36 = ['flask-restx == 0.5.1', 'Werkzeug == 2.0.3']
flask_require = ['flask-restx >= 1.1.0']
flask_require_36 = ['flask-restx == 0.5.1', 'Flask < 3', 'Werkzeug == 2.0.3']
flask_require = ['flask-restx >= 1.1.0', 'Flask < 3']
zmq_require = ['pyzmq >= 20.0.0']
websockets_require = ['websockets >= 10.2']

testing_extras = ['pytest >= 2.5.2', 'pytest-cov']
testing_extras = ['pytest >= 2.5.2', 'pytest-cov', 'pytest-mock', 'pytest-asyncio']

here = os.path.abspath(os.path.dirname(__file__))
README = open(os.path.join(here, 'README.md')).read()
Expand All @@ -49,6 +49,8 @@
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: System :: Boot",
"Topic :: System :: Monitoring",
"Topic :: System :: Software Distribution"
Expand Down Expand Up @@ -79,7 +81,8 @@
'flask:python_version >= "3.7"': flask_require,
'zmq': zmq_require,
'ws:python_version >= "3.7"': websockets_require,
'all': statistics_require + xml_valid_require + flask_require + zmq_require + websockets_require,
'all:python_version < "3.7"': statistics_require + xml_valid_require + flask_require_36 + zmq_require,
'all:python_version >= "3.7"': statistics_require + xml_valid_require + flask_require + zmq_require + websockets_require,
'testing': testing_extras},
include_package_data=True,
zip_safe=False,
Expand Down
94 changes: 75 additions & 19 deletions supvisors/application.py
Expand Up @@ -24,7 +24,7 @@
from supervisor.states import ProcessStates

from .process import ProcessStatus
from .ttypes import (ApplicationStates, DistributionRules, NameList, Payload, StartingStrategies,
from .ttypes import (ApplicationStates, DistributionRules, NameList, NameSet, Payload, StartingStrategies,
StartingFailureStrategies, RunningFailureStrategies)
from .utils import WILDCARD

Expand Down Expand Up @@ -110,7 +110,7 @@ def check_hash_identifiers(self, application_name: str) -> None:
f' application_number={application_number}')
if '*' in self.hash_identifiers:
# all identifiers defined in the supvisors section of the supervisor configuration are applicable
ref_identifiers = list(self.supvisors.supvisors_mapper.instances.keys())
ref_identifiers = list(self.supvisors.mapper.instances.keys())
else:
# the subset of applicable identifiers is the hash_identifiers
ref_identifiers = self.hash_identifiers
Expand Down Expand Up @@ -272,11 +272,11 @@ def assign_at_identifiers(self) -> None:
# get instances
if WILDCARD in self.at_identifiers:
# all identifiers defined in the supvisors section of the supervisor configuration file are applicable
ref_identifiers = list(self.supvisors.supvisors_mapper.instances.keys())
ref_identifiers = list(self.supvisors.mapper.instances.keys())
else:
# the subset of applicable identifiers is the second element of rule 'identifiers'
# filter the unknown identifiers (or remaining aliases)
ref_identifiers = self.supvisors.supvisors_mapper.filter(self.at_identifiers)
ref_identifiers = self.supvisors.mapper.filter(self.at_identifiers)
self.logger.debug(f'ProcessRules.assign_at_identifiers: program={self.program_name}'
f' ref_identifiers={ref_identifiers}')
# the aim of at_identifiers is to distribute the processes over a list of Supvisors instances,
Expand Down Expand Up @@ -318,11 +318,11 @@ def assign_hash_identifiers(self) -> None:
# get instances
if WILDCARD in self.hash_identifiers:
# all identifiers defined in the supvisors section of the supervisor configuration file are applicable
ref_identifiers = list(self.supvisors.supvisors_mapper.instances.keys())
ref_identifiers = list(self.supvisors.mapper.instances.keys())
else:
# the subset of applicable identifiers is the second element of rule 'identifiers'
# filter the unknown identifiers (or remaining aliases)
ref_identifiers = self.supvisors.supvisors_mapper.filter(self.hash_identifiers)
ref_identifiers = self.supvisors.mapper.filter(self.hash_identifiers)
self.logger.debug(f'ProcessRules.assign_hash_identifiers: program={self.program_name}'
f' ref_identifiers={ref_identifiers}')
# the aim of hash_identifiers is to distribute the processes over a list of Supvisors instances, so
Expand Down Expand Up @@ -524,31 +524,87 @@ def resolve_rules(self):
self.process_groups[program].resolve_rules()

def possible_identifiers(self) -> NameList:
""" Return the list of Supervisor identifiers where the application could be started.
""" Return the list of Supervisor identifiers where the application could be started, assuming that its
processes cannot be distributed over multiple Supvisors instances.
To achieve that, three conditions:
- the Supervisor must know all the application programs ;
- the Supervisor identifier must be declared in the rules file ;
- the Supervisor shall know all the application programs (real-time configuration) ;
- the Supervisor identifier shall be allowed (explicitly or implicitly) in the rules file ;
- the programs shall not be disabled.
:return: the list of identifiers where the program could be started
:return: the list of identifiers where the program could be started.
"""
rules_identifiers = self.rules.identifiers
if '*' in self.rules.identifiers:
filtered_identifiers = list(self.supvisors.supvisors_mapper.instances.keys())
filtered_identifiers = list(self.supvisors.mapper.instances.keys())
else:
# filter the unknown identifiers (due to Supvisors discovery mode, any identifier may be lately known)
filtered_identifiers = self.supvisors.supvisors_mapper.filter(rules_identifiers)
# get the identifiers common to all application processes
actual_identifiers = [{identifier
for identifier, info in process.info_map.items()
if not info['disabled']}
for process in self.processes.values()]
filtered_identifiers = self.supvisors.mapper.filter(rules_identifiers)
# get the identifiers of all application processes
actual_identifiers: List[NameSet] = [{identifier
for identifier, info in process.info_map.items()
if not info['disabled']}
for process in self.processes.values()]
if actual_identifiers:
# the common list is the intersection of all subsets
actual_identifiers = actual_identifiers[0].intersection(*actual_identifiers)
self.logger.debug(f'ApplicationStatus.possible_identifiers: application_name={self.application_name}'
f' actual_identifiers={actual_identifiers}')
# intersect with rules
return [identifier
for identifier in actual_identifiers
if identifier in filtered_identifiers]
for identifier in filtered_identifiers
if identifier in actual_identifiers]

def possible_node_identifiers(self) -> NameList:
""" Same principle as possible_identifiers, excepted that the possibilities are built from the nodes
rather than from the strict identifiers.
Note: Some elements in the returned list of identifiers may not fit to the possibilities got from the
intermediate actual_identifiers, that is based on the real-time configurations.
:return: the list of identifiers where the program could be started.
"""
rules_identifiers = self.rules.identifiers
if '*' in self.rules.identifiers:
filtered_identifiers = list(self.supvisors.mapper.instances.keys())
else:
# filter the unknown identifiers (due to Supvisors discovery mode, any identifier may be lately known)
filtered_identifiers = self.supvisors.mapper.filter(rules_identifiers)
# get the identifiers of all application processes
actual_identifiers: List[NameSet] = [{identifier
for identifier, info in process.info_map.items()
if not info['disabled']}
for process in self.processes.values()]
self.logger.trace(f'ApplicationStatus.possible_node_identifiers: application_name={self.application_name}'
f' actual_identifiers={actual_identifiers}')
# test nodes individually
all_node_identifiers = set()
for node, identifiers_list in self.supvisors.mapper.nodes.items():
# from all node identifiers, intersect with rules identifiers
filtered_node_identifiers = {x for x in identifiers_list if x in filtered_identifiers}
self.logger.trace(f'ApplicationStatus.possible_node_identifiers: application_name={self.application_name}'
f' node={node} filtered_node_identifiers={filtered_node_identifiers}')
# check that every process has a solution on the node
config_identifiers = set()
for process_identifiers in actual_identifiers:
process_node_identifiers = filtered_node_identifiers & set(process_identifiers)
if not process_node_identifiers:
self.logger.debug('ApplicationStatus.possible_node_identifiers:'
f' application_name={self.application_name} has no solution on node={node}')
break
# add remaining identifiers to the intersections union
config_identifiers.update(process_node_identifiers)
else:
self.logger.debug('ApplicationStatus.possible_node_identifiers:'
f' application_name={self.application_name} node={node}'
f' solution={config_identifiers}')
# add the node solution to the application solutions
all_node_identifiers.update(config_identifiers)
self.logger.debug(f'ApplicationStatus.possible_node_identifiers: application_name={self.application_name}'
f' all_node_identifiers={all_node_identifiers}')
# return the node identifiers keeping the ordering defined in rules
return [identifier
for identifier in filtered_identifiers
if identifier in all_node_identifiers]

def get_instance_processes(self, identifier: str) -> ProcessList:
""" Return the list of application processes configured in the Supervisor instance.
Expand Down
Expand Up @@ -77,6 +77,7 @@ public IOBytes(float recvBytes, float sentBytes) {
*
* @param HashMap statsInfo: The untyped structure got from the XML-RPC.
*/
@SuppressWarnings({"unchecked"})
public SupvisorsHostStatistics(HashMap statsInfo) {
this.identifier = (String) statsInfo.get("identifier");
this.target_period = (Float) statsInfo.get("target_period");
Expand Down
4 changes: 2 additions & 2 deletions supvisors/commander.py
Expand Up @@ -691,12 +691,12 @@ def distribute_to_single_node(self) -> None:
commands = [process for sequence in self.planned_jobs.values() for process in sequence]
# find the applicable Supvisors instances iaw strategy
application_load = self.application.get_start_sequence_expected_load()
identifiers = self.application.possible_identifiers()
identifiers = self.application.possible_node_identifiers()
# choose the node that can support the application load
node_name = get_node(self.supvisors, self.starting_strategy, identifiers, application_load)
# intersect the identifiers running on the node and the application possible identifiers
# comprehension based on iteration over application possible identifiers to keep the CONFIG order
node_identifiers = list(self.supvisors.supvisors_mapper.nodes.get(node_name, []))
node_identifiers = list(self.supvisors.mapper.nodes.get(node_name, []))
self.identifiers = [identifier for identifier in identifiers if identifier in node_identifiers]
if self.identifiers:
self.logger.trace(f'ApplicationStartJobs.distribute_to_single_node: Supvisors={self.identifiers}'
Expand Down
19 changes: 10 additions & 9 deletions supvisors/context.py
Expand Up @@ -55,7 +55,7 @@ def __init__(self, supvisors: Any):
# the Supvisors instances declared statically
self.instances: Context.InstancesMap = {
identifier: SupvisorsInstanceStatus(supvisors_id, supvisors)
for identifier, supvisors_id in self.supvisors.supvisors_mapper.instances.items()}
for identifier, supvisors_id in self.supvisors.mapper.instances.items()}
# the applications known to Supvisors
self.applications: Context.ApplicationsMap = {}
# start time to manage end of synchronization phase
Expand Down Expand Up @@ -86,7 +86,7 @@ def external_publisher(self) -> Optional[EventPublisherInterface]:
@property
def local_identifier(self) -> str:
""" Get last local TICK sequence counter, used for node invalidation. """
return self.supvisors.supvisors_mapper.local_identifier
return self.supvisors.mapper.local_identifier

@property
def local_status(self) -> SupvisorsInstanceStatus:
Expand Down Expand Up @@ -139,7 +139,7 @@ def elect_master(self, running_identifiers: Optional[NameList] = None) -> None:
if not self.master_identifier or self.master_identifier not in running_identifiers:
# choose Master among the core instances because they are expected to be more stable
# this logic is kept independently of CORE being selected as synchro_options
core_identifiers = self.supvisors.supvisors_mapper.core_identifiers
core_identifiers = self.supvisors.mapper.core_identifiers
self.logger.debug(f'Context.elect_master: core_identifiers={core_identifiers}')
if core_identifiers:
running_core_identifiers = set(running_identifiers).intersection(core_identifiers)
Expand Down Expand Up @@ -221,14 +221,14 @@ def get_nodes_load(self) -> LoadMap:
"""
return {ip_address: sum(self.instances[identifier].get_load()
for identifier in identifiers)
for ip_address, identifiers in self.supvisors.supvisors_mapper.nodes.items()}
for ip_address, identifiers in self.supvisors.mapper.nodes.items()}

# methods on instances
def initial_running(self) -> bool:
""" Return True if all Supervisor instances are in RUNNING state. """
return all(status.state == SupvisorsInstanceStates.RUNNING
for identifier, status in self.instances.items()
if identifier in self.supvisors.supvisors_mapper.initial_identifiers)
if identifier in self.supvisors.mapper.initial_identifiers)

def all_running(self) -> bool:
""" Return True if all Supervisor instances are in RUNNING state. """
Expand All @@ -244,10 +244,11 @@ def running_core_identifiers(self) -> bool:
:return: True if all core SupvisorsInstanceStatus are in RUNNING state
"""
if self.supvisors.supvisors_mapper.core_identifiers:
core_identifiers = self.supvisors.mapper.core_identifiers
if core_identifiers:
return all(status.state == SupvisorsInstanceStates.RUNNING
for identifier, status in self.instances.items()
if identifier in self.supvisors.supvisors_mapper.core_identifiers)
if identifier in core_identifiers)
return False

def isolating_instances(self) -> NameList:
Expand Down Expand Up @@ -563,7 +564,7 @@ def on_tick_event(self, identifier: str, event: Payload) -> None:
status = self.instances[identifier]
if not status.in_isolation():
# update the Supvisors instance with the TICK event
self.supvisors.supvisors_mapper.assign_stereotypes(identifier, event['stereotypes'])
self.supvisors.mapper.assign_stereotypes(identifier, event['stereotypes'])
# for remote Supvisors instance, use local Supvisors instance data
status.update_tick(event['sequence_counter'], event['when'], self.local_sequence_counter, time.time())
# trigger hand-shake on first TICK received
Expand All @@ -584,7 +585,7 @@ def on_discovery_event(self, identifier: str, event: Payload) -> bool:
ip_address, port = event['ip_address'], event['server_port']
if identifier not in self.instances:
item = f'<{identifier}>{ip_address}:{port}:'
new_instance = self.supvisors.supvisors_mapper.add_instance(item, True)
new_instance = self.supvisors.mapper.add_instance(item, True)
self.instances[identifier] = SupvisorsInstanceStatus(new_instance, self.supvisors)
return True
return False
Expand Down

0 comments on commit d5b3466

Please sign in to comment.