#Anomaly Detection model for Predictive Maintenance

In [0]:
dbutils.library.installPyPI('azureml-sdk')
dbutils.library.installPyPI('keras')
dbutils.library.installPyPI('tensorflow', version='2.2.1')
dbutils.library.installPyPI('statsmodels')
dbutils.library.installPyPI('joblib')
dbutils.library.installPyPI('seaborn')
dbutils.library.installPyPI('h5py', version='2.10.0')

#best practice is to restart python after installing libraries
dbutils.library.restartPython()

Import the necessary libraries/namespaces.

In [0]:
import requests
import os
import uuid
import pandas as pd
import numpy as np
from sklearn import preprocessing
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
import seaborn as sns
sns.set(color_codes=True)
import matplotlib.pyplot as plt
import joblib
import h5py

from numpy.random import seed
import tensorflow as tf

from keras.layers import Input, Dropout
from keras.layers.core import Dense 
from keras.models import Model, Sequential, load_model
from keras import regularizers
from keras.models import model_from_json

import azureml
from azureml.core import Run
from azureml.core import Workspace
from azureml.core.model import Model as AmlModel
from azureml.core.run import Run
from azureml.core.experiment import Experiment

Read the raw data that will be used to showcase the anomaly detection models. The following data sets are available:
- Normal: contains sensor readings that correspond to operation under normal conditions
- Gradual failure: contains sensor readings that correspond to an initial period of normal operation followed by a period of gradual deterioration and then by one of failure
- Immediate failure: contains sensor readings that correspond to an initial period of normal operation followed by a sudden transition to failure

In [0]:
pdNormal = pd.read_csv('https://databricksdemostore.blob.core.windows.net/data/mcw-predictive-maintenance-for-remote-field-devices/TelemetryNormal.csv')
pdGradualFailure = pd.read_csv('https://databricksdemostore.blob.core.windows.net/data/mcw-predictive-maintenance-for-remote-field-devices/TelemetryWithGradualFailures.csv')
pdImmediateFailure = pd.read_csv('https://databricksdemostore.blob.core.windows.net/data/mcw-predictive-maintenance-for-remote-field-devices/TelemetryWithImmediateFailures.csv')

Let's look at the timeseries for the five sensors when they operate under normal conditions.

In [0]:
fig, ax = plt.subplots(3,2, figsize=(30,10))

ax[0][0].set_title('MotorPowerkW')
ax[0][1].set_title('MotorSpeed')
ax[1][0].set_title('PumpRate')
ax[1][1].set_title('TimePumpOn')
ax[2][0].set_title('CasingFriction')

ax[0][0].plot(pdNormal['MotorPowerkW'])
ax[0][1].plot(pdNormal['MotorSpeed'])
ax[1][0].plot(pdNormal['PumpRate'])
ax[1][1].plot(pdNormal['TimePumpOn'])
ax[2][0].plot(pdNormal['CasingFriction'])
fig.suptitle('Sensor data for operations under normal conditions', fontsize=16)
display(fig)
plt.close()

Let's look at the timeseries for the five sensors when they operate initially under normal conditions and then the conditions start to gradually deteriorate until they reach a failure state.

In [0]:
fig, ax = plt.subplots(3,2, figsize=(30,10))

ax[0][0].set_title('MotorPowerkW')
ax[0][1].set_title('MotorSpeed')
ax[1][0].set_title('PumpRate')
ax[1][1].set_title('TimePumpOn')
ax[2][0].set_title('CasingFriction')

ax[0][0].plot(pdGradualFailure['MotorPowerkW'])
ax[0][1].plot(pdGradualFailure['MotorSpeed'])
ax[1][0].plot(pdGradualFailure['PumpRate'])
ax[1][1].plot(pdGradualFailure['TimePumpOn'])
ax[2][0].plot(pdGradualFailure['CasingFriction'])
fig.suptitle('Sensor data for operations when gradual failure occurs', fontsize=16)
display(fig)
plt.close()

Let's look at the timeseries for the five sensors when they operate initially under normal conditions and then the conditions suddenly deteriorate reaching a state of failure.

In [0]:
fig, ax = plt.subplots(3,2, figsize=(30,10))

