In [2]:
import csv
import datetime
import errno
from msilib.schema import Error
from time import time
from turtle import pd
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import random

from skforecast.ForecasterAutoreg import ForecasterAutoreg
from sklearn.metrics import mean_absolute_percentage_error as mape
from sklearn.metrics import mean_absolute_error as mae
from sklearn.ensemble import RandomForestRegressor

def calc_avg(lst):
    total = 0
    for element in lst:
        total += element
    if len(lst) < 1:
        return 0
    else:
        return total/len(lst)
    
def read_data(filename):
    nprocs = []
    runtime = []
    total_jobs = []
    submit_time = []
    
    core_count = []
    r = []
    
    
    with open(filename) as file:
        tsv_file = csv.reader(file, delimiter="\t")
        field_count = 0
        while int(field_count) < 20:
            field_count = int(len(next(tsv_file)))

        last_time = -1
        last_hour = -1
        job_count = 0
        last_submitted = -1
        i = 0
        for line in tsv_file:
            i += 1
            #if i > 30000:
                # break
            if (float(line.__getitem__(3)) > -0.5) and (float(line.__getitem__(4)) > -0.5):
                submitted = int(line.__getitem__(1))
                dt = datetime.datetime.fromtimestamp(submitted)
                time_hour = dt.hour

                if last_hour == -1:
                    last_hour = time_hour
                    last_time = dt
                
                if last_hour != time_hour:
                    runtime.append(calc_avg(r))
                    nprocs.append(calc_avg(core_count))
                    total_jobs.append(job_count)
                    submit_time.append(dt.replace(minute=0, second=0, microsecond=0))
                    last_time = dt
                    job_count = 0
                    core_count.clear
                    r.clear
                    last_submitted = -1
                
                core_count.append(float(line.__getitem__(4))) # number of allocated processors
                r.append(float(line.__getitem__(3))) # runtime of the job
                job_count += 1
                last_submitted = submitted
                last_hour = time_hour
    return submit_time, runtime, nprocs, total_jobs

def read_dataframe():
    submit_time, runtime, nprocs, total_jobs = read_data('anon_jobs.gwf') #SharcNet
    df = pd.DataFrame(list(zip(submit_time, runtime, nprocs, total_jobs)), columns=['ds', 'RunTime', 'NProcs', 'TotalJobs'])
    # df.to_pickle('total_jobs_dataframe')
    return df

def load_dataframe():
    return pd.read_pickle('total_jobs_dataframe')

def generate_plot(x_axis, y_axis, title, x_label, y_label):
    fig = plt.figure()
    fig.canvas.manager.set_window_title(title)
    plt.xlabel(x_label)
    plt.ylabel(y_label)
    plt.title(title)
    plt.plot(x_axis, y_axis)
    plt.show()

def getCpuUtilization(df: pd.DataFrame):
    cpu = []
    for i in range(0,24):
        cores = df.iloc[i]['NProcs']
        jobs = df.iloc[i]['TotalJobs']
        cpu.append(cores*jobs/400)
    return cpu

def generate_workload(df: pd.DataFrame):
    critical_job_rate = 0.6
    start_time = 17

    f = open('workload.csv', 'w', newline='')
    writer = csv.writer(f, lineterminator="\n")  # use linux style line endings
    cpuUsage = getCpuUtilization(df)
    time_counter = 0
    while time_counter < 86400:  # generate for whole day
        # calculating adapted values
        current_hour = int(time_counter / 3600)
        adapted_hour = (current_hour + start_time) % 24         
        #print("current hour: " + str(adapted_hour))
        #print("job interval adapted: " + str(adapted_frame['InterArrivalTime']))

        label = ""
        if random.random() > critical_job_rate:
            label = "not-critical"
        else:
            label = "critical"
        print("cpu percent", cpuUsage[adapted_hour])
        total_cpu_usage = int(cpuUsage[adapted_hour] * 4000) # 4000 millicores rounded
        print("cpu total", total_cpu_usage)
        total_jobs = df.iloc[adapted_hour]['TotalJobs']
        print("jobs total", total_jobs)
        cpu_usage_per_job = total_cpu_usage/total_jobs
        runtime = df.iloc[adapted_hour]['RunTime']
        job_interval = int(3600/total_jobs)
        write_data = [str(int(cpu_usage_per_job)), str(int(runtime)),
                    str(int(job_interval)), label]
        #print(write_data)
        writer.writerow(write_data)
        time_counter = int(time_counter) + int(job_interval)
    f.close()

