In [1]:
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.sql.functions import col

In [3]:
file_path = 'gs://6893_waj/FraudLabelledData7.csv/part-00000-b2e16053-fdf5-4a84-a7fc-726293a2769e-c000.csv'

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("FraudDetection").getOrCreate()

#FraudLabelledData is our labelled data , the criteria for labelling is specified in FraudLabelling.py
#input_path = "../data/ProcessedFraudData.parquet"
df = spark.read.option("header",True).csv(file_path,)

24/12/13 07:27:13 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

In [5]:

input_path = "gs://6893_waj/FraudLabelledData.parquet"
df = spark.read.parquet(input_path)


                                                                                

In [6]:
df.head()

24/12/13 07:27:55 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Row(Provider_ID=1043272834, Provider_Last_Name='lindgren', Provider_First_Name='cherrylene', Provider_Credentials='d.o.', Provider_Type='family practice', Provider_State='TX', Provider_RUCA_Code=1.0, Provider_RUCA_Description='Metropolitan area core: primary flow within an urbanized area of 50,000 and greater', Medicare_Participation='Y', Procedure_Code='87804', Procedure_Description='Detection test by immunoassay with direct visual observation for influenza virus', Total_Beneficiaries=12.0, Total_Services=24.0, Total_Beneficiary_Days=12.0, Average_Submitted_Charge=53.0, Average_Medicare_Allowed=16.48, Average_Medicare_Payment=16.48, Average_Standardized_Payment=16.22, Z_Outlier_Flag_Total_Beneficiaries=0, IQR_Outlier_Flag_Total_Beneficiaries=0, Z_Outlier_Flag_Total_Services=0, IQR_Outlier_Flag_Total_Services=0, Z_Outlier_Flag_Total_Beneficiary_Days=0, IQR_Outlier_Flag_Total_Beneficiary_Days=0, Z_Outlier_Flag_Average_Submitted_Charge=0, IQR_Outlier_Flag_Average_Submitted_Charge=0, Z_Ou

In [7]:
print(df.show(5))

[Stage 3:>                                                          (0 + 1) / 1]

+-----------+------------------+-------------------+--------------------+-------------------+--------------+------------------+-------------------------+----------------------+--------------+---------------------+-------------------+--------------+----------------------+------------------------+------------------------+------------------------+----------------------------+----------------------------------+------------------------------------+-----------------------------+-------------------------------+-------------------------------------+---------------------------------------+---------------------------------------+-----------------------------------------+---------------------------------------+-----------------------------------------+---------------------------------------+-----------------------------------------+-------------------------------------------+---------------------------------------------+--------------------------------------+---------------------------------+----

                                                                                

In [8]:
#Define Categorical and Numerical Columns.
categorical_columns = ["Provider_Credentials", "Provider_Type", "Provider_State", "Medicare_Participation"]
numerical_columns = [
    "Total_Beneficiaries", "Total_Services", "Total_Beneficiary_Days",
    "Average_Submitted_Charge", "Average_Medicare_Allowed",
    "Average_Medicare_Payment", "Average_Standardized_Payment"
]
outlier_columns = [col for col in df.columns if col.startswith("Mixed_Outlier_Flag_")]
target_column = "is_fraud"

In [9]:
indexers = [
    StringIndexer(inputCol=col, outputCol=f"{col}_Index").setHandleInvalid("keep")
    for col in categorical_columns
]
encoders = [
    OneHotEncoder(inputCol=f"{col}_Index", outputCol=f"{col}_OHE")
    for col in categorical_columns
]

In [10]:
#Feature Assemler
feature_columns = numerical_columns + outlier_columns + [f"{col}_OHE" for col in categorical_columns]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

#Scaling Numerical Features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")


In [11]:
#Target Variable
df = df.withColumn("label", col(target_column).cast("double"))

#Build the Transformation Pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, scaler])

In [12]:
transformed_df = pipeline.fit(df).transform(df)


24/12/13 07:29:18 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/12/13 07:29:48 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
                                                                                

In [13]:
final_dataset = transformed_df.select("scaled_features", "label")

In [None]:
# Function just to convert to help build data frame
def extract(row):
  return tuple(row.scaled_features.toArray().tolist())

extracted_df = final_dataset.rdd.map(extract).toDF(feature_columns)


In [None]:
final_dataset.show(5)

In [None]:
extracted_df.show(5)

In [None]:
final_dataset.show(5)

In [14]:
# Split the data into training and testing sets
from pyspark.ml.classification import LogisticRegression

(trainingData, testData) = final_dataset.randomSplit([0.7, 0.3])

# Create a LogisticRegression model
lr = LogisticRegression(featuresCol="scaled_features", labelCol="label")

# Fit the model on the training data
lrModel = lr.fit(trainingData)



24/12/13 07:29:59 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/12/13 07:31:03 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/12/13 07:32:06 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/12/13 07:32:08 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/12/13 07:32:09 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/12/13 07:32:10 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/12/13 07:32:11 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/12/13 07:32:12 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/12/13 07:32:13 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/12/13 07:32:14 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/12/13 07:32:15 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/12/13 07:32:16 WARN DAGScheduler: Broadcasting larg

24/12/13 07:33:50 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
                                                                                

In [15]:
# Make predictions on the test data
predictions = lrModel.transform(testData)

In [16]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol="label")
accuracy = evaluator.evaluate(predictions)

print("Accuracy:", accuracy)

24/12/13 07:35:14 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
24/12/13 07:36:44 WARN YarnAllocator: Container from a bad node: container_1734074618391_0001_01_000001 on host: fraud-detection-eecs6893-project-cluster-m.c.projectc-gv2359.internal. Exit status: 143. Diagnostics: [2024-12-13 07:36:44.469]Container killed on request. Exit code is 143
[2024-12-13 07:36:44.469]Container exited with a non-zero exit code 143. 
[2024-12-13 07:36:44.469]Killed by external signal
.
24/12/13 07:36:44 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 1 for reason Container from a bad node: container_1734074618391_0001_01_000001 on host: fraud-detection-eecs6893-project-cluster-m.c.projectc-gv2359.internal. Exit status: 143. Diagnostics: [2024-12-13 07:36:44.469]Container killed on request. Exit code is 143
[2024-12-13 07:36:44.469]Container exited with a non-zero exit code 143. 
[2024-12-13 07:36:44.469]Killed by external signal
.
24/12/13 

Accuracy: 0.9999985371364924


                                                                                