Skip to content

Commit

Permalink
Merge pull request #89 from zqzten/algorithm
Browse files Browse the repository at this point in the history
Algo: Introduce estimation threshold control to replicas estimation
  • Loading branch information
dayko2019 committed Apr 15, 2024
2 parents 32eaf29 + 53ddab4 commit 1a33534
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 9 deletions.
16 changes: 14 additions & 2 deletions algorithm/kapacity/portrait/horizontal/predictive/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ def parse_args():
parser.add_argument('--re-test-dataset-size-in-seconds',
help='size of test dataset in seconds for replicas estimation model',
required=False, default=86400)
parser.add_argument('--re-min-correlation-allowed',
help='minimum allowed correlation of replicas estimation model,'
'the estimation would fail if the model\'s correlation is lower than this threshold,'
'this arg should be a float number within range [0, 1]',
required=False, default=0.9)
parser.add_argument('--re-max-mse-allowed',
help='maximum allowed MSE of replicas estimation model,'
'the estimation would fail if the model\'s MSE is larger than this threshold,'
'this arg should be a float number within range [0, +∞)',
required=False, default=10.0)
parser.add_argument('--scaling-freq', help='frequency of scaling, the duration should be larger than the frequency'
'of the time series forecasting model',
required=True)
Expand Down Expand Up @@ -131,12 +141,14 @@ def predict_replicas(args, metric_ctx, pred_traffics):
traffic_col,
metric_ctx.resource_target,
int(args.re_time_delta_hours),
int(args.re_test_dataset_size_in_seconds))
int(args.re_test_dataset_size_in_seconds),
float(args.re_min_correlation_allowed),
float(args.re_max_mse_allowed))
if 'NO_RESULT' in pred['rule_code'].unique():
raise RuntimeError('there exist points that no replica number would meet the resource target, please consider setting a more reasonable resource target')
return pred
except estimator.EstimationException as e:
raise RuntimeError("replicas estimation failed, this may be caused by insufficient or irregular history data") from e
raise RuntimeError(f'replicas estimation failed, this may be caused by insufficient or irregular history data, detailed estimation info: {e.info}') from e


def merge_history_dict(history_dict):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ def preprocess_data(self):
df.sort_values(by=self.time_col, inplace=True)
df = df.reset_index(drop=True)

# scale resource to 0~100
resource_max = df[self.resource_col].max()
resource_scaling_factor = 1 if resource_max <= 100 else 10**np.ceil(np.log10(resource_max / 100))
self.logger.info(f'resource scaling factor: {resource_scaling_factor}')
df[self.resource_col] = df[self.resource_col] / resource_scaling_factor
self.resource_target = self.resource_target / resource_scaling_factor

features = self.traffic_cols

self.logger.info(f'checkout before filtering NaN: '
Expand Down Expand Up @@ -628,7 +635,12 @@ def bin2str(x):


class EstimationException(Exception):
pass
def __init__(self, message, info):
self.message = message
self.info = info

def __str__(self):
return self.message


def estimate(data: pd.DataFrame,
Expand All @@ -639,7 +651,9 @@ def estimate(data: pd.DataFrame,
traffic_cols: list[str],
resource_target: float,
time_delta_hours: int,
test_dataset_size_in_seconds: int = 86400) -> pd.DataFrame:
test_dataset_size_in_seconds: int = 86400,
min_correlation_allowed: float = 0.9,
max_mse_allowed: float = 10.0) -> pd.DataFrame:
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s: %(message)s')
logger = logging.getLogger()
Expand All @@ -660,19 +674,30 @@ def estimate(data: pd.DataFrame,
estimator.test()
logger.info(f'********* testing cost time: {time.time() - st10} *********')

if (estimator.pearsonr[0] >= 0.9 and estimator.pearsonr[1] < 0.01
and estimator.big_e_10 == 0 and estimator.mse < 10):
logger.info(f'********* [linear] correlation: {estimator.pearsonr[0]}, significance: {estimator.pearsonr[1]}, big_e_10: {estimator.big_e_10}, mse: {estimator.mse} *********')
logger.info(f'********* [residual] correlation: {estimator.pearsonr_rf[0]}, significance: {estimator.pearsonr_rf[1]}, big_e_10: {estimator.big_e_10_rf}, mse: {estimator.mse_rf} *********')

if (estimator.pearsonr[0] >= min_correlation_allowed and estimator.pearsonr[1] < 0.01
and estimator.big_e_10 == 0 and estimator.mse <= max_mse_allowed):
st10 = time.time()
estimator.policy_linear()
logger.info(f'********* linear policy cost time: {time.time() - st10} *********')
return estimator.output

elif (estimator.pearsonr_rf[0] >= 0.9 and estimator.pearsonr_rf[1] < 0.01 and estimator.big_e_10_rf == 0
and estimator.mse_rf < 10 and estimator.pearsonr[0] >= 0.6 and estimator.pearsonr[1] < 0.01):
elif (estimator.pearsonr_rf[0] >= min_correlation_allowed and estimator.pearsonr_rf[1] < 0.01 and estimator.big_e_10_rf == 0
and estimator.mse_rf <= max_mse_allowed and estimator.pearsonr[0] >= 0.6 and estimator.pearsonr[1] < 0.01):
st10 = time.time()
estimator.policy_residual()
logger.info(f'********* residual policy cost time: {time.time() - st10} *********')
return estimator.output

else:
raise EstimationException("no policy fits")
raise EstimationException('no policy fits',
{'linear': {'correlation': estimator.pearsonr[0],
'significance': estimator.pearsonr[1],
'big_e_10': estimator.big_e_10,
'mse': estimator.mse},
'residual': {'correlation': estimator.pearsonr_rf[0],
'significance': estimator.pearsonr_rf[1],
'big_e_10': estimator.big_e_10_rf,
'mse': estimator.mse_rf}})

0 comments on commit 1a33534

Please sign in to comment.