# call the libraries

In [47]:
from pyspark.sql import SparkSession
import numpy as np
import io
import numpy as np
import os


import scipy.stats as stats
import numpy as np
from sklearn.ensemble import IsolationForest
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, MinMaxScaler,OneHotEncoder
import pickle

# initialize spark and spark context

In [48]:
spark = SparkSession\
.builder\
.appName("hospital-profile-sklearn-isolationforest-batch-train")\
.master("local[*]")\
.config("spark.sql.warehouse.dir", "/user/hive/warehouse")\
.enableHiveSupport()\
.getOrCreate()
sc = spark.sparkContext
sc.version

'2.1.1'

In [3]:
tenant_name = "claim"
time_window="day"
entity_type="hospital"
anomaly_type="profile"
model_type="sklearn"
model_name="isolationforest"

BASE_PATH = "/Users/tuhinsharma/Documents/sstech/"+tenant_name
ANOMALY_DATA_REPOSITORY = BASE_PATH + "/models_data/data"

USER_PROFILE_DATA_PATH = ANOMALY_DATA_REPOSITORY + "/{entity_type}/{anomaly_type}/{time_window}.json"
data_path = USER_PROFILE_DATA_PATH.format(entity_type=entity_type,anomaly_type="profile",time_window=time_window)
    
ANOMALY_MODEL_REPOSITORY = BASE_PATH + "/models_data/model"
PROFILE_ANOMALY_MODEL_PATH = ANOMALY_MODEL_REPOSITORY + "/{entity_type}/{anomaly_type}/{time_window}/{model_type}/{model_name}"



# Query for Hospital Profile data for time interval "2019-06-21T12:41:20.000Z" to "2019-06-21T12:40:00.000Z" for model training

In [49]:
query = {
  "queryType": "groupBy",
  "dataSource": "hospital_profile_two_minute",
  "dimensions": [
    "datetime"
  ],
  "aggregations": [],
  "granularity": "all",
  "postAggregations": [],
  "intervals": "1901-01-01T00:00:00+00:00/2101-01-01T00:00:00+00:00",
  "limitSpec": {
    "type": "default",
    "limit": 2,
    "columns": [
      {
        "dimension": "datetime",
        "direction": "descending"
      }
    ]
  }
}

import requests
r = requests.post('http://0.0.0.0:28082/druid/v2/', json=query)
r.status_code
datetime = r.json()[1]["event"]["datetime"]


query = {
   "queryType": "select",
   "dataSource": "hospital_profile_two_minute",
   "descending": "false",
   "metrics":['hospital_id','maximum_claim_amount', 'maximum_patient_age', 'minimum_claim_amount',
       'minimum_patient_age', 'total_claim_amount', 'total_claim_count'],
   "granularity": "all",
   "intervals": "1901-01-01T00:00:00+00:00/2101-01-01T00:00:00+00:00",
    "filter": {
    "type": "selector",
    "dimension": "datetime",
    "value": datetime
  },
   "pagingSpec":{"pagingIdentifiers": {}, "threshold":1000}
 }

import requests
r = requests.post('http://0.0.0.0:28082/druid/v2/', json=query)
r.status_code
query_result = r.json()[0]["result"]
x = query_result["events"]
query_result = [i["event"] for i in x]
query_result

hospital_profile_df = pd.DataFrame.from_records(query_result)[['hospital_id','maximum_claim_amount', 'maximum_patient_age', 'minimum_claim_amount',
       'minimum_patient_age', 'total_claim_amount', 'total_claim_count']].copy()
hospital_profile_df.head(10)

from pyspark.sql.types import *

schema = StructType([ StructField("hospital_id", StringType(), True),
                     StructField("maximum_claim_amount", FloatType(), True)
                       ,StructField("maximum_patient_age", LongType(), True)\
                       ,StructField("minimum_claim_amount", FloatType(), True)\
                       ,StructField("minimum_patient_age", LongType(), True)\
                       ,StructField("total_claim_amount", FloatType(), True)\
                       ,StructField("total_claim_count", LongType(), True)\
                       ])
hospital_profile_sdf = spark.createDataFrame(hospital_profile_df,schema=schema)
hospital_profile_sdf.show(3)

