In [1]:
# %%
import nengo_os
from nengo_os import  ConventScheduler, NemoNengoInterface


from nengo_os import SimpleProc, NemoNengoInterfaceBase

# @functools.total_ordering
# class CompareableRRProc(nengo_os.Process):
#
#     def __eq__(self, other):
#         if self.status !=
#         if self.status is other.status:
#             return True
#
from nengo_os.rr_full import SimpleProc
from operator import itemgetter


def nengo_sim_init(procs):
    iface = nengo_os.NemoNengoInterfaceBase(True, mode=1, rr_time_slice=50)
    for p in procs:
        iface.add_process(p.id, p.cores, p.compute, p.arrival)
    return iface


def nengo_sim_run(iface, end_time):
    iface.init_model()
    iface.generate_full_results(end_time)
    return iface


def interleave_procs():
    procs = [SimpleProc("TEST1", 0, 0, 0, 10, 50),
             SimpleProc("TEST2", 0, 2, 2, 10, 50),
             SimpleProc("TEST3", 0, 5, 5, 10, 50)]
    return procs


def interrupt_procs():
    return [SimpleProc("TEST1", 0, 0, 0, 10, 4000),
            SimpleProc("TEST2", 1, 2, 2, 10, 4000)]


def schedule(processes, mode="FCFS", total_cores=4096, time_slice=50, multiplexing=True):
    return ConventScheduler(processes, mode, total_cores, time_slice, multiplexing)


import pytest
import tqdm


def test_scheduler(pl=None):
    proc_list = interleave_procs() if pl is None else pl
    sch = schedule(proc_list, mode="RR")
    proc_0_run_times = []
    proc_1_run_times = []
    proc_1_pre_wait_times = []
    proc_1_wait_times = []
    assert (sch.current_time == 0)
    for i in range(1, 36):
        sch.scheduler_run_tick()
        assert (sch.current_time == i)
        proc_0_run_times.append(sch.queue.run_q[0].run_time)
        if len(sch.queue.run_q) >= 2:
            proc_1_run_times.append(sch.queue.run_q[1].run_time)
        if i < 2:
            assert (len(sch.queue.run_q) == 1)

        elif i < 5:
            assert (sch.queue.run_q[1].pre_wait_time == 2)
            assert (len(sch.queue.run_q) == 2)

        elif i == 50:
            for p in sch.queue.run_q:
                if p.current_state() != "DONE":
                    assert (p.current_state() == "DONE")

        if pl is None and len(sch.queue.run_q) > 0:
            test_time = 10
            assert (sch.queue.run_q[0].needed_time == test_time)

    return sch





class ProcSQL:
    job_cols = ["model_id", "arrival_time", "requested_run_time"]
    job_stat_cols = ["job_id","run_id","pre_wait_time","current_wait_time","actual_start_time","current_run_time","current_scheduler_time"]
    #(job_id, run_id, pre_wait_time, current_wait_time, actual_start_time, current_run_time,current_scheduler_time)
    #VALUES (null, null, null, null, null, null, null);

    def __insert_job(self):
        query = f"INSERT INTO jobs (model_id, arrival_time, requested_run_time, task_id) values (?,?,?,?);"
        self.cur.execute(query,self.job_info)
        self.job_id = self.cur.lastrowid
        q1 = "INSERT INTO run_job_list (job_id, run_id) VALUES (?,?);"
        self.cur.execute(q1,(self.job_id,self.runid))


    def job_stats(self):
        p = self.p
        values =   [self.job_id, self.runid ,p.pre_wait_time, p.wait_time, p.process_began_run, p.run_time, p.tick_time]
        query = f"INSERT INTO run_job_stats (job_id, run_id, pre_wait_time, current_wait_time, actual_start_time, current_run_time,current_scheduler_time)" \
                f"values (?,?,?,?,?,?,?);"
        self.cur.execute(query,values)

    def __init__(self,p,db_cur,task_id = 0, rid = 0):
        self.job_info = [p.model_id, p.start_time, p.needed_time,task_id]
        self.p = p
        self.task_id = task_id
        self.runid = rid
        self.cur = db_cur
        self.__insert_job()

