In [1]:
from spark.spark_tuner import SparkTuner
from spark.config.config_set import UniversalConfigSet
from spark.config.config_set import ConfigSet
from spark.config.parameter import Parameter
from spark.config.domain import IntRangeDomain

In [2]:
config_set = UniversalConfigSet(10, 1024 * 10)
tuner = SparkTuner(config_set)

training_sample_1 = {
    "spark.executor.memory": 1024 * 5,
    "spark.sql.shuffle.partitions": 100,
    "spark.executor.cores": 4,
    "spark.driver.memory": 1024
}
training_sample_2 = {
    "spark.executor.memory": 1024 * 10,
    "spark.sql.shuffle.partitions": 400,
    "spark.executor.cores": 8,
    "spark.driver.memory": 1024 * 3
}

tuner.add_sample_to_train_data(training_sample_1, 12)
tuner.add_sample_to_train_data(training_sample_2, 4)

tuner.get_next_best_config()


OrderedDict([('spark.sql.shuffle.partitions', 1610.0),
             ('spark.executor.memory', 9113.0),
             ('spark.driver.memory', 1024.0),
             ('spark.executor.cores', 7.0)])

### Example with TPC-DS Q17


|conf|value|
|:-|:-|
|spark.driver.memory|2g|
|spark.executor.cores|3|
|spark.executor.memory|2g|
|spark.sql.shuffle.partitions|400|


<h5 align="center">Timing for the query: q17 - 806750 </h5>

================================================================================================================================================
<br>


|conf|value|
|:-|:-|
|spark.driver.memory|4g|
|spark.executor.cores|8|
|spark.executor.memory|5g|
|spark.sql.shuffle.partitions|600|


<h4 align="center">Timing for the query: q17 - 1191319 </h4> 

================================================================================================================================================
<br>


|conf|value|
|:-|:-|
|spark.driver.memory|1g|
|spark.executor.cores|2|
|spark.executor.memory|2g|
|spark.sql.shuffle.partitions|100|


<h4 align="center">Timing for the query: q17 - 1138390 </h4> 


In [4]:
# config_set = UniversalConfigSet(20, 152500)
config_set = ConfigSet()
config_set.add_param(Parameter('spark.sql.shuffle.partitions', IntRangeDomain(10, 2000, 50)))\
    .add_param(Parameter('spark.executor.memory',
                         IntRangeDomain(1000,  # min executor memory
                                        28672,  # max executor memory
                                        512)))\
    .add_param(Parameter('spark.driver.memory',
                         IntRangeDomain(512, 15200, 256))) \
    .add_param(Parameter('spark.executor.cores',
                         IntRangeDomain(1, 4, 1)))

tuner = SparkTuner(config_set)
training_sample_1 = {
    "spark.executor.memory": 2000,
    "spark.sql.shuffle.partitions": 400,
    "spark.executor.cores": 3,
    "spark.driver.memory": 2000
}
# training_sample_2 = {
#     "spark.executor.memory": 4000,
#     "spark.sql.shuffle.partitions": 600,
#     "spark.executor.cores": 8,
#     "spark.driver.memory": 4000
# }
training_sample_2 = {
    "spark.executor.memory": 4000,
    "spark.sql.shuffle.partitions": 600,
    "spark.executor.cores": 3,
    "spark.driver.memory": 4000
}
training_sample_3 = {
    "spark.executor.memory": 1000,
    "spark.sql.shuffle.partitions": 100,
    "spark.executor.cores": 2,
    "spark.driver.memory": 1000
}
tuner.add_sample_to_train_data(training_sample_1, 806750)
tuner.add_sample_to_train_data(training_sample_2, 1191319)
tuner.add_sample_to_train_data(training_sample_3, 1138390)

In [5]:
tuner.get_next_best_config()

OrderedDict([('spark.sql.shuffle.partitions', 1910.0),
             ('spark.executor.memory', 10728.0),
             ('spark.driver.memory', 10752.0),
             ('spark.executor.cores', 4.0)])

In [6]:
# Output with the predicted config - 
1066135

1066135

In [None]:
from hyperopt import fmin, tpe, hp, Trials
import numpy as np
import math