ax[0][0].set_title('MotorPowerkW')
ax[0][1].set_title('MotorSpeed')
ax[1][0].set_title('PumpRate')
ax[1][1].set_title('TimePumpOn')
ax[2][0].set_title('CasingFriction')

ax[0][0].plot(pdImmediateFailure['MotorPowerkW'])
ax[0][1].plot(pdImmediateFailure['MotorSpeed'])
ax[1][0].plot(pdImmediateFailure['PumpRate'])
ax[1][1].plot(pdImmediateFailure['TimePumpOn'])
ax[2][0].plot(pdImmediateFailure['CasingFriction'])
fig.suptitle('Sensor data for operations when immediate failure occurs', fontsize=16)
display(fig)
plt.close()

We are scaling all three data sets to make sure all values are properly normalized. For this, we are using the MinMax scaler approach.

In [0]:
scaler = preprocessing.MinMaxScaler()

X_train = pd.DataFrame(scaler.fit_transform(pdNormal),
                      columns=pdNormal.columns,
                      index=pdNormal.index)
joblib.dump(scaler, 'scaler.pkl')
X_test1 = pd.DataFrame(scaler.transform(pdGradualFailure),
                      columns=pdGradualFailure.columns,
                      index=pdGradualFailure.index)
X_test2 = pd.DataFrame(scaler.transform(pdImmediateFailure),
                      columns=pdImmediateFailure.columns,
                      index=pdImmediateFailure.index)

The approach we use for anomaly detection is based on a deep learning model that is trained to recognize normal operating conditions. 
We are using a special type of deep learning model called an autoencoder. It basically trains to produce outputs that are as similar as possible to the inputs it gets (hence the name autoencoder). As you will see when we actually train the model, we use the same dataset (X_train - sensors under normal conditions) as both training and test data. 

Notice the middle layer of the neural network - the one we named 'feature_vector'. When data reaches this layer it is in the for of a 20-dimensional vector. The layer is very important because this is where we 'split' the trained model. Splitting the trained model here and using this layer as an output means that we have a model that is capable of encoding a combination of five sensor readings into a 20-dimensional vector. This piece of the model is called an encoder (for obvious reasons) and we will use it later to better understand conditions that classify as anomalies.

Also notice that we are using MSE (Mean Squared Error) as the loss function of the model.

In [0]:
seed(10)
tf.random.set_seed(10)
act_func = 'elu'

input = Input(shape=(X_train.shape[1],))
x = Dense(100,activation=act_func, kernel_initializer='glorot_uniform', kernel_regularizer=regularizers.l2(0.0))(input)
x = Dense(50,activation=act_func, kernel_initializer='glorot_uniform')(x)
encoder = Dense(20,activation=act_func, kernel_initializer='glorot_uniform', name='feature_vector')(x)
x = Dense(50,activation=act_func, kernel_initializer='glorot_uniform')(encoder)
x = Dense(100,activation=act_func, kernel_initializer='glorot_uniform')(x)
output = Dense(X_train.shape[1],activation=act_func, kernel_initializer='glorot_uniform')(x)

model = Model(input, output)
model.compile(loss='mse',optimizer='adam')

encoder_model = Model(inputs=model.input, outputs=model.get_layer('feature_vector').output)
encoder_model.compile(loss='mse',optimizer='adam')

Let's train the autoencoder model using the X_train dataset (sensors operating under normal conditions) as both training and test data.

For model validation, we are using a 95% / 5% split.

In [0]:
epochs = 100
batch_size = 10

history=model.fit(np.array(X_train),np.array(X_train),
                  batch_size=batch_size, 
                  epochs=epochs,
                  validation_split=0.05,
                  verbose = 1)

Next, let's take a look at how the loss function (MSE) evolved for both training and validation data during the advancing of the epochs.

In [0]:
fig, ax = plt.subplots(1,1, figsize=(20,5))

ax.set_title('Training/validation loss')

plt.plot(history.history['loss'],
         'b',
         label='Training loss')
plt.plot(history.history['val_loss'],
         'r',
         label='Validation loss')
plt.legend(loc='upper right')
plt.xlabel('Epochs')
plt.ylabel('Loss, [mse]')
plt.ylim([0,.002])
display(fig)
plt.close()

