# Background and Objective

In Data Centers, once an anomaly is detected, the main challenge of Site Reliability Engineers is to identify and understand the root cause of that anomaly as soon as possible, by analyzing the sequence of events that led to the anomaly. The task of root cause analysis (RCA) is generally done manually, and as a result, takes significant time and effort.

The objective of this project was to reduce the human effort by automating anomaly detection (AD) and RCA, using state-of-the-art Machine Learning and Causal Inference methods.

The dataset is multivariate time series data collected from different machines and applications in hybrid data centers.

One of the challenges was that, by design, the AD model had only access to the last 120 observations (equivalent to 10 minutes). The reason for this was to cap the latency so that an anomaly can be detected and acted upon within 10 minutes of occurrence at the latest.

In [None]:
import numpy as np 
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns
import plotly.express as px
from sklearn.feature_selection import VarianceThreshold
from fbprophet import Prophet
import bnlearn as bn    # Bayesian networks causal inference
import warnings
warnings.filterwarnings('ignore')
import os
sns.set_style('white')
sns.set_context('talk', font_scale = 1)
mpl.rcParams['figure.figsize'] = (9, 6)

In [None]:
!pip install --upgrade pip

!pip install pystan==2.19
!pip install fbprophet

!pip install bnlearn

!pip install ipywidgets
!jupyter nbextension enable --py widgetsnbextension

!pip install pyvis

Collecting pyvis
  Downloading pyvis-0.1.9-py3-none-any.whl (23 kB)
Collecting jsonpickle>=1.4.1
  Downloading jsonpickle-2.0.0-py2.py3-none-any.whl (37 kB)
Installing collected packages: jsonpickle, pyvis
Successfully installed jsonpickle-2.0.0 pyvis-0.1.9


# Data Import &amp; Feature Reduction

In [None]:
df = pd.read_csv('/datasets/goolge-drive/Colab_Notebooks/poomData_agent-100-magento2_07-11-2021.csv')

# =====================================
# select an index for creating the 120-point chunk size
index = 1429
df = df.loc[index-120:index, :]
# ======================================

# sort "timestamp" to ensure they are chronological
df.sort_values('timestamp', inplace=True)

In [None]:
def feature_reduc_filter(df,
                         miss_thresh_pct=20,
                         dup_thresh_pct=10,
                         variance_thresh=0.10,
                         iqr_coeff=1.75,
                         corr_thresh=0.95):
    
    print(f">> {df.shape} is the initial number of rows and features.")

    # filter out null values by dropping a feature if ratio of its nulls is above miss_thresh_pct
    null_before = df.shape[1]
    for c in df.columns:
        if df[c].isna().mean() * 100 > miss_thresh_pct:
            df.drop(c, axis=1, inplace=True)
    # impute null values
    df = df.bfill().ffill()
    print(f"| {null_before - df.shape[1]} features removed by NaN threshold ...")

    # convert unix timestamp string to readable string
    df['timestamp'] = pd.to_datetime(df['timestamp']).dt.strftime('%Y-%m-%d %H:%M:%S')
    # get number of duplicates for each timestamp
    time_col = pd.DataFrame(df['timestamp'].value_counts())
    time_col_duplicates = time_col[time_col['timestamp'] > 1]
    # drop duplicate rows if below dup_thresh_pct; else show an ALERT message
    if time_col_duplicates['timestamp'].sum() / len(df) * 100 < dup_thresh_pct:
        df.drop_duplicates(['timestamp'], inplace=True)
    else:
        print(f"ALERT: Total number of duplicates for 'timestamp' above {dup_thresh_pct}%! Dataset needs revision")
    
    # filter out feature if its variance is below variance_thresh_pct
    from sklearn.feature_selection import VarianceThreshold
    vt = VarianceThreshold(threshold=variance_thresh)
    time_col = df.pop('timestamp')  # Return timestamp and drop it from df
    
    non_num_before = df.shape[1]
    df = df.loc[:, (df.dtypes == 'float') | (df.dtypes == 'int')]  # from the df with no timestamp, remove non-numerical columns
    print(f"| {non_num_before - df.shape[1]} non-numerical features removed ...")
    
    var_before = df.shape[1]
    _ = vt.fit(df)   
    mask = vt.get_support()   # boolean array giving the features selected
    df = df.loc[:, mask]
    print(f"| {var_before - df.shape[1]} features removed by variance threshold ...")
    df = pd.concat([df, time_col], axis=1)   # adding back 'timestamp'
    
    # outside of IQR filter
    iqr_before = df.shape[1]
    for c in df.select_dtypes(exclude='object').columns:
        Q1 = df[c].quantile(0.25)
        Q3 = df[c].quantile(0.75)
        IQR = Q3 - Q1
        # drop feature if it does not have any data points outside of "iqr_coeff x IQR"
        if len(df[(df[c] < Q1 - iqr_coeff * IQR) | (df[c] > Q3 + iqr_coeff *IQR)]) == 0:
            df.drop(c, axis = 1, inplace = True)
    print(f"| {iqr_before - df.shape[1]} features removed using {iqr_coeff}xIQR ...")
    print(f">> {df.shape} is the final number of rows and features.")
    print()

    # convert object to datetime for timestamp column => easier to convert at the end after all filters    
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    
    # reset index to get a clean counter for index in case it is used for any plotting
    df.reset_index(drop = True, inplace = True)
    
    return df

