# "AIOps with Spark"
> "Forecasting metrics values for Cloud resources, to find anomalies, and optimize costs"

- toc: true
- branch: master
- badges: false
- comments: true
- categories: [data-analysis]

## Introduction

Everyday people perform various computational processes which demand a lot of online resources throughout the day, month or the year. The amount of data produced per day is massive, and it is important to know the exact usage of resources through this time to make it available to the users as an when required, thereby improving the efficiency.
In the following project, we make use of certain dev-ops output data and analyze the variation in the performance throughout the day by performing a time series analysis on the data using spark RDDs.
Apache Spark is a unified analytical engine used for big data and machine learning purposes developed at UC Berkeley in 2009. They also founded Databricks from Azure in 2013, which we use here to execute our program


## Architecture

![](my_icons/Bigdataintro.png)

## Importing required libraries

In [0]:

import numpy as np
import pandas as pd
from fbprophet import Prophet
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_list, struct
from pyspark.sql.types import FloatType, StructField, StructType, StringType, TimestampType
from sklearn.metrics import mean_squared_error
from pathlib import Path

Defining path where data is stored.

In [0]:
path = Path("/FileStore/tables/")
file_name = "i_0a1c7dc126cb6ac8b_CPUUtilization.csv"

In [0]:
import pyarrow
pyarrow.__version__

## Defining Helper Functions

Retrieves the data from path and cleans it.

In [None]:
def retrieve_data(file_name):
    """Load sample data from ./data/original-input.csv as a pyspark.sql.DataFrame."""
#     df = (spark.read
#           .option("header", "true")
#           .option("inferSchema", value=True)
#           .csv("./data/input.csv"))
    df = spark.read.option("header", "true").option("inferSchema", value=True).format("csv").load(file_name)

    # Drop any null values incase they exist
    df = df.dropna()

    # Rename timestamp to ds and total to y for fbprophet
    df = df.select(
        df['timestamp'].alias('ds'),
        df['service_name'],
        df['value'].cast(FloatType()).alias('y'),
        df['metric_name']
    )

    return df

Retrieves the data as a stream from path.

In [None]:
def retrieve_datastream(file_name):
  schema = spark.read.option("header", "true").option("inferSchema", value=True).format("csv").load(file_name).schema

  streamingInputDF = (
    spark
      .readStream
      .schema(schema)
      .option("header", "true").format("csv").load(file_name))


  streamingCountsDF = (
    streamingInputDF
      .groupBy(
        streamingInputDF.value,
        window(streamingInputDF.timestamp, "1 hour"))
      .count()
  )
  

Function to convert each of the sql rows into a Python dictionary. We append each element to a single dictionalry and convert the final output into a pandas dataframe

In [None]:
def transform_data(row):
    """Transform data from pyspark.sql.Row to python dict to be used in rdd."""
    data = row['data']
    app = row['service_name']
    mt = row['metric_name']

    # Transform [pyspark.sql.Dataframe.Row] -> [dict]
    data_dicts = []
    for d in data:
        data_dicts.append(d.asDict())

    # Convert into pandas dataframe for fbprophet
    data = pd.DataFrame(data_dicts)
    data['ds'] = pd.to_datetime(data['ds'])

    return {
        'service_name': app,
        'metric_name': mt,
        'data': data
    }

Function used to split the data into training dataset and a testing dataset. The reference used to split would be the value of the timestamp. We initialize a 'max_datetime' variable to set a threshold for the split.

In [None]:
def partition_data(d):
    """Split data into training and testing based on timestamp."""
    # Extract data from pd.Dataframe
    data = d['data']

    # Find max timestamp and extract timestamp for start of day
    max_datetime = pd.to_datetime(max(data['ds']))
#     start_datetime = max_datetime.replace(hour=00, minute=00, second=00)
    start_datetime = '10/10/20 00:00'
    # Extract training data
    train_data = data[data['ds'] < start_datetime]

    # Account for zeros in data while still applying uniform transform
