In [1]:
import pandas as pd
import numpy as np
from tqdm import tqdm

from sklearn.decomposition import FactorAnalysis, PCA
from sklearn.cluster import KMeans
from sklearn.preprocessing import OrdinalEncoder, MinMaxScaler, KBinsDiscretizer
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF

from sklearn.metrics import pairwise_distances_argmin_min, mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.base import clone

In [2]:
def split_train_val(offline_path, save_path='./data/split_data/'):
    '''
    Splits original offline workload into train and val sets
    Random configs of each workload are split into train (offline) and val (online)
    Random configs of val are further split into val and test (just the knobs).
    NOTE: Messy, will clean up later
    '''
    offline = pd.read_csv(offline_path)
    X, y = offline, offline['latency']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05, random_state=645)
    X_train.sort_values('workload id').to_csv(save_path + 'offline_workload.csv', index=False)
    XT_train, XT_test, yT_train, yT_test = train_test_split(X_test, y_test, test_size=0.30, random_state=645)
    XT_train.sort_values('workload id').to_csv(save_path + 'online_workload.csv', index=False)
    XT_test.iloc[:, :13].sort_values('workload id').to_csv(save_path + 'test_knobs.csv', index=False)
    XT_test.iloc[:, [0,13]].sort_values('workload id').iloc[:, 1].to_csv(save_path + 'true_latency.csv', index=False, header=False)
    
    # Of the 58 workloads, how many are represented in test workloads
    # Making sure we test at least one config in each workload
    print(XT_train['workload id'].nunique(), XT_test['workload id'].nunique())
    
# Only run once, will directly load after
# split_train_val('./data/orig/offline_workload.csv')

In [3]:
class DataRepo:
    '''
    Data repository. Contains methods to prune metrics and preprocess knobs.
    '''
    def __init__(self, offline_path, n_factors=2, n_clusters=5, int_enc=OrdinalEncoder(), cont_enc=MinMaxScaler()):
        self.OFFLINE_WL_PATH = offline_path
        self.METRICS_START_IDX = 14
        self.LATENCY_IDX = 13
        self.INT_KNOBS_IDXS = [9, 10, 11, 12]
        self.CONT_KNOBS_IDXS = [1, 2, 3, 4, 5, 6, 8]
        self.BOOL_KNOS_IDX = 7
        
        self.N_FACTORS = n_factors # no. of factors for Factor Analysis
        self.N_CLUSTERS = n_clusters # k in KMeans
        
        self.pruned_metrics_idxs = None
        self.pruned_metrics_names = None
        self.int_encoder = int_enc # Encoder for integer knobs
        self.cont_encoder = cont_enc # Encoder for continuous knobs
    
    def _build(self):
        '''
        Run only once at the beginning of DataRepo creation.
        Prunes metrics and preprocesses knobs in offline workloads.
        Final processed data is not saved, rather returned to 
        OtterTune to create Workload objects.
        '''
        pruned_data = self.__prune_offline_metrics(self.OFFLINE_WL_PATH)
        processed_data = self.__preprocess_workload_knobs(pruned_data)
        return processed_data
    
    def process_online_workload(self, raw_workload):
        '''
        Prune metrics and preprocess knobs of online workloads.
        '''
        pruned_data = self.__prune_online_metrics(raw_workload)
        return self.__preprocess_workload_knobs(pruned_data, online=True)
    
    def process_test_knobs(self, test_knobs):
        '''
        Preprocess test knobs.
        '''
        return self.__preprocess_workload_knobs(test_knobs, online=True, only_knobs=True)
          
    def __prune_offline_metrics(self, file_path=None):
        '''
        Prune offline workloads metrics using FA + KMeans.
        NOTE: Modularize to use any technique.
        '''
        data = pd.read_csv(file_path)
        metrics = data.to_numpy()[:, self.METRICS_START_IDX:].T

        fa = FactorAnalysis(n_components=self.N_FACTORS)
        metric_factors = fa.fit_transform(metrics)
        km = KMeans(n_clusters=self.N_CLUSTERS).fit(metric_factors)
        closest_idxs, _ = pairwise_distances_argmin_min(km.cluster_centers_, metric_factors)
        self.pruned_metrics_idxs = closest_idxs
        closest_idxs_raw = [self.METRICS_START_IDX + idx for idx in closest_idxs]
        self.pruned_metrics_names = data.columns[closest_idxs_raw].tolist()
        
        pruned_metrics = metrics[self.pruned_metrics_idxs].T
        n_cols = data.shape[1]
        metric_cols = np.linspace(self.METRICS_START_IDX, n_cols - 1, n_cols - self.METRICS_START_IDX, dtype=int)
        data.drop(data.columns[metric_cols], axis=1, inplace=True)
        pruned_data = pd.concat([data, pd.DataFrame(pruned_metrics)], axis=1)
        return pruned_data
        
    def __prune_online_metrics(self, raw_workload):
        '''
        Prune online workloads metrics using identified
        non-redundant metrics from offline workloads.
        '''
        data = raw_workload.reset_index(drop=True)
        metrics = data.to_numpy()[:, self.METRICS_START_IDX:].T
        pruned_metrics = metrics[self.pruned_metrics_idxs].T
        
        n_cols = data.shape[1]
        metric_cols = np.linspace(self.METRICS_START_IDX, n_cols - 1, n_cols - self.METRICS_START_IDX, dtype=int)
        data.drop(data.columns[metric_cols], axis=1, inplace=True)
        pruned_data = pd.concat([data, pd.DataFrame(pruned_metrics)], axis=1)
        return pruned_data
    
    def __preprocess_workload_knobs(self, pruned_data, online=False, only_knobs=False):
        '''
        Preprocess knobs.
        If online is True, transform using fitted encoders (online knobs)
        Otherwise, fit and then transform (offline knobs)
        For test knobs, only_knobs is True.
        '''
        col_names = pruned_data.columns.tolist()
        pruned_n = pruned_data.to_numpy()
        int_knobs = self.INT_KNOBS_IDXS
        cont_knobs = self.CONT_KNOBS_IDXS
        bool_knob = self.BOOL_KNOS_IDX
        
        if only_knobs:
            int_knobs = [idx - 1 for idx in int_knobs]
            cont_knobs = [idx - 1 for idx in cont_knobs]
            bool_knob = bool_knob - 1
            online = True
        
        
        if not online:
            pruned_n[:, int_knobs] = self.int_encoder.fit_transform(pruned_n[:, int_knobs])
            pruned_n[:, cont_knobs] = self.cont_encoder.fit_transform(pruned_n[:, cont_knobs])
        else:
            pruned_n[:, int_knobs] = self.int_encoder.transform(pruned_n[:, int_knobs])
            pruned_n[:, cont_knobs] = self.cont_encoder.transform(pruned_n[:, cont_knobs])
        
        pruned_n[:, bool_knob] = pruned_n[:, bool_knob].astype(int)
        return pd.DataFrame(pruned_n, columns=col_names)
    