Given the trained model, we are using it to make predictions on the same data it trained and then we calculate the loss in the form of MAE (Mean Average Error) between the predicted values and the actual values.

The histogram representation of this result (MAE loss) enables us to understand what is the reasonable value for the value that identifies 'normal conditions'. Looking at the right end of the bell shape, we can safely assume that 0.01 is a good value for the threshold.

This result is remarkable! We have found a limit of the loss function that, if exceeded, means we are moving into anomaly territory. We will use this in the next steps to detect anomalies.

In [0]:
X_pred = model.predict(np.array(X_train))
X_pred = pd.DataFrame(X_pred, 
                      columns=X_train.columns)
X_pred.index = X_train.index

scored = pd.DataFrame(index=X_train.index)
scored['Loss_mae'] = np.mean(np.abs(X_pred-X_train), axis = 1)

fig, ax = plt.subplots(1,1)
sns.distplot(scored['Loss_mae'],
             bins = 50, 
             kde= True,
            color = 'blue');
plt.xlim([0.0,.02])
display(fig)
plt.close()

Save both models (the full autoencoder as well as its half, the encoder) for future use.

In [0]:
model.save('anomaly_detection_full_model.h5')
encoder_model.save('anomaly_detection_encoder_model.h5')

For here on, we suppose we have a trained model that we use to monitor and detect anomalies in the functioning of the system.

We are now taking the other two data sets (the one containing gradual failure and the one containing immediate failure) and running them through our full model to get the predicted values. We then compare those predicted values with the input values and calculate for each point in the time series the MAE loss. Finally, we mark each point in the time series as an anomaly or not depending on whether MAE exceeds of not the threshold we determined earlier.

In [0]:
model = load_model('anomaly_detection_full_model.h5')

# Batch scoring for gradual failure
X_pred1 = model.predict(np.array(X_test1))
X_pred1 = pd.DataFrame(X_pred1, 
                      columns=X_test1.columns)
X_pred1.index = X_test1.index

scored1 = pd.DataFrame(index=X_test1.index)
scored1['Loss_mae'] = np.mean(np.abs(X_pred1-X_test1), axis = 1)
scored1['Threshold'] = 0.01
scored1['Anomaly'] = scored1['Loss_mae'] > scored1['Threshold']

# Batch scoring for immediate failure
X_pred2 = model.predict(np.array(X_test2))
X_pred2 = pd.DataFrame(X_pred2, 
                      columns=X_test2.columns)
X_pred2.index = X_test2.index

scored2 = pd.DataFrame(index=X_test2.index)
scored2['Loss_mae'] = np.mean(np.abs(X_pred2-X_test2), axis = 1)
scored2['Threshold'] = 0.01
scored2['Anomaly'] = scored2['Loss_mae'] > scored2['Threshold']

We are finally ready to plot the results. The blue lines are the actual values of the MAE loss function and the orange line is the threshold we have determined earlier.

Notice how in the case of the gradual failure data set we are detecting the abnormal state quite early, before it reaches the final failed state. In real life, this is extremely important as it allows an automatic shutdown of the system before major failure occurs.

In [0]:
fig, ax = plt.subplots(1,2, figsize=(30,5))

ax[0].set_title('Failure chart for gradual failure')
ax[0].plot(scored1[['Loss_mae', 'Threshold']])

ax[1].set_title('Failure chart for immediate failure')
ax[1].plot(scored2[['Loss_mae', 'Threshold']])

ax[0].set_ylabel('Loss (MAE)')
ax[1].set_ylabel('Loss (MAE)')

ax[0].legend(('Pump Health Score', 'Failure Threshold'), loc='upper left')
ax[1].legend(('Pump Health Score', 'Failure Threshold'), loc='upper left')

ax[0].set_yscale('log')
ax[1].set_yscale('log')

display(fig)
plt.close()

Now that we have a working model for detecting anomalies in the working of the system, let's try to gain a better understanding of what's happening.

To do this we will use the encoder portion of our model (the one that 'encodes' each individual state of the 5 sensors into a 20-dimensional vector) and a technique called PCA (Principal Component Analysis).

First, we will encode both data sets with anomalies (gradual failure and immediate failure).

