In [1]:
pip install imbalanced-learn

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [211]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.types import IntegerType

In [87]:
spark = SparkSession.builder \
    .appName("Insurance Fraud Detection") \
    .getOrCreate()


In [125]:
data = spark.read.csv("/home/jb-admin/data/insurance_fraud_claims.csv", header=True, inferSchema=True)

In [126]:
data = data.withColumn("policy_annual_premium", col("policy_annual_premium").cast("integer"))


In [None]:
categorical_cols = [
    'insured_sex', 'insured_education_level', 'insured_occupation',
    'incident_type', 'collision_type', 'incident_severity',
    'authorities_contacted', 'property_damage', 'police_report_available'
]
numerical_cols = ['age', 'policy_deductable',  'umbrella_limit', 'incident_hour_of_the_day', 'number_of_vehicles_involved', 'bodily_injuries', 'witnesses', 'total_claim_amount']

In [212]:
data = data.withColumn("fraud_reported", col("fraud_reported").cast(IntegerType()))

In [213]:
def get_mode(data, column_name):
    mode_value = data.groupBy(column_name).count().orderBy(col("count").desc()).first()[0]
    return mode_value

mode_values = {}
for col_name in categorical_cols:
    mode_values[col_name] = get_mode(data, col_name)

# Fill missing values with the mode
data = data.fillna(mode_values)

In [214]:
data = data.drop(
    'policy_number', 'policy_bind_date', 'policy_state', 'policy_csl', 'policy_annual_premium', 'insured_zip',
    'insured_hobbies', 'insured_relationship', 'capital-gains', 'capital-loss', 'incident_date', 'incident_state',
    'incident_city', 'incident_location', 'injury_claim', 'property_claim', 'vehicle_claim', 'auto_make', 'auto_model',
    'auto_year', '_c39'
)


In [215]:
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_index", handleInvalid='keep') for col in categorical_cols]
encoders = [OneHotEncoder(inputCol=f"{col}_index", outputCol=f"{col}_encoded") for col in categorical_cols]
assembler = VectorAssembler(inputCols=[f"{col}_encoded" for col in categorical_cols] + numerical_cols, outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=False)


In [216]:
lr = LogisticRegression(labelCol="fraud_reported", featuresCol="scaled_features", maxIter=10)

In [217]:
pipeline_lr = Pipeline(stages=indexers + encoders + [assembler, scaler, lr])

In [218]:

def stratified_split(data, label_col, test_size=0.3, seed=None):
    total_count = data.count()
    class_counts = data.groupBy(label_col).count().collect()
    fractions = {row[label_col]: (1 - test_size) if row["count"] / total_count > test_size else test_size for row in class_counts}
    
    train_data = data.sampleBy(label_col, fractions, seed=seed)
    test_data = data.subtract(train_data)
    
    return train_data, test_data

train_data, test_data = stratified_split(data, 'fraud_reported', test_size=0.3, seed=123)



TypeError: key must be float, int, or string, but got <class 'NoneType'>

In [None]:
# Define the grid search parameters
param_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.05, 0.1]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()


In [None]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="fraud_reported")

In [None]:
# Fit cross-validated model
cv_lr = CrossValidator(estimator=pipeline_lr, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3)
cv_lr_model = cv_lr.fit(train_data)

In [None]:
# Print out the performance metrics
print("Logistic Regression")
print("Area under ROC curve (train): ", evaluator.evaluate(cv_lr_model.transform(train_data)))
print("Area under ROC curve (test): ", evaluator.evaluate(cv_lr_model.transform(test_data)))
print("Best params: ", cv_lr_model.bestModel.stages[-1]._java_obj.parent().getEstimatorParamMaps().values()[cv_lr_model.bestModel.getEstimatorParamMaps().keys().index(cv_lr_model.bestModel.getEstimatorParamMap())])