def minimize_training_loss(params):
    try:
        # ToDo: Tune each dimension of beta, gamma and theta individually
        loss = 0.0
#         alpha = params['alpha']
#         beta = np.ones((1, tuner.model.config_set.get_size()), float).transpose() * params['beta']
#         gamma = np.ones(tuner.model.config_set.get_size(), float) * params['gamma']
#         theta = np.ones(tuner.model.config_set.get_size(), float) * params['theta'] * 0.01
        alpha = params['alpha']
        beta_choice = params['beta']
        beta = np.array([[beta_choice['beta1'], beta_choice['beta2'], beta_choice['beta3'], beta_choice['beta4']]])
        gamma_choice = params['gamma']
        gamma = np.array([[gamma_choice['gamma1'], gamma_choice['gamma2'], gamma_choice['gamma3'], gamma_choice['gamma4']]]).transpose()
        theta_choice = params['theta']
        theta = np.array([[theta_choice['theta1'], theta_choice['theta2'], theta_choice['theta3'], theta_choice['theta4']]]).transpose()
#         gamma = np.ones(tuner.model.config_set.get_size(), float) * params['gamma']
#         theta = np.ones(tuner.model.config_set.get_size(), float) * params['theta'] * 0.01

        for config_value, actual_out in zip(tuner.model.get_sampled_configs(), tuner.model.training_out):
#             out = tuner.model.predict(config_value, alpha, beta, gamma, theta)
            mean = tuner.model.get_mean(config_value, beta, gamma, theta)
            variance = tuner.model.get_variance(config_value, alpha, gamma, theta)
            if variance < 0:
                return {'loss': 0, 'status': 'fail'}
#             out = tuner.model.get_variance(config_value, alpha, gamma, theta)
#             loss = loss + abs(out - actual_out)
            loss = loss + abs(tuner.model.best_out - mean)

        return {'loss': loss, 'status': 'ok'}
    except Exception as e:
#         print(e)
        return {'loss': 0, 'status': 'fail'}

# space={'alpha': hp.uniform('alpha', 0, 1000),
#        'beta': hp.uniform('beta', pow(10, 2), pow(10, 7)),
#        'gamma': hp.uniform('gamma', 0, 10),
#        'theta': hp.uniform('theta', 0, 10),
#       }

# space={'alpha': hp.uniform('alpha', 0, 10),
#        'beta': hp.uniform('beta', 1, 10),
#        'gamma': hp.uniform('gamma', 1, 10),
#        'theta': hp.uniform('theta', 1, 10),
#       }

space={'alpha': hp.uniform('alpha', 0, 10),
       'beta': hp.choice('beta', [{
           'beta1': hp.uniform('beta1', 1, 10),
           'beta2': hp.uniform('beta2', 1, 10),
           'beta3': hp.uniform('beta3', 1, 10),
           'beta4': hp.uniform('beta4', 1, 10)
       }]),
       'gamma': hp.choice('gamma', [{
           'gamma1': hp.uniform('gamma1', 1, 10),
           'gamma2': hp.uniform('gamma2', 1, 10),
           'gamma3': hp.uniform('gamma3', 1, 10),
           'gamma4': hp.uniform('gamma4', 1, 10)
       }]),
       'theta': hp.choice('theta', [{
           'theta1': hp.uniform('theta1', 1, 10),
           'theta2': hp.uniform('theta2', 1, 10),
           'theta3': hp.uniform('theta3', 1, 10),
           'theta4': hp.uniform('theta4', 1, 10)
       }]),
      }

number_of_experiments = 100
best = fmin(minimize_training_loss,
            space=space,
            algo=tpe.suggest,
            max_evals=number_of_experiments)

 47%|████▋     | 467/1000 [00:12<00:18, 28.88it/s, best loss: ?]

In [47]:
print best

{'alpha': 6.597183214118218, 'beta': 4.146450677849254, 'gamma': 9.995838685984289, 'theta': 1.0020003126618147}


In [45]:
# OrderedDict([('spark.sql.shuffle.partitions', 1910.0),
#              ('spark.executor.memory', 10728.0),
#              ('spark.driver.memory', 10752.0),
#              ('spark.executor.cores', 4.0)])

# Actual Output with the predicted config - 
# 1066135