In [4]:
class OtterTune:
    '''
    Main OtterTune system. Contains methods to perform workload mapping and predicting latency.
    '''
    def __init__(self, repo, metric_model=GaussianProcessRegressor(kernel=RBF(length_scale=1.0))):
        self.repo = repo
        self.metric_model = metric_model # Used to model each metric in each workload
        self.N_METRICS = None
        
        self.workloads = []        
        self.__build_workloads()
        
        
    def __build_workloads(self):
        '''
        Run only once at OtterTune object creation.
        Creates Workload objects and build metric models on each.
        '''
        print('Building data repository...')
        data = self.repo._build()        
        latency_idx = self.repo.LATENCY_IDX
        wl_ids = data['workload id'].unique()
        
        for wl_id in tqdm(wl_ids, desc='Building Offline Workloads'):
            wl_data = data[data['workload id'] == wl_id].to_numpy()
            knobs = wl_data[:, 1:latency_idx]
            metrics = wl_data[:, latency_idx:]
            if not self.N_METRICS:
                self.N_METRICS = metrics.shape[1]
            workload = Workload(wl_id, knobs, metrics, self.metric_model)
            workload.build_metric_models()
            self.workloads.append(workload)    
    
    def predict(self, raw_workload, test_knobs):
        '''
        Predicts latency for test knobs given online workload.
        Uses helper functions for workload mapping and to
        augment online workload with matched offline workload.
        '''
        processed_wl = self.repo.process_online_workload(raw_workload)
        processed_wl_metrics = processed_wl.iloc[:, 13:]
        processed_wl_knobs = processed_wl.iloc[:, 1:13]
        processed_test_knobs = self.repo.process_test_knobs(test_knobs)

        best_wl_idx = self.__get_best_workload(processed_wl_knobs, processed_wl_metrics)
        aug_wl = self.__get_augmented_workload(best_wl_idx, processed_wl)
        
        gpr = GaussianProcessRegressor(kernel=RBF(length_scale=1.0))
        gpr.fit(aug_wl[:, :-1], aug_wl[:, -1])
        preds = gpr.predict(processed_test_knobs)
        return preds, self.workloads[best_wl_idx].wl_id
    
    def __get_augmented_workload(self, best_wl_idx, processed_wl):
        '''
        Given matched workload, augment current online workload data.
        '''
        w = self.workloads[best_wl_idx]
        w_knobs, w_latency = w.knobs, w.metrics[:, 0].reshape(-1, 1)
        offline = np.concatenate((w_knobs, w_latency), 1)
        
        online = processed_wl.iloc[:, 1:14].to_numpy()
        aug_wl = np.concatenate((offline, online), 0)
        return aug_wl
        
    def __get_best_workload(self, wl_knobs, wl_metrics):
        '''
        Performs workload mapping given online workload (knobs, metrics).
        '''
        n_wls, n_configs = len(self.workloads), len(wl_knobs)
        S = self.__build_distance_matrix(wl_knobs)
        
        binned_S, transf = self.__bin_metrics(S)
        online_metrics = self.__bin_online_metrics(wl_metrics, transf)
        
        best_wl_idx = np.argmin(np.mean(np.sqrt(np.sum((binned_S - online_metrics)**2, axis=2)), axis=0))
        return best_wl_idx
    
    def __build_distance_matrix(self, train_knobs):
        '''
        Build distance matrix S (paper section 6.1).
        Helps efficiently calculate closest offline workload.
        '''
        n_wls, n_configs = len(self.workloads), len(train_knobs)
        S = np.zeros((self.N_METRICS, n_wls, n_configs))
        for metric_idx in range(self.N_METRICS):
            for wl_idx, w in enumerate(self.workloads):
                row = w.predict_metric(metric_idx, train_knobs)
                S[metric_idx, wl_idx, :] = row
        return S

    def __bin_metrics(self, S):
        '''
        Normalizes metrics with bin number using deciles.
        Needed to perform accurate distance comparisons.
        '''
        n_metrics, n_wls, n_configs = S.shape
        sr = S.reshape(n_wls*n_configs, n_metrics)
        transf = KBinsDiscretizer(n_bins=10, encode='ordinal', strategy='uniform')
        sr = transf.fit_transform(sr)
        S = sr.reshape(n_metrics, n_wls, n_configs)
        return S, transf
        
    def __bin_online_metrics(self, wl_metrics, transf):
        '''
        Normalizes online metrics with bin number using deciles.
        Uses previsouly used encoder (transf).
        '''
        online_metrics = transf.transform(wl_metrics).T
        online_metrics = np.repeat(online_metrics[:, np.newaxis, :], len(self.workloads), axis=1)
        return online_metrics