#     train_data['y'] = train_data['y'].apply(lambda x: np.log(x + 1))

    # Assign train/test split
    d['test_data'] = data.loc[(data['ds'] >= start_datetime)
                              & (data['ds'] <= max_datetime)]
    d['train_data'] = train_data

    return d

Function that is used to create a prophet model out if the data we input into it. Prophet takes in certain arguments as shown below and outputs a time series analysis for the same.

In [None]:
def create_model(d):
    """Create Prophet model using each input grid parameter set."""
    m = Prophet()
    d['model'] = m

    return d

Once we have the Prophet model initialized we can fit the training dataset from our previous function and train the model.

In [None]:
def train_model(d):
    """Fit the model using the training data."""
    model = d['model']
    train_data = d['train_data']
    model.fit(train_data)
    d['model'] = model

    return d

Function used to test if the trained model is able to perform smoothly and forecast the right values for the same.

In [None]:
def test_model(d):
    """Run the forecast method on the model to make future predictions."""
    test_data = d['test_data']
    model = d['model']
    t = test_data['ds']
    t = pd.DataFrame(t)
    t.columns = ['ds']

    predictions = model.predict(t)
    d['predictions'] = predictions

    return d

Function to forecast the data values upto a certain defined threshold.

In [None]:
def make_forecast(d):
    """Execute the forecast method on the model to make future predictions."""
    model = d['model']
    future = model.make_future_dataframe(
        periods=576, freq='5min', include_history=False)
    future = pd.DataFrame(future['ds'].apply(pd.DateOffset(1)))
    forecast = model.predict(future)
    d['forecast'] = forecast
    

    return d

Function used to normalize all the values of our data.

In [None]:
def normalize_predictions(d):
    """Normalize predictions using np.exp()."""
    predictions = d['predictions']
#     predictions['yhat'] = np.exp(predictions['yhat']) - 1
    d['predictions'] = predictions
    return d

Function used to normalize the values of our forecasted data. As mentioned below, since certain values tend to infinity, we replace these values with None using lambda functions.

In [None]:
def normalize_forecast(d):
    """Normalize predictions using np.exp().
    Note:  np.exp(>709.782) = inf, so replace value with None
    """
    forecast = d['forecast']
#     forecast['yhat'] = forecast['yhat'].apply(
#         lambda x: np.exp(x) - 1 if x < 709.782 else None)
#     forecast['yhat_lower'] = forecast['yhat_lower'].apply(
#         lambda x: np.exp(x) - 1 if x < 709.782 else None)
#     forecast['yhat_upper'] = forecast['yhat_upper'].apply(
#         lambda x: np.exp(x) - 1 if x < 709.782 else None)
    d['forecast'] = forecast
    return d

Function used to calculate the mean squared error of the given test data with respect to the predicted outcomes.

In [None]:
def calc_error(d):
    """Calculate error using mse (mean squared error)."""
    test_data = d['test_data']
    predictions = d['predictions']
    results = mean_squared_error(test_data['y'], predictions['yhat'])
    d['mse'] = results

    return d

Function used to return a table of all the previously calculated values of our forecast and our mean square errors.

In [None]:
def reduce_data_scope(d):
    """Return a tuple (service_name + , + metric_type, {})."""
    return (
        d['service_name'] + ',' + d['metric_name'],
        {
            'forecast': d['forecast'],
            'mse': d['mse'],
        },
    )

Function to Flatten rdd into tuple which will be converted into a dataframe.Row.
    Checks each float to see if it is a np datatype, since it could be None.
    If it is an np datatype then it will convert to scalar python datatype
    so that it can be persisted into a database, since most dont know how to
    interpret np python datatypes.

