<h1>Detección de anomalías: Isolation Forest<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Librerias-y-configuración" data-toc-modified-id="Librerias-y-configuración-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Librerias y configuración</a></span></li><li><span><a href="#Funcionalidad" data-toc-modified-id="Funcionalidad-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Funcionalidad</a></span></li><li><span><a href="#Isolation-Forest" data-toc-modified-id="Isolation-Forest-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Isolation Forest</a></span><ul class="toc-item"><li><span><a href="#Predict" data-toc-modified-id="Predict-3.1"><span class="toc-item-num">3.1&nbsp;&nbsp;</span>Predict</a></span></li></ul></li></ul></div>

# Librerias y configuración

In [26]:
from pyspark.sql import SparkSession
from onesaitplatform.iotbroker import DigitalClient
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings('ignore')

spark = SparkSession.builder.master("local[*]").appName("") \
    .getOrCreate()

cliente = DigitalClient(host="dev-datalake.onesaitplatform.com", port=443, 
                        iot_client="MeteringIntelligence", iot_client_token="cb629e550b31465885bc2e8d00cc0431")
cliente.protocol = "https"
cliente.avoid_ssl_certificate = False
ok_join, res_join = cliente.join()

# Funcionalidad

In [27]:
from pyspark.sql.types import StringType, ArrayType
from pyspark.sql.functions import explode, col
import json, logging, sys

def readMongo(spark, cliente, ontology, query, query_type= "SQL"):
    ok_query, res_query = cliente.query_batch(query=query, ontology=ontology, query_type=query_type, batch_size=5000)
    if ok_query:
        ds2 = [json.dumps(item) for item in res_query]
        df = spark.read.json(spark.sparkContext.parallelize(ds2))         
    return df

def readMetering(spark, cliente, ontology, signal, start_date, end_date, 
                 key_vars=['timestamp','assetId', 'location.coordinates',
                           'hourly_measurements[*].measure'], 
                 additional_vars=[], assetids=None):
    # Se definen los parametros de la query
    ## VARIABLES QUE VA A DEVOLVER - No se esta considerando el 'hourly_weather'
    query_vars = ','.join(key_vars+additional_vars)
    ## CONDICION DE LA QUERY
    query_cond = "hourly_measurements[*].name = '{s}' AND timestamp >= '{s_dt}' AND timestamp <= '{e_dt}'".\
                    format(s=signal, s_dt=start_date, e_dt=end_date)
    if bool(assetids):
        query_cond = query_cond + " and assetId in ({})".format(','.join(assetids))
    ## SE MONTA LA QUERY COMPLETA
    query = "SELECT {v} from {ont} where {cond}".format(v=query_vars, ont=ontology, cond=query_cond)
    # Hacemos la consulta en mongo
    df = readMongo(spark, cliente, ontology, query)
    
    ### Transformaciones para darle la estructura que nos interesa
    # Primero para las coordenadas y los ancestors
    df = (df.withColumn('latitude', col('coordinates').getItem(0))
            .withColumn('longitude', col('coordinates').getItem(1))
            .drop('coordinates')
            )
    # Obtenemos las columnas que son array y que son string
    str_cols = [f.name for f in df.schema.fields if not isinstance(f.dataType, ArrayType)]
    join_cols = str_cols.copy() 
    array_dict = {f.name:[f.dataType.elementType.fields[i].name 
                          for i in range(len(f.dataType.elementType.fields))] 
                  for f in df.schema.fields if isinstance(f.dataType, ArrayType)}
    # Creamos el dataframe de resultados principales
    result_df = df.select(*str_cols)
    # Iteramos sobre las variables que son array para obtener todos sus campos
    for column, variables in array_dict.items():
        aux_df = (df.select(str_cols + [column])
                    .withColumn('aux', explode(col(column)))
                    .select(str_cols + [col('aux').getItem(var).alias(var) for var in variables]))
    
        result_df = result_df.join(aux_df, list(set(join_cols)), how = 'left')
        join_cols += ['period']
        
    result_df.withColumnRenamed('value', signal)
    return result_df.toPandas()

df = readMetering(spark, cliente, "MeteringUSADEV2", "Electricity_Facility_kWh", "", "2013-09-30")

# Isolation Forest

In [28]:
from sklearn.svm import OneClassSVM
from sklearn.ensemble import IsolationForest
from datetime import timedelta, datetime
import plotly
import plotly.graph_objects as go
import plotly.express as px
plotly.offline.init_notebook_mode(connected=True)

point = '701330'
df_point = df[df.assetId == point].sort_values(by=['timestamp','period']).reset_index(drop=True)

def isolation_forest_model(data_input, value, n_estimators, pct_anomalies, timestamp_index, title):
    model = IsolationForest(n_estimators=n_estimators, contamination=pct_anomalies)
    model = model.fit(data_input)
    pred = model.predict(data_input)
    score = model.decision_function(data_input)
    summary_df = pd.DataFrame({'value': value, 'anomaly_score': score, 'anomaly_pred': pred}, 
                              index = timestamp_index)
    summary_df['anomaly_value'] = summary_df[['value','anomaly_pred']].apply(lambda x: x[0] if x[1] == -1 else None, axis=1)
    summary_df = summary_df.reset_index()
    summary_df = summary_df.rename(columns={'index':'timestamp'})
    fig = go.Figure()
    fig.add_traces(go.Scatter(x=summary_df.index, y=summary_df['value'], mode='lines', name='normal data'))
    fig.add_traces(go.Scatter(x=summary_df.index, y=summary_df['anomaly_value'], mode='markers', name='anomaly data'))
    fig.update_layout(title='Anomaly Detection Isolation Forest ' + title +
                      ' (' + str(len(summary_df[summary_df.anomaly_pred==-1])) + ')', 
                      xaxis_title='Date', yaxis_title='Electricity_Facility_kWh')
    fig.show()
    return summary_df, model

