**PySpark** is the Python API for Apache Spark.

- Apache Spark is a powerful open-source engine for big data processing, real-time stream processing, and machine learning at scale.

- PySpark lets you use Python to work with Spark’s capabilities, so you can write Python code to run fast parallel processing jobs across large datasets.

In [1]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.2 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


##### from pyspark.sql import SparkSession
This means:

"Import" the SparkSession class from the pyspark.sql module.

- SparkSession is the main entry point to work with DataFrames in PySpark.

- Think of SparkSession as the control center where you start, configure, and run Spark commands in Python.

- Import the SparkSession tool. Then, set up a Spark engine named ‘Financial Fraud Detection’. If it's already running, use that. Otherwise, create a new one.
- 
- Without SparkSession, you can't use PySpark features like read.csv(), DataFrame, SQL, or MLlib models.

It's the first thing you write in any PySpark program — like import pandas in pandas-based code.

In [2]:
from pyspark.sql import SparkSession

# Create or get Spark session
spark = SparkSession.builder \
    .appName("Financial Fraud Detection") \
    .getOrCreate()


🔹 **header=True**
This tells Spark: “The first row in the file contains the column names.”

Without this, Spark would treat them as regular data.

🔹 **inferSchema=True**
This tells Spark: “Try to guess the correct data types for each column (like int, float, string).”

Without this, Spark would treat all columns as strings, which is not what we want.

In [3]:
df = spark.read.csv("fraudTrain.csv", header=True, inferSchema=True)


In [4]:
df.show(5)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M1230701703|      

In [5]:
df.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



In [6]:
df.select("type").distinct().show()

+--------+
|    type|
+--------+
|TRANSFER|
| CASH_IN|
|CASH_OUT|
| PAYMENT|
|   DEBIT|
+--------+



In [7]:
df.groupBy("isFraud").count().show()

+-------+-------+
|isFraud|  count|
+-------+-------+
|      1|   8213|
|      0|6354407|
+-------+-------+



we have a classic class imbalance problem:

So only about 0.13% of your data is fraud, which means:

A model that always predicts "Not Fraud" will still be 99.9% accurate — but completely useless! 

We’ll handle this later using class weighting or undersampling, but first, let’s continue data exploration.

In [8]:
df.groupBy("type", "isFraud").count().orderBy("type", "isFraud").show()


+--------+-------+-------+
|    type|isFraud|  count|
+--------+-------+-------+
| CASH_IN|      0|1399284|
|CASH_OUT|      0|2233384|
|CASH_OUT|      1|   4116|
|   DEBIT|      0|  41432|
| PAYMENT|      0|2151495|
|TRANSFER|      0| 528812|
|TRANSFER|      1|   4097|
+--------+-------+-------+



**Observation**
Fraud only happens in:

- TRANSFER

- CASH_OUT

Other transaction types never have fraud (PAYMENT, CASH_IN, DEBIT).

In [9]:
from pyspark.sql.functions import col, sum as Fsum