In [3]:
df = read_dataframe()


In [4]:
df = df.drop_duplicates(subset=['ds'])
df = df.set_index('ds')
df = df.asfreq('H')
df = df.interpolate('ffill')
print(df)

                          RunTime     NProcs  TotalJobs
ds                                                     
2006-01-24 17:00:00     13.600000   5.400000        5.0
2006-01-24 18:00:00     77.000000  19.500000        9.0
2006-01-24 19:00:00    148.625000  27.062500        2.0
2006-01-24 20:00:00    119.105263  48.947368       22.0
2006-01-24 21:00:00    171.384615  73.948718        1.0
...                           ...        ...        ...
2007-01-15 21:00:00  31829.883378   3.013483      254.0
2007-01-15 22:00:00  31828.379609   3.013501       58.0
2007-01-15 23:00:00  31826.919224   3.013525       57.0
2007-01-16 00:00:00  31825.394948   3.013849       59.0
2007-01-16 01:00:00  31825.058342   3.013981       13.0

[8553 rows x 3 columns]


In [5]:

#Params
start_offset = 1
offset = 1
steps = 24
#Build data frames for prediction
nprocs = df[['NProcs']]
nprocs = nprocs.rename(columns={"NProcs": "y"})

runtime = df[['RunTime']]
runtime = runtime.rename(columns={"RunTime": "y"})

totaljobs = df[['TotalJobs']]
totaljobs = totaljobs.rename(columns={"TotalJobs": "y"})

In [6]:
#Offsetting and train splitting
nprocs_offset = nprocs[start_offset:-offset]
runtime_offset = runtime[:-offset]
totaljobs_offset = totaljobs[:-offset]

nprocs_train = nprocs_offset[:-steps]
nprocs_test = nprocs_offset[-steps:]

runtime_train = runtime_offset[:-steps]
runtime_test = runtime_offset[-steps:]

totaljobs_train = totaljobs_offset[:-steps]
totaljobs_test = totaljobs_offset[-steps:]

In [11]:
from skforecast.model_selection import grid_search_forecaster
forecaster = ForecasterAutoreg(
                regressor = RandomForestRegressor(random_state=123),
                lags      = 10 # Placeholder, the value will be overwritten
             )

# Lags used as predictors
lags_grid = [[10], [20], [30], [60], [90]]

# Regressor hyperparameters
param_grid = {'n_estimators': [100, 150, 200, 250],
              'max_depth': [5]}

results_grid = grid_search_forecaster(
                        forecaster  = forecaster,
                        y           = totaljobs_train['y'],
                        param_grid  = param_grid,
                        lags_grid   = lags_grid,
                        steps       = steps,
                        refit       = False,
                        metric      = 'mean_squared_error',
                        initial_train_size = int(len(totaljobs_train['y'])*0.5),
                        fixed_train_size   = False,
                        return_best = True,
                        verbose     = False
               )

Number of models compared: 20.


loop lags_grid:   0%|                                               | 0/5 [00:00<?, ?it/s]
loop param_grid:   0%|                                              | 0/4 [00:00<?, ?it/s][A
loop param_grid:  25%|█████████▌                            | 1/4 [00:23<01:11, 23.99s/it][A
loop param_grid:  50%|███████████████████                   | 2/4 [00:59<01:01, 30.51s/it][A
loop param_grid:  75%|████████████████████████████▌         | 3/4 [01:45<00:37, 37.74s/it][A
loop param_grid: 100%|██████████████████████████████████████| 4/4 [02:43<00:00, 45.76s/it][A
loop lags_grid:  20%|███████▌                              | 1/5 [02:43<10:53, 163.47s/it][A
loop param_grid:   0%|                                              | 0/4 [00:00<?, ?it/s][A
loop param_grid:  25%|█████████▌                            | 1/4 [00:23<01:10, 23.55s/it][A
loop param_grid:  50%|███████████████████                   | 2/4 [00:58<01:00, 30.29s/it][A
loop param_grid:  75%|████████████████████████████▌         | 3

`Forecaster` refitted using the best-found lags and parameters, and the whole data set: 
  Lags: [30] 
  Parameters: {'max_depth': 5, 'n_estimators': 200}
  Backtesting metric: 403471.54051222146

