In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

from pyspark.sql.functions import col, sum, round, countDistinct, max, variance, log10

from pyspark.ml.classification import RandomForestClassifier, LogisticRegression, NaiveBayes
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.clustering import KMeans
from pyspark.sql.functions import lit
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator




In [2]:
# Initialize Spark Session
spark = SparkSession.builder.appName("HealthCareModel").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/11 01:05:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
part_d_data_path = "../DataSet/Prescribers - by Provider and Drug/MUP_DPR_RY23_P04_V10_DY21_NPIBN.csv"
df = spark.read.csv(part_d_data_path, header=True, inferSchema=True)



23/12/11 01:05:47 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

In [4]:
df.printSchema()


root
 |-- Prscrbr_NPI: integer (nullable = true)
 |-- Prscrbr_Last_Org_Name: string (nullable = true)
 |-- Prscrbr_First_Name: string (nullable = true)
 |-- Prscrbr_City: string (nullable = true)
 |-- Prscrbr_State_Abrvtn: string (nullable = true)
 |-- Prscrbr_State_FIPS: string (nullable = true)
 |-- Prscrbr_Type: string (nullable = true)
 |-- Prscrbr_Type_Src: string (nullable = true)
 |-- Brnd_Name: string (nullable = true)
 |-- Gnrc_Name: string (nullable = true)
 |-- Tot_Clms: integer (nullable = true)
 |-- Tot_30day_Fills: double (nullable = true)
 |-- Tot_Day_Suply: integer (nullable = true)
 |-- Tot_Drug_Cst: double (nullable = true)
 |-- Tot_Benes: integer (nullable = true)
 |-- GE65_Sprsn_Flag: string (nullable = true)
 |-- GE65_Tot_Clms: integer (nullable = true)
 |-- GE65_Tot_30day_Fills: double (nullable = true)
 |-- GE65_Tot_Drug_Cst: double (nullable = true)
 |-- GE65_Tot_Day_Suply: integer (nullable = true)
 |-- GE65_Bene_Sprsn_Flag: string (nullable = true)
 |-- GE65_T

In [5]:
part_d_data_t = df.select(
    col("Prscrbr_NPI").alias("npi"),
    col("Prscrbr_City").alias("city"),
    col("Prscrbr_State_Abrvtn").alias("state"),
    col("Prscrbr_Last_Org_Name").alias("last_name"),
    col("Prscrbr_First_Name").alias("first_name"),
    col("Prscrbr_Type").alias("specialty"),
    col("Brnd_Name").alias("drug_name"),
    col("Gnrc_Name").alias("generic_name"),
    col("Tot_Drug_Cst").alias("total_drug_cost"),
    col("Tot_Clms").alias("total_claim_count"),
    col("Tot_Day_Suply").alias("total_day_supply")
)

In [6]:
# Step 1: Assign one DataFrame to another
part_d_pd1 = part_d_data_t

# Step 2: Select specific columns
part_d_drug_df = part_d_data_t.select("npi", "drug_name", "total_drug_cost", "total_claim_count", "total_day_supply", "specialty")

# Step 3: Change the data type of 'npi' to StringType
part_d_drug_df = part_d_drug_df.withColumn("npi", col("npi").cast(StringType()))

# Step 4: Select specific columns from another DataFrame
part_d_spec_df1 = part_d_data_t.select("npi", "specialty")

# Step 5: Show the first few rows (equivalent to head(0) in Pandas)
part_d_spec_df1.show()
part_d_drug_df.show()

+----------+-----------------+
|       npi|        specialty|
+----------+-----------------+
|1003000126|Internal Medicine|
|1003000126|Internal Medicine|
|1003000126|Internal Medicine|
|1003000126|Internal Medicine|
|1003000126|Internal Medicine|
|1003000126|Internal Medicine|
|1003000126|Internal Medicine|
|1003000126|Internal Medicine|
|1003000126|Internal Medicine|
|1003000126|Internal Medicine|
|1003000126|Internal Medicine|
|1003000126|Internal Medicine|
|1003000126|Internal Medicine|
|1003000126|Internal Medicine|
|1003000126|Internal Medicine|
|1003000126|Internal Medicine|
|1003000126|Internal Medicine|
|1003000126|Internal Medicine|
|1003000126|Internal Medicine|
|1003000126|Internal Medicine|
+----------+-----------------+
only showing top 20 rows