In [0]:
model = load_model('anomaly_detection_encoder_model.h5')
X_enc1 = encoder_model.predict(np.array(X_test1))
X_enc2 = encoder_model.predict(np.array(X_test2))

We apply several PCA models (with 2, 5, and 10 components respectively) to the state vectors that we obtained in the previous step.

For the gradual failure dataset, we find out that 2 out of the 20 components of the embedding vectors explain 99.48% of the entire variation found in the data.
For the immediate failure dataset, we find out that 2 out of the 20 components of the embedding vectors explain 99.81% of the entire variation found in the data.

As we move to 5 and 10 components for the PCA, we get some marginal increase in those percentages.

In [0]:
pca_components = [2, 5, 10]
gradual_failure_pca = []
immediate_failure_pca = []

def pca_analysis(input, results, failure_type):
  for comp in pca_components:
    pca = PCA(n_components = comp)
    pca_result = pca.fit_transform(input)
    print('{} failure - Cumulative explained variation for {} principal components: {}'.format(failure_type, comp, np.sum(pca.explained_variance_ratio_)))
    results.append(pca_result)
    
pca_analysis(X_enc1, gradual_failure_pca, 'Gradual')
pca_analysis(X_enc2, immediate_failure_pca, 'Immediate')

To get a visual representation of the 2 components mentioned above, we correlate those 2 components from the embedding vectors with the anomaly flag we've already determined.

We apply some transformations to improve the readability of the gradual failure dat set.

In [0]:
X_embedded1 = pd.DataFrame(gradual_failure_pca[0], columns=['X','Y'])
X_embedded1['State'] = np.where(scored1['Anomaly'], 'Failure', 'Normal')
X_embedded1['X_Norm'] = np.log(X_embedded1['X'] * -1 + 20)
X_embedded1['Y_Norm'] = np.log(X_embedded1['Y'] + 2)

X_embedded2 = pd.DataFrame(immediate_failure_pca[0], columns=['X','Y'])
X_embedded2['State'] = np.where(scored2['Anomaly'], 'Failure', 'Normal')

Finally, we plot the results.

In each of the two plots, each point represents one individual state of the system (corresponding to one point in time in the time series). The blue points indicate a normal state, and the orange ones indicate a state of anomaly (failure).

It is quite remarkable how different the two charts are. In the one corresponding to the gradually failing system, most blue points are concentrated into the leftmost part (normal operating conditions). Then we see an arch of orange points that define the gradual failure of the system. The chart corresponding to the immediately failing system, all points are concentrated in two bands - one for normal conditions and one for abnormal conditions. The sudden transition of the system from a normal condition to an abnormal one is very clearly visible.

In [0]:
fig, ax = plt.subplots(1, 2, figsize=(30,5))

ax[0].set_title('Gradual failure readings')
ax[1].set_title('Immediate failure readings')

sns.scatterplot(x='X_Norm', y='Y_Norm', hue='State', data=X_embedded1, ax=ax[0])
sns.scatterplot(x='X', y='Y', hue='State', data=X_embedded2, ax=ax[1])

display(fig)
plt.close()

## Save the model to disk

In preparation for deploying the model, you need to save the model and scaler to a new `models` directory that will be uploaded to the deployment image later on.

In [0]:
import uuid
import os

# Create a temporary folder to store locally relevant content for this notebook
tempFolderName = '/FileStore/mcw_predmaint_{0}'.format(uuid.uuid4())
dbutils.fs.mkdirs(tempFolderName)
print('Content files will be saved to {0}'.format(tempFolderName))

In [0]:
import os
models_dir = tempFolderName + "/models/"
if not os.path.exists(models_dir):
    os.makedirs(models_dir)

In [0]:
model = load_model('anomaly_detection_full_model.h5')
model.save(models_dir + 'anomaly_score.h5')
scaler = joblib.load('scaler.pkl')
joblib.dump(scaler, models_dir + 'scaler.pkl')

## Test loading the model

All the examples above performed batched predictions. Let's take a look now to an approach for real time scoring. We start with the assumption that we have a previously trained model and scaler.

We start by loading the models from their persisted state.

Next, we scale the values of one set of sensor readings and run the scaled values through the anomaly detection model.

We calculate the MAE value (between the predicted values array and the original readings array).

Finally, we compare the MAE value with the threshold to decide whether we have a normal or abnormal condition.
The example below detects a normal condition.

