Skip to content

Commit

Permalink
[module/hulot] fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
adfaure committed Nov 7, 2023
1 parent 5c5fd5c commit 54d041f
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 57 deletions.
2 changes: 1 addition & 1 deletion oar/lib/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ def pingchecker_exec_command(
output = out.decode()
error = err.decode() # noqa TODO: not used

log.debug("out: {output}, err: {error}")
log.debug(f"out: {output}, err: {error}")

for line in output.split("\n"):
host = filter_output(*(line, ip2hostname))
Expand Down
7 changes: 4 additions & 3 deletions oar/modules/almighty.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,17 @@ def check_hulot(hulot, logger):

try:
stdout, stderr = hulot.communicate(timeout=0)
logger.info(f"hulot: {stdout}\n{stderr}")
logger.info(f"hulot communicated: {stdout}\n{stderr}")
except Exception as e:
logger.info(f"hulot: {e}")
logger.info(f"hulot exception: {e}")
pass

# stdout, stderr = hulot.communicate(timeout=0)
# logger.info(f"hulot: {stdout}\n{stderr}")

logger.info(f"res: {res}")
return res
# return res
return True


#
Expand Down
25 changes: 17 additions & 8 deletions oar/modules/hulot.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
from typing import List, Union

import zmq
from sqlalchemy.orm import scoped_session, sessionmaker

import oar.lib.tools as tools
from oar.lib.configuration import Configuration
Expand All @@ -55,7 +54,6 @@
get_logger,
init_and_get_session,
init_config,
init_logger,
init_oar,
)
from oar.lib.node import (
Expand Down Expand Up @@ -235,12 +233,13 @@ def __init__(self, config, logger):
config["ENERGY_SAVING_WINDOW_FORKER_SIZE"],
config["ENERGY_SAVING_WINDOW_TIMEOUT"],
config,
logger,
)
# TODO
# my $count_cycles;
#

def run(self, loop=True):
def run(self, session=None, loop=True):
logger = self.logger
config = self.config

Expand All @@ -263,7 +262,8 @@ def wait_db():

return session

session = wait_db()
if session is None:
session = wait_db()

