In [5]:
# imports needed

import pyspark
from pyspark.sql import DataFrame
from pyspark.sql.types import *
from pyspark.sql.functions import date_format
from pyspark.sql.functions import col
from pyspark.sql.functions import rand
from pyspark.sql.functions import udf

from matplotlib import pyplot as plt
%matplotlib inline

import datetime
import math
from collections import defaultdict

from functools import reduce
from pprint import pprint

In [3]:
#log_file = "/homes/scotthutch/beocat_slurm_raw.csv"
log_file = "/homes/scotthutch/beocat_2021_raw.csv"

job_schema = StructType([StructField("Account",StringType(),True),
                         StructField("AdminComment",StringType(),True),
                         StructField("AllocCPUS",IntegerType(),True),
                         StructField("AllocGRES",StringType(),True),
                         StructField("AllocNodes",IntegerType(),True),
                         StructField("AllocTRES",StringType(),True),
                         StructField("AssocID",IntegerType(),True),
                         StructField("AveCPU",StringType(),True),
                         StructField("AveCPUFreq",StringType(),True),
                         StructField("AveDiskRead",StringType(),True),
                         StructField("AveDiskWrite",StringType(),True),
                         StructField("AvePages",StringType(),True),
                         StructField("AveRSS",StringType(),True),
                         StructField("AveVMSize",StringType(),True),
                         StructField("BlockID",StringType(),True),
                         StructField("Cluster",StringType(),True),
                         StructField("Comment",StringType(),True),
                         StructField("ConsumedEnergy",StringType(),True),
                         StructField("ConsumedEnergyRaw",StringType(),True),
                         StructField("CPUTime",StringType(),True),
                         StructField("CPUTimeRAW",IntegerType(),True),
                         StructField("DerivedExitCode",StringType(),True),
                         StructField("Elapsed",StringType(),True),
                         StructField("ElapsedRaw",IntegerType(),True),
                         StructField("Eligible",TimestampType(),True),
                         StructField("End",TimestampType(),True),
                         StructField("ExitCode",StringType(),True),
                         StructField("GID",IntegerType(),True),
                         StructField("Group",StringType(),True),
                         StructField("JobID",StringType(),True),
                         StructField("JobIDRaw",StringType(),True),
                         StructField("JobName",StringType(),True),
                         StructField("Layout",StringType(),True),
                         StructField("MaxDiskRead",StringType(),True),
                         StructField("MaxDiskReadNode",StringType(),True),
                         StructField("MaxDiskReadTask",StringType(),True),
                         StructField("MaxDiskWrite",StringType(),True),
                         StructField("MaxDiskWriteNode",StringType(),True),
                         StructField("MaxDiskWriteTask",StringType(),True),
                         StructField("MaxPages",StringType(),True),
                         StructField("MaxPagesNode",StringType(),True),
                         StructField("MaxPagesTask",StringType(),True),
                         StructField("MaxRSS",StringType(),True),
                         StructField("MaxRSSNode",StringType(),True),
                         StructField("MaxRSSTask",StringType(),True),
                         StructField("MaxVMSize",StringType(),True),
                         StructField("MaxVMSizeNode",StringType(),True),
                         StructField("MaxVMSizeTask",StringType(),True),
                         StructField("McsLabel",StringType(),True),
                         StructField("MinCPU",StringType(),True),
                         StructField("MinCPUNode",StringType(),True),
                         StructField("MinCPUTask",StringType(),True),
                         StructField("NCPUS",IntegerType(),True),
                         StructField("NNodes",IntegerType(),True),
                         StructField("NodeList",StringType(),True),
                         StructField("NTasks",StringType(),True),
                         StructField("Priority",IntegerType(),True),
                         StructField("Partition",StringType(),True),
                         StructField("QOS",StringType(),True),
                         StructField("QOSRAW",IntegerType(),True),
                         StructField("ReqCPUFreq",StringType(),True),
                         StructField("ReqCPUFreqMin",StringType(),True),
                         StructField("ReqCPUFreqMax",StringType(),True),
                         StructField("ReqCPUFreqGov",StringType(),True),
                         StructField("ReqCPUS",IntegerType(),True),
                         StructField("ReqGRES",StringType(),True),
                         StructField("ReqMem",StringType(),True),
                         StructField("ReqNodes",IntegerType(),True),
                         StructField("ReqTRES",StringType(),True),
                         StructField("Reservation",StringType(),True),
                         StructField("ReservationId",StringType(),True),
                         StructField("Reserved",StringType(),True),
                         StructField("ResvCPU",StringType(),True),
                         StructField("ResvCPURAW",IntegerType(),True),
                         StructField("Start",TimestampType(),True),
                         StructField("State",StringType(),True),
                         StructField("Submit",TimestampType(),True),
                         StructField("Suspended",StringType(),True),
                         StructField("SystemCPU",StringType(),True),
                         StructField("SystemComment",StringType(),True),
                         StructField("Timelimit",StringType(),True),
                         StructField("TimelimitRaw",IntegerType(),True),
                         StructField("TotalCPU",StringType(),True),
                         StructField("TRESUsageInAve",StringType(),True),
                         StructField("TRESUsageInMax",StringType(),True),
                         StructField("TRESUsageInMaxNode",StringType(),True),
                         StructField("TRESUsageInMaxTask",StringType(),True),
                         StructField("TRESUsageInMin",StringType(),True),
                         StructField("TRESUsageInMinNode",StringType(),True),
                         StructField("TRESUsageInMinTask",StringType(),True),
                         StructField("TRESUsageInTot",StringType(),True),
                         StructField("TRESUsageOutAve",StringType(),True),
                         StructField("TRESUsageOutMax",StringType(),True),
                         StructField("TRESUsageOutMaxNode",StringType(),True),
                         StructField("TRESUsageOutMaxTask",StringType(),True),
                         StructField("TRESUsageOutMin",StringType(),True),
                         StructField("TRESUsageOutMinNode",StringType(),True),
                         StructField("TRESUsageOutMinTask",StringType(),True),
                         StructField("TRESUsageOutTot",StringType(),True),
                         StructField("UID",IntegerType(),True),
                         StructField("User",StringType(),True),
                         StructField("UserCPU",StringType(),True),
                         StructField("WCKey",StringType(),True),
                         StructField("WCKeyID",IntegerType(),True),
                         StructField("WorkDir",StringType(),True)])

