Skip to content

Commit

Permalink
[span]: wip
Browse files Browse the repository at this point in the history
  • Loading branch information
augu5te committed Sep 21, 2023
1 parent 402aaeb commit 7c25090
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 18 deletions.
4 changes: 2 additions & 2 deletions bench/save_assignments.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def save_assigns_bulk_0(jobs, resource_set):
)
pg_bulk_insert(
cursor,
db["queues"],
db["gantt_jobs_resources"],
mld_id_rid_s,
("moldable_job_id", "resource_id"),
binary=True,
Expand Down Expand Up @@ -139,7 +139,7 @@ def save_assigns_redis_pipeline_1(jobs, resource_set):

db.session.execute(GanttJobsPrediction.__table__.insert(), mld_id_start_time_s)
pipe.execute()
print("Cumlated mld_id_start_time_s.append time:", t)
print("Cumulated mld_id_start_time_s.append time:", t)


def bench_job_same(nb_job_exp=10, job_size=100, save_assign="default"):
Expand Down
4 changes: 2 additions & 2 deletions oar/kao/kamelot_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
def schedule_cycle(plt, queues=["default"]):
now = plt.get_time()

logger.info("Begin scheduling....", now)
logger.info(f"Begin scheduling.... {now}")

#
# Retrieve waiting jobs
#
waiting_jobs, waiting_jids, nb_waiting_jobs = plt.get_waiting_jobs(queues)

logger.info(waiting_jobs, waiting_jids, nb_waiting_jobs)
logger.info(f"waiting jobs: nb: {nb_waiting_jobs}, jids: {waiting_jids}")

if nb_waiting_jobs > 0:
#
Expand Down
8 changes: 4 additions & 4 deletions oar/kao/meta_sched.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
is_timesharing_for_two_jobs,
remove_gantt_resource_job,
resume_job_action,
save_assigns,
set_gantt_job_start_time,
set_job_message,
set_job_resa_state,
Expand Down Expand Up @@ -100,6 +99,7 @@
# order_part = config['SCHEDULER_RESOURCE_ORDER']



batsim_sched_proxy = None


Expand Down Expand Up @@ -242,7 +242,7 @@ def prepare_job_to_be_launched(job, current_time_sec):
# set start_time an for jobs to launch
set_job_start_time_assigned_moldable_id(job.id, current_time_sec, job.moldable_id)

# fix resource assignement
# freeze resource assignement
add_resource_job_pairs(job.moldable_id)

set_job_state(job.id, "toLaunch")
Expand Down Expand Up @@ -449,7 +449,7 @@ def check_reservation_jobs(

if ar_jobs_scheduled != []:
logger.debug("Save AR jobs' assignements in database")
save_assigns(ar_jobs_scheduled, resource_set)
plt.save_assigns(ar_jobs_scheduled, resource_set)

logger.debug("Queue " + queue_name + ": end processing of new reservations")

Expand Down Expand Up @@ -986,7 +986,7 @@ def extra_metasched_func(*args): # null function

# Update visu gantt tables
update_gantt_visualization()

import pdb; pdb.set_trace()
#
# Manage dynamic node feature for energy saving:
#
Expand Down
57 changes: 52 additions & 5 deletions oar/lib/job_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ def job_message(job, nb_resources=None):

message_list.append("Q={}".format(job.queue_name))

logger.info("save assignements")
#logger.info("save assignements") TOREMOVE !!!

message = ",".join(message_list)
if hasattr(job, "karma"):
Expand All @@ -629,7 +629,7 @@ def job_message(job, nb_resources=None):
return message


def save_assigns(jobs, resource_set):
def save_assigns_(jobs, resource_set):
# http://docs.sqlalchemy.org/en/rel_0_9/core/dml.html#sqlalchemy.sql.expression.Insert.values
if len(jobs) > 0:
logger.debug("nb job to save: " + str(len(jobs)))
Expand Down Expand Up @@ -701,12 +701,56 @@ def save_assigns_bulk(jobs, resource_set):
)
pg_bulk_insert(
cursor,
db["queues"],
db["gantt_job_resource"],
mld_id_rid_s,
("moldable_job_id", "resource_id"),
binary=True,
)

#def save_assigns_span(jobs, resource_set):
def save_assigns(jobs, resource_set): # span version
# http://docs.sqlalchemy.org/en/rel_0_9/core/dml.html#sqlalchemy.sql.expression.Insert.values
if len(jobs) > 0:
logger.debug("nb job to save: " + str(len(jobs)))
mld_id_start_time_s = []
mld_id_rid_s = []
message_updates = {}
for j in jobs.values() if isinstance(jobs, dict) else jobs:
if j.start_time > -1:
logger.debug("job_id to save: " + str(j.id))
mld_id_start_time_s.append(
{"moldable_job_id": j.moldable_id, "start_time": j.start_time}
)
riods = list(j.res_set)
mld_id_rid_s.extend(
[
{
"moldable_job_id": j.moldable_id,
"resource_id": rid_begin,
"span": span
}
for rid_begin, span in resource_set.riods_to_rid_itvs_spanned(riods)
]
)
msg = job_message(j, nb_resources=len(riods))
message_updates[j.id] = msg

if message_updates:
logger.info("save job messages")
db.session.query(Job).filter(Job.id.in_(message_updates)).update(
{
Job.message: case(
message_updates,
value=Job.id,
)
},
synchronize_session=False,
)

logger.info("save assignements")
db.session.execute(GanttJobsPrediction.__table__.insert(), mld_id_start_time_s)
db.session.execute(GanttJobsResource.__table__.insert(), mld_id_rid_s)
db.commit()

def get_current_jobs_dependencies(jobs):
# retrieve jobs dependencies *)
Expand Down Expand Up @@ -780,6 +824,8 @@ def add_resource_jobs_pairs(tuple_mld_ids): # pragma: no cover


def add_resource_job_pairs(moldable_id):
"""Set, in DB, the resource_ids attributed to job. It used during job's launch preparation.
"""
resources_mld_ids = (
db.query(GanttJobsResource)
.filter(GanttJobsResource.moldable_id == moldable_id)
Expand All @@ -789,9 +835,10 @@ def add_resource_job_pairs(moldable_id):
assigned_resources = [
{
"moldable_job_id": res_mld_id.moldable_id,
"resource_id": res_mld_id.resource_id,
"resource_id": res_mld_id.resource_id + i,
}
for res_mld_id in resources_mld_ids
for i in range(res_mld_id.span)
]

db.session.execute(AssignedResource.__table__.insert(), assigned_resources)
Expand Down Expand Up @@ -2519,7 +2566,7 @@ def get_timer_armed_job():

def archive_some_moldable_job_nodes(moldable_id, hosts):
"""Sets the index fields to LOG in the table assigned_resources"""
# import pdb; pdb.set_trace()

if config["DB_TYPE"] == "Pg":
db.query(AssignedResource).filter(
AssignedResource.moldable_id == moldable_id
Expand Down
6 changes: 3 additions & 3 deletions oar/lib/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class GanttJobsResource(db.Model):
server_default="0",
)
resource_id = db.Column(db.Integer, primary_key=True, server_default="0")

span = db.Column(db.Integer, server_default="1")

class GanttJobsResourcesLog(db.Model):
__tablename__ = "gantt_jobs_resources_log"
Expand All @@ -237,7 +237,7 @@ class GanttJobsResourcesLog(db.Model):
"moldable_job_id", db.Integer, primary_key=True, server_default="0"
)
resource_id = db.Column(db.Integer, primary_key=True, server_default="0")

span = db.Column(db.Integer, server_default="1")

class GanttJobsResourcesVisu(db.Model):
__tablename__ = "gantt_jobs_resources_visu"
Expand All @@ -250,7 +250,7 @@ class GanttJobsResourcesVisu(db.Model):
server_default="0",
)
resource_id = db.Column(db.Integer, primary_key=True, server_default="0")

