In [None]:
#Create SparkSession
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SyntheticFraudDetection") \
    .getOrCreate()

In [None]:
#Create features and prepare data
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

# Index categorical columns
indexer_type = StringIndexer(inputCol="transaction_type", outputCol="type_index")
indexer_country = StringIndexer(inputCol="country", outputCol="country_index")

# OneHot encoding
encoder = OneHotEncoder(
    inputCols=["type_index", "country_index"],
    outputCols=["type_vec", "country_vec"]
)

# Assemble features into a single vector
assembler = VectorAssembler(
    inputCols=["amount", "time_since_last_transaction", "type_vec", "country_vec"],
    outputCol="features"
)

# Pipeline
pipeline = Pipeline(stages=[indexer_type, indexer_country, encoder, assembler])
data_prepared = pipeline.fit(data).transform(data)

In [None]:
#Split into train and test sets
train, test = data_prepared.randomSplit([0.7, 0.3], seed=42)

In [None]:
#Train models
from pyspark.ml.classification import DecisionTreeClassifier, LogisticRegression

# Decision Tree
dt = DecisionTreeClassifier(labelCol="fraud", featuresCol="features")
model_dt = dt.fit(train)
predictions_dt = model_dt.transform(test)

# Logistic Regression
lr = LogisticRegression(labelCol="fraud", featuresCol="features")
model_lr = lr.fit(train)
predictions_lr = model_lr.transform(test)

In [None]:
#Evaluate performance
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(labelCol="fraud", metricName="areaUnderROC")

auc_dt = evaluator.evaluate(predictions_dt)
auc_lr = evaluator.evaluate(predictions_lr)

print(f"AUC - Decision Tree: {auc_dt:.3f}")
print(f"AUC - Logistic Regression: {auc_lr:.3f}")