In [29]:
timestamp_index = [datetime(2013,1,1) + timedelta(hours=h) for h in range(len(df_point))]
xx = df_point['value'].values
summary_historical_consumption, model_historical = isolation_forest_model(xx.reshape(-1,1), xx, 300, 0.05, 
                                                                          timestamp_index, 'Historical Consumption')

In [30]:
data = df_point[['timestamp','period','value']]
data['timestamp'] = data['timestamp'].apply(lambda x: datetime.strptime(x, "%Y-%m-%d"))
data['month'] = data['timestamp'].apply(lambda x: x.month)
data['dayofweek'] = data['timestamp'].apply(lambda x: x.dayofweek)

def create_temporary_dataset(data, n_periods_lags, n_similar_lags):
    cols, names = list(), list()
    for i in range(n_periods_lags, 0, -1):
        cols.append(abs(data[['value']] - data[['value']].shift(i)))
        names += ['value_period(t-{0})'.format(i)]
    for i in range(n_similar_lags, 0, -1):
        cols.append(abs(data[['value']] - data[['value']].shift(7*24*i)))
        names += ['value_similarDays(t-{0})'.format(i)]
    group_month = data.groupby('month')['value'].mean()
    group_month = group_month.reset_index()
    group_month = group_month.rename(columns={'value': 'value_month'})
    group_month = group_month.merge(data, on=['month'])
    cols.append(group_month['value_month'])
    names += ['value_month']
    for col in ['timestamp','period','value']:
        cols.append(data[col])
        names += [col]
    agg = pd.concat(cols, axis=1)
    agg.columns = names
    agg.dropna(inplace=True)
    agg.reset_index(drop=True, inplace=True)
    return agg


data_lags = create_temporary_dataset(data, 3, 5)

timestamp_index = [datetime(2013,1,15) + timedelta(hours=h) for h in range(len(data_lags))]
summary_similar_lags, model_lags = isolation_forest_model(data_lags.iloc[:,:-3], data_lags['value'].values, 300, 0.01, 
                                                          timestamp_index, 'Lags Similar Days')

## Predict

In [72]:
import pickle
with open('mymodel','wb') as f:
    pickle.dump(model_lags,f)

In [73]:
with open('mymodel', 'rb') as file:
    pickle_model = pickle.load(file)

In [74]:
df_predict = readMetering(spark, cliente, ontology = "MeteringUSADEV2", signal = "Electricity_Facility_kWh",
                          start_date = "2013-08-15", end_date = "2013-10-01")
point = '701330'
df_predict_point = df_predict[df_predict.assetId == point].sort_values(by=['timestamp','period']).reset_index(drop=True)

In [75]:
df_predict_point = df_predict_point[['timestamp','period','value']]
df_predict_point['timestamp'] = df_predict_point['timestamp'].apply(lambda x: datetime.strptime(x, "%Y-%m-%d"))
df_predict_point['month'] = df_predict_point['timestamp'].apply(lambda x: x.month)
df_predict_point['dayofweek'] = df_predict_point['timestamp'].apply(lambda x: x.dayofweek)

df_predict_lags = create_temporary_dataset(df_predict_point, 3, 5)
df_predict_lags = df_predict_lags[df_predict_lags.timestamp == '2013-10-01']

pred_lags_october1 = pickle_model.predict(df_predict_lags.iloc[:,:-3])
score_lags_october1 = pickle_model.decision_function(df_predict_lags.iloc[:,:-3])

summary_pred = pd.DataFrame({'value': df_predict_lags['value'].values, 'anomaly_score': score_lags_october1, 
                            'anomaly_pred': pred_lags_october1}, 
                            index = [datetime(2013,10,1) + timedelta(hours=h) for h in range(24)])
summary_pred['anomaly_pred'] = summary_pred[['value', 'anomaly_pred']].apply(lambda x: x[0] if x[1] == -1 else None, axis=1)

fig = go.Figure()
fig.add_traces(go.Scatter(x=summary_pred.index, y=summary_pred['value'], mode='lines', name='normal data'))
fig.add_traces(go.Scatter(x=summary_pred.index, y=summary_pred['anomaly_pred'], mode='markers', name='anomaly data'))
fig.update_layout(title='Anomaly Detection Isolation Forest', xaxis_title='Date', yaxis_title='Electricity_Facility_kWh')
fig.show()

In [83]:
pos_anomalies = list(summary_pred[summary_pred['anomaly_pred'].notnull()].index)
pos_anomalies = [x.hour for x in pos_anomalies]
colors = ['lightgreen'] * len(summary_pred)
for x in pos_anomalies:
    colors[x] = 'indianred'

fig = go.Figure()
fig.add_trace(go.Bar(x=summary_pred.index, y=summary_pred['value'], marker_color=colors, showlegend=False))
fig.add_trace(go.Scatter(x=summary_pred.index, y=summary_pred['value'], 
                         mode='lines', marker=dict(color='green'), showlegend=False))
fig.add_trace(go.Scatter(x=summary_pred.index, y=summary_pred['anomaly_pred'], 
                         mode='markers', marker=dict(color='red'), showlegend=False))
fig.update_layout(title='Anomaly Detection Isolation Forest', yaxis_title='Consumption', xaxis_title='Date')
fig.show()