## Locating the PC Boundaries for Anomalies

* Create a simplified Isolation Forest algorithm that works within PySpark

In [1]:
### Import libraries
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib.colors

import ipyleaflet as ipy

from pyspark import SparkContext
import pyspark.sql.functions as F
import pyspark.ml.functions as M
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql import Window as W
from pyspark.sql.types import *
from pyspark.ml.feature import PCA, VectorAssembler, MinMaxScaler
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation

import time
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

Matplotlib created a temporary cache directory at /tmp/matplotlib-23qlritx because the default path (/home/jovyan/.cache/matplotlib) is not a writable directory; it is highly recommended to set the MPLCONFIGDIR environment variable to a writable directory, in particular to speed up the import of Matplotlib and to better support multiprocessing.


In [2]:
### Notes from prof's example notebook
# #Make sure you allocate enough memory per core. if you chose 3 cores you should select 6GB in your per Node setting. (presumably 2GB per core)

### For server
# 32 nodes
# 64g

## Start Spark context
total_nodes = 8
memory_per_node = 64

driver_memory = f"{memory_per_node}g"
executor_memory = f"{int(memory_per_node/total_nodes)}g"
n_executors = total_nodes - 1
print(f"Driver memory: {driver_memory}\nExecutor memory: {executor_memory}\nNumber of executors: {n_executors}\n")
try:
    print("Initializing SparkContext")
    sc = SparkSession.builder.config("spark.driver.memory", driver_memory) \
                             .config("spark.executor.memory", executor_memory) \
                             .config('spark.local.dir', "test_dir/") \
                             .config("spark.driver.maxResultSize", "16g") \
                             .config("spark.executor.instances", n_executors) \
                             .getOrCreate()
    
                             # .config("spark.jars.packages", "com.microsoft.azure:synapseml_2.12:0.9.4") \
                             # .config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven") \
except:
    print("Starting new SparkContext")
    sc.stop()
    sc = SparkSession.builder.config("spark.driver.memory", driver_memory) \
                             .config("spark.executor.memory", executor_memory) \
                             .config('spark.local.dir', "test_dir/") \
                             .config("spark.driver.maxResultSize", "16g") \
                             .config("spark.executor.instances", n_executors) \
                             .appName("MyApp") \
                             .config("spark.jars.packages", "com.microsoft.azure:synapseml_2.12:0.9.4") \
                             .config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven") \
                             .getOrCreate()
print(sc)

# Start SQL Context
sqlContext = SQLContext(sc)

# Add sc parameters
sc.getActiveSession()
# sc.builder.appName("Read CSV").getOrCreate()
# sc.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

Driver memory: 64g
Executor memory: 8g
Number of executors: 7

Initializing SparkContext
<pyspark.sql.session.SparkSession object at 0x155551ca2910>




In [3]:
%%time
### Load datasets

### Open the preprocessed dataset
df = sqlContext.read.load("preprocessed_df_subset/preprocessed_df_subset.parquet") \
               .select("id", "ss_id", "stamp_date", "power_kW_values", "reconstructions", "recon_PC1", "recon_PC2")
# df_count = df.count()

# metadata
meta_filename = "metadata_preprocessed.csv"
df_meta = sc.read.csv(meta_filename, inferSchema=True, header=True)

### Since metadata table is so small, convert to Pandas
df_meta = df_meta.toPandas()

df.show(1)


