In [54]:
# For Spark 3.1
%%configure -f
{
  "name": "synapseml",
  "conf": {
      "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.9.5-13-d1b51517-SNAPSHOT",
      "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
      "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12",
      "spark.yarn.user.classpath.first": "true"
  }
}
# For Spark 3.2
# %%configure -f
# {
#   "name": "synapseml",
#   "conf": {
#       "spark.jars.packages": " com.microsoft.azure:synapseml_2.12:0.9.5 ",
#       "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
#       "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12,io.netty:netty-tcnative-boringssl-static",
#       "spark.yarn.user.classpath.first": "true"
#   }
# }

StatementMeta(, , , Waiting, )

# Learning Materials
Check out this [recipe](https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fmicrosoft.github.io%2FSynapseML%2Fdocs%2Fnext%2Ffeatures%2Fcognitive_services%2FCognitiveServices%2520-%2520Multivariate%2520Anomaly%2520Detection%2F&data=04%7C01%7Cjingruhan%40microsoft.com%7Cc75bbca96910426f341308d9f1d5b74f%7C72f988bf86f141af91ab2d7cd011db47%7C0%7C0%7C637806722495462177%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=dqQ0lQsOJ9H6yOmfDNHwckwYTdoVenDF7HvHquvMJgw%3D&reserved=0) for reference on this tutorial.

Check out this [blog](www.aka.ms/mvad-on-spark) to know the workflow of MVAD in Synapse: www.aka.ms/mvad-on-spark.

# 💾Import modules

In [None]:
from synapse.ml.cognitive import *
from notebookutils import mssparkutils

In [77]:
import numpy as np
import pandas as pd
import pyspark
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType
import synapse.ml


StatementMeta(, , , Waiting, )

# 📈Load data into a Spark Dataframe

In [69]:

df = spark.read.format("csv").option("header", True).load("wasbs://mvadcsvdata@sparkdemostorage.blob.core.windows.net/spark-demo-data.csv")

df = df.withColumn("sensor_1", col("sensor_1").cast(DoubleType())) \
    .withColumn("sensor_2", col("sensor_2").cast(DoubleType())) \
    .withColumn("sensor_3", col("sensor_3").cast(DoubleType()))

df.show(10)

StatementMeta(, , , Waiting, )

+--------------------+-----------+------------+------------+
|           timestamp|   sensor_1|    sensor_2|    sensor_3|
+--------------------+-----------+------------+------------+
|2021-01-01T00:00:00Z|0.029422617|-0.473649498| -0.19224458|
|2021-01-01T00:01:00Z|1.007787393|-1.052367306| 0.262377261|
|2021-01-01T00:02:00Z|0.748065889|-0.566472749|-0.072228041|
|2021-01-01T00:03:00Z|0.969546005|-0.376075739|   0.1906547|
|2021-01-01T00:04:00Z|1.437884759| 0.882075332|-1.007224649|
|2021-01-01T00:05:00Z|0.997356244| 0.328516725|-0.428782042|
|2021-01-01T00:06:00Z|0.972924733| 0.695607892|-1.139777769|
|2021-01-01T00:07:00Z|1.616419945| 0.082182996| 1.447420156|
|2021-01-01T00:08:00Z| 0.73030701| 0.534265584| 1.220560838|
|2021-01-01T00:09:00Z|1.576517163|-2.579702562|-1.474013466|
+--------------------+-----------+------------+------------+
only showing top 10 rows

# 🔧Training
## 1. Training preparation

In [73]:
#Input your key vault name and anomaly key name in key vault.
anomalyKey = mssparkutils.credentials.getSecret("[key_vault_name]","[anomaly_key_secret_name]")
#Input your key vault name and connection string name in key vault.
connectionString = mssparkutils.credentials.getSecret("[key_vault_name]", "[connectriong_string_secret_name]")

#Specify information about your data.
startTime = "2021-01-01T00:00:00Z"
endTime = "2021-01-02T09:18:00Z"
timestampColumn = "timestamp"
inputColumns = ["sensor_1", "sensor_2", "sensor_3"]
#Specify the container you created in Storage account, you could also initialize a new name here, and Synapse will help you create that container automatically.
containerName = "mvadtest"
#Set a folder name in Storage account to store the intermediate data.
intermediateSaveDir = "intermediateData"

simpleMultiAnomalyEstimator = (FitMultivariateAnomaly()
    .setSubscriptionKey(anomalyKey)
#In .setLocation, use lowercase letter like: eastus.
    .setLocation("eastus")
    .setStartTime(startTime)
    .setEndTime(endTime)
    .setContainerName(containerName)
    .setIntermediateSaveDir(intermediateSaveDir)
    .setTimestampCol(timestampColumn)
    .setInputCols(inputColumns)
    .setSlidingWindow(200)
    .setConnectionString(connectionString))

StatementMeta(, , , Waiting, )

## 2. Train a model

In [74]:
model = simpleMultiAnomalyEstimator.fit(df)
type(model)

StatementMeta(, , , Waiting, )

synapse.ml.cognitive.DetectMultivariateAnomaly.DetectMultivariateAnomaly

# 🔍Inference with trained model
## 1. Inference