+----------+--------------------+---------------+-----------------+----------------+-----------------+
|       npi|           drug_name|total_drug_cost|total_claim_count|total_day_supply|        specialty|
+----------+------------

In [7]:
part_d_pd2 = part_d_data_t.select('npi',
                                  'city',
                                  'state',
                                  'last_name',
                                  'first_name',
                                  'specialty')

part_d_pd2.show()

+----------+--------+-----+---------+----------+-----------------+
|       npi|    city|state|last_name|first_name|        specialty|
+----------+--------+-----+---------+----------+-----------------+
|1003000126|Bethesda|   MD|Enkeshafi|   Ardalan|Internal Medicine|
|1003000126|Bethesda|   MD|Enkeshafi|   Ardalan|Internal Medicine|
|1003000126|Bethesda|   MD|Enkeshafi|   Ardalan|Internal Medicine|
|1003000126|Bethesda|   MD|Enkeshafi|   Ardalan|Internal Medicine|
|1003000126|Bethesda|   MD|Enkeshafi|   Ardalan|Internal Medicine|
|1003000126|Bethesda|   MD|Enkeshafi|   Ardalan|Internal Medicine|
|1003000126|Bethesda|   MD|Enkeshafi|   Ardalan|Internal Medicine|
|1003000126|Bethesda|   MD|Enkeshafi|   Ardalan|Internal Medicine|
|1003000126|Bethesda|   MD|Enkeshafi|   Ardalan|Internal Medicine|
|1003000126|Bethesda|   MD|Enkeshafi|   Ardalan|Internal Medicine|
|1003000126|Bethesda|   MD|Enkeshafi|   Ardalan|Internal Medicine|
|1003000126|Bethesda|   MD|Enkeshafi|   Ardalan|Internal Medic

In [8]:
part_d_pd_u = part_d_pd2.dropDuplicates()

# Show the result
part_d_pd_u.show()



+----------+-------------+-----+---------+----------+-------------------+
|       npi|         city|state|last_name|first_name|          specialty|
+----------+-------------+-----+---------+----------+-------------------+
|1003028002|      Durango|   CO|   Haynes|      Kent|            Dentist|
|1003041476|     Oak Park|   IL| Lindgren|     Kevin|Allergy/ Immunology|
|1003043555|   Washington|   PA|  Orlosky|     Julie|    Family Practice|
|1003046939|      Spencer|   IA|  Heckert|     Kathi| Nurse Practitioner|
|1003055781|     Hartford|   CT|     Rice|     Jenny|Physician Assistant|
|1003064825|        Parma|   OH| Phillips|    Cherie|   Vascular Surgery|
|1003091539|    Fullerton|   CA|      Woo|      Kiho|  Internal Medicine|
|1003095167|      El Paso|   IL|    Tyner|Jean-Marie| Nurse Practitioner|
|1003101932|   Cumberland|   MD|     Hong|     Feiyu|  Internal Medicine|
|1003119009|     Bellevue|   WA|  Swenson|   Jessica| Nurse Practitioner|
|1003127002|       Laredo|   TX|  Rami

                                                                                

In [9]:
group_cols = ['npi']

part_d_pd3 = (part_d_pd1.groupBy(group_cols)
             .agg(
                 F.sum("total_drug_cost").alias("sum_total_drug_cost"),
                 F.mean("total_drug_cost").alias("mean_total_drug_cost"),
                 F.max("total_drug_cost").alias("max_total_drug_cost"),
                 F.sum("total_claim_count").alias("sum_total_claim_count"),
                 F.mean("total_claim_count").alias("mean_total_claim_count"),
                 F.max("total_claim_count").alias("max_total_claim_count"),
                 F.sum("total_day_supply").alias("sum_total_day_supply"),
                 F.mean("total_day_supply").alias("mean_total_day_supply"),
                 F.max("total_day_supply").alias("max_total_day_supply")
             ))

# Cast to float if necessary (optional)
for col_name in part_d_pd3.columns:
    part_d_pd3 = part_d_pd3.withColumn(col_name, F.col(col_name).cast("float"))

# Show the result
part_d_pd3.show()