span = db.Column(db.Integer, server_default="1")

class JobDependencie(db.Model):
__tablename__ = "job_dependencies"
Expand Down
49 changes: 48 additions & 1 deletion oar/lib/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,30 @@ def search_idle_nodes(date):


# TODO MOVE TO GANTT
def get_gantt_hostname_to_wake_up(date, wakeup_time):
def get_gantt_hostname_to_wake_up_(date, wakeup_time):
"""Get hostname that we must wake up to launch jobs"""
hostnames = (
db.query(Resource.network_address)
.filter(GanttJobsResource.moldable_id == GanttJobsPrediction.moldable_id)
.filter(MoldableJobDescription.id == GanttJobsPrediction.moldable_id)
.filter(Job.id == MoldableJobDescription.job_id)
.filter(GanttJobsPrediction.start_time <= date + wakeup_time)
.filter(Job.state == "Waiting")
.filter(Resource.id == GanttJobsResource.resource_id)
.filter(Resource.state == "Absent")
.filter(Resource.network_address != "")
.filter(Resource.type == "default")
.filter(
(GanttJobsPrediction.start_time + MoldableJobDescription.walltime)
<= Resource.available_upto
)
.group_by(Resource.network_address)
.all()
)
hosts = [h_tpl[0] for h_tpl in hostnames]
return hosts