In [75]:
startInferenceTime = "2021-01-02T09:19:00Z"
endInferenceTime = "2021-01-03T01:59:00Z"
result = (model
      .setStartTime(startInferenceTime)
      .setEndTime(endInferenceTime)
      .setOutputCol("results")
      .setErrorCol("errors")
      .setTimestampCol(timestampColumn)
      .setInputCols(inputColumns)
      .transform(df))

StatementMeta(, , , Waiting, )

## 2. Get inference results

In [80]:
rdf = (result.select("timestamp",*inputColumns, "results.contributors", "results.isAnomaly", "results.severity")
    .orderBy('timestamp', ascending=True)
    .filter(col('timestamp') >= lit(startInferenceTime))
    .toPandas())

def parse(x):
    if type(x) is list:
        return dict([item[::-1] for item in x])
    else:
        return {'series_0': 0, 'series_1': 0, 'series_2': 0}

rdf['contributors'] = rdf['contributors'].apply(parse)
rdf = pd.concat([rdf.drop(['contributors'], axis=1), pd.json_normalize(rdf['contributors'])], axis=1)
rdf

StatementMeta(, , , Waiting, )

Unnamed: 0,timestamp,sensor_1,sensor_2,sensor_3,isAnomaly,severity,series_0,series_1,series_2
0,2021-01-02T09:19:00Z,-0.044257,-0.968458,-1.906952,False,0.000000,0.00000,0.000000,0.000000
1,2021-01-02T09:20:00Z,-1.070284,-0.579430,-0.285167,False,0.000000,0.00000,0.000000,0.000000
2,2021-01-02T09:21:00Z,-0.498136,-0.263619,-1.521442,False,0.000000,0.00000,0.000000,0.000000
3,2021-01-02T09:22:00Z,-0.378106,0.949444,0.578883,False,0.000000,0.00000,0.000000,0.000000
4,2021-01-02T09:23:00Z,0.724273,1.211421,2.159496,True,0.305973,0.15597,0.250687,0.593343
...,...,...,...,...,...,...,...,...,...
996,2021-01-03T01:55:00Z,0.572729,-1.318392,0.492164,False,0.000000,0.00000,0.000000,0.000000
997,2021-01-03T01:56:00Z,1.311772,0.408282,0.278034,False,0.000000,0.00000,0.000000,0.000000
998,2021-01-03T01:57:00Z,1.079186,0.528480,0.066745,False,0.000000,0.00000,0.000000,0.000000
999,2021-01-03T01:58:00Z,0.138812,0.657773,-0.456893,False,0.000000,0.00000,0.000000,0.000000


# 👀Visualisation

In [None]:
minSeverity = 0.1


####### Main Figure #######
plt.figure(figsize=(23,8))
plt.plot(rdf['timestamp'],rdf['sensor_1'], color='tab:orange', linestyle='solid', linewidth=2, label='sensor_1')
plt.plot(rdf['timestamp'],rdf['sensor_2'], color='tab:green', linestyle='solid', linewidth=2, label='sensor_2')
plt.plot(rdf['timestamp'],rdf['sensor_3'], color='tab:blue', linestyle='solid', linewidth=2, label='sensor_3')
plt.grid(axis='y')
plt.tick_params(axis='x',which='both',bottom=False,labelbottom=False)
plt.legend()

anoms = list(rdf["severity"] >= minSeverity)
_, _, ymin, ymax = plt.axis()
plt.vlines(np.where(anoms), ymin=ymin , ymax=ymax , color='r', alpha=0.8)

plt.legend()
plt.title('A plot of the values from the three sensors with the detected anomalies highlighted in red.')
plt.show()

####### Severity Figure #######
plt.figure(figsize=(23,1))
plt.tick_params(axis='x',which='both',bottom=False,labelbottom=False)
plt.plot(rdf['timestamp'],rdf['severity'], color='black', linestyle='solid', linewidth=2, label='Severity score')
plt.plot(rdf['timestamp'],[minSeverity]*len(rdf['severity']), color='red', linestyle='dotted', linewidth=1, label='minSeverity')
plt.grid(axis='y')
plt.legend()
plt.ylim([0,1])
plt.title("Severity of the detected anomalies")
plt.show()

####### Contributors Figure #######
plt.figure(figsize=(23,1))
plt.tick_params(axis='x',which='both',bottom=False,labelbottom=False)
plt.bar(rdf['timestamp'],rdf['series_0'], width=2, color='tab:orange', label='sensor_1')
plt.bar(rdf['timestamp'],rdf['series_1'], width=2, color='tab:green', label='sensor_2', bottom=rdf['series_0'])
plt.bar(rdf['timestamp'],rdf['series_2'], width=2, color='tab:blue', label='sensor_3', bottom=rdf['series_0']+rdf['series_1'])
plt.grid(axis='y')
plt.legend()
plt.ylim([0,1])
plt.title("The contribution of each sensor to the detected anomaly")
plt.show()

![image-alt-text](https://camo.githubusercontent.com/3917917017cbfcb6b4c062866d6824bbbf264ec46bbd096a2a624e4fc6f98433/68747470733a2f2f6d6d6c737061726b2e626c6f622e636f72652e77696e646f77732e6e65742f67726170686963732f6d7661645f706c6f742e706e67)

For further questions on this tutorial, feel free to contact:

Louise Han | Program Manager of Anomaly Detector | jingruhan@microsoft.com

Mark Hamilton | Software Engineer of Synapse | marhamil@microsoft.com