In [ ]:
%%configure -f
{
"conf": {
    "spark.sql.autoBroadcastJoinThreshold": -1,
    "spark.dynamicAllocation.disableIfMinMaxNotSpecified.enabled": true,
    "spark.dynamicAllocation.enabled": true,
    "spark.dynamicAllocation.minExecutors": 2,
    "spark.dynamicAllocation.maxExecutors": 8
   }
}

In [ ]:
import pyspark.sql.functions as F
from pyspark.ml.functions import vector_to_array
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import FloatType
import numpy as np
from io import BytesIO
import joblib
import pandas as pd
import h2o
import os

In [ ]:
batch_id = ''
prepped_data_path = ''
iFor_data_prefix = ''
overhead_data_path = ''
overhead_results_prefix = ''
subsample_list = ''
trees_list = ''
train_size = ''
id_feat = ''
id_feat_types = ''
seed = ''
overhead_size = ''
time_slice_folder = ''
extension_level = ''

In [ ]:
# Initiate logging
import logging
from opencensus.ext.azure.log_exporter import AzureLogHandler
from opencensus.ext.azure.trace_exporter import AzureExporter
from opencensus.trace import config_integration
from opencensus.trace.samplers import AlwaysOnSampler
from opencensus.trace.tracer import Tracer

instrumentation_connection_string = mssparkutils.credentials.getSecretWithLS("keyvault", "AppInsightsConnectionString")
config_integration.trace_integrations(['logging'])

logger = logging.getLogger(__name__)
logger.addHandler(AzureLogHandler(connection_string=instrumentation_connection_string))
logger.setLevel(logging.INFO)

tracer = Tracer(
    exporter=AzureExporter(
        connection_string=instrumentation_connection_string
    ),
    sampler=AlwaysOnSampler()
)

# Spool parameters
run_time_parameters = {'custom_dimensions': {
    'batch_id': batch_id,
    'prepped_data_path': prepped_data_path,
    'iFor_data_prefix': iFor_data_prefix,
    'overhead_data_path': overhead_data_path,
    'overhead_results_prefix': overhead_results_prefix,
    'subsample_list': subsample_list,
    'trees_list': trees_list,
    'train_size': train_size,
    'id_feat': id_feat,
    'id_feat_types': id_feat_types,
    'seed': seed,
    'overhead_size': overhead_size,
    'time_slice_folder': time_slice_folder,
    'notebook_name': mssparkutils.runtime.context['notebookname']
} }
  
logger.info(f"{mssparkutils.runtime.context['notebookname']}: INITIALISED", extra=run_time_parameters)

In [ ]:
if prepped_data_path != "":
    prepped_data_path = "/".join(prepped_data_path.split("/")[:-1]) + "/" + time_slice_folder + "/" + prepped_data_path.split("/")[-1]
    logger.info(f'prepped_data_path = {prepped_data_path}')
if iFor_data_prefix != "":
    iFor_data_prefix = "/".join(iFor_data_prefix.split("/")[:-1]) + "/" + time_slice_folder + "/" + iFor_data_prefix.split("/")[-1]
    logger.info(f'iFor_data_prefix = {iFor_data_prefix}')
if overhead_data_path != "":
    overhead_data_path = "/".join(overhead_data_path.split("/")[:-1]) + "/" + time_slice_folder + "/" + overhead_data_path.split("/")[-1]
    logger.info(f'overhead_data_path = {overhead_data_path}')
if overhead_results_prefix != "":
    overhead_results_prefix = "/".join(overhead_results_prefix.split("/")[:-1]) + "/" + time_slice_folder + "/" + overhead_results_prefix.split("/")[-1]
    logger.info(f'overhead_results_prefix = {overhead_results_prefix}')

In [ ]:
# Casting parameters