In [None]:
df = feature_reduc_filter(df)

print(df.dtypes)
print()
print(df.timestamp.head())  # to check timestamp data type and format

>> (121, 810) is the initial number of rows and features.
| 0 features removed by NaN threshold ...
| 30 non-numerical features removed ...
| 565 features removed by variance threshold ...
| 141 features removed using 1.75xIQR ...
>> (121, 74) is the final number of rows and features.

host.disk.read.bytes                              float64
host.disk.write.bytes                             float64
host.network.egress.bytes                         float64
host.network.egress.packets                       float64
host.network.ingress.bytes                        float64
                                                ...      
system.socket.summary.tcp.all.orphan              float64
system.socket.summary.tcp.all.syn_recv            float64
system.socket.summary.tcp.all.syn_sent            float64
system.socket.summary.tcp.all.time_wait           float64
timestamp                                  datetime64[ns]
Length: 74, dtype: object

0   2021-11-04 09:30:04
1   2021-11-04 09:30:09


# Unsupervised AD: fbprohet

In [None]:
# function that does fb modeling on a df that has 2 columns ('timestamp', one feature)
def fbprohet_single_feature(df_fb):
    import os
    import sys
    from fbprophet import Prophet
    mpl.rcParams['figure.figsize'] = (16, 8)

    # from https://stackoverflow.com/questions/11130156/suppress-stdout-stderr-print-from-python-functions
    class suppress_stdout_stderr(object):
        '''
           A context manager for doing a "deep suppression" of stdout and stderr in Python, i.e. will suppress all print 
        (e.g. model output stats), even if the print originates in a compiled C/Fortran sub-function.
           This will not suppress raised exceptions, since exceptions are printed to stderr just before a script exits, 
        and after the context manager has exited (at least, I think that is why it lets exceptions through).
        '''
        def __init__(self):
            # Open a pair of null files
            self.null_fds = [os.open(os.devnull, os.O_RDWR) for x in range(2)]
            # Save the actual stdout (1) and stderr (2) file descriptors.
            self.save_fds = (os.dup(1), os.dup(2))
        def __enter__(self):
            # Assign the null pointers to stdout and stderr.
            os.dup2(self.null_fds[0], 1)
            os.dup2(self.null_fds[1], 2)
        def __exit__(self, *_):
            # Re-assign the real stdout/stderr back to (1) and (2)
            os.dup2(self.save_fds[0], 1)
            os.dup2(self.save_fds[1], 2)
            # Close the null files
            os.close(self.null_fds[0])
            os.close(self.null_fds[1])
    
    # make "forecast" and "df_anomaly" global for later use in our loop
    global forecast
    global df_anomaly
    
    feature_name = df_fb.columns[1]   # name of our feature
    df_fb.columns = ['ds', 'y']

    # set up the model 'm' with 99% confidence interval (CI) => "interval_width" in model => wider CI for safter anomaly detection
    m = Prophet(seasonality_mode='additive',
                interval_width=0.99,
                daily_seasonality=False,
                yearly_seasonality=False,
                weekly_seasonality=False,
                uncertainty_samples=120)   # my tests show that the majority of calc time drop happens from 1000 to 500 (36% time reduction)
    
    # fit the model to 'df_fb' while suppressing the textual model output by using our 'suppress_stdout_stderr'
    with suppress_stdout_stderr():
        m = m.fit(df_fb)
    
    # predict
    forecast = m.predict(df_fb)   # forecast is a df
    
    # fbprophet integrated plot
    # plot = m.plot(forecast)
    # plt.title(feature_name)
    
    # Create an anomaly detection metric:
    
    # 1) adding "y" values to the "forcast" df
    forecast = forecast.merge(df_fb, on = 'ds')
    # 2) creating anomaly column with 3 levels: 0, 1, -1
    forecast['anomaly'] = 0
    forecast.loc[forecast['y'] > forecast['yhat_upper'], 'anomaly'] = 1
    forecast.loc[forecast['y'] < forecast['yhat_lower'], 'anomaly'] = -1
    # 3) creating Anomaly Importance (AI) metric based on the distance of actual y from CI lower bound or from CI upper bound
        # higher AI values mean higher probability of anomaly
    forecast[f"ai_{feature_name}"] = 0
    forecast.loc[forecast['anomaly'] == 1, f"ai_{feature_name}"] = abs((forecast['y'] - forecast['yhat_upper']) / forecast['y'])
    forecast.loc[forecast['anomaly'] == -1, f"ai_{feature_name}"] = abs((forecast['yhat_lower'] - forecast['y']) / forecast['y'])
    # At this stage, we can add code to keep the features with AI values above a certain  threshold (e.g. max > threshold)
        # placeholder for code
    # 4) Min-Max Normalizing of the Anomaly Importance (AI) metric to ensure all values are between 0 to 1
    forecast[f"ai_{feature_name}"] = (forecast[f"ai_{feature_name}"] - forecast[f"ai_{feature_name}"].min()) / (
                                          forecast[f"ai_{feature_name}"].max() - forecast[f"ai_{feature_name}"].min())
    
    
    # forecast.plot(x = 'ds', y = f"ai_{feature_name}")
    # plt.title(f"ai_{feature_name}")
    # plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
    # plt.gca().yaxis.grid(True, linewidth = 0.25)
    
    df_anomaly['ds'] = forecast['ds']
    # define a threshold from 0 to 1 for anomaly detection; here we select all non-zero values
    df_anomaly = df_anomaly.merge(forecast[forecast[f"ai_{feature_name}"] != 0][['ds', f"ai_{feature_name}"]], on = 'ds', how = 'outer')

