In [1]:
import os
os.environ["JAVA_HOME"] = "/usr"

In [2]:
import pyspark 

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.sql("Select 'spark' as hello ")

df.show()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/07/27 12:25:20 WARN Utils: Your hostname, Corei9-13900K-64GB, resolves to a loopback address: 127.0.1.1; using 143.107.145.69 instead (on interface enp3s0)
25/07/27 12:25:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/27 12:25:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+-----+
|hello|
+-----+
|spark|
+-----+



In [3]:
df = spark.read.csv("../data/raw/Base.csv", header=True, inferSchema=True)
df.show()

[Stage 2:=>                                                       (1 + 31) / 32]

+----------+------------------+---------------------+-------------------------+----------------------------+------------+------------------+----------------------+------------+------------+------------------+------------------+------------------+--------------------+--------------------------------+-----------------+-----------------+-------------+--------------+----------------+------------------+-----------------+---------------+---------------------+---------------+--------+-------------------------+---------+------------------+-------------------------+------------------+-----+
|fraud_bool|            income|name_email_similarity|prev_address_months_count|current_address_months_count|customer_age|days_since_request|intended_balcon_amount|payment_type|zip_count_4w|       velocity_6h|      velocity_24h|       velocity_4w|bank_branch_count_8w|date_of_birth_distinct_emails_4w|employment_status|credit_risk_score|email_is_free|housing_status|phone_home_valid|phone_mobile_valid|bank_month

25/07/27 12:25:24 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [4]:
df.describe().show()



+-------+-----------------+-------------------+---------------------+-------------------------+----------------------------+-----------------+--------------------+----------------------+------------+------------------+-------------------+------------------+-----------------+--------------------+--------------------------------+-----------------+-----------------+------------------+--------------+------------------+-------------------+------------------+-------------------+---------------------+------------------+--------+-------------------------+---------+-------------------+-------------------------+------------------+------------------+
|summary|       fraud_bool|             income|name_email_similarity|prev_address_months_count|current_address_months_count|     customer_age|  days_since_request|intended_balcon_amount|payment_type|      zip_count_4w|        velocity_6h|      velocity_24h|      velocity_4w|bank_branch_count_8w|date_of_birth_distinct_emails_4w|employment_status|credi

                                                                                

As noted in the EDA, the column device_fraud_count have all of its values as the same value. We will first drop it.

In [5]:
df = df.drop("device_fraud_count")

### Missing Values

Columns that present missing values:
- prev_address_months_count (-1)
- current_address_months_count (-1)
- intended_balcon_amount ([-16, <0])
- bank_months_count (-1)
- session_length_in_minutes (-1)
- device_distinct_emails (-1)

In [6]:
from pyspark.sql.functions import col, when

df = df.withColumn(
    "prev_address_months_count",
    when(col("prev_address_months_count") == -1, 0)
    .otherwise(col("prev_address_months_count"))
)

In [7]:
df = df.withColumn(
    "current_address_months_count",
    when(col("current_address_months_count") == -1, 0)
    .otherwise(col("current_address_months_count"))
)

In [8]:
from pyspark.sql.functions import lit

# Filter out negative values and compute median
positive_df = df.filter(col("intended_balcon_amount") >= 0)
median_value = positive_df.approxQuantile("intended_balcon_amount", [0.5], 0.01)[0]

df = df.withColumn(
    "intended_balcon_amount",
    when(col("intended_balcon_amount") == -1, lit(median_value))
    .otherwise(col("intended_balcon_amount"))
)

In [9]:
df = df.withColumn(
    "bank_months_count",
    when(col("bank_months_count") == -1, 0)
    .otherwise(col("bank_months_count"))
)

In [10]:
df = df.withColumn(
    "bank_months_count",
    when(col("bank_months_count") == -1, 0)
    .otherwise(col("bank_months_count"))
)

In [11]:
# Filter out negative values and compute median
positive_df = df.filter(col("session_length_in_minutes") >= 0)
median_value = positive_df.approxQuantile("session_length_in_minutes", [0.5], 0.01)[0]

