#📁 1. Setup Spark Session



In [None]:
# Install PySpark in Colab if needed
# !pip install pyspark

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("HealthcareClaimsFraudDetection").getOrCreate()


#📁 2. Simulate Claims Data



In [None]:
from pyspark.sql.types import *
from pyspark.sql import functions as F
import random

# define schema
claims_schema = StructType([
    StructField("claim_id", IntegerType(), True),
    StructField("patient_id", IntegerType(), True),
    StructField("procedure_code", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("claim_type", StringType(), True),
    StructField("date", StringType(), True)
])

# generate 1 million rows (adjust to 10 million if you like, but this is faster for a demo)
data = []
procedures = ["P100", "P200", "P300", "P400", "P500", "P600", "P700"]
claim_types = ["inpatient", "outpatient"]

for i in range(1, 1000001):
    data.append((
        i,
        random.randint(1, 50000),                      # patient_id
        random.choice(procedures),                     # procedure_code
        round(random.uniform(100, 5000), 2),           # amount
        random.choice(claim_types),                    # claim_type
        f"2025-{random.randint(1,12):02d}-{random.randint(1,28):02d}"  # date
    ))

claims_df = spark.createDataFrame(data, claims_schema)

print(f"Total claims rows: {claims_df.count()}")
claims_df.show(5)


Total claims rows: 1000000
+--------+----------+--------------+-------+----------+----------+
|claim_id|patient_id|procedure_code| amount|claim_type|      date|
+--------+----------+--------------+-------+----------+----------+
|       1|     23072|          P100|3414.35| inpatient|2025-05-06|
|       2|      1078|          P700|4512.98|outpatient|2025-08-02|
|       3|     25652|          P400|1248.88| inpatient|2025-11-08|
|       4|       268|          P700|1356.94|outpatient|2025-02-08|
|       5|     15091|          P300|4683.09|outpatient|2025-10-09|
+--------+----------+--------------+-------+----------+----------+
only showing top 5 rows



#📁 3. Simulate Fraud Codes Lookup (Small Table)




In [None]:
fraud_codes_data = [
    ("P100", 1),
    ("P400", 1),
    ("P600", 1)
]
fraud_codes_schema = StructType([
    StructField("procedure_code", StringType(), True),
    StructField("is_fraud", IntegerType(), True)
])
fraud_codes_df = spark.createDataFrame(fraud_codes_data, fraud_codes_schema)

fraud_codes_df.show()


+--------------+--------+
|procedure_code|is_fraud|
+--------------+--------+
|          P100|       1|
|          P400|       1|
|          P600|       1|
+--------------+--------+



#📁 4. Broadcast Join to Label Suspicious Claims


In [None]:
from pyspark.sql.functions import broadcast

joined_df = claims_df.join(
    broadcast(fraud_codes_df),
    on="procedure_code",
    how="left"
).fillna(0, subset=["is_fraud"])

joined_df.show(5)


+--------------+--------+----------+-------+----------+----------+--------+
|procedure_code|claim_id|patient_id| amount|claim_type|      date|is_fraud|
+--------------+--------+----------+-------+----------+----------+--------+
|          P100|       1|     23072|3414.35| inpatient|2025-05-06|       1|
|          P700|       2|      1078|4512.98|outpatient|2025-08-02|       0|
|          P400|       3|     25652|1248.88| inpatient|2025-11-08|       1|
|          P700|       4|       268|1356.94|outpatient|2025-02-08|       0|
|          P300|       5|     15091|4683.09|outpatient|2025-10-09|       0|
+--------------+--------+----------+-------+----------+----------+--------+
only showing top 5 rows



#📁 5. Repartition by Claim Type



In [None]:
# Repartition for optimized writes or queries
repartitioned_df = joined_df.repartition("claim_type")

print("Repartitioned by claim_type:")
print(repartitioned_df.rdd.getNumPartitions())


Repartitioned by claim_type:
2


#📁 6. Cache High-Risk Claims



In [None]:
# Define "high-risk" as amount > 3000 and marked as fraud
high_risk_df = joined_df.filter(
    (F.col("amount") > 3000) & (F.col("is_fraud") == 1)
)

# Cache
high_risk_df.cache()
high_risk_df.count()  # triggers caching


175283

#📁 7. Apply MLlib KMeans for Anomaly Detection


In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

# convert to features
assembler = VectorAssembler(
    inputCols=["amount", "patient_id"],
    outputCol="features"
)
features_df = assembler.transform(high_risk_df)

# train kmeans
kmeans = KMeans(k=2, seed=42)
model = kmeans.fit(features_df)

predictions = model.transform(features_df)

# show cluster assignments
predictions.select("claim_id", "amount", "patient_id", "prediction").show(10)


+--------+-------+----------+----------+
|claim_id| amount|patient_id|prediction|
+--------+-------+----------+----------+
|       1|3414.35|     23072|         1|
|       7|3502.39|     26250|         0|
|      10|3491.61|     32202|         0|
|      14|3070.75|     15097|         1|
|      21|3388.29|     33785|         0|
|      34| 4623.3|     47375|         0|
|      35|4557.71|     35195|         0|
|      36|4429.94|     22586|         1|
|      38|3695.54|     15748|         1|
|      39|3033.51|      2821|         1|
+--------+-------+----------+----------+
only showing top 10 rows



#📁 8. Analyze Cluster Centers



In [None]:
print("Cluster Centers:")
for center in model.clusterCenters():
    print(center)


Cluster Centers:
[ 3997.81570898 37490.00311001]
[ 3999.34773079 12537.70936664]


📁 9. Done!



In [None]:
spark.stop()
