In [1]:
# Standard Library
import os
import ast
import zipfile
import tarfile
import itertools

# Data Manipulation and File I/O
import pandas as pd
import numpy as np
import pyarrow.parquet as pq

# Machine Learning and Preprocessing
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans
from xgboost import XGBRegressor
import xgboost as xgb

# Visualization
import matplotlib.pyplot as plt
from matplotlib import cm
import seaborn as sns

import sys
sys.path.append('/projectnb/peaclab-mon/boztop/resource-allocation/python_scripts')

from ml_model_training import train_xgboost
from kmeans_clustering import cluster_create_sub_dataframes
from plot_functions import plot_everything
from feature_selection import (
    xgboost_feature_selection, 
    random_forest_feature_importance, 
    correlation_feature_selection
)


In [2]:
file_path = '/projectnb/peaclab-mon/boztop/resource-allocation/datasets/borg_traces_data.csv'

df = pd.read_csv(file_path)

df.columns

Index(['Unnamed: 0', 'time', 'instance_events_type', 'collection_id',
       'scheduling_class', 'collection_type', 'priority',
       'alloc_collection_id', 'instance_index', 'machine_id',
       'resource_request', 'constraint', 'collections_events_type', 'user',
       'collection_name', 'collection_logical_name',
       'start_after_collection_ids', 'vertical_scaling', 'scheduler',
       'start_time', 'end_time', 'average_usage', 'maximum_usage',
       'random_sample_usage', 'assigned_memory', 'page_cache_memory',
       'cycles_per_instruction', 'memory_accesses_per_instruction',
       'sample_rate', 'cpu_usage_distribution', 'tail_cpu_usage_distribution',
       'cluster', 'event', 'failed'],
      dtype='object')

In [3]:
df['user']

0         fn8Ve4Tdl/FVVvwXFGIKe4+Wo4zLjUL/557qdFVYu5M=
1         DrrEIEWkWuW7RrZwpHLCN0k0A2J0usJeyt3wtqzZ7Kk=
2         /ivQBmewiFcXfGJdCUsEKx47NiRE29Tjiq3gw+zR2Cg=
3         8qRmTJas/6XEBaA0l4Wt1+/qSLgc6p7u7JzoMSuT/M8=
4         +cz+wEXttqDjld+YWLmrgqExjdqiHMZ2WZODThyR9jE=
                              ...                     
405889    jW2GJpOlCheoSzGcwfmRjkjV6mYZo4DC43ej/X/OyjM=
405890    DrrEIEWkWuW7RrZwpHLCN0k0A2J0usJeyt3wtqzZ7Kk=
405891    +tvHHpU79T6KNqPFO5KYRfjslYLDrIq4QS+CQ98QCEY=
405892    jW2GJpOlCheoSzGcwfmRjkjV6mYZo4DC43ej/X/OyjM=
405893    DrrEIEWkWuW7RrZwpHLCN0k0A2J0usJeyt3wtqzZ7Kk=
Name: user, Length: 405894, dtype: object

In [4]:
df['start_time'] = df['start_time'].astype(int)
df['end_time'] = df['end_time'].astype(int)

df['time'] = df['time'].astype(int)
df['run_time'] = df['end_time'] - df['start_time'] 

df = df.sort_values(by='start_time').reset_index(drop=True)

df

