**Table of contents**<a id='toc0_'></a>    
- [Importing Libraries and Constants](#toc1_)    
  - [Libraries](#toc1_1_)    
  - [Constants](#toc1_2_)    
- [Extracting Essential Tables For The Simulator](#toc2_)    
  - [Read Dataset Files](#toc2_1_)    
  - [Creating Training/Test set of VMs](#toc2_2_)    
  - [Creating an Initial Probability Distribution for Each VM Configuration (Used by Audible Algorithm)](#toc2_3_)    
  - [Creating a Conservative Initial VM Usage Model Based on Configuration (Used by Gaussian Algorithm)](#toc2_4_)    
  - [Storing VMIDs with Their Sizes for Simulation](#toc2_5_)    
  - [Create vmid to trace dataframe](#toc2_6_)    
    - [Departure Rate](#toc2_6_1_)    
    - [Create a vmid to trace file](#toc2_6_2_)    
  - [Store tables of vmid to predicted average/variance for both Oracle and Estimation Approaches](#toc2_7_)    
  - [Look-up table for Standard Normal](#toc2_8_)    
  - [Reserved Values per VM for Resource Central](#toc2_9_)    

<!-- vscode-jupyter-toc-config
	numbering=false
	anchor=true
	flat=false
	minLevel=1
	maxLevel=6
	/vscode-jupyter-toc-config -->
<!-- THIS CELL WILL BE REPLACED ON TOC UPDATE. DO NOT WRITE YOUR TEXT IN THIS CELL -->

# <a id='toc1_'></a>[Importing Libraries and Constants](#toc0_)

## <a id='toc1_1_'></a>[Libraries](#toc0_)

In [1]:
import pandas as pd
import numpy as np
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from scipy.stats import norm
import ipywidgets as widgets

## <a id='toc1_2_'></a>[Constants](#toc0_)

In [2]:
# Constants
# BVM tuple: (baseline, num_cpus, ram, init_credits, credit_cap, price, name')
bvm_configs = [
    (3.37, 20,  80, 600, 4860, 0.992, 'Standard_B20ms'),        # B20MS
    (2.70, 16,  64, 480, 3888, 0.794, 'Standard_B16ms'),        # B16MS
    (2.02, 12,  48, 360, 2909, 0.595, 'Standard_B12ms'),        # B12MS
    (1.35,  8,  32, 240, 1944, 0.397, 'Standard_B8ms'),        # B8MS
    (0.90,  4,  16, 120, 1296, 0.198, 'Standard_B4ms'),        # B4MS
    (0.60,  2,   8,  60,  864, 0.0992, 'Standard_B2ms'),        # B2MS
    (0.40,  2,   4,  60,  576, 0.0496, 'Standard_B2s'),        # B2S
    (0.20,  1,   2,  30,  288, 0.0248, 'Standard_B1ms'),        # B1MS
    (0.10,  1,   1,  30,  144, 0.0124, 'Standard_B1s'),        # B1S
    (0.05,  1, 0.5,  30,   72, 0.0062, 'Standard_B1ls')        # B1LS
]

In [3]:
baseline_to_corecount = {c[0]:c[1] for c in bvm_configs}
baseline_to_corecount

{3.37: 20,
 2.7: 16,
 2.02: 12,
 1.35: 8,
 0.9: 4,
 0.6: 2,
 0.4: 2,
 0.2: 1,
 0.1: 1,
 0.05: 1}

# <a id='toc2_'></a>[Extracting Essential Tables For The Simulator](#toc0_)

## <a id='toc2_1_'></a>[Read Dataset Files](#toc0_)

In [4]:
trace_dataframes, real_avg_cpu_utils_per_df = {}, {}
for fn in ['2021_burstable']:#['2021_burstable', '2021_regular', '2019_burstable']:
    # download the large 
    df = pd.read_parquet('data/{}VMs_{}_Data.parquet'.format(fn.split('_')[1].title(), fn.split('_')[0]))
    # df.set_index('vmid', inplace = True)
    trace_dataframes[fn] = df
    real_avg_cpu_utils_per_df[fn] = df['trace'].to_dict()
    print(f'{fn} dataframe has {df.shape[0]} VM records')
    df.head(3)

2019_burstable dataframe has 721341 VM records


## <a id='toc2_2_'></a>[Creating Training/Test set of VMs](#toc0_)
We divide the virtual machines into training and test sets. We use the training set to build the initial model and the test set for conducting simulations.

In [5]:
training_vmids_per_df, test_vmids_per_df = {}, {}
for ds_name in trace_dataframes:
    vmids = trace_dataframes[ds_name].index.values
    # split the vmids to test and training
    training_vmids_per_df[ds_name], test_vmids_per_df[ds_name] = train_test_split(trace_dataframes[ds_name].index.values, test_size=int( 0.2*len(trace_dataframes[ds_name]) ),random_state=1)


## <a id='toc2_3_'></a>[Creating an Initial Probability Distribution for Each VM Configuration (Used by Audible Algorithm)](#toc0_)

In [7]:
for ds_name in trace_dataframes:
    temp_training_df = trace_dataframes[ds_name].loc[training_vmids_per_df[ds_name]]

    # for training, get the 95 percentile of the pdf for each vmcorecount
    temp_training_df['avg_plus_std_first_hour'] = temp_training_df.index.map(lambda vmid: np.mean(real_avg_cpu_utils_per_df[ds_name][vmid][:12]) + np.std(real_avg_cpu_utils_per_df[ds_name][vmid][:12])) #['real_avg_avg_cpu'] + temp_training_df['real_std_avg_cpu']**(0.5)
    # the cpu usage array for each vmi is in real_avg_cpu_utils_per_df['2021_regular'][vmid] and create pmf for each of them with accuracy of 1 decimal point from 0 to vmcorecount associated to vmid (inclusive)

    # vmid_to_vmcorecount is a dictionary that include all the corecounts associated to a vmid
    # take the top 95 percentile of the pmfs from the trianing vms
    size_str = 'baseline' if 'burstable' in ds_name else 'corecount'
    all_usage_per_corecount = {}
    for corecount in tqdm(np.sort(temp_training_df[size_str].unique())):
        temp = temp_training_df[(temp_training_df[size_str] == corecount)]['avg_plus_std_first_hour'].dropna()
        # temp = vmid_to_f_per_df[name][(vmid_to_f_per_df[name]['corecount'] == core) & (vmid_to_f_per_df[name]['dram'] == ram)]['avg_plus_std_first_hour']
        vmids = temp[temp >= temp.quantile(0.95)].index.values
        all_usages = np.concatenate([real_avg_cpu_utils_per_df[ds_name][vmid][:12] for vmid in vmids])
        all_usage_per_corecount[corecount] = all_usages

    probability_density_per_corecount = {}
    for corecount in np.sort(temp_training_df[size_str].unique()):
        coef = corecount if 'regular' in ds_name else baseline_to_corecount[corecount]
        y, x = np.histogram(all_usage_per_corecount[corecount], bins = np.array(range(int(coef)*100 + 2))/100, density = True)
        probability_density_per_corecount[corecount] = y/100
    # store the probability_density_training_vms
    np.save(f'data/probability_density_training_vms_per_{size_str}_{ds_name}.npy', probability_density_per_corecount, allow_pickle = True)

100%|██████████| 7/7 [00:00<00:00,  7.04it/s]


## <a id='toc2_4_'></a>[Creating a Conservative Initial VM Usage Model Based on Configuration (Used by Gaussian Algorithm)](#toc0_)
This model represents the 95th percentile of initial CPU usage averages and variances, grouped by each VM configuration within the training set VMs.

In [6]:
for ds_name in trace_dataframes:
    feature_names = ['baseline' if 'burstable' in ds_name else 'corecount']# in a larger data set, this value could be set to ['corecount', 'dram']
    try:
        feature_to_avgs_training_arrivals = np.load( 'data/' + '{}_to_avgs_training_arrivals_{}.npy'.format('_'.join(feature_names), ds_name), allow_pickle=True)
        feature_to_vars_training_arrivals = np.load( 'data/' + '{}_to_vars_training_arrivals_{}.npy'.format('_'.join(feature_names), ds_name), allow_pickle=True)
    except:
        temp_training_df = trace_dataframes[ds_name].loc[training_vmids_per_df[ds_name]]
        feature_to_vmids = (temp_training_df[temp_training_df.vmcreated != 0].groupby(feature_names).apply(lambda x: x.index.tolist())).to_dict()
        feature_to_avgs_training_arrivals, feature_to_vars_training_arrivals = {}, {}
        for feature_value in tqdm(feature_to_vmids):
            feature_to_avgs_training_arrivals[feature_value], feature_to_vars_training_arrivals[feature_value] = [], []
            for vmid in tqdm(feature_to_vmids[feature_value]):
                feature_to_avgs_training_arrivals[feature_value].append(np.mean(real_avg_cpu_utils_per_df[ds_name][vmid][:12]))
                if len(real_avg_cpu_utils_per_df[ds_name][vmid]) > 5:
                    feature_to_vars_training_arrivals[feature_value].append(np.var(real_avg_cpu_utils_per_df[ds_name][vmid][:12]))
        np.save( 'data/' + '{}_to_avgs_training_arrivals_{}.npy'.format('_'.join(feature_names), ds_name), feature_to_avgs_training_arrivals)
        np.save( 'data/' + '{}_to_vars_training_arrivals_{}.npy'.format('_'.join(feature_names), ds_name), feature_to_vars_training_arrivals)


In [11]:
# generate first model quantiles 95
def get_avg_var_per_core_ram(feature_names, ds_name):
    p = 'data/'
    feature_to_avgs = np.load(p + '{}_to_avgs_training_arrivals_{}.npy'.format('_'.join(feature_names), ds_name), allow_pickle = True).reshape(1, )[0]
    feature_to_vars = np.load(p + '{}_to_vars_training_arrivals_{}.npy'.format('_'.join(feature_names),ds_name), allow_pickle = True).reshape(1, )[0]
    avg_var_per_feature = {}
    for feature_value in feature_to_avgs:
            a = np.quantile(feature_to_avgs[feature_value], 0.95)
            b = np.quantile(feature_to_vars[feature_value], 0.95)
            avg_var_per_feature[feature_value] = (a, b)
    return avg_var_per_feature



In [12]:
avg_var_per_feature_per_df = {}
for ds_name in trace_dataframes:
    feature_names = ['baseline' if 'burstable' in ds_name else 'corecount']
    avg_var_per_feature_per_df[ds_name] = get_avg_var_per_core_ram(feature_names, ds_name)
    

## <a id='toc2_5_'></a>[Storing VMIDs with Their Sizes for Simulation](#toc0_)
Store VMIDs from the test set that arrive during the monitoring window to ensure their initial usage is captured. In addition, store the baseline core performance for these VMs, which is the baseline value for burstable VMs and the peak core count for regular VMs.

In [6]:
# List of test arrival vmids
test_arrival_vmids_per_df, vmid_to_baseline_per_df = {}, {}
for ds_name in trace_dataframes:
    size_str = 'baseline' if 'burstable' in ds_name else 'corecount'
    try:
        test_arrival_vmids_per_df[ds_name] = np.load('data/test_arrival_vmids_' + ds_name + '.npy')
        vmid_to_baseline_per_df[ds_name] = np.load( 'data/' + 'vmid_to_{}_{}.npy'.format(size_str, ds_name), allow_pickle=True).reshape(1, )[0]
    except:
        # in-progress VMs should not be used in simulation because their arrival usage in unknow and they have truncated data.
        test_arrival_vmids_per_df[ds_name] = np.intersect1d(test_vmids_per_df[ds_name], trace_dataframes[ds_name][trace_dataframes[ds_name].vmcreated != 0].index.values)
        np.save( 'data/test_arrival_vmids_' + ds_name + '.npy', test_arrival_vmids_per_df[ds_name])
        # vmid to corecount
        vmid_to_baseline_per_df[ds_name] = trace_dataframes[ds_name].loc[test_arrival_vmids_per_df[ds_name]][size_str].to_dict()
        np.save( 'data/' + 'vmid_to_{}_{}.npy'.format(size_str, ds_name), vmid_to_baseline_per_df[ds_name])
    


## <a id='toc2_6_'></a>[Create vmid to trace dataframe](#toc0_)
Our monitoring captures only a limited period of VM activities, which is likely to be shorter than the actual lifespan of some of the VMs. This situation leads to incomplete data. To address this, we analyzed VM departure rates to predict their lifespans beyond our observation and adjusted their operational periods in our simulations.

### <a id='toc2_6_1_'></a>[Departure Rate](#toc0_)

In [8]:
departure_rates_per_df = {}
for ds_name in trace_dataframes:
    temp = trace_dataframes[ds_name]
    departure_rates = []
    len_tamp1 = []
    for i in range(2, 8):
        # long-running Arrival VMS that at least i days of them could have been observed
        if i == 7:
            cond1 = (temp.vmcreated >= 0) # arrival VMs        
            cond2 = (i*24*3600 <= (temp.vmdeleted.max() + 300 - temp.vmcreated))
        else:
            cond1 = (temp.vmcreated > 0) # arrival VMs
            cond2 = (i*24*3600 <= (temp.vmdeleted.max() - temp.vmcreated)) # at least i days after creation exists in monitoring
        cond3 = ( (i-1)*24*3600 < (temp.vmdeleted - temp.vmcreated)) # at least (i-1) day have been alive
        temp1 = temp[cond1 & cond2 & cond3]
        # VMs that lived i days
        temp2 = temp1[(temp1.vmdeleted >= (temp1.vmcreated + (i-1)*24*3600) ) & (temp1.vmdeleted < (temp1.vmcreated + (i)*24*3600)) & ( temp1.vmdeleted != temp.vmdeleted.max()) ]
        len_tamp1.append(len(temp1))
        departure_rates.append(len(temp2)/len(temp1))
    departure_rates_per_df[ds_name] = departure_rates

### <a id='toc2_6_2_'></a>[Create a vmid to trace file](#toc0_)

In [19]:
def determine_lr_num_samples(seen_num_samples, departure_rates):
    """
    the seen_nu_samples are always greater or equal 288
    """
    np.random.seed(seen_num_samples)
    i = min( int(np.floor(seen_num_samples/288)), 6) - 1
    alive = True
    while alive:
        alive = np.random.choice([True, False], p = [(1- departure_rates[min(5,i)]), departure_rates[min(5,i)]])
        i += 1
    new_num_samples = min(i * 288, 365*288) # to ensure at most one year worth of samples get added
    return new_num_samples

def gen_trace(trace, num_samples, revisions = [72, 288]):
    """ 
    return a new trace which is real values of trace that has been concatnated with same array excluding the first interval in revisions to match num_samples  
    precondition
    len(trace)>revisions[0]
    """
    first_revision = revisions[0]
    # determine the new trace based on the given num_samples
    if len(trace) < num_samples:
        new_trace = np.zeros(num_samples)
        num_rep = int(np.ceil(num_samples/len(trace[first_revision:])))
        new_trace[:first_revision] = trace[:first_revision]
        new_trace[first_revision:] = np.tile(trace[first_revision:], num_rep)[:len(new_trace)-first_revision]
    else:
        new_trace = np.copy(trace)
    return new_trace

new_traces_per_df = {}
for ds_name in trace_dataframes:
    new_traces_per_df[ds_name] = {}
    vmids_unknown_end = trace_dataframes[ds_name][trace_dataframes[ds_name].vmdeleted == trace_dataframes[ds_name].vmdeleted.max()].index.values
    for vmid in tqdm(test_arrival_vmids_per_df[ds_name]):
        trace_len = len(real_avg_cpu_utils_per_df[ds_name][vmid])
        if trace_len >= 288 and vmid in vmids_unknown_end: 
            new_traces_per_df[ds_name][vmid] = gen_trace(real_avg_cpu_utils_per_df[ds_name][vmid], determine_lr_num_samples(trace_len, departure_rates_per_df[ds_name]))
        else:
            new_traces_per_df[ds_name][vmid] = real_avg_cpu_utils_per_df[ds_name][vmid]

    # save the datafile as feather
    temp = pd.DataFrame(list(new_traces_per_df[ds_name].items()), columns = ['vmid', 'trace'])
    temp.to_feather('data/' + 'test_arrival_vmid_to_trace_{}.feather'.format(ds_name))


100%|██████████| 503384/503384 [00:12<00:00, 39774.96it/s]


## <a id='toc2_7_'></a>[Store tables of vmid to predicted average/variance for both Oracle and Estimation Approaches](#toc0_)
Each VM's predicted average/variance are calculated by combining the initial conservative model with lifespan prediction. These predictions are generated both from an oracle perspective and by estimating according to the VM's early-stage CPU usage traces.

In [14]:
def gen_avgs_vars(trace, vmsize, avg_or_var = 'avg' ,prediction = 'oracle'):
    if prediction == 'oracle' and avg_or_var == 'avg':
        return np.concatenate([np.ones(min(72, len(trace)))*avg_var_per_feature_per_df[ds_name][vmsize][0], np.mean(trace[72:])*np.ones(len(trace[72:]))])
    if prediction == 'oracle' and avg_or_var == 'var':
        return np.concatenate([np.ones(min(72, len(trace)))*avg_var_per_feature_per_df[ds_name][vmsize][1], np.var(trace[72:])*np.ones(len(trace[72:]))])
    if prediction == 'est' and avg_or_var == 'avg':
        return np.concatenate([np.ones(min(72, len(trace)))*avg_var_per_feature_per_df[ds_name][vmsize][0], np.mean(trace[:72])*np.ones(len(trace[72:]))])
    if prediction == 'est' and avg_or_var == 'var':
        return np.concatenate([np.ones(min(72, len(trace)))*avg_var_per_feature_per_df[ds_name][vmsize][1], np.var(trace[:72])*np.ones(len(trace[72:]))])
    

In [15]:
for ds_name in trace_dataframes:
    # read vmid to their associated traces
    temp = pd.read_feather('data/' + 'test_arrival_vmid_to_trace_{}.feather'.format(ds_name))
    # read vmid to their base performance
    size_str = 'baseline' if 'burstable' in ds_name else 'corecount'
    # Generate and save a table that has vmid to the predicted average/variance of the VM for each point in their lifetime 
    for prediction_type in ['oracle', 'est']: # Oracle and estimation prediction
        temp['trace_pred_avgs'] = temp.apply(lambda row:  gen_avgs_vars(row['trace'], vmid_to_baseline_per_df[ds_name][row['vmid']], avg_or_var = 'avg' ,prediction = prediction_type), axis = 1)
        temp['trace_pred_vars'] = temp.apply(lambda row:  gen_avgs_vars(row['trace'], vmid_to_baseline_per_df[ds_name][row['vmid']], avg_or_var = 'var' ,prediction = prediction_type), axis = 1)
        temp[['vmid', 'trace_pred_avgs', 'trace_pred_vars']].to_feather('data/' + '0.95_first_model_{}_rest_{}.feather'.format(prediction_type, ds_name))

  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)
  return _methods._var(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  arrmean = um.true_divide(arrmean, div, out=arrmean, casting='unsafe',
  ret = ret.dtype.type(ret / rcount)


## <a id='toc2_8_'></a>[Look-up table for Standard Normal](#toc0_)
For faster CDF computation within the Gaussian algorithm, we create data ahead of time offline

In [None]:
precision = 6
lookup_table = {}
for desired_cdf in tqdm(np.arange(0 + 1/(10**precision), 1 , 1/(10**precision))):
    corresponding_value = norm.ppf(desired_cdf)
    lookup_table[round(desired_cdf, precision)] = corresponding_value
np.save('data/' + 'standard_lookup_table_precision_{}.npy'.format(precision), lookup_table, allow_pickle = True)

  0%|          | 1176/999999 [00:00<02:46, 6012.20it/s]

100%|██████████| 999999/999999 [02:17<00:00, 7271.71it/s]


## <a id='toc2_9_'></a>[Reserved Values per VM for Resource Central](#toc0_)
- Resource Central uses bucket-based CPU usage forecasts.
- More buckets are created for burstable VMs, which usually stay below baseline but sometimes exceed it.
- Oracle predictions identify the bucket representing the 95th percentile CPU usage for each VM.

In [None]:
def get_buckets_per_baseline(PEAK_CPU_PER_BASELINE):
    # The buckets for predicting in the resource central method
    BUCKET_RC_PER_BASELINE = {}
    for baseline in PEAK_CPU_PER_BASELINE:
        values1 = np.linspace(0, baseline, num = 5, endpoint=False)
        values2 = np.linspace(baseline, PEAK_CPU_PER_BASELINE[baseline], num = 4, endpoint=True)
        BUCKET_RC_PER_BASELINE[baseline] = np.concatenate((values1, values2), axis=0)[1:]
    return BUCKET_RC_PER_BASELINE

def get_allocated_cores(BUCKET_RC_PER_BASELINE, vmid_to_baseline, vmid_to_trace, prediction_quantile):
    # for all the traces, calculate the bucket value that bound their usage
    bucketized_predictions_per_vmid = {}
    for vmid in vmid_to_trace:
        baseline, trace = vmid_to_baseline[vmid], vmid_to_trace[vmid]
        idx = np.argmin(BUCKET_RC_PER_BASELINE[baseline] <= np.quantile(trace, prediction_quantile))
        bucketized_predictions_per_vmid[vmid] = BUCKET_RC_PER_BASELINE[baseline][idx]
    return bucketized_predictions_per_vmid

In [None]:
prediction_quantile = 0.95 # In resource central, we picked a bucket that bounds the 95 percentile of the CPU usage
for ds_name in trace_dataframes:
    size_str = 'baseline' if 'burstable' in ds_name else 'corecount'
    vmid_to_trace = pd.read_feather('data/' + 'test_arrival_vmid_to_trace_{}.feather'.format(ds_name)).set_index('vmid')['trace'].to_dict()
    try:
        vmid_to_baseline = vmid_to_baseline_per_df[ds_name]
    except:
        vmid_to_baseline = np.load( 'data/' + 'vmid_to_{}_{}.npy'.format(size_str, ds_name), allow_pickle=True).reshape(1, )[0]
    if 'burstable' in ds_name:
        PEAK_CPU_PER_BASELINE = {3.37: 20, 2.7: 16, 2.02: 12, 1.35: 8, 0.9: 4, 0.6: 2, 0.4: 2, 0.2: 1, 0.1: 1, 0.05: 1}
    else:
        unique_baselines = np.array(list(set(vmid_to_baseline.values())))
        PEAK_CPU_PER_BASELINE = dict(zip(unique_baselines, unique_baselines))# we extend resource central to improve accuracy since there is more chance the VM's 95 percentile fall in the first 20% of Peak CPU


    BUCKET_RC_PER_BASELINE = get_buckets_per_baseline(PEAK_CPU_PER_BASELINE)
    allocated_cores_per_vmid = get_allocated_cores(BUCKET_RC_PER_BASELINE, vmid_to_baseline, vmid_to_trace, prediction_quantile)
    
    temp = pd.DataFrame(data = list(allocated_cores_per_vmid.values()), index = list(allocated_cores_per_vmid.keys()), columns = [f'first_rc_{prediction_quantile}'])
    temp.index.name = 'vmid'
    temp.reset_index().to_feather('data/' + f"first_models_quantile_{prediction_quantile}_{ds_name}.feather")
        