def get_gantt_hostname_to_wake_up_(date, wakeup_time):
"""Get hostname that we must wake up to launch jobs"""
hostnames = (
db.query(Resource.network_address)
Expand All @@ -111,7 +134,31 @@ def get_gantt_hostname_to_wake_up(date, wakeup_time):
hosts = [h_tpl[0] for h_tpl in hostnames]
return hosts

def get_gantt_hostname_to_wake_up(date, wakeup_time):
"""Get hostname that we must wake up to launch jobs"""

#get save assignement

hostnames = (
db.query(Resource.network_address)
.filter(GanttJobsResource.moldable_id == GanttJobsPrediction.moldable_id)
.filter(MoldableJobDescription.id == GanttJobsPrediction.moldable_id)
.filter(Job.id == MoldableJobDescription.job_id)
.filter(GanttJobsPrediction.start_time <= date + wakeup_time)
.filter(Job.state == "Waiting")
.filter(Resource.id == GanttJobsResource.resource_id)
.filter(Resource.state == "Absent")
.filter(Resource.network_address != "")
.filter(Resource.type == "default")
.filter(
(GanttJobsPrediction.start_time + MoldableJobDescription.walltime)
<= Resource.available_upto
)
.group_by(Resource.network_address)
.all()
)
hosts = [h_tpl[0] for h_tpl in hostnames]
return hosts
def get_next_job_date_on_node(hostname):
result = (
db.query(func.min(GanttJobsPrediction.start_time))
Expand Down
20 changes: 20 additions & 0 deletions oar/lib/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,23 @@ def __init__(self):
default_roids = [self.rid_i2o[i] for i in default_rids]
self.default_itvs = ProcSet(*default_roids)
ResourceSet.default_itvs = self.default_itvs # for Quotas

def riods_to_rid_itvs_spanned(self, riods):
# Convert ordered resource_ids to spanned intervals of DB resource_ids
# rid_o2i = [10, 9, 8, 7, 6, 5, 4, 3, 2, 1]
# riods_to_rid_itvs_spanned([3,8,5,9,4,1]):
# [(1, 2), (5, 3), (9, 1)]
rids = [ self.rid_o2i[i] for i in riods ]
rids_spanned = []
rids.sort()
rid_begin = rids[0]
span = 1
for rid in rids[1:]:
if rid == rid_begin + span:
span += 1
else:
rids_spanned.append((rid_begin, span))
rid_begin = rid
span = 1
rids_spanned.append((rid_begin, span))
return rids_spanned
1 change: 0 additions & 1 deletion tests/kao/test_db_all_in_one.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ def test_db_all_in_one_simple_1(monkeypatch):
job = db["Job"].query.one()
print("job state:", job.state)

# pdb.set_trace()
print("fakezmq.num_socket: ", fakezmq.num_socket)
meta_schedule("internal")
print("fakezmq.num_socket: ", fakezmq.num_socket)
Expand Down

0 comments on commit 7c25090

Please sign in to comment.