In [0]:
def expand_predictions(d):
    """Flatten rdd into tuple which will be converted into a dataframe.Row.
    Checks each float to see if it is a np datatype, since it could be None.
    If it is an np datatype then it will convert to scalar python datatype
    so that it can be persisted into a database, since most dont know how to
    interpret np python datatypes.
    """
    service_metric, data = d
    service, metric = service_metric.split(',')
    return [
        (
            service,
            metric,
            p['ds'].to_pydatetime(),
            np.asscalar(p['yhat']) if isinstance(
                p['yhat'], np.generic) else p['yhat'],
            np.asscalar(p['yhat_lower']) if isinstance(
                p['yhat_lower'], np.generic) else p['yhat_lower'],
            np.asscalar(p['yhat_upper']) if isinstance(
                p['yhat_upper'], np.generic) else p['yhat_upper'],
            np.asscalar(data['mse']) if isinstance(
                data['mse'], np.generic) else data['mse'],
        ) for i, p in data['forecast'].iterrows()
    ]

In [0]:
file_name = "/FileStore/tables/i_0a1c7dc126cb6ac8b_CPUUtilization.csv"

Reading the csv file from the databricks file system (dbfs).

In [0]:
df1 = spark.read.format("csv").load(file_name, header="true", inferSchema="true")

The below function registers a table to make it accessible via SQL content as shown below.

In [0]:
df1.createOrReplaceTempView("data_ec2")

In [None]:
%sql
select timestamp, value from data_ec2 where metric_name = 'CPUUtilization';

In [0]:
#hide
%sql
select timestamp, value from data_ec2 where metric_name = 'CPUUtilization';

timestamp,value
2020-09-08 00:00:00,3.569324812430564
2020-09-08 00:05:00,3.43524590166937
2020-09-08 00:10:00,4.18107344632663
2020-09-08 00:15:00,4.4329860145721245
2020-09-08 00:20:00,4.668259701784014
2020-09-08 00:25:00,5.538135593236124
2020-09-08 00:30:00,5.103107344648038
2020-09-08 00:35:00,6.304621654133174
2020-09-08 00:40:00,6.843243493578017
2020-09-08 00:45:00,7.64604982864135


In [0]:
df1

## Training the model

 Initializing the spark configuration and spark session.

In [0]:
conf = (SparkConf()
        .setMaster("local[*]")
        .setAppName("EC2 Training"))

spark = (SparkSession
         .builder
         .config(conf=conf)
         .getOrCreate())

In [0]:
sc = spark.sparkContext
sc.setLogLevel("INFO")

Retrieve data from dbfs

In [0]:
df = retrieve_data(file_name)

Group data by app and metric_type to aggregate data for each app-metric combo.

In [None]:
df = df.groupBy('service_name', 'metric_name')
df = df.agg(collect_list(struct('ds', 'y')).alias('data'))

Using the previously defined functions with lambda functions to execute the RDDs and provide our prophet model.

In [0]:
df = (df.rdd
      .map(lambda r: transform_data(r))
      .map(lambda d: partition_data(d))
      # prophet cant handle data with < 2 training examples
      .filter(lambda d: len(d['train_data']) > 2)
      .map(lambda d: create_model(d))
      .map(lambda d: train_model(d))
      .map(lambda d: test_model(d))
      .map(lambda d: make_forecast(d))
      .map(lambda d: normalize_forecast(d))
      .map(lambda d: normalize_predictions(d))
      .map(lambda d: calc_error(d))
      .map(lambda d: reduce_data_scope(d))
      .flatMap(lambda d: expand_predictions(d)))

Defining the schema of the rdd.

In [0]:
schema = StructType([
    StructField("service_name", StringType(), True),
    StructField("metric_name", StringType(), True),
    StructField("ds", TimestampType(), True),
    StructField("yhat", FloatType(), True),
    StructField("yhat_lower", FloatType(), True),
    StructField("yhat_upper", FloatType(), True),
    StructField("mse", FloatType(), True)
])

In [0]:
df = spark.createDataFrame(df, schema)

In [0]:
df

In [None]:
%sql
select ds, yhat from forecasts

![](my_icons/qqplot.png)

In [0]:
#hide
%sql
select ds, yhat from forecasts

