In [1]:
import pandas as pd
import pandas_profiling as pp
import plotly.express as px
import numpy as np

from sklearn.model_selection import train_test_split

from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier
from imblearn.ensemble import BalancedRandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from catboost import CatBoostClassifier
from xgboost import XGBClassifier

from sklearn.metrics import accuracy_score
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from sklearn.ensemble import RandomForestClassifier

from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("Zeiss_task").getOrCreate()
spark

In [4]:
type(spark)

pyspark.sql.session.SparkSession

In [5]:
df = spark.read.csv('./data/insurance_claims.csv', header=True, sep=',', inferSchema=True)

In [6]:
type(df)

pyspark.sql.dataframe.DataFrame

In [40]:
df.show(2,truncate=False)

+------------------+---+-------------+----------------+------------+----------+-----------------+---------------------+--------------+-----------+-----------+-----------------------+------------------+---------------+--------------------+-------------+------------+-------------+------------------------+--------------+-----------------+---------------------+--------------+-------------+-----------------+------------------------+---------------------------+---------------+---------------+---------+-----------------------+------------------+------------+--------------+-------------+---------+----------+---------+--------------+
|months_as_customer|age|policy_number|policy_bind_date|policy_state|policy_csl|policy_deductable|policy_annual_premium|umbrella_limit|insured_zip|insured_sex|insured_education_level|insured_occupation|insured_hobbies|insured_relationship|capital-gains|capital-loss|incident_date|incident_type           |collision_type|incident_severity|authorities_contacted|incident

In [41]:
cols = ('policy_number','policy_bind_date','policy_state','insured_zip','incident_location','incident_date','auto_make','auto_model','insured_occupation')

df = df.drop(*cols)

df.printSchema()

root
 |-- months_as_customer: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- policy_csl: string (nullable = true)
 |-- policy_deductable: integer (nullable = true)
 |-- policy_annual_premium: double (nullable = true)
 |-- umbrella_limit: integer (nullable = true)
 |-- insured_sex: string (nullable = true)
 |-- insured_education_level: string (nullable = true)
 |-- insured_hobbies: string (nullable = true)
 |-- insured_relationship: string (nullable = true)
 |-- capital-gains: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- incident_type: string (nullable = true)
 |-- collision_type: string (nullable = true)
 |-- incident_severity: string (nullable = true)
 |-- authorities_contacted: string (nullable = true)
 |-- incident_state: string (nullable = true)
 |-- incident_city: string (nullable = true)
 |-- incident_hour_of_the_day: integer (nullable = true)
 |-- number_of_vehicles_involved: integer (nullable = true)
 |-- property_damage: strin

In [42]:
cols = ('age', 'total_claim_amount')

df = df.drop(*cols)

df.printSchema()

root
 |-- months_as_customer: integer (nullable = true)
 |-- policy_csl: string (nullable = true)
 |-- policy_deductable: integer (nullable = true)
 |-- policy_annual_premium: double (nullable = true)
 |-- umbrella_limit: integer (nullable = true)
 |-- insured_sex: string (nullable = true)
 |-- insured_education_level: string (nullable = true)
 |-- insured_hobbies: string (nullable = true)
 |-- insured_relationship: string (nullable = true)
 |-- capital-gains: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- incident_type: string (nullable = true)
 |-- collision_type: string (nullable = true)
 |-- incident_severity: string (nullable = true)
 |-- authorities_contacted: string (nullable = true)
 |-- incident_state: string (nullable = true)
 |-- incident_city: string (nullable = true)
 |-- incident_hour_of_the_day: integer (nullable = true)
 |-- number_of_vehicles_involved: integer (nullable = true)
 |-- property_damage: string (nullable = true)
 |-- bodily_inju

In [43]:
to_drop = ['policy_number','policy_bind_date','policy_state','insured_zip','incident_location','incident_date','auto_make','auto_model','insured_occupation']
df.drop(to_drop, inplace = True, axis = 1)
df.head()

TypeError: drop() got an unexpected keyword argument 'inplace'

In [44]:
df['insured_hobbies']=df['insured_hobbies'].apply(lambda x :'Other' if x!='chess' and x!='cross-fit' else x)


TypeError: 'Column' object is not callable

In [112]:
type(cols)

tuple

In [65]:
from pyspark.sql.functions import when, lit

df = df.withColumn('insured_hobbies', when(df['insured_hobbies'] =='chess',df['insured_hobbies'])\
    .when(df['insured_hobbies'] =='cross-fit',df['insured_hobbies'])\
    .otherwise('Others'))


# when(df.address.endswith('Rd'),regexp_replace(df.address,'Rd','Road')) \
#    .when(df.address.endswith('St'),regexp_replace(df.address,'St','Street')) \
#    .when(df.address.endswith('Ave'),regexp_replace(df.address,'Ave','Avenue')) \
#    .otherwise(df.address)) \
#    .show(truncate=False)
     

