In [1]:
import simulus
import time
import re
import sys
from loguru import logger
from datetime import datetime
from functools import cmp_to_key
from statistics import mean
import gymnasium as gym
from enum import Enum
import json
import pathlib
import pandas as pd

# __metaclass__ = type
NO_NAME = "SIMULUS_EVENT_NO_NAME"

In [3]:
from clusterConfig.cluster import *
from clusterConfig.job import *

In [5]:
class Schedulus:
    def __init__(self, num_proc, backfill, path):
        self.jobs = {}
        self.schedule = []
        self.waiting = []
        self.running = []
        self.backfill = backfill
        self.path = path
        self.stats = {}
        self.cluster = Cluster(num_proc, num_proc, num_proc)
        self.sim = simulus.simulator()

    def __log(self, submitted, started, finished):
        current_utilization = (
            self.cluster.total - self.cluster.idle
        ) / self.cluster.total
        if self.sim.now not in self.stats:
            self.stats[self.sim.now] = {
                "utilization": current_utilization,
                "num_running_jobs": len(self.running),
            }
        elif current_utilization > self.stats[self.sim.now]["utilization"]:
            self.stats[self.sim.now]["utilization"] = current_utilization
        if len(self.jobs) <= 100:
            logger.debug("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<")
            logger.debug("Time: " + str(self.sim.now))
            logger.debug("Utilization: " + str(current_utilization))
            logger.debug("------------------------------")
            logger.debug("Wait: " + str(self.waiting))
            logger.debug("Run: " + str(self.running))
            logger.debug("Schedule: " + str(self.schedule))
            logger.debug("------------------------------")
            logger.debug(
                "Total: " + str(self.cluster.total) + " Idle: " + str(self.cluster.idle)
            )
            logger.debug("------------------------------")
            if submitted:
                logger.debug("[Submit] " + str(submitted))
            if started:
                logger.debug("[Start] " + str(started))
            if finished:
                logger.debug("[Finish] " + str(finished))
            logger.debug(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n")

    def __process_submit(self, job_id):
        submitted = []
        started = []

        job = self.jobs[job_id]

        if job.req_proc <= self.cluster.total:
            job.submit()
            self.waiting.append(job_id)
            self.schedule.append(job_id)
            submitted.append(job_id)

            s_job = self.jobs[self.schedule[0]]

            if self.cluster.allocate(s_job.req_proc):
                self.__initiate_job(s_job.id)
                started.append(s_job.id)
            elif self.backfill != "none":
                started.extend(self.__backfill(s_job.id))

            self.__log(submitted, started, [])

    def __process_end(self, job_id):
        started = []
        finished = []

        job = self.jobs[job_id]

        job.finish(self.sim.now)
        self.cluster.release(job.req_proc)
        self.running.remove(job_id)
        finished.append(job_id)

        do_backfill = self.backfill != "none" and len(self.schedule) > 0

        while self.schedule and self.cluster.allocate(
            self.jobs[self.schedule[0]].req_proc
        ):
            next_job_id = self.schedule[0]
            self.__initiate_job(next_job_id)
            started.append(next_job_id)
            do_backfill = False

        if do_backfill:
            started.extend(self.__backfill(self.jobs[self.schedule[0]].id))

        self.__log([], started, finished)

    # https://www.cse.huji.ac.il/~perf/ex11.html
    def __backfill(self, job_id):
        job = self.jobs[job_id]

        proc_pool = self.cluster.idle
        idle_procs = self.cluster.idle
        extra_procs = 0
        shadow_time = -1

        jobs_exp_end = []
        for r_job_id in self.running:
            # print('Job ' + str(r_job_id) + ' has been running for ' + str(self.sim.now - self.jobs[r_job_id].start_time) + ' seconds')
            # print('Job ' + str(r_job_id) + ' is expected to end at time ' + str(self.jobs[r_job_id].start_time + self.jobs[r_job_id].req_time))
            jobs_exp_end.append(
                (
                    r_job_id,
                    self.jobs[r_job_id].start_time + self.jobs[r_job_id].req_time,
                )
            )
        jobs_exp_end.sort(key=lambda tup: tup[1])

        for job_exp_end in jobs_exp_end:
            if proc_pool < job.req_proc:
                proc_pool += self.jobs[job_exp_end[0]].req_proc
            if proc_pool >= job.req_proc:
                shadow_time = job_exp_end[1]
                extra_procs = proc_pool - job.req_proc

        started = []

        for w_job_id in self.waiting[1:].copy():
            # Condition 1: It uses no more than the currently available processors, and is expected to terminate by the shadow time.
            if self.sim.now + self.jobs[
                w_job_id
            ].req_time < shadow_time and self.cluster.allocate(
                self.jobs[w_job_id].req_proc
            ):
                self.__initiate_job(w_job_id)
                started.append(w_job_id)
            # Condition 2: It uses no more than the currently available processors, and also no more than the extra processors.
            elif self.jobs[w_job_id].req_proc <= extra_procs and self.cluster.allocate(
                self.jobs[w_job_id].req_proc
            ):
                self.__initiate_job(w_job_id)
                extra_procs -= self.jobs[w_job_id].req_proc
                started.append(w_job_id)

        return started

    def __initiate_job(self, job_id):
        job = self.jobs[job_id]

        job.start(self.sim.now)
        self.sim.sched(self.__process_end, job_id, offset=job.run)
        self.waiting.remove(job_id)
        self.schedule.remove(job_id)
        self.running.append(job_id)

    def add_job(self, job: Job):
        if job.id in self.jobs.keys():
            raise ValueError("Job ID already exists in workload")
        else:
            self.jobs[int(job.id)] = job
            self.sim.sched(self.__process_submit, job.id, until=job.submit_time)

    def read_jobs(self, path):
        self.jobs = {}

        with open(path) as job_file:
            for trace_line in job_file:
                # Find string before ';', then split that on whitespace to find trace fields
                job_fields = trace_line.split(";", 1)[0].split()

                if job_fields and float(job_fields[3]) != -1:
                    self.jobs[int(job_fields[0])] = Job(
                        id=int(job_fields[0]),
                        submit_time=float(job_fields[1]),
                        wait=float(job_fields[2]),
                        run=float(job_fields[3]),
                        used_proc=int(job_fields[4]),
                        used_ave_cpu=float(job_fields[5]),
                        used_mem=float(job_fields[6]),
                        req_proc=int(job_fields[7])
                        if int(job_fields[7]) != -1
                        else int(job_fields[4]),
                        req_time=float(job_fields[8]),
                        req_mem=float(job_fields[9]),
                        status=int(job_fields[10]),
                        user_id=int(job_fields[11]),
                        group_id=int(job_fields[12]),
                        num_exe=int(job_fields[13]),
                        num_queue=int(job_fields[14]),
                        num_part=int(job_fields[15]),
                        num_pre=int(job_fields[16]),
                        think_time=int(job_fields[17]),
                        start_time=-1,
                        end_time=-1,
                        score=0,
                        state=0,
                        happy=-1,
                        est_start=-1,
                    )

    def get_peek_name(self):
        if len(self.sim._eventlist) > 0:
            x, e = self.sim._eventlist.pqueue.peek()
            return e.name
        else:
            return NO_NAME

    def get_peek_time(self):
        return self.sim.peek()

    def get_state(self):
        # get running jobs remaining time and req_proc
        running_jobs = []
        for job_id in self.running:
            job = self.jobs[job_id]
            running_jobs.append(
                (job_id, job.req_proc, job.run - (self.sim.now - job.start_time))
            )
        # get waiting jobs
        waiting_job = []
        for job_id in self.waiting:
            job = self.jobs[job_id]
            waiting_job.append((job_id, job.req_proc, job.run))
        # get schedule jobs
        schedule_job = []
        for job_id in self.schedule:
            job = self.jobs[job_id]
            schedule_job.append((job_id, job.req_proc, job.run))
        
        
        
        return {"running": running_jobs, "waiting": waiting_job, "schedule": schedule_job}
    
    def one_step(self):
        self.sim.step()

    def get_now(self):
        return self.sim.now

In [6]:
class GPUType(Enum):
    A40 = 0
    RTX_2080Ti = 1
    RTX_TITAN = 2
    TITAN_Xp = 3
    TITAN_V = 4
    GTX_1080 = 5
    TITAN_X = 6
    M40 = 7

class NextEventType(Enum):
    Submit = 0
    Finish = 1
    Other = 2
    

In [7]:
class ClusterEnv(gym.Env):
    def __init__(self, job_file_path: str, cluster_config_path: str):
        self.job_file_path = pathlib.Path(job_file_path)
        self.cluster_config_path = pathlib.Path(cluster_config_path)

        # read cluster_config_path as json
        with open(self.cluster_config_path) as f:
            self.cluster_config = json.load(f)
            # transfer to GPUType
            for gpu_type in GPUType:
                self.gpu_type_proc_num = self.cluster_config["proc_num"][gpu_type.name]

        # read job_file_path as swf a.k.a. csv
        self.jobs = {}
        self.job_id_in_order = []
        self.next_step_is = NextEventType.Submit

        # construct cluster
        self.cluster = {
            gpu_type: Schedulus(self.gpu_type_proc_num[gpu_type.name], "easy", "")
            for gpu_type in GPUType
        }

        # make action space
        self.gpu_type_num = len(GPUType)
        self.action_space = gym.spaces.Discrete(2 * self.gpu_type_num)
        self._action_to_gpu_type = {}
        for i, gpu_type in enumerate(GPUType):
            self._action_to_gpu_type[i] = gpu_type
            self._action_to_gpu_type[i + self.gpu_type_num] = gpu_type

        # next submit job timestamp
        self.next_submit_t = 0
        self.next_submit_job_id = 0

    def _get_obs(self):
        state = {}
        for gpu_type in GPUType:
            state[gpu_type] = self.cluster[gpu_type].get_state()

    def step(self, action):
        action_gpu = self._action_to_gpu_type[action]
        # if next step is submit the job
        if action < self.gpu_type_num:
            self.cluster[action_gpu].add_job(self.jobs[self.next_submit_job_id])
            if self.job_id_in_order:
                self.next_submit_job_id = self.job_id_in_order.pop(0)
        # if next step in finish the job
        else:
            self.cluster[action_gpu].one_step()

        # make steps until next step is finish before next submit time
        

In [None]:
ClusterEnv(
    job_file_path="data/cluster/workload_1.csv",
    cluster_config_path="example_cluster.json",
)

: 