+------------+-------------------+--------------------+-------------------+---------------------+----------------------+---------------------+--------------------+---------------------+--------------------+
|         npi|sum_total_drug_cost|mean_total_drug_cost|max_total_drug_cost|sum_total_claim_count|mean_total_claim_count|max_total_claim_count|sum_total_day_supply|mean_total_day_supply|max_total_day_supply|
+------------+-------------------+--------------------+-------------------+---------------------+----------------------+---------------------+--------------------+---------------------+--------------------+
| 1.0030432E9|           18613.12|             930.656|            7077.18|                355.0|                 17.75|                 44.0|             21040.0|               1052.0|              3000.0|
|1.00307283E9|          282157.47|           3399.4878|           45438.26|               4100.0|              49.39759|                244.0|            203448.0|         

                                                                                

In [10]:
part_d_pd3.count()

                                                                                

1017417

In [11]:
part_d_all_pd = part_d_pd3.join(part_d_pd_u, on='npi', how='left')

# Show the result
part_d_all_pd.show()



+------------+-------------------+--------------------+-------------------+---------------------+----------------------+---------------------+--------------------+---------------------+--------------------+--------------+-----+--------------+----------+--------------------+
|         npi|sum_total_drug_cost|mean_total_drug_cost|max_total_drug_cost|sum_total_claim_count|mean_total_claim_count|max_total_claim_count|sum_total_day_supply|mean_total_day_supply|max_total_day_supply|          city|state|     last_name|first_name|           specialty|
+------------+-------------------+--------------------+-------------------+---------------------+----------------------+---------------------+--------------------+---------------------+--------------------+--------------+-----+--------------+----------+--------------------+
| 1.0030124E9|             577.08|              288.54|             373.53|                 70.0|                  35.0|                 39.0|               478.0|            

                                                                                

In [12]:
payment_data_path="../DataSet/PaymentDataSet/OP_DTL_OWNRSHP_PGYR2021_P06302023.csv"
pds = spark.read.csv(payment_data_path, header=True, inferSchema=True)

In [13]:
pds.printSchema()

root
 |-- Change_Type: string (nullable = true)
 |-- Physician_Profile_ID: integer (nullable = true)
 |-- Physician_NPI: integer (nullable = true)
 |-- Physician_First_Name: string (nullable = true)
 |-- Physician_Middle_Name: string (nullable = true)
 |-- Physician_Last_Name: string (nullable = true)
 |-- Physician_Name_Suffix: string (nullable = true)
 |-- Recipient_Primary_Business_Street_Address_Line1: string (nullable = true)
 |-- Recipient_Primary_Business_Street_Address_Line2: string (nullable = true)
 |-- Recipient_City: string (nullable = true)
 |-- Recipient_State: string (nullable = true)
 |-- Recipient_Zip_Code: string (nullable = true)
 |-- Recipient_Country: string (nullable = true)
 |-- Recipient_Province: string (nullable = true)
 |-- Recipient_Postal_Code: string (nullable = true)
 |-- Physician_Primary_Type: string (nullable = true)
 |-- Physician_Specialty: string (nullable = true)
 |-- Record_ID: integer (nullable = true)
 |-- Program_Year: integer (nullable = true)

In [14]:
pds_df = pds.select(
    col("Physician_First_Name").alias("first_name"),
    col("Physician_Last_Name").alias("last_name"),
    col("Recipient_State").alias("state"),
    col("Recipient_City").alias("city"),
    col("Total_Amount_Invested_USDollars").alias("total_amount_of_payment_usd")
)

pds_df.show()

+----------+----------+-----+----------------+---------------------------+
|first_name| last_name|state|            city|total_amount_of_payment_usd|
+----------+----------+-----+----------------+---------------------------+
|     Faith|    Brosch|   DE|          NEWARK|                        0.0|
|      Troy|  Brothers|   FL|          Naples|                        0.0|
|      Anne|     Brown|   VA|        Leesburg|                        0.0|
|     David|     Brown|   FL|      FORT MYERS|                        0.0|
|    Ingrid|     Brown|   TX|      Round Rock|                        0.0|
|     Kevin|     Brown|   TX|      Round Rock|                        0.0|
|    Steven|     Brown|   FL|      Palm Coast|                        0.0|
|   Michael|   Buckley|   NC|         Raleigh|                        0.0|
|  Christin|Richardson|   NC|         Raleigh|                        0.0|
|     Kathy|Richardson|   NC|      Greensboro|                        0.0|
|    Kellyn|    Rielly|  