In [67]:
df = df.withColumn('fraud_reported', when(df['fraud_reported'] =='Y',1)\
    .otherwise(0))

In [69]:
df=df.where("umbrella_limit>=0")


In [6]:
df.agg({'umbrella_limit': 'min'}).first()[0]

-1000000

In [81]:
mode =  df.groupby("collision_type").count().orderBy("count", ascending=False).first()[0]

In [82]:
df = df.withColumn('collision_type', when(df['collision_type'] =='?',mode)\
    .otherwise(df['collision_type']))

In [85]:
df = df.withColumn('property_damage', when(df['property_damage'] =='?','NO')\
    .otherwise(df['property_damage']))

In [88]:
df = df.withColumn('police_report_available', when(df['police_report_available'] =='?','NO')\
    .otherwise(df['police_report_available']))

In [None]:
df['collision_type'].fillna(df['collision_type'].mode()[0], inplace = True)

df['property_damage'].fillna('NO', inplace = True)

df['police_report_available'].fillna('NO', inplace = True)

In [89]:
# new_df.select("insured_hobbies").sample(False, 0.1, seed=9).limit(20).show()
df.select('police_report_available').distinct().collect()

[Row(police_report_available='YES'), Row(police_report_available='NO')]

In [9]:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType

In [98]:
from pyspark.ml.feature import StringIndexer

insured_hobbies_indexer = StringIndexer(inputCol="insured_hobbies", outputCol="insured_hobbiesIndex")
df = insured_hobbies_indexer.fit(df).transform(df)



IllegalArgumentException: requirement failed: Output column insured_hobbiesIndex already exists.

In [99]:
onehotencoder_age_vector = OneHotEncoder(inputCol="insured_hobbiesIndex", outputCol="insured_hobbies_vec")
df = onehotencoder_age_vector.fit(df).transform(df)

In [56]:


name = 'insured_hobbies'
udf = UserDefinedFunction(lambda x :'Other' if x!='chess' or x!='cross-fit' else x)
new_df = df.withColumn('insured_hobbies', udf(df.insured_hobbies))

In [101]:
# new_df.select("insured_hobbies").sample(False, 0.1, seed=9).limit(20).show()
df.select('insured_hobbies_vec').distinct().collect()

[Row(insured_hobbies_vec=SparseVector(2, {0: 1.0})),
 Row(insured_hobbies_vec=SparseVector(2, {1: 1.0})),
 Row(insured_hobbies_vec=SparseVector(2, {}))]

In [24]:
df.schema.fields