In [5]:
class Workload:
    '''
    Models each workload. Contains methods to train GPR models on each metric.
    Predicts latency (metric index 0).
    '''
    def __init__(self, wl_id, knobs, metrics, metric_model):
        self.wl_id = wl_id
        self.knobs = knobs
        self.metrics = metrics
        self.metric_model = metric_model
        self.models = {}
        self.N_METRICS = metrics.shape[1]
        
    def build_metric_models(self):
        '''
        Train GPR models on each metric.
        '''
        for metric_idx in range(self.N_METRICS):
            model = clone(self.metric_model)
            model.fit(self.knobs, self.metrics[:, metric_idx])
            self.models[metric_idx] = model
        
    def predict_metric(self, metric_idx, knobs):
        '''
        Predict a metric using existing model.
        '''
        return self.models[metric_idx].predict(knobs)
    

In [6]:
class Tester:
    '''
    Driver class to run val/test workloads and report performance.
    Each workload required 3 files.
    online_path - Online workloads file
    test_path - Test Knobs file
    true_path - True Latency for test knobs file
    '''
    def __init__(self, ottertune, online_path, test_path, true_path):
        self.ONLINE_PATH = online_path
        self.TEST_PATH = test_path
        self.TRUE_PATH = true_path
        self.o = ottertune
        
        self.online_workloads = {}
        self.test_knobs = {}
        self.true_preds = None
        self.wl_ids = None
        self.__load_data()
        
    def __load_data(self):
        '''
        Run only once at creation of Tester.
        Loads all 3 required files.
        '''
        online = pd.read_csv(self.ONLINE_PATH)
        knobs = pd.read_csv(self.TEST_PATH)
        self.true_preds = pd.read_csv(self.TRUE_PATH, header=None).to_numpy().reshape(-1)
        wl_ids = online['workload id'].unique().tolist()
        self.wl_ids = wl_ids
        for wl_id in tqdm(wl_ids, desc='Loading Online Workloads'):
            w = online[online['workload id'] == wl_id]
            k = knobs[knobs['workload id'] == wl_id].iloc[:, 1:]
            self.online_workloads[wl_id] = w
            self.test_knobs[wl_id] = k
                
    def run(self, out_file):
        '''
        Runs each workload to predict latency for each test knob.
        Saves result file with true/pred workload id and latency.
        Prints MSE across all workloads.
        '''
        preds_arr = []
        pi = 0
        for wl_id in tqdm(self.wl_ids, desc='Running Target Workloads'):
            online_wl = self.online_workloads[wl_id]
            test_knobs = self.test_knobs[wl_id]
            preds, best_wl_id = self.o.predict(online_wl, test_knobs)
            for p in preds: 
                preds_arr.append([wl_id, best_wl_id, self.true_preds[pi], p])
                pi += 1
        
        df = pd.DataFrame(preds_arr, columns=['true_wl_id', 'pred_wl_id', 'true_latency', 'latency_pred'])
        df.to_csv(out_file)
        print('MSE:', mean_squared_error(self.true_preds, df.iloc[:, -1].to_numpy()))
        