In [15]:
pds_df1 = (pds_df
           .groupBy('first_name', 'last_name', 'state', 'city')
           .agg(F.sum('total_amount_of_payment_usd').alias('sum_total_amount_of_payment_usd'))
           .withColumn('total_amount_of_payment_usd', F.col('sum_total_amount_of_payment_usd').cast('float')))

# Show the result
pds_df1.show()

+-----------+----------+-----+---------------+-------------------------------+---------------------------+
| first_name| last_name|state|           city|sum_total_amount_of_payment_usd|total_amount_of_payment_usd|
+-----------+----------+-----+---------------+-------------------------------+---------------------------+
|       Mary|   Goodwin|   NC|      Asheville|                            0.0|                        0.0|
|Christopher|      Ames|   CA|  SAN FRANCISCO|                            0.0|                        0.0|
|   Rajeshri|     Patel|   NY|     Long Beach|                        38348.0|                    38348.0|
|     George|    Ferzli|   NY|  Staten Island|                        70000.0|                    70000.0|
|    Michael| Ziebelman|   FL|   Winter Haven|                            0.0|                        0.0|
|    STEPHEN|     RAMEE|   LA|    NEW ORLEANS|                            0.0|                        0.0|
|       JOHN|    VUKICH|   WI|      W

In [16]:
# Convert all string columns to uppercase
for col_name in pds_df1.columns:
    # Check if the column type is string
    if isinstance(pds_df1.schema[col_name].dataType, StringType):
        pds_df1 = pds_df1.withColumn(col_name, F.upper(F.col(col_name)))

# Show the result
pds_df1.show()

+-----------+----------+-----+---------------+-------------------------------+---------------------------+
| first_name| last_name|state|           city|sum_total_amount_of_payment_usd|total_amount_of_payment_usd|
+-----------+----------+-----+---------------+-------------------------------+---------------------------+
|       MARY|   GOODWIN|   NC|      ASHEVILLE|                            0.0|                        0.0|
|CHRISTOPHER|      AMES|   CA|  SAN FRANCISCO|                            0.0|                        0.0|
|   RAJESHRI|     PATEL|   NY|     LONG BEACH|                        38348.0|                    38348.0|
|     GEORGE|    FERZLI|   NY|  STATEN ISLAND|                        70000.0|                    70000.0|
|    MICHAEL| ZIEBELMAN|   FL|   WINTER HAVEN|                            0.0|                        0.0|
|    STEPHEN|     RAMEE|   LA|    NEW ORLEANS|                            0.0|                        0.0|
|       JOHN|    VUKICH|   WI|      W

In [17]:
pay_part_d_fpd = part_d_all_pd.join(pds_df1, on=['last_name', 'first_name', 'city', 'state'], how='left')

# Show the result
pay_part_d_fpd.show()



+--------------+----------+--------------+-----+------------+-------------------+--------------------+-------------------+---------------------+----------------------+---------------------+--------------------+---------------------+--------------------+--------------------+-------------------------------+---------------------------+
|     last_name|first_name|          city|state|         npi|sum_total_drug_cost|mean_total_drug_cost|max_total_drug_cost|sum_total_claim_count|mean_total_claim_count|max_total_claim_count|sum_total_day_supply|mean_total_day_supply|max_total_day_supply|           specialty|sum_total_amount_of_payment_usd|total_amount_of_payment_usd|
+--------------+----------+--------------+-----+------------+-------------------+--------------------+-------------------+---------------------+----------------------+---------------------+--------------------+---------------------+--------------------+--------------------+-------------------------------+------------------------

                                                                                

In [18]:
pay_part_d_fpd.count()

                                                                                

3160505

In [19]:
leie_data_path = "../DataSet/LEIE.csv"
leie_df = spark.read.csv(leie_data_path, header=True, inferSchema=True)
leie_df.show()

