Copyright (c) Microsoft Corporation. 
Licensed under the MIT license. 

## Anomaly Detection
Load data and and apply Spark anomaly detection model:

1. Define variables
2. Load data
3. Get access key to cognitive service
4. Apply anomaly detection model
5. Store results


In [None]:
import mmlspark

if mmlspark.__spark_package_version__ < "1.0.0-rc3":
    raise Exception("This notebook is not compatible with the current version of mmlspark: {}. Please upgrade to 1.0.0-rc3 or higher.".format(
        mmlspark.__spark_package_version__))

In [None]:
from mmlspark.cognitive import *
from notebookutils import mssparkutils
from pyspark.sql.functions import col, lit
from pyspark.sql.types import DoubleType

## Define variables


In [None]:
# Azure storage access info
data_lake_account_name = ""
file_system_name = ""


# Allow SPARK to read from Blob remotely
adls_path = "abfss://%s@%s.dfs.core.windows.net/CommodityAggrData" % (file_system_name, data_lake_account_name)
scored_data_path = "abfss://%s@%s.dfs.core.windows.net/Result/Anomalies" % (file_system_name, data_lake_account_name)

# Cognitive Services credentials

# Azure Key Vault Linked Service name 
linked_service = "" 
# Azure Key Vault name 
akv_name = "" 
# Azure Key Vault Secret name
secret_name = ""  
# Azure Cognitive Service Region 
anomaly_location = "" 

## Load data


In [None]:
# Load the data into a Spark DataFrame
df = spark.read.parquet(adls_path)

df = df.withColumn('date',col('date').cast('string'))
df = df.withColumn('group',lit('serie1'))
display(df)

## Retrieve Access Keys


In [None]:
# Fetch the subscription key (or a general Cognitive Service key) from Azure Key Vault
service_key = mssparkutils.credentials.getSecret(
    linkedService=linked_service,
    akvName=akv_name, 
    secret=secret_name)

## Instantiate and apply model


In [None]:
# Instantiate anamoly detector
anomalyDetector = (SimpleDetectAnomalies()
    .setLocation(anomaly_location)
    .setSubscriptionKey(service_key)
    .setOutputCol("output")
    .setErrorCol("error")
    .setGranularity("monthly")
    .setTimestampCol("date")
    .setValueCol("average_value")
    .setGroupbyCol("group")
    )

# Apply anamoly detector
df_results = anomalyDetector.transform(df)
display(df_results)

## Store results


In [None]:
#Store results 
df_results = df_results.select('date','average_value','output.*')
df_results = df_results.withColumn('upperValue', col('expectedValue')+col('upperMargin')).withColumn('lowerValue', col('expectedValue')+col('lowerMargin'))
df_results.write.mode('overwrite').save(scored_data_path,format='parquet')
df_results.write.mode("overwrite").saveAsTable('Anomalies') 