In [0]:
scaler = joblib.load(models_dir + 'scaler.pkl')
model = load_model(models_dir + 'anomaly_score.h5')

sensor_readings = np.array([70, 200, 60.6, 0, 1448.17])
scaled_sensor_readings = scaler.transform(sensor_readings.reshape(1,-1))

pred_sensor_readings = model.predict(scaled_sensor_readings)
score = np.mean(np.abs(scaled_sensor_readings - pred_sensor_readings[0]))

if score > 0.01:
  print('WARNING! Abnormal conditions detected')
else:
  print('Everything is ok')

The following example shows how an abnormal condition is detected.

In [0]:
sensor_readings = np.array([14.23, 41, 14.4, 318.50, 601.95])
scaled_sensor_readings = scaler.transform(sensor_readings.reshape(1,-1))

pred_sensor_readings = model.predict(scaled_sensor_readings)
score = np.mean(np.abs(scaled_sensor_readings - pred_sensor_readings[0]))

if score > 0.01:
  print('WARNING! Abnormal conditions detected')
else:
  print('Everything is ok')

## Deploy the model

Now you will use the Azure Machine Learning service SDK to programmatically register your model and create a container image for the web service that uses it and deploy that image on to an Azure Container Instance.

Run the following cells to create some helper functions that you will use for deployment.

In [0]:
import azureml
from azureml.core import Workspace
from azureml.core.model import Model as AmlModel

In [0]:
def getOrCreateWorkspace(subscription_id, resource_group, workspace_name, workspace_region):
    # By using the exist_ok param, if the worskpace already exists we get a reference to the existing workspace instead of an error
    ws = Workspace.create(
        name = workspace_name,
        subscription_id = subscription_id,
        resource_group = resource_group, 
        location = workspace_region,
        exist_ok = True)
    return ws

In [0]:
def deployModelAsWebService(ws, model_paths, model_names, 
                scoring_script_filename="score.py", 
                conda_packages=['scikit-learn','numpy','joblib','pandas','keras','h5py==2.10.0'],
                conda_file="dependencies.yml", runtime="python",
                cpu_cores=1, memory_gb=1, tags={'name':'scoring'},
                description='Scoring web service.',
                service_name = "scoringservice"
               ):

    print("Registering and uploading models...")
    registered_models = []
    
    for index, path in enumerate(model_paths):
      
      print('Registering model {} with name {}'.format(path, model_names[index]))
    
      registered_models.append(AmlModel.register(model_path=path, 
                                      model_name=model_names[index], 
                                      workspace=ws))
    
    print('Successfully registered {} models.'.format(len(registered_models)))

    # create a Conda dependencies environment file
    print("Creating conda dependencies file locally...")
    from azureml.core.conda_dependencies import CondaDependencies 
    mycondaenv = CondaDependencies.create(conda_packages=conda_packages)
    mycondaenv.add_pip_package('tensorflow==2.2.1')
    
    #dbutils.fs.put(tempFolderName + "/" + conda_file, mycondaenv.serialize_to_string(), overwrite=True)
    with open(conda_file,"w") as f:
        f.write(mycondaenv.serialize_to_string())
    
    # create inference configuration
    print("Creating inference configuration...")
    from azureml.core.model import InferenceConfig
    inference_config = InferenceConfig(runtime=runtime, 
                                       entry_script=scoring_script_filename,
                                       conda_file=conda_file)
    
    # create ACI configuration
    print("Creating ACI configuration...")
    from azureml.core.webservice import AciWebservice, Webservice
    aci_config = AciWebservice.deploy_configuration(
        cpu_cores = cpu_cores, 
        memory_gb = memory_gb, 
        tags = tags, 
        description = description)

    # deploy the webservice to ACI
    print("Deploying webservice to ACI...")
    webservice = AmlModel.deploy(workspace=ws,
                              name=service_name,
                              models=registered_models,
                              inference_config=inference_config,
                              deployment_config=aci_config)
    webservice.wait_for_deployment(show_output=True)
    
    return webservice

Your web service which knows how to load the model and use it for scoring needs to be saved out to a file for the Azure Machine Learning service SDK to deploy it. Run the following cell to create this file.