def generate_proc_sql(run_id, scheduler,cur):
    job_data = []
    i = 1
    for p in scheduler.queue.wait_q:
        job_stats_sql = ProcSQL(p,cur,i,run_id)
        i += 1
        job_stats_sql.job_stats()
        job_data.append(job_stats_sql)
        p.job_id = job_stats_sql.job_id
    return job_data

In [2]:
### JOB EVENT GENERATION ###
class EventTracker:

    def state_change(self,old_state, state):
        stx = old_state+state
        if stx not in self.change_lookup.keys():
            print(f"{self.initial_state_list}")
        return self.id_from_typemap[self.change_lookup[stx]]

    def no_chg(self,v):
        nc_tv = self.no_chg_typemap[v]
        sql_tp = self.id_from_typemap[nc_tv]
        return sql_tp

    def __init__(self, scheduler,run_id,cursor):
        self.run_id = run_id
        self.cur = cursor

        self.proc_list = [proc for proc in scheduler.queue.wait_q]
        self.initial_state_list = {proc.job_id:proc.current_state() for proc in self.proc_list}
        self.updated_state_list = [s for s in self.initial_state_list]

        done = "done"
        interrupted = "interrupted"
        pre_waiting = "pre_waiting"
        proc_complete = "proc_complete"
        running = "running"
        start_running = "start_running"
        start_waiting = "start_waiting"
        waiting = "waiting"

        sql_typemap = {15:interrupted,12:pre_waiting,16:proc_complete,10:running,14:start_running,13:start_waiting,11:waiting}
        self.id_from_typemap = {v:k for k,v in sql_typemap.items()}
        no_chg_typemap = {"DONE":done,"WAITING":waiting,"RUNNING":running,"PRE_WAIT":pre_waiting}
        change_lookup = {
            #old, new, type
            "PRE_WAIT" "WAITING" : "start_waiting",
            "PRE_WAIT" "RUNNING" : "start_running",
            "WAITING"  "RUNNING" : "start_running",
            "RUNNING"  "WAITING" : "interrupted",
            "RUNNING"  "DONE"    : "proc_complete"
        }
        self.sql_typemap = sql_typemap
        self.no_chg_typemap = no_chg_typemap
        self.change_lookup = change_lookup



    def update(self, scheduler):
        new_procs = [proc for proc in scheduler.queue.wait_q] + [proc for proc in scheduler.queue.run_q]
        new_states = {proc.job_id:proc.current_state() for proc in new_procs}
        event_time = scheduler.current_time
        event_updates = []
        for job_id, state in new_states.items():
            old_state = self.initial_state_list[job_id]
            if old_state != state:
                job_event_id = self.state_change(old_state, state)
                query = "INSERT INTO run_job_events (event_type, run, job, sched_event_time) " \
                    "values (?,?,?,?)"
                self.cur.execute(query,(job_event_id,self.run_id,job_id,event_time))
        self.initial_state_list = new_states
            # else:
            #
            #     job_event_id = self.no_chg(state)



#### Paper Proc Test with 50 procs


Data schema:
- Table of Models
- Table of Processes:
- - (model_id, etc....)
- Table of Process Events:
- - (model_id, run_id, scheduler_time, event_id)
- table of Event Types:
- -0 event_id, name
- table of proc_runs:
- - run_id, model_id
- table of run info
- - run_id, sched_mode, rr_slice_value




In [3]:
import pandas as pd
import numpy as np
import sqlite3
from pathlib import Path
import os
import shutil




result_path = Path("./results")
template_db = result_path / "run_result_template.sqlite"
result_db = result_path / "run_result.sqlite"
backup_tp = "./run_result_backup-*-.sqlite"

INIT_TEMPLATE = True


In [4]:
if not result_db.exists():
    shutil.copy(template_db, result_db)
else:
    num_bkps = 0
    backup_list = []
    for f in result_path.glob(backup_tp):
        bpl = str(f.absolute().name).split("-")
        bpl[1] = int( bpl[1] )
        backup_list.append( bpl)
        #run_result_backup,*,.sqlite
    if len(backup_list) > 0:
        backup_list = sorted(backup_list, key=itemgetter(1))
        num_bkps = max(map(lambda x: int(x[1]), backup_list))
        num_bkps += 1

    new_backup_name = backup_tp.replace("*",str(num_bkps))
    new_path = result_path / new_backup_name
    shutil.move(result_db,new_path)
    shutil.copy(new_path, result_db)