while True:
self.window_forker.check_executors(
Expand Down Expand Up @@ -406,7 +406,9 @@ def wait_db():
for node, cmd_info in nodes_list_to_remind.items():
if node not in nodes_list_command_running:
# move this node from reminded list to list to process
logger.debug(f"Adding '{node} => {cmd_info}' to list to process.")
logger.debug(
f"Adding '{node} => {cmd_info}' to list to process to remind."
)
nodes_list_to_process[node] = {
"command": cmd_info["command"],
"timeout": -1,
Expand Down Expand Up @@ -543,7 +545,7 @@ def wait_db():
return 0


def command_executor(cmd_node, config):
def command_executor(cmd_node, config, logger):
command, node = cmd_node
command_to_exec = 'echo "' + node + '" | '
if command == "HALT":
Expand All @@ -560,9 +562,10 @@ def command_executor(cmd_node, config):


class WindowForker(object):
def __init__(self, window_size, timeout, config):
def __init__(self, window_size, timeout, config, logger):
self.config = config
self.timeout = timeout
self.logger = logger
self.pool = Pool(processes=window_size)
self.executors = {}

Expand Down Expand Up @@ -594,23 +597,29 @@ def add_commands_toLaunch(self, session, commands):
cmd, node = cmd_node
# FIXME: Async code here ?!
self.executors[
self.pool.apply_async(command_executor, (cmd_node, self.config))
self.pool.apply_async(
command_executor, (cmd_node, self.config, self.logger)
)
] = (
node,
cmd,
tools.get_date(session),
)

self.logger.debug(f"wtf: {self.executors}")

def check_executors(self, session, config, nodes_list_running):
executors_toRemove = []
now = tools.get_date(
session,
)
for executor, data in self.executors.items():
self.logger.debug(f"Executor output: {executor}: {data}")
node, cmd, launching_date = data
if executor.ready(): # TODO executor.successful()
executors_toRemove.append(executor)
exit_status = executor.get()

if exit_status != 0:
# Suspect node if error
change_node_state(session, node, "Suspected", config)
Expand Down
94 changes: 49 additions & 45 deletions tests/modules/test_hulot.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,9 @@ def test_hulot_check_nodes_to_remind(
hulot = Hulot(config, logger)
hulot.nodes_list_to_remind = {"localhost0": {"timeout": -1, "command": "HALT"}}
exit_code = hulot.run(minimal_db_initialization, False)
print(hulot.nodes_list_running)
assert "localhost0" in hulot.nodes_list_running
assert hulot.nodes_list_running["localhost0"]["command"] == "HALT"
print(hulot.nodes_list_command_running)
assert "localhost0" in hulot.nodes_list_command_running
assert hulot.nodes_list_command_running["localhost0"]["command"] == "HALT"
assert exit_code == 0


Expand All @@ -197,9 +197,9 @@ def test_hulot_check_wakeup_for_min_nodes(
hulot = Hulot(config, logger)
exit_code = hulot.run(minimal_db_initialization, False)
config["ENERGY_SAVING_NODES_KEEPALIVE"] = prev_value
print(hulot.nodes_list_running)
assert "localhost2" in hulot.nodes_list_running
assert hulot.nodes_list_running["localhost2"]["command"] == "WAKEUP"
print(hulot.nodes_list_command_running)
assert "localhost2" in hulot.nodes_list_command_running
assert hulot.nodes_list_command_running["localhost2"]["command"] == "WAKEUP"
assert exit_code == 0


Expand All @@ -209,9 +209,9 @@ def test_hulot_halt_1(monkeypatch, setup_config, minimal_db_initialization, setu
fakezmq.recv_msgs[0] = [{"cmd": "HALT", "nodes": ["localhost0"]}]
hulot = Hulot(config, logger)
exit_code = hulot.run(minimal_db_initialization, False)
print(hulot.nodes_list_running)
assert "localhost0" in hulot.nodes_list_running
assert hulot.nodes_list_running["localhost0"]["command"] == "HALT"
print(hulot.nodes_list_command_running)
assert "localhost0" in hulot.nodes_list_command_running
assert hulot.nodes_list_command_running["localhost0"]["command"] == "HALT"
assert exit_code == 0


Expand All @@ -227,9 +227,9 @@ def test_hulot_halt_keepalive(
# import pdb; pdb.set_trace()
exit_code = hulot.run(minimal_db_initialization, False)
config["ENERGY_SAVING_NODES_KEEPALIVE"] = prev_value
print(hulot.nodes_list_running)
assert "localhost2" in hulot.nodes_list_running
assert hulot.nodes_list_running["localhost2"]["command"] == "WAKEUP"
print(hulot.nodes_list_command_running)
assert "localhost2" in hulot.nodes_list_command_running
assert hulot.nodes_list_command_running["localhost2"]["command"] == "WAKEUP"
assert exit_code == 0


Expand All @@ -243,9 +243,9 @@ def test_hulot_halt_1_forker(
hulot = Hulot(config, logger)
exit_code = hulot.run(minimal_db_initialization, False)
config["ENERGY_SAVING_WINDOW_FORKER_BYPASS"] = "yes"
print(hulot.nodes_list_running)
assert "localhost0" in hulot.nodes_list_running
assert hulot.nodes_list_running["localhost0"]["command"] == "HALT"
print(hulot.nodes_list_command_running)
assert "localhost0" in hulot.nodes_list_command_running
assert hulot.nodes_list_command_running["localhost0"]["command"] == "HALT"
assert exit_code == 0


Expand All @@ -255,9 +255,9 @@ def test_hulot_wakeup_1(monkeypatch, setup_config, minimal_db_initialization, se
fakezmq.recv_msgs[0] = [{"cmd": "WAKEUP", "nodes": ["localhost2"]}]
hulot = Hulot(config, logger)
exit_code = hulot.run(minimal_db_initialization, False)
print(hulot.nodes_list_running)
assert "localhost2" in hulot.nodes_list_running
assert hulot.nodes_list_running["localhost2"]["command"] == "WAKEUP"
print(hulot.nodes_list_command_running)
assert "localhost2" in hulot.nodes_list_command_running
assert hulot.nodes_list_command_running["localhost2"]["command"] == "WAKEUP"
assert exit_code == 0


Expand All @@ -268,10 +268,12 @@ def test_hulot_wakeup_already_timeouted(
config = setup
fakezmq.recv_msgs[0] = [{"cmd": "WAKEUP", "nodes": ["localhost2"]}]
hulot = Hulot(config, logger)
hulot.nodes_list_running = {"localhost2": {"timeout": -1, "command": "WAKEUP"}}
hulot.nodes_list_command_running = {
"localhost2": {"timeout": -1, "command": "WAKEUP"}
}
exit_code = hulot.run(minimal_db_initialization, False)
print(hulot.nodes_list_running)
assert hulot.nodes_list_running == {}
print(hulot.nodes_list_command_running)
assert hulot.nodes_list_command_running == {}
assert exit_code == 0


Expand All @@ -282,16 +284,16 @@ def test_hulot_wakeup_already_pending(
config = setup
fakezmq.recv_msgs[0] = [{"cmd": "WAKEUP", "nodes": ["localhost2"]}]
hulot = Hulot(config, logger)
hulot.nodes_list_running = {
hulot.nodes_list_command_running = {
"localhost2": {
"timeout": tools.get_date(minimal_db_initialization) + 1000,
"command": "WAKEUP",
}
}
exit_code = hulot.run(minimal_db_initialization, False)
print(hulot.nodes_list_running)
print(hulot.nodes_list_command_running)
print(hulot.nodes_list_to_remind)
assert "localhost2" in hulot.nodes_list_running
assert "localhost2" in hulot.nodes_list_command_running
assert hulot.nodes_list_to_remind == {}
assert exit_code == 0

Expand All @@ -303,16 +305,16 @@ def test_hulot_halt_wakeup_already_pending(
config = setup
fakezmq.recv_msgs[0] = [{"cmd": "HALT", "nodes": ["localhost2"]}]
hulot = Hulot(config, logger)
hulot.nodes_list_running = {
hulot.nodes_list_command_running = {
"localhost2": {
"timeout": tools.get_date(minimal_db_initialization) + 1000,
"command": "WAKEUP",
}
}
exit_code = hulot.run(minimal_db_initialization, False)
print(hulot.nodes_list_running)
print(hulot.nodes_list_command_running)
print(hulot.nodes_list_to_remind)
assert "localhost2" in hulot.nodes_list_running
assert "localhost2" in hulot.nodes_list_command_running
assert "localhost2" in hulot.nodes_list_to_remind
assert hulot.nodes_list_to_remind["localhost2"]["command"] == "HALT"
assert exit_code == 0
Expand All @@ -325,10 +327,12 @@ def test_hulot_check_clean_booted_node(
config = setup
fakezmq.recv_msgs[0] = [{"cmd": "CHECK"}]
hulot = Hulot(config, logger)
hulot.nodes_list_running = {"localhost0": {"timeout": -1, "command": "WAKEUP"}}
hulot.nodes_list_command_running = {
"localhost0": {"timeout": -1, "command": "WAKEUP"}
}
exit_code = hulot.run(minimal_db_initialization, False)
print(hulot.nodes_list_running)
assert hulot.nodes_list_running == {}
print(hulot.nodes_list_command_running)
assert hulot.nodes_list_command_running == {}
assert exit_code == 0


Expand All @@ -342,9 +346,9 @@ def test_hulot_wakeup_1_forker(
hulot = Hulot(config, logger)
exit_code = hulot.run(minimal_db_initialization, False)
config["ENERGY_SAVING_WINDOW_FORKER_BYPASS"] = "yes"
print(hulot.nodes_list_running)
assert "localhost2" in hulot.nodes_list_running
assert hulot.nodes_list_running["localhost2"]["command"] == "WAKEUP"
print(hulot.nodes_list_command_running)
assert "localhost2" in hulot.nodes_list_command_running
assert hulot.nodes_list_command_running["localhost2"]["command"] == "WAKEUP"
assert exit_code == 0


Expand All @@ -363,10 +367,10 @@ def test_hulot_command_executor(
monkeypatch, setup_config, minimal_db_initialization, setup
):
config = setup
assert command_executor(("HALT", "node1"), config) == 0
assert command_executor(("HALT", "node1"), config, logger) == 0
print(called_command)
assert called_command == 'echo "node1" | sleep_cmd'
assert command_executor(("WAKEUP", "node1"), config) == 0
assert command_executor(("WAKEUP", "node1"), config, logger) == 0
print(called_command)
assert called_command == 'echo "node1" | wakeup_cmd'

Expand All @@ -379,7 +383,7 @@ def yop(a, b=0):
def test_hulot_window_forker_check_executors(
setup_config, minimal_db_initialization, setup
):
wf = WindowForker(1, 10, setup)
wf = WindowForker(1, 10, setup, logger)
wf.executors = {
wf.pool.apply_async(yop, (0,)): (
"node1",
Expand All @@ -397,17 +401,17 @@ def test_hulot_window_forker_check_executors(
tools.get_date(minimal_db_initialization),
),
}
nodes_list_running = {
nodes_list_command_running = {
"node1": "command_and_args",
"node2": "command_and_args",
"node3": "command_and_args",
}
while True:
wf.check_executors(minimal_db_initialization, setup, nodes_list_running)
wf.check_executors(minimal_db_initialization, setup, nodes_list_command_running)
if len(wf.executors) != 3:
break
print(nodes_list_running)
assert nodes_list_running == {
print(nodes_list_command_running)
assert nodes_list_command_running == {
"node2": "command_and_args",
"node3": "command_and_args",
}
Expand All @@ -417,15 +421,15 @@ def test_hulot_window_forker_check_executors(
def test_hulot_window_forker_check_executors_timeout(
setup_config, setup, minimal_db_initialization
):
wf = WindowForker(1, 10, setup)
wf = WindowForker(1, 10, setup, logger)
wf.executors = {wf.pool.apply_async(yop, (0, 0.2)): ("localhost0", "HALT", 0)}
nodes_list_running = {"localhost0": "command_and_args"}
wf.check_executors(minimal_db_initialization, setup, nodes_list_running)
nodes_list_command_running = {"localhost0": "command_and_args"}
wf.check_executors(minimal_db_initialization, setup, nodes_list_command_running)
time.sleep(
0.5
) # To prevent deadlock when all tests are executed (due to pytest internals ?)
print(nodes_list_running)
assert nodes_list_running == {}
print(nodes_list_command_running)
assert nodes_list_command_running == {}
resource = (
minimal_db_initialization.query(Resource)
.filter(Resource.network_address == "localhost0")
Expand Down

0 comments on commit 54d041f

Please sign in to comment.