df = df.withColumn(
    "session_length_in_minutes",
    when(col("session_length_in_minutes") == -1, lit(median_value))
    .otherwise(col("session_length_in_minutes"))
)

Median makes more sense in this case

In [12]:
df = df.withColumn(
    "bank_months_count",
    when(col("bank_months_count") == -1, 0)
    .otherwise(col("bank_months_count"))
)

In [13]:
df = df.withColumn(
    "device_distinct_emails_8w",
    when(col("device_distinct_emails_8w") == -1, 1)
    .otherwise(col("device_distinct_emails_8w"))
)

In [14]:
df.describe().show()



+-------+-----------------+-------------------+---------------------+-------------------------+----------------------------+-----------------+--------------------+----------------------+------------+------------------+-------------------+------------------+-----------------+--------------------+--------------------------------+-----------------+-----------------+------------------+--------------+------------------+-------------------+------------------+-------------------+---------------------+------------------+--------+-------------------------+---------+-------------------+-------------------------+------------------+
|summary|       fraud_bool|             income|name_email_similarity|prev_address_months_count|current_address_months_count|     customer_age|  days_since_request|intended_balcon_amount|payment_type|      zip_count_4w|        velocity_6h|      velocity_24h|      velocity_4w|bank_branch_count_8w|date_of_birth_distinct_emails_4w|employment_status|credit_risk_score|     e

                                                                                

### Splits

The month column is intended to be used to split the data

In [15]:
train_df = df.filter(col('month') < 6)
# we will keep the test df raw so we can simulate new data for the api
test_df = df.filter(col('month') >= 6)

train_df = train_df.drop('month')


train_df_processed = train_df

### Processing pipeline

In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StringType, IntegerType, DoubleType, FloatType, LongType
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

# identify categorical columns (string type) and numeric columns
categorical_cols = [field.name for field in train_df_processed.schema.fields if isinstance(field.dataType, StringType)]
# ignore fraud_bool column so we don't leak the data
numeric_cols = [field.name for field in train_df_processed.schema.fields if (isinstance(field.dataType, (IntegerType, DoubleType, FloatType, LongType))) and field.name != 'fraud_bool']

Let's first create a transformation Pipeline

In [17]:
stages = []

# StringIndexer + OneHotEncoder for categorical columns
for categorical_col in categorical_cols:
    string_indexer = StringIndexer(inputCol=categorical_col, outputCol=categorical_col + "_index")
    encoder = OneHotEncoder(inputCol=categorical_col + "_index", outputCol=categorical_col + "_encoded")
    stages += [string_indexer, encoder]

# Assemble numeric values as a vector
numeric_assembler = VectorAssembler(inputCols=numeric_cols, outputCol="numeric_features")
stages.append(numeric_assembler)

# Assemble all features (numeric + encoded categorical)
encoded_categorical_cols = [c + "_encoded" for c in categorical_cols]
assembler_inputs = ["numeric_features"] + encoded_categorical_cols
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
stages.append(assembler)

scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
stages.append(scaler)

pipeline = Pipeline(stages=stages)

In [18]:
# Fit pipeline to training data
pipeline_model = pipeline.fit(train_df)

train_df_processed = pipeline_model.transform(train_df_processed)

# Select only the needed columns
train_df_processed = train_df_processed.select(col('scaled_features').alias('features'), col('fraud_bool').alias('label'))

train_df_processed.show(1, truncate=False)

                                                                                

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|features                                                                                             

Now let's also keep the original df so we can use it for further analysis

In [19]:
from pyspark.sql.functions import monotonically_increasing_id
train_df = train_df.withColumnRenamed("fraud_bool", "label")
train_df = train_df.withColumn("id", monotonically_increasing_id())
train_df.show(1)
train_df_processed = train_df_processed.withColumn("id", monotonically_increasing_id())
train_df_processed.show(1)

