This notebook is exploring using Facebook Prophet while also trying to determine when to retrain the model using the volatility measurement inspired by the DRAIN service

In [92]:
from prophet import Prophet
import pandas as pd
from datetime import datetime, timedelta
import altair as alt
from collections import deque
import numpy as np
import math
import time
from orbit.utils.simulation import make_trend, make_seasonality, make_regression
import random
random.seed()

fit_predict_model is a function which will take in a dataframe, fit the Prophet model to it and then give the forecasted results back for the next N minutes

In [109]:
def fit_predict_model(dataframe,periods=15,changepoint_prior_scale=0.10,daily_seasonality=False, weekly_seasonality=False, yearly_seasonality=False):
    prophet_model = Prophet(growth="linear",changepoint_prior_scale=changepoint_prior_scale,daily_seasonality=daily_seasonality, weekly_seasonality=weekly_seasonality, yearly_seasonality=yearly_seasonality)
    prophet_model.fit(dataframe)
    future = prophet_model.make_future_dataframe(periods=periods,freq="1MIN",include_history=False,)
    future["cap"] = 1.0
    future_forecast = prophet_model.predict(future)
    future_forecast["timestamp"] = future_forecast["ds"]
    future_forecast = future_forecast.set_index("timestamp")
    return future_forecast

detect_anomalies determines if each of the actual values is within the lower and upper bound.

In [3]:
def detect_anomalies(forecast):
    forecasted = forecast[['ds','trend', 'yhat', 'yhat_lower', 'yhat_upper', 'fact', 'upper_std', 'lower_std']].copy()
    #forecast['fact'] = df['y']
    forecasted.loc[forecasted['fact'] > forecasted['upper_std'], 'anomaly'] = 1
    forecasted.loc[forecasted['fact'] < forecasted['lower_std'], 'anomaly'] = 1

    #anomaly importances
    forecasted['importance'] = 0
    forecasted.loc[forecasted['anomaly'] ==1, 'importance'] = \
        (forecasted['fact'] - forecasted['yhat_upper'])/forecast['fact']
    forecasted.loc[forecasted['anomaly'] ==-1, 'importance'] = \
        (forecasted['yhat_lower'] - forecasted['fact'])/forecast['fact']
    
    return forecasted

plot_anomalies is a function that will plot the lower and upper bounds for each data point in addition to the actual value. Anomalies are marked by red meaning the data point is outside of the lower and upper bound range.

In [4]:
def plot_anomalies(forecasted):
    interval = alt.Chart(forecasted).mark_area(interpolate="basis", color = '#7FC97F').encode(
    x=alt.X('ds:T', axis=alt.Axis(format='%H:%M'), title ='date'),
    y='upper_std',
    y2='lower_std',
    tooltip=['ds', 'fact', 'lower_std', 'upper_std']
    ).interactive().properties(
        title='Anomaly Detection'
    )

    fact = alt.Chart(forecasted[forecasted.anomaly==0]).mark_circle(size=15, opacity=0.7, color = 'Black').encode(
        x='ds:T',
        y=alt.Y('fact', title='CPU Utilization Percentage'),    
        tooltip=['ds', 'fact', 'lower_std', 'upper_std']
    ).interactive()

    anomalies = alt.Chart(forecasted[forecasted.anomaly!=0]).mark_circle(size=30, color = 'Red').encode(
        x='ds:T',
        y=alt.Y('fact', title='CPU Utilization Percentage'),    
        tooltip=['ds', 'fact', 'lower_std', 'upper_std']
    ).interactive()

    return alt.layer(interval, fact, anomalies)\
              .properties(width=870, height=450)\
              .configure_title(fontSize=20)

Read in 600 data points from clusterwide_cpu_usage_percentage.txt which contains 600 minutes worth of CPU usage within a cluster

In [117]:
dataset_values = []
date_time_vals = []
with open('clusterwide_cpu_usage_percentage.txt','r') as cmd:
    for line in cmd:
        line = line.rstrip().split(",")
        date_time = datetime.fromtimestamp(float(line[0]))
        date_time_vals.append(float(line[0]))
        metric_dict = {"ds": date_time.strftime("%Y-%m-%d %H:%M:%S"), "y": float(line[1]) / 100.0}
        dataset_values.append(metric_dict)

Inject some outlier data at the end of this dataset to see whether or not these data points are marked as an anomaly or if model can adjust its forecasts over time