#subsample_list = eval(subsample_list)
subsample_list = [int(i) for i in subsample_list.split(",")]
#trees_list = eval(trees_list)
trees_list = [int(i) for i in trees_list.split(",")]
train_size = float(train_size)
#id_feat = eval(id_feat)
id_feat = [i for i in id_feat.split(",")]
#id_feat_types = eval(id_feat_types)
id_feat_types = [i for i in id_feat_types.split(",")]
seed = int(seed)
overhead_size = float(overhead_size)
extension_level = int(extension_level)

In [ ]:
df = spark.read.parquet(prepped_data_path)
m = df.count()
logger.info("Number of records: {:,}".format(m))

In [ ]:
max_subsample_size = max(subsample_list)
num_groups = int(np.ceil(m*train_size/max_subsample_size))
logger.info(f'Num groups: {str(num_groups)}')

In [ ]:
# Creation of overhead sample
df_W = df.sample(withReplacement=False, fraction=overhead_size)
df_W.write.mode('overwrite').parquet(overhead_data_path)

num_feats = len(df_W.head(1)[0]['scaled'])
df_W_unassembled = df_W.withColumn('f', vector_to_array("scaled")).select(id_feat + [F.col("f")[i] for i in range(num_feats)])
logger.info("Number of records of overhead dataset: {:,}".format(df_W.count()))

In [ ]:
def clf_predict(clf, ex_level, time_slice, group_num, tree_size, subsample_size):
    def _fun(iterator):
        if ex_level >= 0:
            model_filename = "/tmp/ijungle_{}_{}_{}_{}".format(time_slice, group_num, tree_size, subsample_size)
            with open(model_filename, 'wb') as model_file:
                model_file.write(clf)
            h2o.init()
            saved_model = h2o.load_model(model_filename)
            os.remove(model_filename)
        for pdf in iterator:
            pdf.set_index(["issuer_id_indexed","issued_date","tree_size","subsample_size","group_num"], inplace=True)
            if ex_level < 0:
                _predict = clf.score_samples(pdf)
            else:
                #Factor of -1 to align with sklearn formalism
                hf = h2o.H2OFrame(pdf)
                _predict = saved_model.predict(hf)
                _predict = -1.0*_predict['anomaly_score'].as_data_frame().to_numpy().reshape(-1)
            pdf.reset_index(drop=False, inplace=True)
            pdf_out = pdf[["issuer_id_indexed","issued_date","tree_size","subsample_size","group_num"]]
            pdf_out["predict"] = _predict
            yield(pdf_out)
    return(_fun)

In [ ]:
df_iFor = spark.read.parquet(iFor_data_prefix) #add select by partition
schema = "issuer_id_indexed integer, issued_date timestamp, tree_size integer, subsample_size integer, group_num integer, predict float"
df_predict_exists = False
for trees in trees_list:
    for subsample_size in subsample_list:
        for group_num in range(num_groups):
            #try:
            if extension_level < 0:
                model_bytes = df_iFor.where((F.col('id')==group_num) & (F.col('tree_size')==trees) & (F.col('subsample_size')==subsample_size)).select('model').collect()[0]['model']
                clf = joblib.load(BytesIO(model_bytes))
            else:
                model_bytes = df_iFor.where((F.col('id')==group_num) & (F.col('tree_size')==trees) & (F.col('subsample_size')==subsample_size)).select('model').collect()[0]['model']
                clf = model_bytes
            if not df_predict_exists:
                df_predict_exists = True
                df_predict = df_W_unassembled.withColumn("tree_size",F.lit(trees)).withColumn("subsample_size",F.lit(subsample_size)).withColumn("group_num",F.lit(group_num)).mapInPandas(clf_predict(clf, extension_level, time_slice_folder, group_num, trees, subsample_size), schema = schema)
            else:
                df_predict = df_predict.union(df_W_unassembled.withColumn("tree_size",F.lit(trees)).withColumn("subsample_size",F.lit(subsample_size)).withColumn("group_num",F.lit(group_num)).mapInPandas(clf_predict(clf, extension_level, time_slice_folder, group_num, trees, subsample_size), schema = schema))

In [ ]:
df_predict.write.mode('overwrite').parquet(overhead_results_prefix)