+--------+---------+-------+--------------------+------------------+------------------+----+----------+----+--------------------+----------+-----+-----+--------+--------+--------+----------+--------+
|LASTNAME|FIRSTNAME|MIDNAME|             BUSNAME|           GENERAL|         SPECIALTY|UPIN|       NPI| DOB|             ADDRESS|      CITY|STATE|  ZIP|EXCLTYPE|EXCLDATE|REINDATE|WAIVERDATE|WVRSTATE|
+--------+---------+-------+--------------------+------------------+------------------+----+----------+----+--------------------+----------+-----+-----+--------+--------+--------+----------+--------+
|    NULL|     NULL|   NULL|#1 MARKETING SERV...|    OTHER BUSINESS|        SOBER HOME|NULL|         0|NULL|239 BRIGHTON BEAC...|  BROOKLYN|   NY|11235|  1128a1|20200319|       0|         0|    NULL|
|    NULL|     NULL|   NULL|    1 BEST CARE, INC|    OTHER BUSINESS|HOME HEALTH AGENCY|NULL|         0|NULL|2161 UNIVERSITY A...|SAINT PAUL|   MN|55114|  1128b5|20230518|       0|         0|    NULL|


In [20]:
leie_df.printSchema()

root
 |-- LASTNAME: string (nullable = true)
 |-- FIRSTNAME: string (nullable = true)
 |-- MIDNAME: string (nullable = true)
 |-- BUSNAME: string (nullable = true)
 |-- GENERAL: string (nullable = true)
 |-- SPECIALTY: string (nullable = true)
 |-- UPIN: string (nullable = true)
 |-- NPI: integer (nullable = true)
 |-- DOB: integer (nullable = true)
 |-- ADDRESS: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- ZIP: string (nullable = true)
 |-- EXCLTYPE: string (nullable = true)
 |-- EXCLDATE: string (nullable = true)
 |-- REINDATE: integer (nullable = true)
 |-- WAIVERDATE: integer (nullable = true)
 |-- WVRSTATE: string (nullable = true)



In [21]:
leie_df1 = leie_df.select(
    col('NPI').alias("npi"),
    col('EXCLTYPE').alias("is_fraud")
)
leie_df1.show()

+----------+--------+
|       npi|is_fraud|
+----------+--------+
|         0|  1128a1|
|         0|  1128b5|
|1972902351|  1128b8|
|         0|  1128a1|
|         0|  1128b7|
|         0|  1128b6|
|1922348218|  1128a1|
|         0|  1128b5|
|         0|  1128a1|
|         0|  1128b8|
|         0|  1128a1|
|         0|  1128b8|
|         0|  1128b5|
|         0|  1128a1|
|         0|  1128b8|
|         0|  1128a1|
|         0|  1128a1|
|         0|  1128b4|
|         0|  1128a1|
|         0|  1128b8|
+----------+--------+
only showing top 20 rows



In [22]:
leie_df2 = leie_df1.filter(col('npi') != 0)
leie_df2.show()

+----------+--------+
|       npi|is_fraud|
+----------+--------+
|1972902351|  1128b8|
|1922348218|  1128a1|
|1942476080|  1128b8|
|1275600959|  1128a1|
|1891731758|  1128b8|
|1265830335|  1128a1|
|1851631543|  1128b7|
|1902198435|  1128a1|
|1073916631|  1128b7|
|1437510278|  1128a1|
|1073682936|  1128b7|
|1902166028|  1128b8|
|1992906937|  1128b8|
|1104947944|  1128a1|
|1164669479|  1128a1|
|1043302250|  1128a1|
|1801231436|  1128a1|
|1912011800|  1128b8|
|1780812768|  1128b7|
|1447560867|  1128b8|
+----------+--------+
only showing top 20 rows



In [23]:
leie_df2 = leie_df2.withColumn('is_fraud', lit(1))
    

# Show the result
leie_df2.show()

+----------+--------+
|       npi|is_fraud|
+----------+--------+
|1972902351|       1|
|1922348218|       1|
|1942476080|       1|
|1275600959|       1|
|1891731758|       1|
|1265830335|       1|
|1851631543|       1|
|1902198435|       1|
|1073916631|       1|
|1437510278|       1|
|1073682936|       1|
|1902166028|       1|
|1992906937|       1|
|1104947944|       1|
|1164669479|       1|
|1043302250|       1|
|1801231436|       1|
|1912011800|       1|
|1780812768|       1|
|1447560867|       1|
+----------+--------+
only showing top 20 rows