In [None]:
# series is a list; each member of "series" list is a df with 2 columns ('timestamp', one feature)
series = [df[['timestamp', col]] for col in df.loc[:, (df.columns != 'timestamp') & (df.columns != 'scenario')]]

df_anomaly = pd.DataFrame()
for df_fb in series:
    fbprohet_single_feature(df_fb)

Snapshot of the fbprophet result, shown for two features as examples:
![Picture title](image-20211227-222126.png)

In [None]:
import plotly.express as px

fig = px.line(
    df_anomaly.set_index('ds'),
    y=df_anomaly.columns[(df_anomaly.columns.str[0:3] == 'ai_')
                         | (df_anomaly.columns.isin(['scenario']))],
    template='plotly_dark')
fig.update_layout(hovermode="x unified")  # shows all column names upon hover
fig.show()

# Unsupervised RCA: Bayesian Networks Causal Inference

In [None]:
def Bayes_causal_inf_RCA(df=df_anomaly):
    """
    >>> Inputs: fbprophet df_anomaly (DataFrame of AI values for each metric)
    >>> Output: 
            i) At each timestamp, it returns the number of active metrics with AI values > ai_threshold (nb_metrics_above_AIthresh column)
            ii) Unsupervised determination of Anomaly start
            iii) Dynamic plot (plotly) of all anomalous features
            iv) DAG (Directed Acyclic Graph) via Bayesian Networks Causal Inference
    """

    global df_anomaly

    # estimating ai_threshold (0-1) by looking at the 25th percentile of all metrics AI values and selecting the min of those 25th percentile values
    # to avoid extremely small numbers, we will cap the lower bound at an experimental value (e.g. 0.005)
    excl_cols = ['ds', 'timestamp', 'scenario']
    ai_threshold = max(0.005, df_anomaly.loc[:, ~df_anomaly.columns.isin(excl_cols)].quantile(0.25).min())
    
    cols = ['ds', 'timestamp', 'scenario', 'nb_metrics_above_thesh']
    # for each row, we want to count the number of metrics with AI values > threshold => df[df > 0.1].count(axis=1)
    df_anomaly['nb_metrics_above_AIthresh'] = df_anomaly.loc[:, ~df_anomaly.columns.isin(cols) >
                                                             ai_threshold].count(axis=1)
    
    # moving avg to smooth out the 'nb_metrics_above_AIthresh' column
    df_anomaly['nb_metrics_above_AIthresh'] = df_anomaly[
        'nb_metrics_above_AIthresh'].rolling(3, min_periods=1).mean()

    # detecting peaks in "nb_metrics_above_AIthresh"
    from scipy.signal import find_peaks
    peak_index, peak_height = find_peaks(
        df_anomaly['nb_metrics_above_AIthresh'],
        height=3)  # height=3 or prominence=1
    # === why 3? because Bayesian Networks Causal Inference needs a min of 3 variables ===
    
    # if one or more peaks are present in peak_index
    if len(peak_index) >= 1:
        # Anomaly start time: set as the first peak found
        anomaly_start_time = df_anomaly.loc[peak_index[0], 'ds']
        anomaly_start_index = peak_index[0]
        # Refine Anomaly start time: if all 3 prior observations have "nb_metrics_above_AIthresh" >= 3, then that is an anomaly start (based on my observation)
        for index in peak_index:
            if (df_anomaly.loc[index-3:index, 'nb_metrics_above_AIthresh'] >= 3).all():
                anomaly_start_time = df_anomaly.loc[index, 'ds']
                anomaly_start_index = index
                break
        
        print('\n=== CAUTION: Potential anomaly detected! ===')
        print(f'\n=== Anomaly start time: {anomaly_start_time} ===')
        
        # anomalous_features at start
        df_anomaly_peaks = df_anomaly.iloc[[anomaly_start_index]]
        anomalous_features = list(df_anomaly_peaks.iloc[[0]].columns[(df_anomaly_peaks.iloc[[0]].notna().all())
                   & (df_anomaly_peaks.iloc[[0]].columns.str[0:3] == 'ai_')])
        # removing "_ai" from the beginning of metrics' name
        anomalous_features = list(map(lambda st: str.replace(st, 'ai_', ''), anomalous_features))
        print(f'\n=== Anomalous Features at start time: {anomalous_features} ===')
        
        # dynamic plot
        import plotly.express as px
        import plotly.graph_objects as go
        fig = go.Figure([
            go.Scatter(name=f'Nb of Metrics above Anomaly Importance Threshold ({round(ai_threshold, 3)})',
                       x=df_anomaly['ds'],
                       y=df_anomaly['nb_metrics_above_AIthresh'],
                       mode='lines+markers',
                       marker=dict(size=4),
                       showlegend=True),
            go.Scatter(name='Anomalies (Peak Height >= 3)',
                       x=df_anomaly.iloc[peak_index]['ds'],
                       y=df_anomaly.iloc[peak_index]['nb_metrics_above_AIthresh'],
                       mode='markers',
                       marker=dict(color='#910707', size=10),
                       showlegend=True)
        ]); fig.show()
        
        # cleaning df_anomaly for causal inference below
        df_anomaly = df_anomaly.fillna(0)
        df_anomaly.drop('nb_metrics_above_AIthresh', axis=1, inplace=True)

        # Bayesian networks causal inference
        import bnlearn as bn
        t_s = df_anomaly.loc[peak_index[0]-3, 'ds']   # 3 points before anomaly start
        t_s_plus = str(anomaly_start_time)
        df_anomaly = df_anomaly.set_index('ds')
        DAG = bn.structure_learning.fit(df_anomaly[t_s:t_s_plus], methodtype='cl', scoretype='bic')
        # G = bn.plot(DAG)

        G = bn.plot(DAG,
                    scale=1,
                    node_color=None,
                    node_size=None,
                    params_interactive={
                        'directed': True,
                        'height': '800px',
                        'width': '70%',
                        'notebook': False,
                        'heading': 'Bayesian Causal Inference at Anomaly Start',
                        'layout': None,
                        'font_color': False,
                        'bgcolor': '#ffffff'
                    },
                    params_static={
                        'width': 25,
                        'height': 15,
                        'font_size': 10,
                        'font_family': 'times new roman',
                        'alpha': 0
                    },
                    interactive=True)
    
    # if peak_index is empty
    else:   
        print('\n=== No Anomaly')
        import plotly.express as px
        import plotly.graph_objects as go
        fig = go.Figure([
            go.Scatter(name=f'Nb of Metrics above Anomaly Importance Threshold ({round(ai_threshold, 3)})',
                       x=df_anomaly['ds'],
                       y=df_anomaly['nb_metrics_above_AIthresh'],
                       mode='lines+markers',
                       marker=dict(size=4),
                       showlegend=True)
        ]); fig.show()

In [None]:
Bayes_causal_inf_RCA()


=== CAUTION: Potential anomaly detected! ===

=== Anomaly start time: 2021-11-04 09:38:04 ===

=== Anomalous Features at start time: ['mysql.status.threads.connected', 'mysql.status.threads.created', 'redis.info.memory.used.dataset', 'redis.info.memory.used.rss', 'system.load.norm.1', 'system.network_summary.tcp.DelayedACKs', 'system.network_summary.tcp.PAWSEstab', 'system.network_summary.tcp.TCPDSACKIgnoredNoUndo', 'system.network_summary.tcp.TCPHystartTrainCwnd', 'system.network_summary.tcp.TCPHystartTrainDetect', 'system.network_summary.tcp.TCPLossProbeRecovery', 'system.process.summary.sleeping', 'system.socket.summary.tcp.all.syn_recv', 'system.socket.summary.tcp.all.time_wait'] ===


[bnlearn] >Computing best DAG using [chow-liu]


Building tree:   0%|          | 0/2628.0 [00:00<?, ?it/s]

[bnlearn]> Set node properties.
[bnlearn]> Set edge properties.
[bnlearn] >Plot based on Bayesian model


<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=78d3ccee-0bf4-46c4-a7db-d216d9128005' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>