[StructField(months_as_customer,StringType,true),
 StructField(policy_csl,StringType,true),
 StructField(policy_deductable,StringType,true),
 StructField(policy_annual_premium,StringType,true),
 StructField(umbrella_limit,StringType,true),
 StructField(insured_sex,StringType,true),
 StructField(insured_education_level,StringType,true),
 StructField(insured_hobbies,StringType,true),
 StructField(insured_relationship,StringType,true),
 StructField(capital-gains,StringType,true),
 StructField(capital-loss,StringType,true),
 StructField(incident_type,StringType,true),
 StructField(collision_type,StringType,true),
 StructField(incident_severity,StringType,true),
 StructField(authorities_contacted,StringType,true),
 StructField(incident_state,StringType,true),
 StructField(incident_city,StringType,true),
 StructField(incident_hour_of_the_day,StringType,true),
 StructField(number_of_vehicles_involved,StringType,true),
 StructField(property_damage,StringType,true),
 StructField(bodily_injuries

In [109]:
import pyspark.ml.tree 

In [111]:
from pyspark.ml.classification import RandomForestClassifier

In [142]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder

def onehot(df,col):
    indexer = StringIndexer(inputCol=col, outputCol=col+'Index')

    df = indexer.fit(df).transform(df)

    onehotencoder = OneHotEncoder(inputCol=col+'Index', outputCol=col+'Vector')

    df = onehotencoder.fit(df).transform(df)

    cols_drop = (col,col+'Index')
    df = df.drop(*cols_drop)

    return df
      

In [144]:
df = onehot(df,'incident_severity')

In [146]:
df.select('incident_severityVector').distinct().collect()

[Row(incident_severityVector=SparseVector(3, {2: 1.0})),
 Row(incident_severityVector=SparseVector(3, {1: 1.0})),
 Row(incident_severityVector=SparseVector(3, {0: 1.0})),
 Row(incident_severityVector=SparseVector(3, {}))]

In [2]:
import findspark

In [5]:
print(findspark.init())

None


In [1]:
import json
import time
import schedule
import http.client
import pandas as pd




In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName("test").getOrCreate()


In [4]:
df = spark.read.csv('./data/insurance_claims.csv', header=True, sep=',', inferSchema=True)


In [5]:
df_pandas = df.toPandas()

In [6]:
df_json = df_pandas.to_json()

In [10]:
# Serializing json 
json_object = json.dumps(df_json, indent = 4)
  
# Writing to sample.json
with open("sample.json", "w") as outfile:
    outfile.write(json_object)

In [None]:
c = http.client.HTTPConnection('127.0.0.1', 8000)
data = pd.read_csv('Data_repository/Data/test.csv')
data = data.to_json()
c.request('POST', '/get_prediction_results', json.dumps(data))
result = c.getresponse().read()
prediction_result = json.loads(result)
prediction_result_df = pd.read_json(prediction_result)
print(prediction_result_df.head())
return prediction_result_df
    

if __name__ == '__main__':
    schedule.every(0).minutes.do(get_prediction_result)
    while True:
        schedule.run_pending()
        time.sleep(1)

In [7]:
data = '{"months_as_customer":{"0":328},"age":{"0":48},"policy_number":{"0":521585},"policy_bind_date":{"0":1413504000000},"policy_state":{"0":"OH"},"policy_csl":{"0":"250\\/500"},"policy_deductable":{"0":1000},"policy_annual_premium":{"0":1406.91},"umbrella_limit":{"0":0},"insured_zip":{"0":466132},"insured_sex":{"0":"MALE"},"insured_education_level":{"0":"MD"},"insured_occupation":{"0":"craft-repair"},"insured_hobbies":{"0":"sleeping"},"insured_relationship":{"0":"husband"},"capital-gains":{"0":53300},"capital-loss":{"0":0},"incident_date":{"0":1422144000000},"incident_type":{"0":"Single Vehicle Collision"},"collision_type":{"0":"Side Collision"},"incident_severity":{"0":"Major Damage"},"authorities_contacted":{"0":"Police"},"incident_state":{"0":"SC"},"incident_city":{"0":"Columbus"},"incident_location":{"0":"9935 4th Drive"},"incident_hour_of_the_day":{"0":5},"number_of_vehicles_involved":{"0":1},"property_damage":{"0":"YES"},"bodily_injuries":{"0":1},"witnesses":{"0":2},"police_report_available":{"0":"YES"},"total_claim_amount":{"0":71610},"injury_claim":{"0":6510},"property_claim":{"0":13020},"vehicle_claim":{"0":52080},"auto_make":{"0":"Saab"},"auto_model":{"0":"92x"},"auto_year":{"0":2004},"fraud_reported":{"0":"Y"}}'


In [8]:
a_json = json.loads(data)


In [9]:
df_pandas = pd.DataFrame.from_dict(a_json)
df_spark = spark.createDataFrame(df_pandas)



In [10]:
import sys
import argparse

def get_args():

    parser = argparse.ArgumentParser(description='Zeiss Task')


    parser.add_argument("--app_name", type=str, default="Zeiss_classification_task")
    parser.add_argument("--data_filename", type=str, default="./data/insurance_claims.csv",
                                    help="data file in csv format")
    parser.add_argument("--columns_to_drop", type=list, 
                        default=['policy_number','policy_bind_date','policy_state','insured_zip','incident_location',
                        'incident_date','auto_make','auto_model','insured_occupation','age', 'total_claim_amount'])
    parser.add_argument("--store_schema", type=bool, default=False)
    parser.add_argument("--schema_path", type=str, default="/src/data/feature_store/")
    parser.add_argument("--columns_to_encode", type=list, 
                        default=['policy_csl', 'insured_sex', 'insured_education_level','insured_hobbies', 'insured_relationship',
                        'incident_type', 'incident_severity','authorities_contacted', 'incident_state', 'incident_city','collision_type'])
    parser.add_argument("--preprocess_hobbies", type=bool, default=True)
    parser.add_argument("--tune_hyper_params", type=bool, default=False)
    parser.add_argument("--best_hyper_params_filepath", type=str, default='/src/data/best_hyper_params')
    parser.add_argument("--model_path", type=str, default='/src/model')
    parser.add_argument("--hyper_params", type=dict, 
                        default={'criterion': 'gini', 
                                    'max_depth': 5, 
                                    'min_samples_leaf': 2, 
                                    'min_samples_split': 2
                                    })
    parser.add_argument("--target", type=str, default='fraud_reported')

    return parser.parse_args()



In [11]:
sys.argv = sys.argv[10:]

In [12]:
args = get_args()

In [14]:
from src.utils.utils import  data_preprocessing

df_processed = data_preprocessing(df_spark, args)



AttributeError: 'OneHotEncoder' object has no attribute 'fit'

In [29]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer.load(os.getcwd() + args.schema_path + '/schemaData/' + 'incident_city')

pyspark.ml.feature.StringIndexer

In [33]:
import pickle