all_jobs = spark.read.csv(log_file, sep="|", header=True, schema=job_schema)
base_jobs = all_jobs.filter(~(all_jobs.JobID.contains("_")) & ~(all_jobs.JobID.contains(".")))

print("Found {} jobs".format(base_jobs.count()))
base_jobs.head()

Found 645834 jobs


Row(Account='beodefault', AdminComment=None, AllocCPUS=8, AllocGRES=None, AllocNodes=1, AllocTRES='billing=8,cpu=8,mem=8G,node=1', AssocID=363, AveCPU=None, AveCPUFreq=None, AveDiskRead=None, AveDiskWrite=None, AvePages=None, AveRSS=None, AveVMSize=None, BlockID=None, Cluster='beocat', Comment=None, ConsumedEnergy=None, ConsumedEnergyRaw=None, CPUTime='1802-13:07:04', CPUTimeRAW=155740024, DerivedExitCode='0:0', Elapsed='225-07:38:23', ElapsedRaw=19467503, Eligible=datetime.datetime(2020, 5, 26, 16, 7, 20), End=datetime.datetime(2021, 1, 6, 22, 46, 1), ExitCode='0:0', GID=2547, Group='gowri_users', JobID='9579230', JobIDRaw='9579230', JobName='Den_10_32Ehz_1200fs_with_field', Layout=None, MaxDiskRead=None, MaxDiskReadNode=None, MaxDiskReadTask=None, MaxDiskWrite=None, MaxDiskWriteNode=None, MaxDiskWriteTask=None, MaxPages=None, MaxPagesNode=None, MaxPagesTask=None, MaxRSS=None, MaxRSSNode=None, MaxRSSTask=None, MaxVMSize=None, MaxVMSizeNode=None, MaxVMSizeTask=None, McsLabel=None, MinC

In [4]:
# From OSG log data, we know the number of jobs in each 30 miunte time window.
# Unfortunetly, the OSG job logs do not contain all information needed to simulate (namely the amount of memory each job used)
# Need to construct a job set from Beocat log data that looks like OSG jobs
# Target is 3% of jobs request a GPU, and the vast majority of those only request one GPU.

def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

@udf
def get_gres(tres_string):
    if tres_string == None:
        return 0
    for element in tres_string.split(","):
        if "gpu" in element:
            return int(element.split("=")[1])
    return 0

@udf
def get_mem(tres_string):
    if tres_string == None:
        return
    for element in tres_string.split(","):
        if "mem" in element:
            mem_str = element.split("=")[1]
            if "G" in mem_str:
                return int(float(mem_str.strip("G"))) * 1000000
            elif "M" in mem_str:
                return int(float(mem_str.strip("M"))) * 1000
            elif "T" in mem_str:
                return int(float(mem_str.strip("T"))) * 1000000000
            else:
                raise Exception("What? {}".format(mem_str))


# number of OSG jobs in each half hour
jobs_per_half_hour =   [168283603,
                        28643797,
                        15714596,
                        10900766,
                        8188928,
                        6908098,
                        5401470,
                        4327811,
                        3760481,
                        3059451,
                        3384589,
                        2394489,
                        2141512,
                        1978759,
                        1803829,
                        1727789,
                        1666278,
                        1378629,
                        1307098,
                        1222586,
                        1195983,
                        1108240,
                        1053850,
                        1034004,
                        1060313,
                        982037,
                        905671,
                        909243,
                        841967,
                        867482,
                        794127,
                        697510,
                        706359,
                        659095,
                        652969,
                        620483,
                        578792,
                        538649,
                        537734,
                        527385,
                        576537,
                        497013,
                        492077,
                        502074,
                        504790,
                        525113,
                        481785,
                        491481]

total_jobs = sum(jobs_per_half_hour)
start = 0

required_jobs = 24250
print("Constructing dataset with roughly {} jobs.".format(required_jobs))

# Add some columns
base_jobs_cols_added = base_jobs.withColumn("ReqMem", get_mem(col("ReqTREs"))) \
                                .withColumn("ReqMem", col("ReqMem").cast("int")) \
                                .withColumn("ReqGPUs", get_gres(col("ReqTREs"))) \
                                .withColumn("ReqGPUs", col("ReqGPUs").cast("int"))

# Filter out jobs which request more than 8GB of memory which have no GPUs
# GPU jobs get added later
low_mem_jobs = base_jobs_cols_added.filter((col("ReqMem") <= 8000000) & (col("ReqGPUs") == 0))
print("Have {} jobs with less than 8GB of memory".format(low_mem_jobs.count()))

job_groups = []

for index, num_jobs in enumerate(jobs_per_half_hour):
    print("="*40)
    end = start + 0.5
    start_sec = start * 60 * 60
    end_sec = end * 60 * 60
    
    percent_jobs_in_window = round(num_jobs/total_jobs * 100, 2)
    
    jobs_needed = math.ceil(percent_jobs_in_window/100 * required_jobs)
    
    print("Jobs from {:4} to {:4} hours were {}% of OSG data.".format(start,
                                                                      end,
                                                                      percent_jobs_in_window))
    print("I need to find {} jobs in this window".format(jobs_needed))
        
    jobs_in_window = low_mem_jobs.filter((col("CPUTimeRAW") > start_sec) & (col("CPUTimeRAW") < end_sec))
    num_jobs_in_window = jobs_in_window.count()
    print("Found {} jobs with this duration".format(num_jobs_in_window))
    if num_jobs_in_window >= jobs_needed:
        chosen_jobs = jobs_in_window.orderBy(rand(seed=42)).limit(jobs_needed)
        print("Selected {} at random".format(chosen_jobs.count()))
        job_groups.append(chosen_jobs)
    else:
        print("Not enough jobs")

    start += 0.5

Constructing dataset with roughly 24250 jobs.
Have 332585 jobs with less than 8GB of memory
Jobs from    0 to  0.5 hours were 57.13% of OSG data.
I need to find 13855 jobs in this window
Found 154623 jobs with this duration
Selected 13855 at random
Jobs from  0.5 to  1.0 hours were 9.73% of OSG data.
I need to find 2360 jobs in this window
Found 8480 jobs with this duration
Selected 2360 at random
Jobs from  1.0 to  1.5 hours were 5.34% of OSG data.
I need to find 1295 jobs in this window
Found 81070 jobs with this duration
Selected 1295 at random
Jobs from  1.5 to  2.0 hours were 3.7% of OSG data.
I need to find 898 jobs in this window
Found 31366 jobs with this duration
Selected 898 at random
Jobs from  2.0 to  2.5 hours were 2.78% of OSG data.
I need to find 675 jobs in this window
Found 2104 jobs with this duration
Selected 675 at random
Jobs from  2.5 to  3.0 hours were 2.35% of OSG data.
I need to find 570 jobs in this window
Found 4869 jobs with this duration
Selected 570 at ran

Selected 42 at random
Jobs from 21.5 to 22.0 hours were 0.17% of OSG data.
I need to find 42 jobs in this window
Found 329 jobs with this duration
Selected 42 at random
Jobs from 22.0 to 22.5 hours were 0.17% of OSG data.
I need to find 42 jobs in this window
Found 266 jobs with this duration
Selected 42 at random
Jobs from 22.5 to 23.0 hours were 0.18% of OSG data.
I need to find 44 jobs in this window
Found 277 jobs with this duration
Selected 44 at random
Jobs from 23.0 to 23.5 hours were 0.16% of OSG data.
I need to find 39 jobs in this window
Found 221 jobs with this duration
Selected 39 at random
Jobs from 23.5 to 24.0 hours were 0.17% of OSG data.
I need to find 42 jobs in this window
Found 222 jobs with this duration
Selected 42 at random


In [5]:
# Need to make the GPUs requested look like OSG GPU jobs
num_gpus = [1,2,3,4]
gpu_targets = [725,10,2,10]

gpu_jobs = base_jobs_cols_added.filter((col("ReqMem") <= 8000000) & (col("ReqGPUs") > 0))

for index, num_gpu_jobs in enumerate(gpu_targets):
    print("="*40)
    print("Looking for {} jobs requesting {} GPU".format(num_gpu_jobs, num_gpus[index]))
    filtered_gpu_jobs = gpu_jobs.filter(col("ReqGPUs") == num_gpus[index])
    num_filtered_gpu_jobs = filtered_gpu_jobs.count()
    print("Found {} jobs requesting {} GPU".format(num_filtered_gpu_jobs, num_gpus[index]))
    
    if num_filtered_gpu_jobs >= num_gpu_jobs:
        chosen_jobs = filtered_gpu_jobs.orderBy(rand(seed=42)).limit(num_gpu_jobs)
        print("Selected {} at random".format(chosen_jobs.count()))
        job_groups.append(chosen_jobs)
    else:
        print("Not enough jobs")

Looking for 725 jobs requesting 1 GPU
Found 11870 jobs requesting 1 GPU
Selected 725 at random
Looking for 10 jobs requesting 2 GPU
Found 3756 jobs requesting 2 GPU
Selected 10 at random
Looking for 2 jobs requesting 3 GPU
Found 2 jobs requesting 3 GPU
Selected 2 at random
Looking for 10 jobs requesting 4 GPU
Found 45 jobs requesting 4 GPU
Selected 10 at random


In [6]:
job_set = unionAll(*job_groups)
print("{} jobs found".format(job_set.count()))

25031 jobs found


In [7]:
job_set.groupBy("ReqGPUs").count().sort("ReqGPUs").show()

+-------+-----+
|ReqGPUs|count|
+-------+-----+
|      0|24284|
|      1|  725|
|      2|   10|
|      3|    2|
|      4|   10|
+-------+-----+



In [8]:
# Need to shuffle these jobs to mix up their order.
job_set = job_set.orderBy(rand(seed=42))

In [9]:
job_set.head()

Row(Account='beodefault', AdminComment=None, AllocCPUS=4, AllocGRES=None, AllocNodes=1, AllocTRES='billing=4,cpu=4,mem=8000M,node=1', AssocID=2553, AveCPU=None, AveCPUFreq=None, AveDiskRead=None, AveDiskWrite=None, AvePages=None, AveRSS=None, AveVMSize=None, BlockID=None, Cluster='beocat', Comment=None, ConsumedEnergy='0', ConsumedEnergyRaw='0', CPUTime='01:23:04', CPUTimeRAW=4984, DerivedExitCode='0:0', Elapsed='00:20:46', ElapsedRaw=1246, Eligible=datetime.datetime(2021, 2, 19, 17, 38, 50), End=datetime.datetime(2021, 2, 19, 17, 59, 36), ExitCode='1:0', GID=3838, Group='osgsubmit_users', JobID='276864', JobIDRaw='276864', JobName='bl_AOrhRb', Layout=None, MaxDiskRead=None, MaxDiskReadNode=None, MaxDiskReadTask=None, MaxDiskWrite=None, MaxDiskWriteNode=None, MaxDiskWriteTask=None, MaxPages=None, MaxPagesNode=None, MaxPagesTask=None, MaxRSS=None, MaxRSSNode=None, MaxRSSTask=None, MaxVMSize=None, MaxVMSizeNode=None, MaxVMSizeTask=None, McsLabel=None, MinCPU=None, MinCPUNode=None, MinCPU

In [10]:
# output a job set to a file for use later.
save_path = "job_set_25000"
job_set.write.parquet(save_path)
print("DataFrame saved to {} folder".format(save_path))

DataFrame saved to job_set_25000 folder


In [22]:
# Read a previously saved job set
job_set = spark.read.parquet("job_set_50000")
job_set.show()

+----------+------------+---------+---------+----------+--------------------+-------+------+----------+-----------+------------+--------+------+---------+-------+-------+-------+--------------+-----------------+-----------+----------+---------------+----------+----------+-------------------+-------------------+--------+----+--------------------+-------+--------+--------------------+------+-----------+---------------+---------------+------------+----------------+----------------+--------+------------+------------+------+----------+----------+---------+-------------+-------------+--------+------+----------+----------+-----+------+--------------------+------+--------+------------------+------+------+----------+-------------+-------------+-------------+-------+-------+-------+--------+--------------------+-----------+-------------+--------+----------+----------+-------------------+-------------+-------------------+---------+---------+-------------+-----------+------------+-----------+-----

In [23]:
jobs = job_set.collect()

In [24]:
# Need to modify the submit dates to pack them into the same time period.
# Also generate new unique jobIDs for each job
job_id = 1000
start_date = datetime.datetime(year=2022,
                               month=1,
                               day=1,
                               hour=0,
                               minute=0,
                               second=0
                              )
time_shift = datetime.timedelta(seconds=3)
current_time = start_date

#build a trace file R script for R to run
R_string = "library(RSlurmSimTools)\n\ntrace <- list(\n"
for job in jobs:
    #print(job)
    job_id += 1
    current_time += time_shift
    
    R_string += " "*4 +"sim_job(\n"
    R_string += " "*8 + "job_id = " + str(job_id) + ",\n"
    R_string += " "*8 + "submit = \"" + str(current_time) + "\",\n"

    req_seconds = int(job.TimelimitRaw)
    req_minutes = math.ceil(req_seconds/60)
    R_string += " "*8 + "wclimit = " + str(req_minutes) + "L,\n"
    R_string += " "*8 + "duration = " + str(job.ElapsedRaw) + "L,\n"
    R_string += " "*8 + "tasks = " + str(job.ReqCPUS) + "L,\n"
    R_string += " "*8 + "tasks_per_node = " + str(job.ReqNodes) + "L,\n"
    R_string += " "*8 + "username = \"" + job.User + "\",\n"
    R_string += " "*8 + "qosname = \"" + job.QOS + "\",\n"
    R_string += " "*8 + "account = \"" + job.Account + "\",\n"
    R_string += " "*8 + "req_mem = " + str(job.ReqMem) + "L,\n"
    R_string += " "*8 + "req_mem_per_cpu = " + str(job.ReqMem) + "L,\n"
    
    if job.ReqGPUs != 0:
        R_string += " "*8 + "gres = \"gpu:{}".format(job.ReqGPUs) + "\",\n"
    R_string = R_string.strip(",\n") + "\n"
    #print("R_string here: {}".format(repr(R_string)))
    R_string += " "*4 + "),\n"

R_string = R_string.strip(",\n") + "\n)\n\ntrace <- do.call(rbind, lapply(trace,data.frame))\n\nwrite_trace(\"/slurm_sim_ws/sim/micro/baseline/input_files/test.trace\",trace)"

with open("trace.R", "w") as f_out:
    f_out.write(R_string)
print("R file written to trace.R")

#print(R_string)

R file written to trace.R


In [9]:
# Generate the sacctmgr_commands.txt and the users.sim file needed for this job set

#production
users_file_name = r"users.sim"
sacctmgr_file_name = r"sacctmgr_commands.txt"
#testing
#users_file_name = r"/homes/scotthutch/singularity_slurm/run_folders/etc/users.sim"
#sacctmgr_file_name = r"/homes/scotthutch/singularity_slurm/run_folders/input_files/sacctmgr_commands.txt"

accounts_with_users = defaultdict(set)

for job in jobs:
   accounts_with_users[job.Account].add(job.User)

users_file = open(users_file_name, "w")
sacctmgr_file = open(sacctmgr_file_name, "w")

#sacctmgr_file.write("modify QOS set normal Priority=0\n")
#sacctmgr_file.write("add QOS Name=supporters Priority=100\n")
#sacctmgr_file.write("add cluster Name=beocat Fairshare=1 QOS=normal,supporters\n")

for account, user_set in accounts_with_users.items():
    sacctmgr_file.write("add account name={} Fairshare=100\n".format(account))

    for user in user_set:
        sacctmgr_file.write("add user name={} DefaultAccount={} MaxSubmitJobs=10000\n".format(user, account))

sacctmgr_file.write("modify user set qoslevel=\"normal,supporters\"")

UID = 2001
users = set()

for user_set in accounts_with_users.values():
    users = users.union(user_set)

for user in users:
    users_file.write("{}:{}\n".format(user, UID))
    UID += 1

sacctmgr_file.write("\n")

users_file.close()
sacctmgr_file.close()

print("Users file created:  {}".format(users_file_name))
print("sacctmgr directives created: {}".format(sacctmgr_file_name))


Users file created:  users.sim
sacctmgr directives created: sacctmgr_commands.txt