In [5]:
conn = sqlite3.connect(result_db)
cur = conn.cursor()


In [6]:
#load run data
model_data_file = Path( "/Users/plaggm/dev/nemo-codes/config/paper_models.json")
sched_type = "RR"
rr_ts = 100
scheduler = ConventScheduler(mode=sched_type,total_cores=4096,time_slice=rr_ts,proc_js_file=str(model_data_file.absolute()))

Loading procs from file: /Users/plaggm/dev/nemo-codes/config/paper_models.json
LOADING /Users/plaggm/dev/nemo-codes/config/paper_models.json
Debug Print Mode is disabled


In [7]:
def init_run_in_sql(sched_type = sched_type,ts = rr_ts, nemo_stats = ""):
    query = "INSERT INTO runs (sched_type, rr_time_slice, nemo_stats) VALUES (?, ?, ?);"
    cur.execute(query, (sched_type,ts,nemo_stats))
    id = cur.lastrowid
    conn.commit()
    return id

In [8]:
run_id = init_run_in_sql(sched_type,rr_ts,"")
run_id

1

In [9]:
p = scheduler.queue.wait_q[0]

In [10]:
## Start of system, create awesome stat objects
job_data = generate_proc_sql(run_id,scheduler,cur)

In [11]:

conn.commit()

In [12]:
### RUN SCHEDULER AND UPDATE STATS!!!
## RUN A COUPLE TIMES TO TEST
evt_tracker = EventTracker(scheduler,run_id,cur)

In [13]:
evt_tracker.update(scheduler)
conn.commit()

In [14]:
## RUN 1:
import tqdm.notebook

In [15]:
sttl = sum(pv.needed_time for pv in scheduler.queue.wait_q)

In [16]:
sttl
import json
"{time:1, wait:[1,2,3..] run:[]},..."
class Instruct:
    def __init__(self):
        self.ts = []
        self.waits = []
        self.runs = []
    def add_instruction(self, t, wait, run):
        self.ts.append(t)
        self.waits.append(wait)
        self.runs.append(run)

    def to_json(self):
        def entry(t,ws,rs):
            itm = {"time":t,"waits":ws,"runs":rs}
            return itm
        entries = []
        for i in range(len(self.ts)):
            t = self.ts[i]
            w = self.waits[i]
            r = self.runs[i]
            entries.append(entry(t,w,r))
        return json.dumps(entries)

def save_scheduler_instructions(sched,run_procs,wait_procs):
    rp = []
    wp = []
    for p in sched.queue.run_q:
        if p.current_state() == "RUNNING":
            rp.append(p.job_id)
    for p in sched.queue.wait_q:
        wp.append(p.job_id)
    rrb = run_procs
    wwb = wait_procs
    rrb.append(rp)
    wwb.append(wp)
    return (rrb,wwb)

In [19]:
t = []
wp = []
rp = []
for i in tqdm.tqdm(range(sttl)):
    rp,wp = save_scheduler_instructions(scheduler, rp,wp)
    t.append(i)
    scheduler.scheduler_run_tick()
    evt_tracker.update(scheduler)
    for jd in job_data:
        jd.job_stats()
conn.commit()

  0%|          | 0/19725 [00:15<?, ?it/s]


{3: 'PRE_WAIT', 4: 'PRE_WAIT', 5: 'PRE_WAIT', 6: 'PRE_WAIT', 7: 'PRE_WAIT', 8: 'PRE_WAIT', 9: 'PRE_WAIT', 10: 'PRE_WAIT', 1: 'WAITING', 2: 'RUNNING'}


KeyError: 'RUNNINGPRE_WAIT'

In [47]:
inst = Instruct()
for tt in t:
    wwp = wp[tt]
    rrp = rp[tt]
    inst.add_instruction(t,wwp,rrp)

In [48]:
jdt = inst.to_json()

In [132]:
conn.commit()