## Lucid: Workload Estimator

In [1]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns

from sklearn import preprocessing, metrics
from interpret.glassbox import ExplainableBoostingRegressor
plt.rcParams['font.sans-serif']=['SimHei']
plt.rcParams['axes.unicode_minus']=False

sns.set_style("ticks")
font = {
#     "font.family": "Roboto",
    "font.size": 12,
}
sns.set_style(font)
paper_rc = {
    "lines.linewidth": 3,
    "lines.markersize": 10,
}
sns.set_context("paper", font_scale=1.8, rc=paper_rc)
current_palette = sns.color_palette()

pd.set_option("display.max_columns", None)

idx = 0
save = False
experiment_list = ["Venus_Sept", "Saturn_Sept", "Philly"]
cluster_list = ["Venus", "Saturn", "Philly"]
cluster = cluster_list[idx]
experiment = experiment_list[idx]

datapath = f"../data/{cluster}"


result = pd.DataFrame()
if cluster == "Philly":
    df = pd.read_csv(
        f"{datapath}/cluster_full_log.csv",
        parse_dates=["submit_time"],
        usecols=[
            "job_id",
            "user",
            "vc",
            "gpu_num",
            "submit_time",
            "amp",
            "gpu_util",
            "gmem_util",
            "gmem",
            "duration",
        ],
    )
else:
    df = pd.read_csv(
        f"{datapath}/cluster_full_log.csv",
        parse_dates=["submit_time"],
        usecols=[
            "job_id",
            "user",
            "vc",
            # "jobname",
            "gpu_num",
            "cpu_num",
            "submit_time",
            "month",
            "day",
            "hour",
            "dayofweek",
            "amp",
            "gpu_util",
            "gmem_util",
            "gmem",
            "duration",
        ],
    )

if cluster == "Philly":
    trace_range = ("2017-10-01 00:00:00", "2017-10-07 23:59:00")
    train_df = df[(df["submit_time"] > trace_range[1])]
    val_df = df[(df["submit_time"] >= trace_range[0]) & (df["submit_time"] <= trace_range[1])]
else:
    # trace_range = ("2020-09-01 00:00:00", "2020-09-26 23:59:59")
    trace_range = ("2020-09-01 00:00:00", "2020-09-27 00:10:00")  # Add a bit more job for prediction
    train_df = df[(df["submit_time"] < trace_range[0])]
    val_df = df[(df["submit_time"] >= trace_range[0]) & (df["submit_time"] <= trace_range[1])]


train_df = train_df.sort_values(by="submit_time")
train_df.reset_index(inplace=True, drop=True)
val_df = val_df.sort_values(by="submit_time")
val_df.reset_index(inplace=True, drop=True)

train_data = train_df.drop(columns=["duration", "submit_time"])
test_data = val_df.drop(columns=["duration", "submit_time"])
train_label = train_df[["duration"]]
test_label = val_df[["duration"]]

Duplicate key in file PosixPath('/home/lihe/Software/Anaconda3/envs/lucid/lib/python3.9/site-packages/matplotlib/mpl-data/matplotlibrc'), line 271 ('font.sans-serif: DejaVu Sans, Bitstream Vera Sans, Computer Modern Sans Serif, Lucida Grande, Verdana, Geneva, Lucid, Arial, Helvetica, Avant Garde, sans-serif')


Weekly Update Lucid Model

In [2]:
trace_range_list = [
    ("2020-09-01 00:00:00", "2020-09-07 00:00:00"), # Week 1
    ("2020-09-07 00:00:00", "2020-09-14 00:00:00"), # Week 2
    ("2020-09-14 00:00:00", "2020-09-21 00:00:00"), # Week 3
    ("2020-09-21 00:00:00", "2020-09-27 00:10:00"), # Week 4
]
week_df = pd.DataFrame()
for trace_range in trace_range_list:

    train_df = df[(df["submit_time"] < trace_range[0])]
    val_df = df[(df["submit_time"] >= trace_range[0]) & (df["submit_time"] <= trace_range[1])]


    train_df = train_df.sort_values(by="submit_time")
    train_df.reset_index(inplace=True, drop=True)
    val_df = val_df.sort_values(by="submit_time")
    val_df.reset_index(inplace=True, drop=True)

    train_data = train_df.drop(columns=["duration", "submit_time"])
    test_data = val_df.drop(columns=["duration", "submit_time"])
    train_label = train_df[["duration"]]
    test_label = val_df[["duration"]]

    print(f"训练数据长度: {len(train_data)}")