+-----+------+---------------------+-------------------------+----------------------------+------------+------------------+----------------------+------------+------------+------------------+-----------------+-----------------+--------------------+--------------------------------+-----------------+-----------------+-------------+--------------+----------------+------------------+-----------------+---------------+---------------------+---------------+--------+-------------------------+---------+------------------+-------------------------+---+
|label|income|name_email_similarity|prev_address_months_count|current_address_months_count|customer_age|days_since_request|intended_balcon_amount|payment_type|zip_count_4w|       velocity_6h|     velocity_24h|      velocity_4w|bank_branch_count_8w|date_of_birth_distinct_emails_4w|employment_status|credit_risk_score|email_is_free|housing_status|phone_home_valid|phone_mobile_valid|bank_months_count|has_other_cards|proposed_credit_limit|foreign_reque

In [20]:
test_df = test_df.withColumn("id", monotonically_increasing_id())
test_df.show(1)

+----------+------+---------------------+-------------------------+----------------------------+------------+------------------+----------------------+------------+------------+------------------+-----------------+------------------+--------------------+--------------------------------+-----------------+-----------------+-------------+--------------+----------------+------------------+-----------------+---------------+---------------------+---------------+--------+-------------------------+---------+------------------+-------------------------+-----+------------+
|fraud_bool|income|name_email_similarity|prev_address_months_count|current_address_months_count|customer_age|days_since_request|intended_balcon_amount|payment_type|zip_count_4w|       velocity_6h|     velocity_24h|       velocity_4w|bank_branch_count_8w|date_of_birth_distinct_emails_4w|employment_status|credit_risk_score|email_is_free|housing_status|phone_home_valid|phone_mobile_valid|bank_months_count|has_other_cards|proposed

Let's save our pipeline model so we can use for future data

In [21]:
pipeline_model.write().overwrite().save("../models/processing_pipeline")

Now, when we have new data, all we need to do is load the processing pipeline, drop the month column, apply the pipeline and select the scaled_features as features and the fraud_bool column as label.

or in pseudo code:

```python
from pyspark.ml import PipelineModel
from pyspark.sql.functions import col

pipe = PipelineModel.load("../models/processing_pipeline")
df_processed = test_df.drop("month")

df_processed = pipe.transform(test_df_processed)

df_processed = df_processed.select(col('scaled_features').alias('features'), col('fraud_bool').alias('label'))
df_processed.show(1, truncate=False)

```

### Wrapping Up

In [None]:
from pathlib import Path
SAVE_DIR = Path("../data/processed")
SAVE_DIR.parent.mkdir(parents=True, exist_ok=True)

In [23]:
train_df_processed.write.parquet(str(SAVE_DIR / "Base_train.parquet"),mode="overwrite")
train_df.write.parquet(str(SAVE_DIR / "Base_train_feats.parquet"),mode="overwrite")
test_df.write.parquet(str(SAVE_DIR / "Base_test.parquet"),mode="overwrite")

25/07/27 12:25:34 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/07/27 12:25:34 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/07/27 12:25:34 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
25/07/27 12:25:34 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
25/07/27 12:25:34 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
25/07/27 12:25:34 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 58.46% for 13 writers
25/07/27 12:25:34 WARN MemoryManager: Total allocation exceeds 95.

In [24]:
new_train_df = spark.read.parquet(str(SAVE_DIR / "Base_train.parquet"))
new_train_df.show(1)

new_feats_df = spark.read.parquet(str(SAVE_DIR / "Base_train_feats.parquet"))
new_feats_df.show(1)

new_test_df = spark.read.parquet(str(SAVE_DIR / "Base_test.parquet"))
new_test_df.show(1)

+--------------------+-----+----------+
|            features|label|        id|
+--------------------+-----+----------+
|[0.18046796286410...|    0|8589934592|
+--------------------+-----+----------+
only showing top 1 row
+-----+------------------+---------------------+-------------------------+----------------------------+------------+------------------+----------------------+------------+------------+----------------+-----------------+-----------------+--------------------+--------------------------------+-----------------+-----------------+-------------+--------------+----------------+------------------+-----------------+---------------+---------------------+---------------+--------+-------------------------+---------+------------------+-------------------------+----------+
|label|            income|name_email_similarity|prev_address_months_count|current_address_months_count|customer_age|days_since_request|intended_balcon_amount|payment_type|zip_count_4w|     velocity_6h|     veloc