In [24]:
leie_df2.printSchema()

root
 |-- npi: integer (nullable = true)
 |-- is_fraud: integer (nullable = false)



In [25]:
# Features Engineering 
feature_1 = leie_df2.join(pay_part_d_fpd, on='npi', how='left')

In [26]:
feature_1.printSchema()

root
 |-- npi: integer (nullable = true)
 |-- is_fraud: integer (nullable = false)
 |-- last_name: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- sum_total_drug_cost: float (nullable = true)
 |-- mean_total_drug_cost: float (nullable = true)
 |-- max_total_drug_cost: float (nullable = true)
 |-- sum_total_claim_count: float (nullable = true)
 |-- mean_total_claim_count: float (nullable = true)
 |-- max_total_claim_count: float (nullable = true)
 |-- sum_total_day_supply: float (nullable = true)
 |-- mean_total_day_supply: float (nullable = true)
 |-- max_total_day_supply: float (nullable = true)
 |-- specialty: string (nullable = true)
 |-- sum_total_amount_of_payment_usd: double (nullable = true)
 |-- total_amount_of_payment_usd: float (nullable = true)



In [27]:
#Filling 0 for NA
feature_1.fillna(0)
feature_1.show()

[Stage 86:>                                                         (0 + 4) / 4]

+----------+--------+----------+----------+----------------+-----+-------------------+--------------------+-------------------+---------------------+----------------------+---------------------+--------------------+---------------------+--------------------+--------------------+-------------------------------+---------------------------+
|       npi|is_fraud| last_name|first_name|            city|state|sum_total_drug_cost|mean_total_drug_cost|max_total_drug_cost|sum_total_claim_count|mean_total_claim_count|max_total_claim_count|sum_total_day_supply|mean_total_day_supply|max_total_day_supply|           specialty|sum_total_amount_of_payment_usd|total_amount_of_payment_usd|
+----------+--------+----------+----------+----------------+-----+-------------------+--------------------+-------------------+---------------------+----------------------+---------------------+--------------------+---------------------+--------------------+--------------------+-------------------------------+---------

                                                                                

In [28]:
feature_1.count()

                                                                                

51347

In [29]:
feature_1.filter(col('is_fraud') == 1).count()


                                                                                

51347

In [30]:
feature_all_df = feature_1

In [31]:
feature_all_df.printSchema()

root
 |-- npi: integer (nullable = true)
 |-- is_fraud: integer (nullable = false)
 |-- last_name: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- sum_total_drug_cost: float (nullable = true)
 |-- mean_total_drug_cost: float (nullable = true)
 |-- max_total_drug_cost: float (nullable = true)
 |-- sum_total_claim_count: float (nullable = true)
 |-- mean_total_claim_count: float (nullable = true)
 |-- max_total_claim_count: float (nullable = true)
 |-- sum_total_day_supply: float (nullable = true)
 |-- mean_total_day_supply: float (nullable = true)
 |-- max_total_day_supply: float (nullable = true)
 |-- specialty: string (nullable = true)
 |-- sum_total_amount_of_payment_usd: double (nullable = true)
 |-- total_amount_of_payment_usd: float (nullable = true)



In [32]:
# Scaling the features
# Apply log transformation and create new columns for differences
feature_all_df = (feature_all_df
                  .withColumn('sum_total_drug_cost', log10(col('sum_total_drug_cost') + 1.0))
                  .withColumn('sum_total_claim_count', log10(col('sum_total_claim_count') + 1.0))
                  .withColumn('sum_total_day_supply', log10(col('sum_total_day_supply') + 1.0))
                  .withColumn('sum_total_amount_of_payment_usd', log10(col('sum_total_amount_of_payment_usd') + 1.0))
                  .withColumn('mean_total_drug_cost', log10(col('mean_total_drug_cost') + 1.0))
                  .withColumn('mean_total_claim_count', log10(col('mean_total_claim_count') + 1.0))
                  .withColumn('mean_total_day_supply', log10(col('mean_total_day_supply') + 1.0))
                  .withColumn('max_total_drug_cost', log10(col('max_total_drug_cost') + 1.0))
                  .withColumn('max_total_claim_count', log10(col('max_total_claim_count') + 1.0))
                  .withColumn('max_total_day_supply', log10(col('max_total_day_supply') + 1.0))
                  .withColumn('claim_max_mean', col('max_total_claim_count') - col('mean_total_claim_count'))
                  .withColumn('supply_max_mean', col('max_total_day_supply') - col('max_total_day_supply'))
                  .withColumn('drug_max_mean', col('max_total_drug_cost') - col('mean_total_drug_cost')))