# , binning="uniform"
    ebm = ExplainableBoostingRegressor(learning_rate=0.01, interactions=20)
    ebm.fit(train_data, train_label)
    print('训练结束')
    pred = ebm.predict(test_data)

    mae_score = metrics.mean_absolute_error(test_label, pred)
    mape_score = metrics.mean_absolute_percentage_error(test_label, pred)
    r2_score = metrics.r2_score(test_label, pred)
    result.at["ebm_r2", cluster] = r2_score
    print(f"平均绝对误差分数: {mae_score:.2f}, 平均绝对误差百分比分数: {mape_score:.2f}, R2分数: {r2_score:.4f}")

    pred = pred.astype(int)
    val_df.loc[:,'priority'] = pred
    week_df = pd.concat([week_df, val_df])
# week_df.to_csv(f"ebm/{experiment}_Sept_ebm_weekly_updated.csv", index=False)

训练数据长度: 89556
训练结束
平均绝对误差分数: 11573.96, 平均绝对误差百分比分数: 121.86, R2分数: 0.4459
训练数据长度: 94471
训练结束
平均绝对误差分数: 13560.14, 平均绝对误差百分比分数: 345.90, R2分数: 0.4224
训练数据长度: 101093
训练结束
平均绝对误差分数: 13327.78, 平均绝对误差百分比分数: 102.34, R2分数: 0.2240
训练数据长度: 107707
训练结束
平均绝对误差分数: 11814.87, 平均绝对误差百分比分数: 222.77, R2分数: 0.5189


# Job Name Affinity Propagation

Scripts below need original jobname information, which cannot release

In [17]:
import distance
import time
import warnings
import random
import pandas as pd
import numpy as np
from itertools import groupby
from sklearn.cluster import AffinityPropagation
from sklearn.preprocessing import LabelEncoder
warnings.filterwarnings("ignore")

top_high_freq_num = 100

idx = 0
experiment_list = ["Venus_Sept", "Saturn_Sept", "Philly"]
cluster_list = ["Venus", "Saturn", "Philly"]
cluster = cluster_list[idx]
experiment = experiment_list[idx]

df = pd.read_csv(f'../data/{cluster}/cluster_full_log.csv',
                 parse_dates=['submit_time', 'start_time', 'end_time'])

df = df[df['gpu_num']>0]
df.drop(columns=['year', 'nodelist', 'priority', 'minute'], inplace=True)
df.reset_index(drop=True, inplace=True)

count = df['jobname'].value_counts()
print(count)
name_list = list(count.index)
print(name_list)

high_freq = name_list[:top_high_freq_num]
to_cluster = name_list[top_high_freq_num:]
to_cluster.sort()

groups = [list(g) for k, g in groupby(to_cluster, key=lambda x: x[0])]
print('finished')
print(groups)

0        NaN
1        NaN
2        NaN
3        NaN
4        NaN
          ..
114816   NaN
114817   NaN
114818   NaN
114819   NaN
114820   NaN
Name: jobname, Length: 114821, dtype: float64
Series([], Name: count, dtype: int64)
[]
finished
[]


In [7]:
label_dict = {}

for group in groups:
    if len(group) == 1:
        label_dict.update({group[0]:group[0]})
    else:
        print(f"Processing First Character: {group[0][0]}, Lenth: {len(group)}")
        ts = time.time()

        names = np.asarray(group)
        lev_similarity = -1 * np.array([[distance.levenshtein(w1, w2) for w1 in names] for w2 in names])

        affprop = AffinityPropagation(affinity="precomputed", damping=0.9, random_state=6)
        affprop.fit(lev_similarity)

        for cluster_id in np.unique(affprop.labels_):
            exemplar = names[affprop.cluster_centers_indices_[cluster_id]]
            cluster = np.unique(names[np.nonzero(affprop.labels_==cluster_id)])

            for ori in cluster:
                label_dict.update({ori:exemplar})

        print(f"Time Cost: {time.time()-ts} s")

for i in high_freq:
    label_dict.update({i:i})
assert len(label_dict) == len(name_list)

In [8]:
"""Replace Name"""
for i in range(len(df)):
    df.at[i, 'jobname'] = label_dict[df.at[i, 'jobname']]

df.to_csv(f"./{cluster}/cluster_full_log.csv", index=None)

KeyError: np.float64(nan)