ds,yhat
2020-10-11T00:00:00.000+0000,4.619425
2020-10-11T00:05:00.000+0000,4.8191323
2020-10-11T00:10:00.000+0000,5.02556
2020-10-11T00:15:00.000+0000,5.238224
2020-10-11T00:20:00.000+0000,5.4566207
2020-10-11T00:25:00.000+0000,5.6802278
2020-10-11T00:30:00.000+0000,5.908505
2020-10-11T00:35:00.000+0000,6.1408997
2020-10-11T00:40:00.000+0000,6.376847
2020-10-11T00:45:00.000+0000,6.6157713


![](my_icons/sqltable1.png)

In [None]:
df.show(n=5)

![](my_icons/dfshow.png)

In [0]:
df.createOrReplaceTempView("forecasts")

We create a temporary variable to store the pandas dataframe and slice to show the the output.

In [0]:
temp = df.toPandas()

In [0]:
temp[:30]

Unnamed: 0,service_name,metric_name,ds,yhat,yhat_lower,yhat_upper,mse
0,ec2,CPUUtilization,2020-10-11 00:00:00,4.619425,2.152688,7.399229,14.689586
1,ec2,CPUUtilization,2020-10-11 00:05:00,4.819132,2.049825,7.313528,14.689586
2,ec2,CPUUtilization,2020-10-11 00:10:00,5.02556,2.346648,7.663989,14.689586
3,ec2,CPUUtilization,2020-10-11 00:15:00,5.238224,2.60047,8.062807,14.689586
4,ec2,CPUUtilization,2020-10-11 00:20:00,5.456621,2.759909,8.302505,14.689586
5,ec2,CPUUtilization,2020-10-11 00:25:00,5.680228,3.041043,8.384025,14.689586
6,ec2,CPUUtilization,2020-10-11 00:30:00,5.908505,3.247892,8.670483,14.689586
7,ec2,CPUUtilization,2020-10-11 00:35:00,6.1409,3.661118,8.664343,14.689586
8,ec2,CPUUtilization,2020-10-11 00:40:00,6.376847,3.758875,8.888179,14.689586
9,ec2,CPUUtilization,2020-10-11 00:45:00,6.615771,3.907142,9.169059,14.689586


Writing the output forecasts to a parquet file stored in dbfs.

In [0]:
df.write.options(header=True).parquet(f'{file_name}_output.parquet', mode='overwrite')

## Streaming data

The input path has been defined where the new data gets added. Structured streaming has been performed for testing our model.

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

inputPath = "/FileStore/tables/"
name = "i_0a1c7dc126cb6ac8b_CPUUtilization.csv"

schema = spark.read.option("header", "true").option("inferSchema", value=True).format("csv").load(inputPath+name).schema

streamingInputDF = (
  spark
    .readStream
    .schema(schema)
    .option("header", "true").format("csv").load(inputPath+'*.csv'))


streamingCountsDF = (
  streamingInputDF
    .groupBy(
      streamingInputDF.value,
      window(streamingInputDF.timestamp, "1 hour"))
    .count()
)

In [0]:
query = (
  streamingCountsDF
    .writeStream
    .format("memory")        # memory = store in-memory table (for testing only)
    .queryName("count")     # counts = name of the in-memory table
    .outputMode("complete")  # complete = all the counts should be in the table
    .start()
)

![](my_icons/dag.png)

In [0]:
#hide
%sql select value, date_format(window.end, "MMM-dd HH:mm") as time, count from count order by time

value,time,count
3.614851347628106,Dec-01 00:00,1
3.114328054119905,Dec-01 00:00,1
3.2558071686745165,Dec-01 00:00,1
3.583333333333333,Dec-01 00:00,1
3.14051125309327,Dec-01 00:00,1
4.015573770445236,Dec-01 00:00,1
3.0732240437153395,Dec-01 00:00,1
3.5,Dec-01 00:00,1
3.01190608502388,Dec-01 00:00,1
3.049999999968958,Dec-01 00:00,1


In [0]:
%sql select value, date_format(window.end, "MMM-dd HH:mm") as time, count from count order by time

![](my_icons/streamtable.png)