In [118]:
last_one = date_time_vals[-1] + 60
icr = 0.70
for i in range(500):
    if i % 50 == 0:
        icr += 0.01
    new_dict = {"ds": (datetime.fromtimestamp(last_one)).strftime("%Y-%m-%d %H:%M:%S"), "y": icr}
    last_one += 60
    dataset_values.append(new_dict)
    

In [141]:
xs = pd.date_range("00:00", "23:59", freq="1min")
dataset_values = []
date_time_vals = []
rw = make_trend(len(xs), rw_loc=0.01, rw_scale=1, seed=random.randint(1, 2000))
# normalize [0, 1.0]
rw= (1.0*(rw - np.min(rw))/np.ptp(rw))
for w in range(0,len(rw)):
    metric_dict = {"ds": xs[w].strftime("%Y-%m-%d %H:%M:%S"), "y": rw[w]}
    dataset_values.append(metric_dict)

In [168]:
xs = pd.date_range("00:00", "23:59", freq="1min")
dataset_values = []
date_time_vals = []
fs = make_seasonality(len(xs), seasonality=random.randint(100, 1000), method='fourier', order=random.randint(4, 10), seed=random.randint(1, 2000))
fs = (1.0*(fs - np.min(fs))/np.ptp(fs))
for w in range(0,len(rw)):
    metric_dict = {"ds": xs[w].strftime("%Y-%m-%d %H:%M:%S"), "y": rw[w]}
    dataset_values.append(metric_dict)

Define data_range to determine how much of the dataset you would want to work with

In [169]:
data_range = 1500

Define time_interval for how often FB Prophet model will be retrained.

In [170]:
time_interval = 15

Define the maximum amount of training data which the FB prophet model will be trained on

In [171]:
training_metric_limit = 100

Define the changepoint_range for the Prophet model

In [172]:
changepoint_prior_scale = 0.05

Retrieve the first data_range values of the dataset

In [173]:
data_values = dataset_values[:data_range]

For the initial training of the FB Prophet model, retrieve the first time_interval data points from the dataset

In [174]:
training_dataset = data_values[:time_interval]

Turn the training_dataset list into a Pandas dataframe so it can be taken as input into Prophet model

In [175]:
training_dataset_df = pd.DataFrame(training_dataset)

weighted_avg_and_std function which computes the weighted mean and standard deviation of a numpy array.

In [176]:
def weighted_avg_and_std(values, weights):
    average = np.average(values, weights=weights)
    # Fast and numerically precise:
    variance = np.average((values - average) ** 2, weights=weights)
    return average, math.sqrt(variance)

In [177]:
def compute_vol(metrics_ds):
    cumulative_metrics_data = np.array(metrics_ds)
    vol = np.std(cumulative_metrics_data) / np.mean(cumulative_metrics_data[:10])
    time_steps = cumulative_metrics_data.shape[0]
    weights = np.flip(np.true_divide(np.arange(1, time_steps + 1), time_steps))
    weighted_mean, weighted_std = weighted_avg_and_std(cumulative_metrics_data, weights)
    weighted_vol = weighted_std / np.mean(cumulative_metrics_data[:10])
    return weighted_vol
    

Initial variables declaration

In [178]:
metrics_data_queue = deque([],training_metric_limit)
normal_periods = []
training_dataset = []
num_anomalies = 0
metric_data_obtained = dict()
quarantined_data = []
column_names =  ['trend', 'yhat', 'yhat_lower', 'yhat_upper']
train_on_next_chance = True
stable = False
training_start_ts_ns = 0
very_first_ts_ns = 0
future_forecast = None
metrics_db = dict()
prediction_results = []
weighted_vols = []
max_training_waittime = time_interval
minutes_elapsed_since_last_trained = 0
in_quarantine = False
quarantine_counter = 10
quarantine_timestamp = 0