In [0]:
# write out the file scoring-service.py
scoring_service = """
import os
import numpy as np
import pandas as pd
import tensorflow.keras
import joblib
from tensorflow.keras.models import load_model
import h5py

from azureml.core.model import Model as AmlModel

def init():
  global model
  global scaler
  global init_error
    
  try:
    
    init_error = None
  
    scaler_file_path = AmlModel.get_model_path('anomaly-detection-scaler')
    model_file_path = AmlModel.get_model_path('anomaly-detection-model')

    print('Loading scaler from:', scaler_file_path)
    scaler = joblib.load(scaler_file_path)

    print('Loading model from:', model_file_path)
    model = load_model(model_file_path)

  except Exception as e:
    init_error = e
    print(e)
        
# note you can pass in multiple rows for scoring
def run(raw_data):

  if init_error is not None:
    return 'Init error: {}'.format(str(init_error))

  try:
    print("Received input:", raw_data)
    
    input_df = pd.read_json(raw_data, orient='values')
    
    sensor_readings = np.array(input_df)
    scaled_sensor_readings = scaler.transform(sensor_readings.reshape(1,-1))

    pred_sensor_readings = model.predict(scaled_sensor_readings)
    score = np.mean(np.abs(scaled_sensor_readings - pred_sensor_readings[0]))

    if score > 0.01:
      print('WARNING! Abnormal conditions detected')
      return 1
    else:
      print('Everything is ok')
      return 0

  except Exception as e:
    error = str(e)
    return error
"""

with open(tempFolderName + "/" + "score.py", "w") as file:
    file.write(scoring_service)

Next, create your Workspace (or retrieve the existing one if it already exists) and deploy the model.

Please note that executing the next few cells can take between **7** and **10** minutes.

## Configure access to the Azure Machine Learning resources
To begin, you will need to provide the following information about your Azure Subscription.

**If you are using your own Azure subscription, please provide names for subscription_id, resource_group, workspace_name and workspace_region to use.** Note that the workspace needs to be of type [Machine Learning Workspace](https://docs.microsoft.com/en-us/azure/machine-learning/service/setup-create-workspace).

In the following cell, be sure to set the values for `subscription_id`, `resource_group`, `workspace_name` and `workspace_region` as directed by the comments (*these values can be acquired from the Azure Portal*).

To get these values, do the following:
1. Navigate to the Azure Portal and login with your credentials.
2. From the left-hand menu, under Favorites, select `Resource Groups`.
3. In the list, select the resource group used for the lab.
4. Open your Machine Learning Workspace.
5. From the Overview tab, capture the desired values.

In [0]:
#Provide the Subscription ID of your existing Azure subscription
subscription_id = ""

#Provide values for the existing Resource Group 
resource_group = ""  

#Provide the Workspace Name and Azure Region of the Azure Machine Learning Workspace
workspace_name = ""
workspace_region = "eastus"

Run the following cells to connect to your **Azure Machine Learning Workspace** If the workspace does not already exist, it will be created. Otherwise the code below will connect to the existing workspace without creating a new one.

**Important Note**: You will be prompted to login in the text that is output below the cell. Be sure to navigate to the URL displayed and enter the code that is provided. Once you have entered the code, return to this notebook and wait for the output to read `Workspace configuration succeeded`.

In [0]:
ws =  getOrCreateWorkspace(subscription_id, resource_group, 
                   workspace_name, workspace_region)

In [0]:
# It is important to change the current working directory so that your generated scoring-service.py is at the root of it. 
# This is required by the Azure Machine Learning SDK
os.chdir(tempFolderName)
os.getcwd()

In [0]:
webservice = deployModelAsWebService(ws, 
                                     model_paths=['models/anomaly_score.h5', 'models/scaler.pkl'], 
                                     model_names=['anomaly-detection-model', 'anomaly-detection-scaler'])
print(webservice.state)

In [0]:
result = webservice.run('[[14.23, 41, 14.4, 318.50, 601.95]]')
result
# result of 0 = no anomaly detected
# result of 1 = anomaly detected

## Copy the deployed web service URL

Run the cell below to retrieve the web service URL. Copy the value in the cell's output after running and use it to configure your Azure Function App.

In [0]:
print(webservice.scoring_uri)