## Testing a sample workloads (./data/sample/)
Offline Workloads - first 3 workloads from original offline workloads  
Online Workloads - 5 random configs of only 1 workload (the first one)  
Test Knobs - 2 random configs to predict latency on  
True Latency - True latency of test knobs  

In [7]:
dataset = 'sample' # one of ['sample', 'split_data']
path = './data/' + dataset + '/'
out_file = './data/out/' + dataset + '_results.csv' # To write out test results
offline_path = path + 'offline_workload.csv' # offline workload
online_path = path + 'online_workload.csv' # online workload
test_path = path + 'test_knobs.csv' # test knobs (like test.csv)
true_path = path + 'true_latency.csv' # true latency (to measure performance)

### Create DataRepo, OtterTune, Tester

In [8]:
repo = DataRepo(offline_path)
o = OtterTune(repo)
t = Tester(o, online_path, test_path, true_path)

Building data repository...


Building Offline Workloads: 100%|██████████| 3/3 [00:00<00:00,  6.73it/s]
Loading Online Workloads: 100%|██████████| 1/1 [00:00<00:00, 310.09it/s]


#### Predict test knobs and report MSE
Very small MSE as test knobs are also present in offline set.  
Easy workload mapping for the same reason.

In [9]:
t.run(out_file)

Running Target Workloads: 100%|██████████| 1/1 [00:00<00:00, 14.49it/s]

MSE: 1.8649540383401883e-19





## Testing original workloads (./data/split_data/) 

In [10]:
dataset = 'split_data' # one of ['sample', 'split_data']
path = './data/' + dataset + '/'
out_file = './data/out/' + dataset + '_results.csv' # To write out test results
offline_path = path + 'offline_workload.csv' # offline workload
online_path = path + 'online_workload.csv' # online workload
test_path = path + 'test_knobs.csv' # test knobs (like test.csv)
true_path = path + 'true_latency.csv' # true latency (to measure performance)

### Create DataRepo, OtterTune, Tester

In [11]:
repo = DataRepo(offline_path)
o = OtterTune(repo)
t = Tester(o, online_path, test_path, true_path)

Building data repository...


Building Offline Workloads: 100%|██████████| 58/58 [00:06<00:00,  9.25it/s]
Loading Online Workloads: 100%|██████████| 58/58 [00:00<00:00, 401.03it/s]


#### Predict test knobs and report MSE
Huge MSE.  
"results.csv" (./data/out/) from the run shows lots of incorrect workload mapping and value "0" predictions for latency. Could be due to a combination of hyperparams within pruning metrics, choice of normalization of various knobs, GPR kernels etc.  
NOTE: Will push update after making sure its only because of hyperparams and not a programming issue.

In [12]:
t.run(out_file)

  "replaced with 0." % jj)
  "replaced with 0." % jj)
  "replaced with 0." % jj)
Running Target Workloads: 100%|██████████| 58/58 [00:19<00:00,  3.02it/s]

MSE: 14808.391568475066