In [179]:
%%time
for i in range(0,len(data_values)):
    metric_data_obtained = dict()
    current_metric_datetime = data_values[i]["ds"]
    element = datetime.strptime(current_metric_datetime,"%Y-%m-%d %H:%M:%S")
    element_datetime_tuple = element.timetuple()
    current_ts = int(time.mktime(element_datetime_tuple))
    if training_start_ts_ns == 0:
        training_start_ts_ns = current_ts
        very_first_ts_ns = current_ts
    current_metric_value = data_values[i]["y"]
    metrics_db[current_ts] = data_values[i]
    if len(metrics_db) == training_metric_limit:
        all_ts_keys = list(metrics_db.keys())
        all_ts_keys.sort()
        metrics_db.pop(all_ts_keys[0],None)
    current_weighted_vol = 0.0
    
    if len(metrics_data_queue) > 0:
        #copy_queue = metrics_data_queue.copy()
        previous_weighted_vol = compute_vol(metrics_data_queue)
        metrics_data_queue.appendleft(current_metric_value)
        current_weighted_vol = compute_vol(metrics_data_queue)
        if abs(current_weighted_vol - previous_weighted_vol) > 0.07 and len(metrics_data_queue) >= time_interval:
            in_quarantine = True
            quarantine_timestamp = current_ts
        if in_quarantine:
            print("currently in quarantine")
            quarantined_data.append(current_metric_value)
            if len(quarantined_data) == quarantine_counter + 1:
                quarantined_vol = compute_vol(quarantined_data)
                in_quarantine = False                
                if quarantined_vol < 0.155:
                    print(f"SENDING TRAIN SIGNAL on iteration {i}")
                    training_end_ts_ns = current_ts
                    normal_periods = []
                    normal_periods.append({"start_ts": quarantine_timestamp, "end_ts": training_end_ts_ns})
                    train_on_next_chance = False
                    stable = True
                    training_start_ts_ns = current_ts
                    training_ds = []
                    for normal in normal_periods:
                        start_ts, end_ts = normal["start_ts"], normal["end_ts"]
                        for ts in range(start_ts,end_ts,60):
                            if ts in metrics_db:
                                training_ds.append(metrics_db[ts])
                    training_ds_df = pd.DataFrame(training_ds)
                    training_ds_df['cap'] = 1.0
                    future_forecast = fit_predict_model(training_ds_df,periods=time_interval)
                    minutes_elapsed_since_last_trained = 0
                    metrics_data_queue = deque(quarantined_data,training_metric_limit)
                else:
                    print("The quarantine data will be thrown out")
                    current_normal_dataset = list(metrics_data_queue)[:len(metrics_data_queue) - quarantine_counter - 1]
                    metrics_data_queue = deque(current_normal_dataset, training_metric_limit)
                    
                quarantined_data = []
    else:
        metrics_data_queue.appendleft(current_metric_value)
            
    if future_forecast is not None:
        nearest_index = future_forecast.index.get_loc(current_metric_datetime, method="nearest")
        nearest_index_value = future_forecast.iloc[[nearest_index]]
        for c in column_names:
            metric_data_obtained[c] = nearest_index_value[c].values[0]
        yhat_lower = nearest_index_value['yhat_lower'].values[0]
        yhat_upper = nearest_index_value['yhat_upper'].values[0]
        yhat_value = nearest_index_value['yhat'].values[0]
        yhat_lower = max(0, yhat_lower)
        upper_std = abs(yhat_upper - yhat_value)
        lower_std = abs(yhat_value - yhat_lower)
        metric_data_obtained = {"fact": current_metric_value, "ds": current_metric_datetime}
        metric_data_obtained['lower_std'] = max(0, yhat_lower - 2 * lower_std)
        metric_data_obtained['upper_std'] = min(1.0, yhat_upper + (2 * upper_std))
        metric_data_obtained['yhat_lower'] = yhat_lower
        metric_data_obtained['yhat_upper'] = yhat_upper
        metric_data_obtained['anomaly'] = 0
        if (current_metric_value < yhat_lower - (2 * lower_std)) or (current_metric_value > yhat_upper + (2 * upper_std)):
            metric_data_obtained['anomaly'] = 1
            num_anomalies += 1
        prediction_results.append(metric_data_obtained)
    minutes_elapsed_since_last_trained += 1
    if len(metrics_data_queue) >= time_interval and not in_quarantine:
        if current_weighted_vol >= 0.199:
            train_on_next_chance = True

        if (current_weighted_vol < 0.199 and training_start_ts_ns != very_first_ts_ns and train_on_next_chance):
            training_start_ts_ns = current_ts

        if current_weighted_vol > 0.155 and not train_on_next_chance and stable:
            training_end_ts_ns = current_ts
            normal_periods.append({"start_ts": training_start_ts_ns, "end_ts": training_end_ts_ns})
            stable = False
            training_start_ts_ns = -1.0

        if (current_weighted_vol <= 0.15 and train_on_next_chance) or (minutes_elapsed_since_last_trained >= time_interval):                
            print(f"SENDING TRAIN SIGNAL on iteration {i}")
            if training_start_ts_ns != -1.0:
                training_end_ts_ns = current_ts
                normal_periods.append({"start_ts": training_start_ts_ns, "end_ts": training_end_ts_ns})
            train_on_next_chance = False
            stable = True
            training_start_ts_ns = current_ts
            training_ds = []
            for normal in normal_periods:
                start_ts, end_ts = normal["start_ts"], normal["end_ts"]
                for ts in range(start_ts,end_ts,60):
                    if ts in metrics_db:
                        training_ds.append(metrics_db[ts])
            training_ds_df = pd.DataFrame(training_ds)
            training_ds_df['cap'] = 1.0
            future_forecast = fit_predict_model(training_ds_df,periods=time_interval)
            minutes_elapsed_since_last_trained = 0