from spark.config.config import Config
from spark.config.parameter import Parameter

tuner.model.alpha = best['alpha']
tuner.model.beta = np.ones((1, config_set.get_size()), float).transpose() * best['beta']
tuner.model.gamma = np.ones(tuner.model.config_set.get_size(), float) * best['gamma']
tuner.model.theta = np.ones(tuner.model.config_set.get_size(), float) * best['theta'] * 0.01
print tuner.get_next_best_config()
# tuner.model.alpha = 1.0
# tuner.model.beta = np.ones((1, config_set.get_size()), float).transpose()
# tuner.model.gamma = np.ones(tuner.model.config_set.get_size(), float)
# tuner.model.theta = np.ones(tuner.model.config_set.get_size(), float)
config = Config()
params = tuner.model.config_set.get_params()
for param in params:
    if param.get_name() == 'spark.executor.memory':
        config.add_param(param, 10728.0)
    elif param.get_name() == 'spark.sql.shuffle.partitions':
        config.add_param(param, 1910.0)
    elif param.get_name() == 'spark.executor.cores':
        config.add_param(param, 4.0)
    elif param.get_name() == 'spark.driver.memory':
        config.add_param(param, 10752.0)

config_value = tuner.model.normalizer.normalize_config(config.get_all_param_values())
print tuner.model.predict(config_value, tuner.model.alpha, tuner.model.beta, tuner.model.gamma, tuner.model.theta)
# print tuner.get_next_best_config()


ValueError: math domain error

In [44]:
from scipy.stats import norm
print "config_value"
print config_value
print ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"
print "training_out"
print tuner.model.training_out
print ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"
print "get_training_pairwise_correlation"
print tuner.model.get_training_pairwise_correlation(tuner.model.gamma, tuner.model.theta)
print ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"
print "get_correlation_with_train_data"
print tuner.model.get_correlation_with_train_data(config_value, tuner.model.gamma, tuner.model.theta)
print ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"
print "get_training_params"
print tuner.model.get_training_params()
print ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"
print "get_mean"
print tuner.model.get_mean(config_value, tuner.model.beta, tuner.model.gamma, tuner.model.theta)
print ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"
print "get_variance"
print tuner.model.get_variance(config_value, tuner.model.alpha, tuner.model.gamma, tuner.model.theta)
print ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"
print "get_mu"
print tuner.model.get_mu(config_value, tuner.model.alpha, tuner.model.beta, tuner.model.gamma, tuner.model.theta)
print ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"
print "cdf"
print norm.cdf(tuner.model.get_mu(config_value, tuner.model.alpha, tuner.model.beta, tuner.model.gamma, tuner.model.theta) * -1)
print ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"

config_value
[0.9743589743589743, 0.35185185185185186, 0.7017543859649122, 1.0]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
training_out
[[ 806750.]
 [1191319.]
 [1138390.]]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
get_training_pairwise_correlation
[[1.         0.99688523 0.99409869]
 [0.99688523 1.         0.9910023 ]
 [0.99409869 0.9910023  1.        ]]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
get_correlation_with_train_data
[[0.9962611 ]
 [0.99689036]
 [0.99791788]]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
get_training_params
[[0.8        0.96383102 0.89802632 0.33333333]
 [0.6974359  0.89149306 0.76096491 0.33333333]
 [0.95384615 1.         0.96655702 0.66666667]]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
get_mean
1251439.8909180947
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

In [30]:
print norm.cdf(-1)
print norm.pdf(-2)

0.15865525393145707
0.05399096651318806


In [35]:
corr_with_train_data = tuner.model.get_correlation_with_train_data(config_value, tuner.model.gamma, tuner.model.theta)
corr_pairwise_train_data = tuner.model.get_training_pairwise_correlation(tuner.model.gamma, tuner.model.theta)
term1 = np.dot(corr_with_train_data.transpose(), np.linalg.inv(corr_pairwise_train_data))
print term1
term2 = np.dot(term1, corr_with_train_data)
print term2
# term3 = 1 - term2
# return np.linalg.det(pow(alpha, 2) * term3)


[[-0.01606481  0.50916356  0.5112562 ]]
[[1.00419232]]