+---+-----+----------+--------------------+--------------------+--------------------+--------------------+
| id|ss_id|stamp_date|     power_kW_values|     reconstructions|           recon_PC1|           recon_PC2|
+---+-----+----------+--------------------+--------------------+--------------------+--------------------+
|  0| 2405|2012-01-21|[0.0, 0.0, 0.0, 0...|[0.00410907621307...|-0.41782595826965074|-0.02458836598737569|
+---+-----+----------+--------------------+--------------------+--------------------+--------------------+
only showing top 1 row

CPU times: user 82.1 ms, sys: 5.73 ms, total: 87.8 ms
Wall time: 3.99 s


## Initial filtering of close outliers

Set initial close-outlier cutoffs at PC1 > 2 and PC2 > 2

In [4]:
%%time
### Add a condition to filter out these outliers
cond = (F.col("recon_PC1")>1) | (F.col("recon_PC2")>1)
df2 = df.where(~cond)

CPU times: user 367 µs, sys: 922 µs, total: 1.29 ms
Wall time: 11.3 ms


# Isolation Forest

In [5]:
### Get n splits on m columns randomly chosen

def get_min_max_values(data, inputCols):
    # Get the min and max value of columns in inputCols, output dictionary of dict[col] = (min, max)
    return {i:data.select(F.min(col), F.max(col)).collect()[0] for i,col in enumerate(inputCols)}

def get_line_params(data, inputCols, num_splits, offset_size, min_max_dict=False):
    # Get a list of line parameters to randomly split values

    def randomLine_parameters(data, min_max_dict, offset_size):
        # Pick two random points within the boundaries of inputCols, then determine the parameters for the resulting line
        # y-y1 = (y2-y1)/(x2-x1) * (x-x1)
        # --> (y1-y2)*x + (x2-x1)*y + (x1-x2)*y1 - (y2-y1)*x1 = 0
    
        # Get offsets --> This ensures the selected points are closer to the mean (and away from boundaries)
        # Having higher offsets is better when the data is more concentrated
        col1_offset = (min_max_dict[0][1] - min_max_dict[0][0]) * offset_size
        col2_offset = (min_max_dict[1][1] - min_max_dict[1][0]) * offset_size
        
        x1 = np.random.uniform(min_max_dict[0][0] + col1_offset, min_max_dict[0][1] - col1_offset)
        x2 = np.random.uniform(min_max_dict[0][0] + col1_offset, min_max_dict[0][1] - col1_offset)
        y1 = np.random.uniform(min_max_dict[1][0] + col2_offset, min_max_dict[1][1] - col2_offset)
        y2 = np.random.uniform(min_max_dict[1][0] + col2_offset, min_max_dict[1][1] - col2_offset)
    
        # Get line parameters in standardized form
        A = y2 - y1
        B = x1 - x2
        C = y1*(x2-x1) + x1*(y1-y2)
        line_params = (A, B, C)
        
        return line_params
    ############################################################################
    # Variables
    output = []
    if min_max_dict is False:
        min_max_dict = get_min_max_values(data, inputCols)
    
    ### Get a list of (A,B,C) tuples
    for i in np.arange(0, num_splits):
        output.append(randomLine_parameters(data, min_max_dict, offset_size))

    return output

def build_isolation_forest_simple(data, inputCols, outputCol, num_trees, num_splits, min_max_dict=False, offset_size = 0.001):
    ### Build a custom isolation forest (simplified without sampling or recursive structure)
    # Inputs:
    #    Data --> Input dataframe
    #    inputCols --> The columns to split on (recon_PC1 and recon_PC2 in this code)
    #    num_splits --> The number of splits to run on each tree
    #    random_offset --> An offset from the edges of the column split ranges to ensure the split stays within the range
    #
    # Determine a list of tuple-pairings of randomly selected columns and respective values to split on
    # For each split:
    #     Determine whether each point falls to the left (group 0) or right (group 1) of the split
    #     Count the number of points in group 0 and group 1
    #     Record the resulting fraction for each point and add the fraction to the running sum (tree_scoreSum)
    # After all splits are complete, take the average splitScore as scoreSum
    # After all trees are complete, take the average scoreSum and output as score
    #
    # Points that are outliers should have smaller scores than normal points

    # Record the columns and get row count
    print("Model Start")
    print()
    model_start = time.time()
    df_columns = data.schema.names

    # Initialize the score column and necessary columns to handle the sampling, id column
    output = data.withColumn(outputCol, F.lit(0))  \
                 .withColumn("scoreSum", F.lit(0))

    # Get the number of rows in the data
    rowCount = output.count()

    ### Start the trees
    for j in np.arange(0, num_trees):
        print(f"Tree {j+1}/{num_trees} start:")
        tree_start = time.time()
        # Find the columns and respective values to split on
        line_param_list = get_line_params(output, inputCols, num_splits, offset_size, min_max_dict)
        # Instantiate the tree_scoreSum column
        output = output.withColumn("tree_scoreSum", F.lit(0))
        
        ### Start the splits
        split_start = time.time()
        for i,(line_params) in enumerate(line_param_list):
            if (((i-1)%10 == 0) & (i!=0)):
                split_start = time.time()
            A,B,C = line_params
            # Mark each selected row as being on one side of the split
            output = output.withColumn("which_side", F.when(F.lit(A)*F.col(inputCols[0]) + F.lit(B)*F.col(inputCols[1]) + C >=0, 1).otherwise(0))
            # Get the fraction of rows on either side of the split
            group1 = output.select(F.sum("which_side")).collect()[0][0] / rowCount
            group0 = 1 - group1
            # Add 1 to the score of the larger group
            if group1 > group0:
                group1 += 1
            else:
                group0 += 1
    
            # For each selected row, assign group0 or group1 to splitScore
            output = output.withColumn("splitScore", F.when(F.col("which_side")==1, F.lit(group1)).otherwise(F.lit(group0))) \
    
            # Add the splitScore to the running total tree_scoreSum
            output = output.withColumn("tree_scoreSum", F.col("tree_scoreSum") + F.col("splitScore"))

            # if ((i+1)%10) == 0:
                # print(f"  Split {i+1}/{num_splits}: Time elapsed --> {time.time() - split_start}")
        ### Add the score to the overall running score total (scoreSum)
        # 2*num_splits because of the extra weight added to the bigger group
        output = output.withColumn("scoreSum", F.col("scoreSum") + (F.col("tree_scoreSum") / F.lit(2*num_splits)))
        
        print(f"Tree {j+1}:  Time elapsed --> {time.time() - tree_start}")
        print()

    # Add the average score
    output = output.withColumn(outputCol, F.round(F.col("scoreSum") / F.lit(num_trees), 5))

    ### Calculate the overall score and output it
    output = output.select(df_columns + [outputCol]) \
                   .orderBy(outputCol, ascending=True)
    
    print(f"Model complete: Time elapsed --> {time.time() - model_start}")
    return output


In [6]:
### Optionally Scale the PC variables
assembler1 = VectorAssembler(inputCols=["recon_PC1"], outputCol="recon_PC1_vec")
assembler2 = VectorAssembler(inputCols=["recon_PC2"], outputCol="recon_PC2_vec")
df3 = assembler1.transform(assembler2.transform(df2))

mmScaler_pc1 = MinMaxScaler(inputCol="recon_PC1_vec", outputCol="recon_PC1_scaled")
model1 = mmScaler_pc1.fit(df3)

mmScaler_pc2 = MinMaxScaler(inputCol="recon_PC2_vec", outputCol="recon_PC2_scaled")
model2 = mmScaler_pc2.fit(df3)

df3 = model2.transform(model1.transform(df3))

In [7]:
### Select only the necessary variables
df3 = df3.select("id", "ss_id", "stamp_date", "recon_PC1", "recon_PC2",
                   M.vector_to_array(F.col("recon_PC1_scaled"))[0].alias("recon_PC1_scaled"),
                   M.vector_to_array(F.col("recon_PC2_scaled"))[0].alias("recon_PC2_scaled"))

df3.show(1)

+---+-----+----------+--------------------+--------------------+------------------+------------------+
| id|ss_id|stamp_date|           recon_PC1|           recon_PC2|  recon_PC1_scaled|  recon_PC2_scaled|
+---+-----+----------+--------------------+--------------------+------------------+------------------+
|  0| 2405|2012-01-21|-0.41782595826965074|-0.02458836598737569|0.8253311997502791|0.4839477701942617|
+---+-----+----------+--------------------+--------------------+------------------+------------------+
only showing top 1 row



In [8]:
%%time
### Run the model on a number of splits
### Setting parameters
#################################################################################
inputCols = ["recon_PC1_scaled", "recon_PC2_scaled"]
outputCol = "score"
#################################################################################

# Get input columns and corresponding min/max values
min_max_dict = get_min_max_values(df3, inputCols)

for item in min_max_dict.items():
    print(item)

(0, Row(min(recon_PC1_scaled)=0.0, max(recon_PC1_scaled)=1.0))
(1, Row(min(recon_PC2_scaled)=0.0, max(recon_PC2_scaled)=1.0))
CPU times: user 7.02 ms, sys: 2.19 ms, total: 9.21 ms
Wall time: 36.5 s


In [18]:
%%time
# Run a series of tests at differing numbers of trees and splits

n_runs = 3
run_vec = ((1,10), (3,10), (5,10), (5,30), (3,30), (1,30))

for run_num in np.arange(0,len(run_vec)):
    num_trees = run_vec[run_num][0]
    num_splits = run_vec[run_num][1]
    
    print(run_num)
    
    # Initialize df_iforest
    df_iforest = df3.cache()
    df_iforest.count()
    select_columns = df_iforest.schema.names
    output_columns = []
    # df_iforest = df2.select(select_columns).cache()
    
    for i in np.arange(0,3):
        run_start = time.time()
        print(f"\n\nRun {i+1} --> Start")
        outputCol = f"score_{i}_{num_trees}t_{num_splits}s"
        df_iforest = build_isolation_forest_simple(data=df_iforest, 
                                                   inputCols=inputCols, 
                                                   outputCol=outputCol,
                                                   num_trees=num_trees,
                                                   num_splits=num_splits,
                                                   min_max_dict=min_max_dict)
        select_columns = select_columns + [outputCol]
        output_columns.append(outputCol)
        print(f"Done --> Total runtime elapsed: {time.time() - run_start}")
    
    run_config = f"{num_trees}t_{num_splits}s"
    score_cols = [f"\'score_{j}_{run_config}\'" for j in np.arange(0,n_runs)]
    score_columns = [f"F.col(\'score_{j}_{run_config}\')" for j in np.arange(0,n_runs)]
    mean_func = eval(" + ".join(score_columns)) / len(score_columns)
    std_string = f"F.col(\'{run_config}_mean_score\'))**2"
    std_func =  eval(f"((F.col({score_cols[0]})-{std_string} + (F.col({score_cols[1]})-{std_string} + (F.col({score_cols[2]})-{std_string})")  ### Change this as needed

    print(f"{run_config}_std_score", std_func)
    
    df_iforest = df_iforest.withColumn(f"{run_config}_mean_score", mean_func).withColumn(f"{run_config}_std_score", std_func)
    
    print(f"\nAll Runs complete --> Saving model to outliers_df_iforest/df_iforest_{num_trees}trees_{num_splits}splits")
    df_iforest.repartition(20).write.mode("overwrite").parquet(f"datasets/df_iforest_{num_trees}trees_{num_splits}splits")

print("Complete\n")

0
1t_10s_std_score Column<'((POWER((score_0_1t_10s - 1t_10s_mean_score), 2) + POWER((score_1_1t_10s - 1t_10s_mean_score), 2)) + POWER((score_2_1t_10s - 1t_10s_mean_score), 2))'>
1
3t_10s_std_score Column<'((POWER((score_0_3t_10s - 3t_10s_mean_score), 2) + POWER((score_1_3t_10s - 3t_10s_mean_score), 2)) + POWER((score_2_3t_10s - 3t_10s_mean_score), 2))'>
2
5t_10s_std_score Column<'((POWER((score_0_5t_10s - 5t_10s_mean_score), 2) + POWER((score_1_5t_10s - 5t_10s_mean_score), 2)) + POWER((score_2_5t_10s - 5t_10s_mean_score), 2))'>
3
5t_30s_std_score Column<'((POWER((score_0_5t_30s - 5t_30s_mean_score), 2) + POWER((score_1_5t_30s - 5t_30s_mean_score), 2)) + POWER((score_2_5t_30s - 5t_30s_mean_score), 2))'>
4
3t_30s_std_score Column<'((POWER((score_0_3t_30s - 3t_30s_mean_score), 2) + POWER((score_1_3t_30s - 3t_30s_mean_score), 2)) + POWER((score_2_3t_30s - 3t_30s_mean_score), 2))'>
5
1t_30s_std_score Column<'((POWER((score_0_1t_30s - 1t_30s_mean_score), 2) + POWER((score_1_1t_30s - 1t_30s_m

In [19]:
df_iforest.unpersist().count()