Skip to content

Commit

Permalink
Merge branch 'energy_saving_module'
Browse files Browse the repository at this point in the history
  • Loading branch information
adfaure committed Nov 7, 2023
2 parents b9d0eb1 + 63b492b commit d93b93d
Show file tree
Hide file tree
Showing 7 changed files with 256 additions and 194 deletions.
14 changes: 11 additions & 3 deletions oar/kao/meta_sched.py
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ def call_internal_scheduler(
)


def nodes_energy_saving(session, config, current_time_sec):
def nodes_energy_saving(session, config, logger, current_time_sec):
"""
Energy saving mode.
Expand All @@ -795,6 +795,8 @@ def nodes_energy_saving(session, config, current_time_sec):
nodes_2_halt = []
nodes_2_wakeup = []

logger.info("Energy saving function")

if (
("SCHEDULER_NODE_MANAGER_SLEEP_CMD" in config)
or (
Expand All @@ -805,11 +807,14 @@ def nodes_energy_saving(session, config, current_time_sec):
("SCHEDULER_NODE_MANAGER_SLEEP_TIME" in config)
and ("SCHEDULER_NODE_MANAGER_IDLE_TIME" in config)
):
logger.info("Looking for node to shutdown")
# Look at nodes that are unused for a duration
idle_duration = int(config["SCHEDULER_NODE_MANAGER_IDLE_TIME"])
sleep_duration = int(config["SCHEDULER_NODE_MANAGER_SLEEP_TIME"])

idle_nodes = search_idle_nodes(session, current_time_sec)
logger.debug(f"Idle nodes found: {idle_nodes}")

tmp_time = current_time_sec - idle_duration

# Determine nodes to halt
Expand All @@ -826,8 +831,9 @@ def nodes_energy_saving(session, config, current_time_sec):

if ("SCHEDULER_NODE_MANAGER_SLEEP_CMD" in config) or (
(config["ENERGY_SAVING_INTERNAL"] == "yes")
and ("ENERGY_SAVING_NODE_MANAGER_SLEEP_CMD" in config)
and ("ENERGY_SAVING_NODE_MANAGER_WAKE_UP_CMD" in config)
):
logger.info("Looking for node to wakeup")
# Get nodes which the scheduler wants to schedule jobs to,
# but which are in the Absent state, to wake them up
wakeup_time = int(config["SCHEDULER_NODE_MANAGER_WAKEUP_TIME"])
Expand Down Expand Up @@ -1057,7 +1063,9 @@ def extra_metasched_func(*args): # null function
#
if ("ENERGY_SAVING_MODE" in config) and config["ENERGY_SAVING_MODE"] != "":
if config["ENERGY_SAVING_MODE"] == "metascheduler_decision_making":
nodes_2_change = nodes_energy_saving(session, config, current_time_sec)
nodes_2_change = nodes_energy_saving(
session, config, logger, current_time_sec
)
elif config["ENERGY_SAVING_MODE"] == "batsim_scheduler_proxy_decision_making":
nodes_2_change = batsim_sched_proxy.retrieve_pstate_changes_to_apply()
else:
Expand Down
2 changes: 1 addition & 1 deletion oar/lib/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class Configuration(dict):
"SQLALCHEMY_MAX_OVERFLOW": None,
"LOG_LEVEL": 3,
"LOG_FILE": ":stderr:",
"LOG_FORMAT": "[%(levelname)8s] [%(asctime)s] [%(name)s]: %(message)s",
"LOG_FORMAT": "[%(levelname)8s] [%(asctime)s] [%(name)s::%(funcName)s:%(lineno)d]: %(message)s",
"OAR_SSH_CONNECTION_TIMEOUT": 120,
"SERVER_HOSTNAME": "localhost",
"SERVER_PORT": "6666",
Expand Down
16 changes: 11 additions & 5 deletions oar/lib/node.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# coding: utf-8

from typing import List

from sqlalchemy import and_, distinct, func, or_, text

import oar.lib.tools as tools
Expand Down Expand Up @@ -65,20 +67,24 @@ def search_idle_nodes(session, date):

busy_nodes = {} # TODO can be remove ? to replace by busy_nodes = result
for network_address in result:
logger.debug(f"{network_address}")
busy_nodes[network_address[0]] = True

result = (
query = (
session.query(Resource.network_address, func.max(Resource.last_job_date))
.filter(Resource.state == "Alive")
.filter(Resource.network_address != "")
.filter(Resource.type == "default")
.filter(Resource.available_upto < 2147483647)
.filter(Resource.available_upto > 0)
.group_by(Resource.network_address)
.all()
)

logger.debug(f"idle nodes query: {query}")
result = query.all()
logger.debug(f"idle nodes query: {result}")
idle_nodes = {}

for x in result:
network_address, last_job_date = x
if network_address not in busy_nodes:
Expand Down Expand Up @@ -177,15 +183,15 @@ def get_nodes_that_can_be_waked_up(session, date):
return [r[0] for r in result]


def get_nodes_with_given_sql(session, properties):
def get_nodes_with_given_sql(session, properties) -> List[str]:
"""Gets the nodes list with the given sql properties"""
result = (
session.query(Resource.network_address)
session.query(Resource.network_address, Resource.state, Resource.next_state)
.distinct()
.filter(text(properties))
.all()
)
return [r[0] for r in result]
return [r for r in result]


def set_node_state(session, hostname, state, finaud_tag, config):
Expand Down
7 changes: 5 additions & 2 deletions oar/lib/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,8 @@ def pingchecker_exec_command(
output = out.decode()
error = err.decode() # noqa TODO: not used

log.debug(f"out: {output}, err: {error}")

for line in output.split("\n"):
host = filter_output(*(line, ip2hostname))
if host and host in bad_hosts:
Expand Down Expand Up @@ -970,11 +972,12 @@ def get_oarexecuser_script_for_oarsub(
return script


def check_process(pid):
def check_process(pid, logger):
"""Check for the existence process."""
try:
os.kill(pid, 0)
except OSError:
except OSError as error:
logger.info(f"checking process error: {error}")
return False
else:
return True
44 changes: 37 additions & 7 deletions oar/modules/almighty.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,41 @@ def launch_command(command):
return return_code


def start_hulot():
def start_hulot() -> tools.Popen:
"""Start :mod:`oar.kao.hulot`"""
return tools.Popen(hulot_command)
command = hulot_command
logger.debug("Launching command : [" + command + "]")

hulot = tools.Popen(command)
try:
stdout, stderr = hulot.communicate(timeout=10)
logger.info(f"hulot: {stdout}\n{stderr}")
except Exception as e:
logger.info(f"hulot: {e}")
pass

return hulot

def check_hulot(hulot):

def check_hulot(hulot, logger):
"""Check the presence hulot process"""
return tools.check_process(hulot.pid)
logger.debug(f"checking if Hulot is still alive: pid:{hulot.pid}")

res = tools.check_process(hulot.pid, logger)

try:
stdout, stderr = hulot.communicate(timeout=0)
logger.info(f"hulot communicated: {stdout}\n{stderr}")
except Exception as e:
logger.info(f"hulot exception: {e}")
pass

# stdout, stderr = hulot.communicate(timeout=0)
# logger.info(f"hulot: {stdout}\n{stderr}")

logger.info(f"res: {res}")
# return res
return True


#
Expand Down Expand Up @@ -184,7 +211,9 @@ def __init__(self):
# Starting of Hulot, the Energy saving module
self.hulot = None
if self.config["ENERGY_SAVING_INTERNAL"] == "yes":
logger.info("Energy saving internal mode: Starting up Hulot")
self.hulot = start_hulot()
logger.info(f"{self.hulot}")

self.lastscheduler = 0
self.lastvillains = 0
Expand Down Expand Up @@ -321,6 +350,7 @@ def read_commands(self, timeout=read_commands_timeout): # TODO
if remaining != max_successive_read:
timeout = 0
if command is None:
logger.debug(f"qget command: {command}")
break
self.add_command(command["cmd"])
remaining -= 1
Expand Down Expand Up @@ -348,9 +378,9 @@ def run(self, loop=True):
return 10

# We check Hulot
if self.hulot and not check_hulot(self.hulot):
if self.hulot and not check_hulot(self.hulot, logger):
logger.warning("Energy saving module (hulot) died. Restarting it.")
start_hulot(self)
self.hulot = start_hulot()
# QGET
elif self.state == "Qget":
# if len(self.command_queue) > 0:
Expand Down Expand Up @@ -407,7 +437,7 @@ def run(self, loop=True):
# Launch the scheduler
# We check Hulot just before starting the scheduler
# because if the pipe is not read, it may freeze oar
if (energy_pid > 0) and not check_hulot():
if (energy_pid > 0) and not check_hulot(self.hulot, logger):
logger.warning(
"Energy saving module (hulot) died. Restarting it."
)
Expand Down
Loading

0 comments on commit d93b93d

Please sign in to comment.