# Sum of nulls in each column
df.select([Fsum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()


+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|step|type|amount|nameOrig|oldbalanceOrg|newbalanceOrig|nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|   0|   0|     0|       0|            0|             0|       0|             0|             0|      0|             0|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+



Now we’ll keep only the rows where type is TRANSFER or CASH_OUT, because:

- These are the only transaction types where fraud occurs

- Including the others would add noise and confuse the model

In [10]:
df_filtered = df.filter((df.type == "TRANSFER") | (df.type == "CASH_OUT"))
df_filtered.select("type", "isFraud").groupBy("type", "isFraud").count().show()


+--------+-------+-------+
|    type|isFraud|  count|
+--------+-------+-------+
|CASH_OUT|      0|2233384|
|TRANSFER|      1|   4097|
|CASH_OUT|      1|   4116|
|TRANSFER|      0| 528812|
+--------+-------+-------+



Some columns are not useful for machine learning because:

- They contain IDs or names (nameOrig, nameDest) which won’t help the model
- They may leak target info or be too specific
- We only need useful numerical features and labels

In [11]:
# Drop unnecessary columns
df_model = df_filtered.drop("nameOrig", "nameDest", "isFlaggedFraud", "step")

# View the first few rows
df_model.show(5)


+--------+---------+-------------+--------------+--------------+--------------+-------+
|    type|   amount|oldbalanceOrg|newbalanceOrig|oldbalanceDest|newbalanceDest|isFraud|
+--------+---------+-------------+--------------+--------------+--------------+-------+
|TRANSFER|    181.0|        181.0|           0.0|           0.0|           0.0|      1|
|CASH_OUT|    181.0|        181.0|           0.0|       21182.0|           0.0|      1|
|CASH_OUT|229133.94|      15325.0|           0.0|        5083.0|      51513.44|      0|
|TRANSFER| 215310.3|        705.0|           0.0|       22425.0|           0.0|      0|
|TRANSFER|311685.89|      10835.0|           0.0|        6267.0|    2719172.89|      0|
+--------+---------+-------------+--------------+--------------+--------------+-------+
only showing top 5 rows


- Machine Learning models can’t work with text labels like "TRANSFER" or "CASH_OUT" —
We need to convert them into numbers.

In [13]:
from pyspark.ml.feature import StringIndexer

# Convert 'type' to numeric using StringIndexer
indexer = StringIndexer(inputCol="type", outputCol="type_index")
df_indexed = indexer.fit(df_model).transform(df_model)

# Show the transformed result
df_indexed.select("type", "type_index").distinct().show()


+--------+----------+
|    type|type_index|
+--------+----------+
|CASH_OUT|       0.0|
|TRANSFER|       1.0|
+--------+----------+



**Combine All Features into a Single Feature Vector**
- MLlib models in PySpark expect one column named "features" that contains all input features packed together as a vector.

- We’ll use *VectorAssembler* to do this.

In [14]:
from pyspark.ml.feature import VectorAssembler

# Define the feature columns
feature_cols = ["amount", "oldbalanceOrg", "newbalanceOrig", "oldbalanceDest", "newbalanceDest", "type_index"]

# Create the assembler
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform the data
df_final = assembler.transform(df_indexed)

# Show the final DataFrame with features
df_final.select("features", "isFraud").show(5, truncate=False)


+---------------------------------------------+-------+
|features                                     |isFraud|
+---------------------------------------------+-------+
|[181.0,181.0,0.0,0.0,0.0,1.0]                |1      |
|[181.0,181.0,0.0,21182.0,0.0,0.0]            |1      |
|[229133.94,15325.0,0.0,5083.0,51513.44,0.0]  |0      |
|[215310.3,705.0,0.0,22425.0,0.0,1.0]         |0      |
|[311685.89,10835.0,0.0,6267.0,2719172.89,1.0]|0      |
+---------------------------------------------+-------+
only showing top 5 rows


**Train a Classification Model (Logistic Regression)** :
 
 We’ll use *Logistic Regression* for now because:

- It’s a standard model for binary classification
- It works well as a baseline
- It gives probabilities, which is helpful for fraud detection

In [15]:
from pyspark.ml.classification import LogisticRegression

# Create the Logistic Regression model
lr = LogisticRegression(featuresCol="features", labelCol="isFraud")

# Fit the model to the data
lr_model = lr.fit(df_final)

# Make predictions
predictions = lr_model.transform(df_final)

# Show predictions
predictions.select("features", "isFraud", "prediction", "probability").show(5, truncate=False)


+---------------------------------------------+-------+----------+-------------------------------------------+
|features                                     |isFraud|prediction|probability                                |
+---------------------------------------------+-------+----------+-------------------------------------------+
|[181.0,181.0,0.0,0.0,0.0,1.0]                |1      |0.0       |[0.9808727271585423,0.019127272841457654]  |
|[181.0,181.0,0.0,21182.0,0.0,0.0]            |1      |0.0       |[0.9958119075800348,0.004188092419965206]  |
|[229133.94,15325.0,0.0,5083.0,51513.44,0.0]  |0      |0.0       |[0.9999385873232427,6.141267675729978E-5]  |
|[215310.3,705.0,0.0,22425.0,0.0,1.0]         |0      |0.0       |[0.9996283155591873,3.716844408127029E-4]  |
|[311685.89,10835.0,0.0,6267.0,2719172.89,1.0]|0      |0.0       |[0.9999999999501343,4.9865667151038906E-11]|
+---------------------------------------------+-------+----------+-------------------------------------------+
o

In [16]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Use the built-in evaluator
evaluator = BinaryClassificationEvaluator(
    labelCol="isFraud",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

# Evaluate using AUC-ROC (common for binary classification)
auc = evaluator.evaluate(predictions)

print(f"🚀 Area Under ROC Curve (AUC): {auc:.4f}")


🚀 Area Under ROC Curve (AUC): 0.9820


In [17]:
# Count total fraud and non-fraud
fraud_count = df_final.filter(df_final.isFraud == 1).count()
nonfraud_count = df_final.filter(df_final.isFraud == 0).count()

# Total rows
total_count = fraud_count + nonfraud_count

# Calculate balancing ratio
balance_ratio = nonfraud_count / fraud_count
print(f"⚖️ Class Weight (0 vs 1): {nonfraud_count} / {fraud_count} = {balance_ratio:.2f}")


⚖️ Class Weight (0 vs 1): 2762196 / 8213 = 336.32


This means:

- Fraud transactions are 336 times rarer than normal ones — so we need to give fraud cases 336x more weight during training to help the model learn from them.

In [18]:
from pyspark.sql.functions import when, lit

# Create new column 'classWeight' based on label
df_weighted = df_final.withColumn(
    "classWeight",
    when(df_final.isFraud == 1, lit(balance_ratio)).otherwise(lit(1.0))
)

# Show example rows
df_weighted.select("isFraud", "classWeight").distinct().show()


+-------+-----------------+
|isFraud|      classWeight|
+-------+-----------------+
|      1|336.3199805186899|
|      0|              1.0|
+-------+-----------------+



In [19]:
from pyspark.ml.classification import LogisticRegression

# Create a new logistic regression model using weight column
lr_weighted = LogisticRegression(
    featuresCol="features",
    labelCol="isFraud",
    weightCol="classWeight"  # This is new!
)

# Train the model on the weighted dataset
lr_weighted_model = lr_weighted.fit(df_weighted)

# Make predictions
predictions_weighted = lr_weighted_model.transform(df_weighted)

# Show the first few predictions
predictions_weighted.select("isFraud", "prediction", "probability").show(5, truncate=False)


+-------+----------+-------------------------------------------+
|isFraud|prediction|probability                                |
+-------+----------+-------------------------------------------+
|1      |1.0       |[0.3947840478627993,0.6052159521372007]    |
|1      |0.0       |[0.7197118142070917,0.28028818579290826]   |
|0      |0.0       |[0.8235703680978783,0.1764296319021217]    |
|0      |1.0       |[0.3638526816945722,0.6361473183054278]    |
|0      |0.0       |[0.9999999999999891,1.0880185641326534E-14]|
+-------+----------+-------------------------------------------+
only showing top 5 rows


In [20]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Create evaluator for binary classification
evaluator_weighted = BinaryClassificationEvaluator(
    labelCol="isFraud",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

# Evaluate the weighted model
auc_weighted = evaluator_weighted.evaluate(predictions_weighted)

print(f"🚀 New AUC with Class Weighting: {auc_weighted:.4f}")


🚀 New AUC with Class Weighting: 0.9756


Why this is good:
The weighted model now predicts more actual frauds (recall↑) even though AUC dropped a bit.

💡 In real-world fraud detection:

- Catching more fraud is more important than perfect AUC.

- Even false positives are acceptable, but missing a fraud isn't.

In [21]:
# Convert Spark predictions to Pandas DataFrame
preds_pd = predictions_weighted.select("isFraud", "prediction").toPandas()

# Import scikit-learn metrics
from sklearn.metrics import classification_report

# Generate and print classification report
print(classification_report(preds_pd['isFraud'], preds_pd['prediction'], digits=4))


              precision    recall  f1-score   support

           0     0.9996    0.9348    0.9662   2762196
           1     0.0388    0.8852    0.0744      8213

    accuracy                         0.9347   2770409
   macro avg     0.5192    0.9100    0.5203   2770409
weighted avg     0.9968    0.9347    0.9635   2770409



**observation**

✅ Recall = 88.52% for Fraud
This is excellent.

You are catching almost 9 out of 10 fraud cases!

That's thanks to the class weighting you applied — it worked perfectly.

⚠️ Precision = 3.88% for Fraud
This is very low — meaning:

Most transactions flagged as fraud by the model are actually not fraud.
For every 100 fraud alerts, only ~4 are real fraud.

🧠 This is common in fraud detection — and often acceptable — because:

Catching fraud is top priority ✅

False alerts can be manually reviewed or auto-flagged in production

🤝 F1-Score = 7.44% for Fraud
A balanced metric that reflects the trade-off:

Precision is low

Recall is high

Improving F1-score is your next goal.

In [22]:
# Split the data into training (80%) and testing (20%)
train_data, test_data = df_weighted.randomSplit([0.8, 0.2], seed=42)

# Show how many records in each set
print(f"Training set count: {train_data.count()}")
print(f"Testing set count : {test_data.count()}")


Training set count: 2216340
Testing set count : 554069


In [23]:
from pyspark.ml.classification import LogisticRegression

# Create logistic regression model with class weighting
lr = LogisticRegression(
    featuresCol="features",
    labelCol="isFraud",
    weightCol="classWeight"  # still using our weight column
)

# Train the model on training data
lr_model = lr.fit(train_data)

# Predict on test data
test_predictions = lr_model.transform(test_data)

# Show example predictions
test_predictions.select("isFraud", "prediction", "probability").show(5, truncate=False)


+-------+----------+----------------------------------------+
|isFraud|prediction|probability                             |
+-------+----------+----------------------------------------+
|0      |0.0       |[0.8122403802484877,0.18775961975151234]|
|0      |0.0       |[0.7887736886997534,0.2112263113002466] |
|0      |0.0       |[0.8066613595422412,0.19333864045775884]|
|0      |0.0       |[0.7711090960153837,0.2288909039846163] |
|0      |0.0       |[0.9598815611459302,0.04011843885406985]|
+-------+----------+----------------------------------------+
only showing top 5 rows


In [24]:
# AUC evaluation
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator_test = BinaryClassificationEvaluator(
    labelCol="isFraud",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

auc_test = evaluator_test.evaluate(test_predictions)
print(f"🚀 AUC on Test Data: {auc_test:.4f}")


🚀 AUC on Test Data: 0.9767


In [25]:
# Convert to Pandas for sklearn metrics
test_preds_pd = test_predictions.select("isFraud", "prediction").toPandas()

from sklearn.metrics import classification_report

# Print the classification report
print(classification_report(test_preds_pd['isFraud'], test_preds_pd['prediction'], digits=4))


              precision    recall  f1-score   support

           0     0.9997    0.9346    0.9660    552433
           1     0.0387    0.8906    0.0742      1636

    accuracy                         0.9344    554069
   macro avg     0.5192    0.9126    0.5201    554069
weighted avg     0.9968    0.9344    0.9634    554069



### Train a Random Forest Classifier (With Class Weights)

In [26]:
from pyspark.ml.classification import RandomForestClassifier

# Create Random Forest model with class weights
rf = RandomForestClassifier(
    labelCol="isFraud",
    featuresCol="features",
    weightCol="classWeight",  # still using classWeight column
    numTrees=50,              # number of decision trees
    maxDepth=10               # maximum depth of trees
)

# Train on training data
rf_model = rf.fit(train_data)

# Predict on test data
rf_predictions = rf_model.transform(test_data)

# Show example predictions
rf_predictions.select("isFraud", "prediction", "probability").show(5, truncate=False)


+-------+----------+----------------------------------------+
|isFraud|prediction|probability                             |
+-------+----------+----------------------------------------+
|0      |0.0       |[0.8780176652799115,0.1219823347200885] |
|0      |0.0       |[0.8915183833835644,0.10848161661643556]|
|0      |0.0       |[0.8780176652799115,0.1219823347200885] |
|0      |0.0       |[0.934160468095669,0.0658395319043311]  |
|0      |0.0       |[0.7301419271947941,0.2698580728052059] |
+-------+----------+----------------------------------------+
only showing top 5 rows


In [27]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Create evaluator for Random Forest
evaluator_rf = BinaryClassificationEvaluator(
    labelCol="isFraud",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

# Evaluate AUC
auc_rf = evaluator_rf.evaluate(rf_predictions)
print(f"🌲 AUC with Random Forest: {auc_rf:.4f}")


🌲 AUC with Random Forest: 0.9967


**Observation** : This is near perfect!

- The model very effectively separates fraud from non-fraud

- In real-world fraud detection, this is an excellent AUC

In [28]:
# Convert to pandas
rf_preds_pd = rf_predictions.select("isFraud", "prediction").toPandas()

from sklearn.metrics import classification_report

# Generate report
print(classification_report(rf_preds_pd['isFraud'], rf_preds_pd['prediction'], digits=4))


              precision    recall  f1-score   support

           0     1.0000    0.9809    0.9904    552433
           1     0.1335    0.9933    0.2354      1636

    accuracy                         0.9809    554069
   macro avg     0.5667    0.9871    0.6129    554069
weighted avg     0.9974    0.9809    0.9881    554069



🔍 **Class-wise Interpretation**

✅ Class 0 (Non-Fraud)

- Precision = 100% → Almost no false positives flagged as fraud
- Recall = 98.09% → Almost all legit transactions are correctly classified

⚠️ Class 1 (Fraud)

- Precision = 13.35% → Slightly better than Logistic Regression (was 3.87%)
- Recall = 99.33% → Incredible! Almost every fraud was caught
- F1-Score = 23.54% → Huge jump from logistic regression (was ~7.4%)



In [30]:
# Sample transaction for prediction
new_data = spark.createDataFrame([
    (10000.0, 10000.0, 0.0, 0.0, 0.0, 1.0)  # values: amount, oldbalanceOrg, newbalanceOrig, oldbalanceDest, newbalanceDest, type_index
], ["amount", "oldbalanceOrg", "newbalanceOrig", "oldbalanceDest", "newbalanceDest", "type_index"])


In [31]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=["amount", "oldbalanceOrg", "newbalanceOrig", "oldbalanceDest", "newbalanceDest", "type_index"], outputCol="features")
new_data = assembler.transform(new_data)


In [32]:
prediction = rf_model.transform(new_data)
prediction.select("prediction", "probability").show()


Py4JJavaError: An error occurred while calling o2091.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 302.0 failed 1 times, most recent failure: Lost task 0.0 in stage 302.0 (TID 974) (DESKTOP-C0TRA0G executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:252)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:143)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:158)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:178)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:261)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1623)
Caused by: java.net.SocketTimeoutException: Timed out while waiting for the Python worker to connect back
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:234)
	... 33 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$3(DAGScheduler.scala:2935)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2935)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2927)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2927)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1295)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1295)
	at scala.Option.foreach(Option.scala:437)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1295)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3207)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3141)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3130)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:50)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2484)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2505)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2524)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:544)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:497)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:58)
	at org.apache.spark.sql.classic.Dataset.collectFromPlan(Dataset.scala:2244)
	at org.apache.spark.sql.classic.Dataset.$anonfun$head$1(Dataset.scala:1379)
	at org.apache.spark.sql.classic.Dataset.$anonfun$withAction$2(Dataset.scala:2234)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
	at org.apache.spark.sql.classic.Dataset.$anonfun$withAction$1(Dataset.scala:2232)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:162)
	at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:268)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:124)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
	at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:124)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:291)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:123)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:77)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:233)
	at org.apache.spark.sql.classic.Dataset.withAction(Dataset.scala:2232)
	at org.apache.spark.sql.classic.Dataset.head(Dataset.scala:1379)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2810)
	at org.apache.spark.sql.classic.Dataset.getRows(Dataset.scala:339)
	at org.apache.spark.sql.classic.Dataset.showString(Dataset.scala:375)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:578)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:1623)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:252)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:143)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:158)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:178)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:261)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	... 1 more
Caused by: java.net.SocketTimeoutException: Timed out while waiting for the Python worker to connect back
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:234)
	... 33 more


In [33]:
import pandas as pd
# Define your feature names (order matters!)
features = ["amount", "oldbalanceOrg", "newbalanceOrig", "oldbalanceDest", "newbalanceDest", "type_index"]
# Get importance
importances = rf_model.featureImportances.toArray()
# Combine and sort
fi_df = pd.DataFrame(list(zip(features, importances)), columns=["Feature", "Importance"])
fi_df.sort_values(by="Importance", ascending=False, inplace=True)
print(fi_df)


          Feature  Importance
1   oldbalanceOrg    0.502758
4  newbalanceDest    0.156525
0          amount    0.147381
2  newbalanceOrig    0.097380
3  oldbalanceDest    0.064319
5      type_index    0.031637


In [34]:
rf_predictions.select("isFraud", "prediction", "probability") \
              .toPandas() \
              .to_csv("rf_predictions_output.csv", index=False)


📌 What it does:

Converts the Spark DataFrame (rf_predictions) into a Pandas DataFrame.

Saves it to your local directory as rf_predictions_output.csv.