Unnamed: 0.1,Unnamed: 0,time,instance_events_type,collection_id,scheduling_class,collection_type,priority,alloc_collection_id,instance_index,machine_id,...,cycles_per_instruction,memory_accesses_per_instruction,sample_rate,cpu_usage_distribution,tail_cpu_usage_distribution,cluster,event,failed,wallclock_req,run_time
0,183421,0,2,152917495628,3,1,200,0,47,17377689713,...,,,1.0,[0.01968384 0.02651978 0.0284729 0.03042603 0...,[0.05157471 0.05200195 0.05255127 0.05310059 0...,2,FAIL,1,0,300000000
1,256265,352321459944,3,261561475113,2,1,101,0,425,178160671591,...,,,1.0,[0.00028896 0.00033188 0.00035667 0.00037479 0...,[0.00057983 0.00058651 0.00059414 0.00060177 0...,8,FINISH,0,352321459944,300000000
2,44099,365626782748,3,261561475113,2,1,101,0,257,813863542,...,,,1.0,[0.00028896 0.00033188 0.00035667 0.00037479 0...,[0.00057983 0.00058651 0.00059414 0.00060177 0...,8,FINISH,0,365626782748,300000000
3,145811,1846171586901,3,261561475113,2,1,101,0,263,527532269,...,,,1.0,[0.00028896 0.00033188 0.00035667 0.00037479 0...,[0.00057983 0.00058651 0.00059414 0.00060177 0...,8,FINISH,0,1846171586901,300000000
4,154472,2571380789305,3,261561475113,2,1,101,0,135,559903004,...,,,1.0,[0.00028896 0.00033188 0.00035667 0.00037479 0...,[0.00057983 0.00058651 0.00059414 0.00060177 0...,8,FINISH,0,2571380789305,300000000
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
405889,342616,2155563205756,3,21299367638,1,0,119,0,500,128037042591,...,0.911881,0.000878,1.0,[0.00846863 0.02548218 0.03009033 0.03167725 0...,[0.04907227 0.04956055 0.04998779 0.05078125 0...,7,FINISH,0,2155563205756,300000000
405890,105609,232437467236,3,21299367638,1,0,119,0,262,14113087,...,0.911881,0.000878,1.0,[0.00846863 0.02548218 0.03009033 0.03167725 0...,[0.04907227 0.04956055 0.04998779 0.05078125 0...,7,FINISH,0,232437467236,300000000
405891,221122,1323175184874,3,21299367638,1,0,119,0,729,14113283,...,0.911881,0.000878,1.0,[0.00846863 0.02548218 0.03009033 0.03167725 0...,[0.04907227 0.04956055 0.04998779 0.05078125 0...,7,FINISH,0,1323175184874,300000000
405892,64800,2401741772239,3,21299367638,1,0,119,0,756,3194006620,...,0.911881,0.000878,1.0,[0.00846863 0.02548218 0.03009033 0.03167725 0...,[0.04907227 0.04956055 0.04998779 0.05078125 0...,7,FINISH,0,2401741772239,300000000


In [5]:
# --- LET'S MAKE SOME OBSERVATIONS! ---
print(f"Total number of jobs in the dataset: {df.shape[0]}")
print(f"Number of unique clusters: {df['cluster'].nunique()}")
print(f"Number of unique collection_ID: {df['collection_id'].nunique()}")
print(f"Number of unique machine_ID: {df['machine_id'].nunique()}")      
print(f"Number of unique users: {df['user'].nunique()}")
          

Total number of jobs in the dataset: 405894
Number of unique clusters: 8
Number of unique collection_ID: 4057
Number of unique machine_ID: 96174
Number of unique users: 898


In [6]:
print("Failed vs Successful Jobs:")  
      
print(df['failed'].value_counts())
    
    
df_success = df[df['failed'] == 0]
df_failure = df[df['failed'] == 1] 

Failed vs Successful Jobs:
0    313216
1     92678
Name: failed, dtype: int64


In [7]:
df_success['resource_request'] = df_success['resource_request'].apply(
    lambda x: ast.literal_eval(x) if isinstance(x, str) else x if isinstance(x, dict) else None
)
df_success['maximum_usage'] = df_success['maximum_usage'].apply(
    lambda x: ast.literal_eval(x) if isinstance(x, str) else x if isinstance(x, dict) else None
)

df_success['cpu_req'] = df_success['resource_request'].apply(lambda x: x['cpus'] if isinstance(x, dict) else None)
df_success['mem_req'] = df_success['resource_request'].apply(lambda x: x['memory'] if isinstance(x, dict) else None)
df_success['cpu_used'] = df_success['maximum_usage'].apply(lambda x: x['cpus'] if isinstance(x, dict) else None)
df_success['mem_used'] = df_success['maximum_usage'].apply(lambda x: x['memory'] if isinstance(x, dict) else None)



df_success['cpu_req'] = df_success['cpu_req']*100
df_success['mem_req'] = df_success['mem_req']*100
df_success['cpu_used'] = df_success['cpu_used']*100
df_success['mem_used'] = df_success['mem_used']*100

df_success.columns



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  """
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs

Index(['Unnamed: 0', 'time', 'instance_events_type', 'collection_id',
       'scheduling_class', 'collection_type', 'priority',
       'alloc_collection_id', 'instance_index', 'machine_id',
       'resource_request', 'constraint', 'collections_events_type', 'user',
       'collection_name', 'collection_logical_name',
       'start_after_collection_ids', 'vertical_scaling', 'scheduler',
       'start_time', 'end_time', 'average_usage', 'maximum_usage',
       'random_sample_usage', 'assigned_memory', 'page_cache_memory',
       'cycles_per_instruction', 'memory_accesses_per_instruction',
       'sample_rate', 'cpu_usage_distribution', 'tail_cpu_usage_distribution',
       'cluster', 'event', 'failed', 'wallclock_req', 'run_time', 'cpu_req',
       'mem_req', 'cpu_used', 'mem_used'],
      dtype='object')

In [8]:
df_success = df_success.drop(columns=['Unnamed: 0', 'time', 'instance_events_type', 
       'resource_request', 'constraint', 
       'start_after_collection_ids', 'vertical_scaling', 'scheduler',
       'average_usage', 'maximum_usage',
       'random_sample_usage', 'assigned_memory', 'page_cache_memory',
       'cycles_per_instruction', 'memory_accesses_per_instruction',
       'sample_rate', 'cpu_usage_distribution', 'tail_cpu_usage_distribution',
       'event', 'failed'])

df_success

Unnamed: 0,collection_id,scheduling_class,collection_type,priority,alloc_collection_id,instance_index,machine_id,collections_events_type,user,collection_name,collection_logical_name,start_time,end_time,cluster,wallclock_req,run_time,cpu_req,mem_req,cpu_used,mem_used
1,261561475113,2,1,101,0,425,178160671591,3,DrrEIEWkWuW7RrZwpHLCN0k0A2J0usJeyt3wtqzZ7Kk=,jqV4NZYEuP2YWjqfZZcW8MnCJVQgRIABXl/ESAAlU9Y=,mOIaNf8mXWKMT1ODsgfz3yhpFBedgNmcMaskvu55SBc=,600000000,900000000,8,352321459944,300000000,2.246094,0.726318,0.118637,0.466156
2,261561475113,2,1,101,0,257,813863542,3,DrrEIEWkWuW7RrZwpHLCN0k0A2J0usJeyt3wtqzZ7Kk=,jqV4NZYEuP2YWjqfZZcW8MnCJVQgRIABXl/ESAAlU9Y=,mOIaNf8mXWKMT1ODsgfz3yhpFBedgNmcMaskvu55SBc=,600000000,900000000,8,365626782748,300000000,2.246094,1.182556,0.118637,0.466156
3,261561475113,2,1,101,0,263,527532269,3,DrrEIEWkWuW7RrZwpHLCN0k0A2J0usJeyt3wtqzZ7Kk=,jqV4NZYEuP2YWjqfZZcW8MnCJVQgRIABXl/ESAAlU9Y=,mOIaNf8mXWKMT1ODsgfz3yhpFBedgNmcMaskvu55SBc=,600000000,900000000,8,1846171586901,300000000,2.017212,1.182556,0.118637,0.466156
4,261561475113,2,1,101,0,135,559903004,3,DrrEIEWkWuW7RrZwpHLCN0k0A2J0usJeyt3wtqzZ7Kk=,jqV4NZYEuP2YWjqfZZcW8MnCJVQgRIABXl/ESAAlU9Y=,mOIaNf8mXWKMT1ODsgfz3yhpFBedgNmcMaskvu55SBc=,600000000,900000000,8,2571380789305,300000000,2.142334,2.874756,0.118637,0.466156
5,261561475113,2,1,101,0,897,1892065342,3,DrrEIEWkWuW7RrZwpHLCN0k0A2J0usJeyt3wtqzZ7Kk=,jqV4NZYEuP2YWjqfZZcW8MnCJVQgRIABXl/ESAAlU9Y=,mOIaNf8mXWKMT1ODsgfz3yhpFBedgNmcMaskvu55SBc=,600000000,900000000,8,2190254331260,300000000,2.017212,1.182556,0.118637,0.466156
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
405889,21299367638,1,0,119,0,500,128037042591,3,8qRmTJas/6XEBaA0l4Wt1+/qSLgc6p7u7JzoMSuT/M8=,1DO3jDz/DuDOwl3QEV7CroiyNmhiMUgp1QKsAJVy3EE=,7J86abwGzd8YCATmMLG6StG4z18ZcHI1bWj0OSnpaYo=,2678400000000,2678700000000,7,2155563205756,300000000,5.285645,0.611115,6.195068,0.161934
405890,21299367638,1,0,119,0,262,14113087,3,8qRmTJas/6XEBaA0l4Wt1+/qSLgc6p7u7JzoMSuT/M8=,1DO3jDz/DuDOwl3QEV7CroiyNmhiMUgp1QKsAJVy3EE=,7J86abwGzd8YCATmMLG6StG4z18ZcHI1bWj0OSnpaYo=,2678400000000,2678700000000,7,232437467236,300000000,5.285645,0.469971,6.195068,0.161934
405891,21299367638,1,0,119,0,729,14113283,3,8qRmTJas/6XEBaA0l4Wt1+/qSLgc6p7u7JzoMSuT/M8=,1DO3jDz/DuDOwl3QEV7CroiyNmhiMUgp1QKsAJVy3EE=,7J86abwGzd8YCATmMLG6StG4z18ZcHI1bWj0OSnpaYo=,2678400000000,2678700000000,7,1323175184874,300000000,5.285645,0.415039,6.195068,0.161934
405892,21299367638,1,0,119,0,756,3194006620,3,8qRmTJas/6XEBaA0l4Wt1+/qSLgc6p7u7JzoMSuT/M8=,1DO3jDz/DuDOwl3QEV7CroiyNmhiMUgp1QKsAJVy3EE=,7J86abwGzd8YCATmMLG6StG4z18ZcHI1bWj0OSnpaYo=,2678400000000,2678700000000,7,2401741772239,300000000,5.285645,0.551605,6.195068,0.161934


# Finding Optimal Training Feature Set

In [None]:
numerical_submission_features = ['collection_id',
       'scheduling_class', 'collection_type', 'priority',
       'alloc_collection_id', 'instance_index', 'machine_id',
       'collections_events_type',
        'start_time', 'end_time', 'cluster',
       'cpu_req', 'mem_req', 'cpu_used', 'mem_used']

categorical_submission_features = ['user', 'collection_name', 'collection_logical_name']
label_encoders = {col: LabelEncoder() for col in categorical_submission_features}

for col in categorical_submission_features:
    df_success[col] = label_encoders[col].fit_transform(df_success[col])

In [None]:
def find_optimal_feature_set(df_success,features, pred_feature, title, xlabel):
    X = df_success[features]
    y = df_success[pred_feature]
    

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    rmse_values = []
    combinations = []
    
    for r in range(6, 15):
        print(f'Iteration: {r}')
        for combo in itertools.combinations(features, r):
            print(f'Feature Combination: {list(combo)}')
            X_train_combo = X_train[list(combo)]
            X_test_combo = X_test[list(combo)]

            model = xgb.XGBRegressor(objective='reg:squarederror', eval_metric='rmse')
            model.fit(X_train_combo, y_train)

            y_pred = model.predict(X_test_combo)

            rmse = np.sqrt(mean_squared_error(y_test, y_pred))

            rmse_values.append(rmse)
            combinations.append(', '.join(combo))
        
    sorted_indices = np.argsort(rmse_values)[::-1]
    sorted_rmse_values = np.array(rmse_values)[sorted_indices]
    sorted_combinations = np.array(combinations)[sorted_indices]
    
    print(f"Best Feature Combination: {sorted_combinations[0]}")
    print(f"Least RMSE Value: {sorted_rmse_values[0]}")

    norm = plt.Normalize(vmin=min(sorted_rmse_values), vmax=max(sorted_rmse_values))
    colors = cm.viridis(norm(sorted_rmse_values)) 

    plt.figure(figsize=(10, 6))
    bars = plt.barh(sorted_combinations, sorted_rmse_values, color=colors)

    cbar = plt.colorbar(cm.ScalarMappable(norm=norm, cmap='viridis'))
    cbar.set_label('RMSE', fontsize=12)

    plt.xlabel(f'{xlabel}', fontsize=14)
    plt.ylabel('Feature Combination', fontsize=14)
    plt.title(f'{title}', fontsize=16)
    plt.tight_layout()
    plt.show()

    
    

In [None]:
find_optimal_feature_set(df_success,['collection_id',
       'scheduling_class', 'collection_type', 'priority',
       'alloc_collection_id', 'instance_index', 'machine_id',
       'collections_events_type',
        'start_time', 'end_time', 'cluster',
       'cpu_req', 'mem_req',], 'cpu_used', 'RMSE Values for Each Categorical Input Feature Combination', 'RMSE (# of CPU Cores)')



In [None]:
find_optimal_feature_set(df_success,['collection_id',
       'scheduling_class', 'collection_type', 'priority',
       'alloc_collection_id', 'instance_index', 'machine_id',
       'collections_events_type',
        'start_time', 'end_time', 'cluster',
       'cpu_req', 'mem_req'], 'mem_used', 'RMSE Values for Each Categorical Input Feature Combination', 'RMSE (Memory in bytes)')



# Execution Time Prediction

In [15]:
df_success['wallclock_req']

1          352321459944
2          365626782748
3         1846171586901
4         2571380789305
5         2190254331260
              ...      
405889    2155563205756
405890     232437467236
405891    1323175184874
405892    2401741772239
405893    1996383758711
Name: wallclock_req, Length: 313216, dtype: int64

In [27]:
df_success['wallclock_req'] = pd.to_numeric(df_success['wallclock_req'], errors='coerce')
df_success['run_time'] = pd.to_numeric(df_success['run_time'], errors='coerce')

squared_differences = (df_success['run_time'] - df_success['wallclock_req']) ** 2
squared_differences = squared_differences.dropna()
rmse = squared_differences.mean()

print(f"RMSE of user requested execution time: {rmse}")

mae = np.mean(np.abs(df['run_time'] - df['wallclock_req']))
print(f"MAE of user requested execution time: {mae}")

r2 = r2_score(df['run_time'], df['wallclock_req'])
print(f"R^2 of user requested execution time: {r2}")

RMSE of user requested execution time: -7.6827551046697e+16
MAE of user requested execution time: 69274515250665.42
R^2 of user requested execution time: 4.036209336676826


In [28]:
print(df_success[['wallclock_req', 'run_time']].describe())

       wallclock_req      run_time
count   3.132160e+05  3.132160e+05
mean    8.951414e+13  2.019083e+08
std     2.854480e+16  1.327993e+08
min     0.000000e+00  1.000000e+06
25%     4.803803e+11  1.200000e+07
50%     1.159233e+12  3.000000e+08
75%     1.765364e+12  3.000000e+08
max     9.223372e+18  3.000000e+08


# CPU Prediction

In [None]:
rmse = np.sqrt(((df_success['cpu_used'] - df_success['cpu_req']) ** 2).mean())

print(f"RMSE: {rmse}")


In [None]:
rmse, mae = train_xgboost(df_success, 'Google Cluster Dataset', 'cpu_used', ['collection_id',
       'scheduling_class', 'collection_type', 'priority',
       'alloc_collection_id', 'instance_index', 'machine_id',
       'collections_events_type',
        'start_time', 'end_time', 'cluster',
       'cpu_req', 'mem_req'], 'cpu_req',lag_value=1)


print(f"RMSE: {rmse}, MAE: {mae}")

In [None]:
sub_dfs = cluster_create_sub_dataframes(df_success, ['collection_id',
       'scheduling_class', 'collection_type', 'priority',
       'alloc_collection_id', 'instance_index', 'machine_id',
       'collections_events_type',
        'start_time', 'end_time',
       'cpu_req', 'mem_req'] , ['cpu_used'],7)

rmse_list = []
mae_list = []

for i, sub_df in enumerate(sub_dfs):
    rmse, mae = train_xgboost(sub_df, 'Google Cluster Dataset', 'cpu_used', ['collection_id',
       'scheduling_class', 'collection_type', 'priority',
       'alloc_collection_id', 'instance_index', 'machine_id',
       'collections_events_type',
        'start_time', 'end_time', 
       'cpu_req', 'mem_req'], 'cpu_req',lag_value=1)
    rmse_list.append(rmse)
    mae_list.append(mae)

    
mae = np.mean(mae_list)
print(f"Average RMSE for Clustering: {mae:.4f}")




# Memory Prediction

In [None]:
rmse = np.sqrt(((df_success['mem_used'] - df_success['mem_req']) ** 2).mean())

print(f"RMSE: {rmse}")


In [None]:
train_xgboost(df_success, 'Google Cluster Dataset', 'mem_used', ['collection_id',
       'scheduling_class', 'collection_type', 'priority',
       'alloc_collection_id', 'instance_index', 'machine_id',
       'collections_events_type',
        'start_time', 'end_time', 'cluster',
       'cpu_req', 'mem_req'], 'mem_req',lag_value=1)



In [None]:
sub_dfs = cluster_create_sub_dataframes(df_success, ['collection_id',
       'scheduling_class', 'collection_type', 'priority',
       'alloc_collection_id', 'instance_index', 'machine_id',
       'collections_events_type',
        'start_time', 'end_time',
       'cpu_req', 'mem_req'] , ['mem_used'],7)

rmse_list = []
mae_list = []

for i, sub_df in enumerate(sub_dfs):
    rmse, mae = train_xgboost(sub_df, 'Google Cluster Dataset', 'mem_used', ['collection_id',
       'scheduling_class', 'collection_type', 'priority',
       'alloc_collection_id', 'instance_index', 'machine_id',
       'collections_events_type',
        'start_time', 'end_time',
       'cpu_req', 'mem_req'], 'mem_req',lag_value=1)
    rmse_list.append(rmse)
    mae_list.append(mae)

    
mae = np.mean(mae_list)
print(f"Average RMSE for Clustering: {mae:.4f}")




In [None]:
sampled_df = df_success.iloc[::len(df_success) // 100]

# Plot for CPU
plt.figure(figsize=(10, 5))
plt.plot(sampled_df.index, sampled_df['cpu_req'], linestyle='--', color='purple', label='CPU Requested')
plt.plot(sampled_df.index, sampled_df['cpu_used'], linestyle='-', color='orange', label='CPU Used')
plt.xlabel('Timestamp')
plt.ylabel('CPU')
plt.title('Requested vs. Used CPU')
plt.legend()
plt.show()


In [None]:
# Plot for Memory
plt.figure(figsize=(10, 5))
plt.plot(df_success.index, df_success['assigned_memory'], linestyle='--', color='purple', label='Memory Allocated')
plt.plot(df_success.index, df_success['mem_used'], linestyle='-', color='orange', label='Memory Used')
plt.xlabel('Job index')
plt.ylabel('Memory (NCUs)')
plt.title('Allocated vs. Used Memory in BORG Google Cluster (May, 2019)')
plt.legend()
plt.show()