diff --git a/bench/save_assignments.py b/bench/save_assignments.py index 63c59cec..e6e72025 100644 --- a/bench/save_assignments.py +++ b/bench/save_assignments.py @@ -70,7 +70,7 @@ def save_assigns_bulk_0(jobs, resource_set): ) pg_bulk_insert( cursor, - db["queues"], + db["gantt_jobs_resources"], mld_id_rid_s, ("moldable_job_id", "resource_id"), binary=True, @@ -139,7 +139,7 @@ def save_assigns_redis_pipeline_1(jobs, resource_set): db.session.execute(GanttJobsPrediction.__table__.insert(), mld_id_start_time_s) pipe.execute() - print("Cumlated mld_id_start_time_s.append time:", t) + print("Cumulated mld_id_start_time_s.append time:", t) def bench_job_same(nb_job_exp=10, job_size=100, save_assign="default"): diff --git a/oar/kao/kamelot_basic.py b/oar/kao/kamelot_basic.py index f992155c..136afd47 100644 --- a/oar/kao/kamelot_basic.py +++ b/oar/kao/kamelot_basic.py @@ -12,14 +12,14 @@ def schedule_cycle(plt, queues=["default"]): now = plt.get_time() - logger.info("Begin scheduling....", now) + logger.info(f"Begin scheduling.... {now}") # # Retrieve waiting jobs # waiting_jobs, waiting_jids, nb_waiting_jobs = plt.get_waiting_jobs(queues) - logger.info(waiting_jobs, waiting_jids, nb_waiting_jobs) + logger.info(f"waiting jobs: nb: {nb_waiting_jobs}, jids: {waiting_jids}") if nb_waiting_jobs > 0: # diff --git a/oar/kao/meta_sched.py b/oar/kao/meta_sched.py index e3177e3c..c0590a34 100644 --- a/oar/kao/meta_sched.py +++ b/oar/kao/meta_sched.py @@ -57,7 +57,6 @@ is_timesharing_for_two_jobs, remove_gantt_resource_job, resume_job_action, - save_assigns, set_gantt_job_start_time, set_job_message, set_job_resa_state, @@ -100,6 +99,7 @@ # order_part = config['SCHEDULER_RESOURCE_ORDER'] + batsim_sched_proxy = None @@ -242,7 +242,7 @@ def prepare_job_to_be_launched(job, current_time_sec): # set start_time an for jobs to launch set_job_start_time_assigned_moldable_id(job.id, current_time_sec, job.moldable_id) - # fix resource assignement + # freeze resource assignement add_resource_job_pairs(job.moldable_id) set_job_state(job.id, "toLaunch") @@ -449,7 +449,7 @@ def check_reservation_jobs( if ar_jobs_scheduled != []: logger.debug("Save AR jobs' assignements in database") - save_assigns(ar_jobs_scheduled, resource_set) + plt.save_assigns(ar_jobs_scheduled, resource_set) logger.debug("Queue " + queue_name + ": end processing of new reservations") @@ -986,7 +986,7 @@ def extra_metasched_func(*args): # null function # Update visu gantt tables update_gantt_visualization() - + import pdb; pdb.set_trace() # # Manage dynamic node feature for energy saving: # diff --git a/oar/lib/job_handling.py b/oar/lib/job_handling.py index 091fa83a..533af88e 100644 --- a/oar/lib/job_handling.py +++ b/oar/lib/job_handling.py @@ -620,7 +620,7 @@ def job_message(job, nb_resources=None): message_list.append("Q={}".format(job.queue_name)) - logger.info("save assignements") + #logger.info("save assignements") TOREMOVE !!! message = ",".join(message_list) if hasattr(job, "karma"): @@ -629,7 +629,7 @@ def job_message(job, nb_resources=None): return message -def save_assigns(jobs, resource_set): +def save_assigns_(jobs, resource_set): # http://docs.sqlalchemy.org/en/rel_0_9/core/dml.html#sqlalchemy.sql.expression.Insert.values if len(jobs) > 0: logger.debug("nb job to save: " + str(len(jobs))) @@ -701,12 +701,56 @@ def save_assigns_bulk(jobs, resource_set): ) pg_bulk_insert( cursor, - db["queues"], + db["gantt_job_resource"], mld_id_rid_s, ("moldable_job_id", "resource_id"), binary=True, ) +#def save_assigns_span(jobs, resource_set): +def save_assigns(jobs, resource_set): # span version + # http://docs.sqlalchemy.org/en/rel_0_9/core/dml.html#sqlalchemy.sql.expression.Insert.values + if len(jobs) > 0: + logger.debug("nb job to save: " + str(len(jobs))) + mld_id_start_time_s = [] + mld_id_rid_s = [] + message_updates = {} + for j in jobs.values() if isinstance(jobs, dict) else jobs: + if j.start_time > -1: + logger.debug("job_id to save: " + str(j.id)) + mld_id_start_time_s.append( + {"moldable_job_id": j.moldable_id, "start_time": j.start_time} + ) + riods = list(j.res_set) + mld_id_rid_s.extend( + [ + { + "moldable_job_id": j.moldable_id, + "resource_id": rid_begin, + "span": span + } + for rid_begin, span in resource_set.riods_to_rid_itvs_spanned(riods) + ] + ) + msg = job_message(j, nb_resources=len(riods)) + message_updates[j.id] = msg + + if message_updates: + logger.info("save job messages") + db.session.query(Job).filter(Job.id.in_(message_updates)).update( + { + Job.message: case( + message_updates, + value=Job.id, + ) + }, + synchronize_session=False, + ) + + logger.info("save assignements") + db.session.execute(GanttJobsPrediction.__table__.insert(), mld_id_start_time_s) + db.session.execute(GanttJobsResource.__table__.insert(), mld_id_rid_s) + db.commit() def get_current_jobs_dependencies(jobs): # retrieve jobs dependencies *) @@ -780,6 +824,8 @@ def add_resource_jobs_pairs(tuple_mld_ids): # pragma: no cover def add_resource_job_pairs(moldable_id): + """Set, in DB, the resource_ids attributed to job. It used during job's launch preparation. + """ resources_mld_ids = ( db.query(GanttJobsResource) .filter(GanttJobsResource.moldable_id == moldable_id) @@ -789,9 +835,10 @@ def add_resource_job_pairs(moldable_id): assigned_resources = [ { "moldable_job_id": res_mld_id.moldable_id, - "resource_id": res_mld_id.resource_id, + "resource_id": res_mld_id.resource_id + i, } for res_mld_id in resources_mld_ids + for i in range(res_mld_id.span) ] db.session.execute(AssignedResource.__table__.insert(), assigned_resources) @@ -2519,7 +2566,7 @@ def get_timer_armed_job(): def archive_some_moldable_job_nodes(moldable_id, hosts): """Sets the index fields to LOG in the table assigned_resources""" - # import pdb; pdb.set_trace() + if config["DB_TYPE"] == "Pg": db.query(AssignedResource).filter( AssignedResource.moldable_id == moldable_id diff --git a/oar/lib/models.py b/oar/lib/models.py index d6a52898..af2439ff 100644 --- a/oar/lib/models.py +++ b/oar/lib/models.py @@ -225,7 +225,7 @@ class GanttJobsResource(db.Model): server_default="0", ) resource_id = db.Column(db.Integer, primary_key=True, server_default="0") - + span = db.Column(db.Integer, server_default="1") class GanttJobsResourcesLog(db.Model): __tablename__ = "gantt_jobs_resources_log" @@ -237,7 +237,7 @@ class GanttJobsResourcesLog(db.Model): "moldable_job_id", db.Integer, primary_key=True, server_default="0" ) resource_id = db.Column(db.Integer, primary_key=True, server_default="0") - + span = db.Column(db.Integer, server_default="1") class GanttJobsResourcesVisu(db.Model): __tablename__ = "gantt_jobs_resources_visu" @@ -250,7 +250,7 @@ class GanttJobsResourcesVisu(db.Model): server_default="0", ) resource_id = db.Column(db.Integer, primary_key=True, server_default="0") - + span = db.Column(db.Integer, server_default="1") class JobDependencie(db.Model): __tablename__ = "job_dependencies" diff --git a/oar/lib/node.py b/oar/lib/node.py index 18010e8f..06ab1372 100644 --- a/oar/lib/node.py +++ b/oar/lib/node.py @@ -88,7 +88,30 @@ def search_idle_nodes(date): # TODO MOVE TO GANTT -def get_gantt_hostname_to_wake_up(date, wakeup_time): +def get_gantt_hostname_to_wake_up_(date, wakeup_time): + """Get hostname that we must wake up to launch jobs""" + hostnames = ( + db.query(Resource.network_address) + .filter(GanttJobsResource.moldable_id == GanttJobsPrediction.moldable_id) + .filter(MoldableJobDescription.id == GanttJobsPrediction.moldable_id) + .filter(Job.id == MoldableJobDescription.job_id) + .filter(GanttJobsPrediction.start_time <= date + wakeup_time) + .filter(Job.state == "Waiting") + .filter(Resource.id == GanttJobsResource.resource_id) + .filter(Resource.state == "Absent") + .filter(Resource.network_address != "") + .filter(Resource.type == "default") + .filter( + (GanttJobsPrediction.start_time + MoldableJobDescription.walltime) + <= Resource.available_upto + ) + .group_by(Resource.network_address) + .all() + ) + hosts = [h_tpl[0] for h_tpl in hostnames] + return hosts + +def get_gantt_hostname_to_wake_up_(date, wakeup_time): """Get hostname that we must wake up to launch jobs""" hostnames = ( db.query(Resource.network_address) @@ -111,7 +134,31 @@ def get_gantt_hostname_to_wake_up(date, wakeup_time): hosts = [h_tpl[0] for h_tpl in hostnames] return hosts +def get_gantt_hostname_to_wake_up(date, wakeup_time): + """Get hostname that we must wake up to launch jobs""" + + #get save assignement + hostnames = ( + db.query(Resource.network_address) + .filter(GanttJobsResource.moldable_id == GanttJobsPrediction.moldable_id) + .filter(MoldableJobDescription.id == GanttJobsPrediction.moldable_id) + .filter(Job.id == MoldableJobDescription.job_id) + .filter(GanttJobsPrediction.start_time <= date + wakeup_time) + .filter(Job.state == "Waiting") + .filter(Resource.id == GanttJobsResource.resource_id) + .filter(Resource.state == "Absent") + .filter(Resource.network_address != "") + .filter(Resource.type == "default") + .filter( + (GanttJobsPrediction.start_time + MoldableJobDescription.walltime) + <= Resource.available_upto + ) + .group_by(Resource.network_address) + .all() + ) + hosts = [h_tpl[0] for h_tpl in hostnames] + return hosts def get_next_job_date_on_node(hostname): result = ( db.query(func.min(GanttJobsPrediction.start_time)) diff --git a/oar/lib/resource.py b/oar/lib/resource.py index 8c50e6c7..7c5d6872 100644 --- a/oar/lib/resource.py +++ b/oar/lib/resource.py @@ -123,3 +123,23 @@ def __init__(self): default_roids = [self.rid_i2o[i] for i in default_rids] self.default_itvs = ProcSet(*default_roids) ResourceSet.default_itvs = self.default_itvs # for Quotas + + def riods_to_rid_itvs_spanned(self, riods): + # Convert ordered resource_ids to spanned intervals of DB resource_ids + # rid_o2i = [10, 9, 8, 7, 6, 5, 4, 3, 2, 1] + # riods_to_rid_itvs_spanned([3,8,5,9,4,1]): + # [(1, 2), (5, 3), (9, 1)] + rids = [ self.rid_o2i[i] for i in riods ] + rids_spanned = [] + rids.sort() + rid_begin = rids[0] + span = 1 + for rid in rids[1:]: + if rid == rid_begin + span: + span += 1 + else: + rids_spanned.append((rid_begin, span)) + rid_begin = rid + span = 1 + rids_spanned.append((rid_begin, span)) + return rids_spanned diff --git a/tests/kao/test_db_all_in_one.py b/tests/kao/test_db_all_in_one.py index 1ef1aae0..f5e73b51 100644 --- a/tests/kao/test_db_all_in_one.py +++ b/tests/kao/test_db_all_in_one.py @@ -157,7 +157,6 @@ def test_db_all_in_one_simple_1(monkeypatch): job = db["Job"].query.one() print("job state:", job.state) - # pdb.set_trace() print("fakezmq.num_socket: ", fakezmq.num_socket) meta_schedule("internal") print("fakezmq.num_socket: ", fakezmq.num_socket)