# Show the result
feature_all_df.show()






+----------+--------+--------------+----------+----------------+-----+-------------------+--------------------+-------------------+---------------------+----------------------+---------------------+--------------------+---------------------+--------------------+-------------------+-------------------------------+---------------------------+-------------------+---------------+-------------------+
|       npi|is_fraud|     last_name|first_name|            city|state|sum_total_drug_cost|mean_total_drug_cost|max_total_drug_cost|sum_total_claim_count|mean_total_claim_count|max_total_claim_count|sum_total_day_supply|mean_total_day_supply|max_total_day_supply|          specialty|sum_total_amount_of_payment_usd|total_amount_of_payment_usd|     claim_max_mean|supply_max_mean|      drug_max_mean|
+----------+--------+--------------+----------+----------------+-----+-------------------+--------------------+-------------------+---------------------+----------------------+---------------------+----

                                                                                

In [46]:
feature_all_df = feature_all_df.withColumn("npi", col("npi").cast(StringType()))



In [58]:
categorical_features = ['npi','last_name', 'specialty','first_name','city', 'state']

numerical_features = ['sum_total_drug_cost', 'mean_total_drug_cost','total_amount_of_payment_usd',
       'max_total_drug_cost', 'sum_total_claim_count',
       'mean_total_claim_count', 'max_total_claim_count',
       'sum_total_day_supply', 'mean_total_day_supply', 'max_total_day_supply',
    'claim_max_mean','supply_max_mean', 'drug_max_mean']

target = ['is_fraud']

In [59]:
# Combining all feature names
all_features = categorical_features + numerical_features + target


In [60]:
#Splitting the Data into Training and Validation Sets
train_df, valid_df = feature_all_df.randomSplit([0.8, 0.2], seed=0)

feature_all_df.printSchema()

root
 |-- npi: string (nullable = true)
 |-- is_fraud: integer (nullable = false)
 |-- last_name: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- sum_total_drug_cost: double (nullable = true)
 |-- mean_total_drug_cost: double (nullable = true)
 |-- max_total_drug_cost: double (nullable = true)
 |-- sum_total_claim_count: double (nullable = true)
 |-- mean_total_claim_count: double (nullable = true)
 |-- max_total_claim_count: double (nullable = true)
 |-- sum_total_day_supply: double (nullable = true)
 |-- mean_total_day_supply: double (nullable = true)
 |-- max_total_day_supply: double (nullable = true)
 |-- specialty: string (nullable = true)
 |-- sum_total_amount_of_payment_usd: double (nullable = true)
 |-- total_amount_of_payment_usd: float (nullable = true)
 |-- claim_max_mean: double (nullable = true)
 |-- supply_max_mean: double (nullable = true)
 |-- drug_max_mean: double (nullable

In [61]:
#Handling Missing Values

# Fill numerical features with 0
train_df = train_df.fillna(0, subset=numerical_features)
valid_df = valid_df.fillna(0, subset=numerical_features)

# Fill categorical features with 'NA'
train_df = train_df.fillna('NA', subset=categorical_features)
valid_df = valid_df.fillna('NA', subset=categorical_features)


In [None]:
#Feature Transformation

#OneHotEncoding for categorical features
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index").setHandleInvalid("keep") for col in categorical_features]
encoders = [OneHotEncoder(inputCol=col+"_index", outputCol=col+"_encoded") for col in categorical_features]

# Assembling combined feature vector
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + numerical_features, outputCol="features")

# Standard Scaler for numerical features
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

# Define a pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, scaler])

# Fit and transform the training data
train_df_transformed = pipeline.fit(train_df).transform(train_df)

# Transform the validation data
valid_df_transformed = pipeline.fit(train_df).transform(valid_df)


[Stage 310:> (7 + 4) / 25][Stage 311:> (0 + 0) / 25][Stage 312:>  (0 + 0) / 1]5]