+-----------+--------------------+-------------------+--------------------+-------------------+------------------+-----------------+
|hospital_id|maximum_claim_amount|maximum_patient_age|minimum_claim_amount|minimum_patient_age|total_claim_amount|total_claim_count|
+-----------+--------------------+-------------------+--------------------+-------------------+------------------+-----------------+
|      HOSP1|             31990.0|                 69|              1660.0|                 21|          154150.0|               11|
|     HOSP10|             43390.0|                 61|              2730.0|                 17|          139230.0|                7|
|    HOSP100|             39280.0|                 64|              6790.0|                 19|          166680.0|               10|
+-----------+--------------------+-------------------+--------------------+-------------------+------------------+-----------------+
only showing top 3 rows



In [52]:
import json
json.dumps(query_result[0])

'{"timestamp": "2019-07-05T15:28:00.000Z", "datetime": "2019-07-05T15:28:00.000Z", "timestamp_start": "1562340360000", "timestamp_end": "1562340480000", "hospital_id": "HOSP1", "total_claim_amount": 154150.0, "total_claim_count": 11, "minimum_claim_amount": 1660.0, "maximum_claim_amount": 31990.0, "minimum_patient_age": 21, "maximum_patient_age": 69}'

In [38]:
hospital_profile_df.columns

Index(['hospital_id', 'maximum_claim_amount', 'maximum_patient_age',
       'minimum_claim_amount', 'minimum_patient_age', 'total_claim_amount',
       'total_claim_count'],
      dtype='object')

In [44]:
hospital_profile_df= hospital_profile_sdf.toPandas()

## Define the columns on which Model shall be trained

In [86]:
cat_colnames = []
num_colnames = ['maximum_claim_amount', 'maximum_patient_age', 'minimum_claim_amount',
       'minimum_patient_age', 'total_claim_amount', 'total_claim_count']
num_data = hospital_profile_df[num_colnames].values.astype(np.float64)
if len(cat_colnames)>0:
    cat_data = hospital_profile_df[cat_colnames].values

# Batch Training

### IsolationForest Pipelinemodel

In [87]:
standard_scaler = None
if len(num_colnames)>0:
    standard_scaler = StandardScaler()
    num_data_normalized = standard_scaler.fit_transform(num_data)

In [88]:
one_hot_encoder = None
if len(cat_colnames)>0:
    one_hot_encoder = OneHotEncoder(categories='auto')
    cat_data_encoded = one_hot_encoder.fit_transform(cat_data).toarray()

In [89]:
if len(num_colnames)>0 and len(cat_colnames)>0:
    data = np.concatenate((num_data_normalized,cat_data_encoded),axis=1)
elif len(cat_colnames):
    data = cat_data_encoded
elif len(num_colnames):
    data = num_data_normalized

In [90]:
isolation_forest = IsolationForest(behaviour='new',n_estimators=1000,max_samples=0.3,max_features=min(4,len(num_colnames+cat_colnames)),bootstrap=True,
                                         contamination="auto",
                                         random_state=42)
isolation_forest_model = isolation_forest.fit(data)

In [91]:
hospital_profile_df["score"] = isolation_forest_model.decision_function(data).reshape(-1, 1)*-1

### scoring pipelinemodel

In [92]:
score = hospital_profile_df["score"].values.reshape(-1, 1)
minmax_scaler = MinMaxScaler(feature_range=(0,100))
scoring_pipeline = Pipeline(steps=[("MinMaxScaler",minmax_scaler)])
scoring_pipeline_model = scoring_pipeline.fit(score)

In [93]:
model_path = PROFILE_ANOMALY_MODEL_PATH.format(entity_type=entity_type,anomaly_type=anomaly_type,time_window=time_window,\
                                 model_type=model_type,model_name=model_name)

os.system("hdfs dfs -rm -r "+model_path)
os.system("rm -rf "+model_path)




0

### Save the Clustering Pipelinemodel

In [94]:
isolation_forest_model_rdd = sc.parallelize([{"cat_colnames":cat_colnames,"num_colnames":num_colnames,"standard_scaler":standard_scaler,"one_hot_encoder":one_hot_encoder,"isolation_forest_model":isolation_forest_model}])
isolation_forest_model_rdd.saveAsPickleFile(model_path+"/if_pipeline_model")

### Save the Scoring Pipelinemodel

In [95]:
scoring_pipeline_model_rdd = sc.parallelize([scoring_pipeline_model])
scoring_pipeline_model_rdd.saveAsPickleFile(model_path+"/scoring_pipeline_model")