INFO:prophet:n_changepoints greater than number of observations. Using 10.


SENDING TRAIN SIGNAL on iteration 14


INFO:prophet:n_changepoints greater than number of observations. Using 22.


SENDING TRAIN SIGNAL on iteration 29
SENDING TRAIN SIGNAL on iteration 44
SENDING TRAIN SIGNAL on iteration 59
SENDING TRAIN SIGNAL on iteration 74
SENDING TRAIN SIGNAL on iteration 89
SENDING TRAIN SIGNAL on iteration 104
SENDING TRAIN SIGNAL on iteration 119
SENDING TRAIN SIGNAL on iteration 134
SENDING TRAIN SIGNAL on iteration 149
SENDING TRAIN SIGNAL on iteration 164
SENDING TRAIN SIGNAL on iteration 179
SENDING TRAIN SIGNAL on iteration 194
SENDING TRAIN SIGNAL on iteration 209
SENDING TRAIN SIGNAL on iteration 224
SENDING TRAIN SIGNAL on iteration 239
SENDING TRAIN SIGNAL on iteration 254
SENDING TRAIN SIGNAL on iteration 269
SENDING TRAIN SIGNAL on iteration 284
SENDING TRAIN SIGNAL on iteration 299
SENDING TRAIN SIGNAL on iteration 314
SENDING TRAIN SIGNAL on iteration 329
SENDING TRAIN SIGNAL on iteration 344
SENDING TRAIN SIGNAL on iteration 359
SENDING TRAIN SIGNAL on iteration 374
SENDING TRAIN SIGNAL on iteration 389
SENDING TRAIN SIGNAL on iteration 404
SENDING TRAIN SIG

In [39]:
num_inflection_points = 0
for i in range(1,len(weighted_vols)):
    if abs(weighted_vols[i] - weighted_vols[i-1]) >= 0.07:
        num_inflection_points += 1
        print(weighted_vols[i])
        print(weighted_vols[i-1])
print(num_inflection_points)

0


In [15]:
weighted_vols

NameError: name 'weighted_vols' is not defined

Create the first Prophet model which will be trained on the initial time_interval data points

In [180]:
pred_result_df = pd.DataFrame(prediction_results)

In [181]:
pred_result_df.tail(500)

Unnamed: 0,fact,ds,lower_std,upper_std,yhat_lower,yhat_upper,anomaly
925,0.337158,2021-07-26 15:40:00,0.292922,0.451776,0.344403,0.397354,0
926,0.315944,2021-07-26 15:41:00,0.289137,0.453093,0.342891,0.397543,0
927,0.315321,2021-07-26 15:42:00,0.281724,0.455131,0.340171,0.397973,0
928,0.330136,2021-07-26 15:43:00,0.284238,0.458044,0.340760,0.398695,0
929,0.316322,2021-07-26 15:44:00,0.284238,0.458044,0.340760,0.398695,0
...,...,...,...,...,...,...,...
1420,0.042483,2021-07-26 23:55:00,0.000000,0.226116,0.042984,0.133310,0
1421,0.016449,2021-07-26 23:56:00,0.000000,0.224211,0.043653,0.130910,0
1422,0.016076,2021-07-26 23:57:00,0.000000,0.209239,0.034621,0.124154,0
1423,0.030169,2021-07-26 23:58:00,0.000000,0.213257,0.036981,0.123728,0


See the number of anomalies predicted for each time_interval. A cold start is expected initially.

In [182]:
plot_anomalies(pred_result_df)

In [None]:
import matplotlib.pyplot as plt
plotting_values = []
for d in data_values:
    plotting_values.append(d['y'])
plt.plot(range(len(plotting_values)), plotting_values)