In [56]:
spark.version

'3.5.3'

In [57]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer, OneHotEncoder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
import pyspark.sql.functions as F

### Lectura datos

In [58]:
# Leer CSV desde Cloud Storage (GCS) con inferencia de tipos y encabezado
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("gs://dataproc_spark_tfm/Base.csv")

                                                                                

### Selección de tantos casos de fraude como no fraude

In [59]:
# Step 1: Separate the fraud and non-fraud transactions
fraud_df = df.filter(df.fraud_bool == 1)
nonfraud_df = df.filter(df.fraud_bool == 0)

# Step 2: Count frauds
fraud_count = fraud_df.count()

# Step 3: Take same number of non-fraud rows randomly
nonfraud_sample = nonfraud_df.sample(withReplacement=False, fraction=1.0).limit(fraud_count)

# Step 4: Union both
balanced_df = fraud_df.union(nonfraud_sample)

# Optional: Shuffle the data
df = balanced_df.orderBy(F.rand())

# Show result
df.groupBy("fraud_bool").count().show()



+----------+-----+
|fraud_bool|count|
+----------+-----+
|         1|11029|
|         0|11029|
+----------+-----+



                                                                                

### Visualización de los datos

In [60]:
df.show(2)



+----------+------------------+---------------------+-------------------------+----------------------------+------------+------------------+----------------------+------------+------------+-----------------+------------------+-----------------+--------------------+--------------------------------+-----------------+-----------------+-------------+--------------+----------------+------------------+-----------------+---------------+---------------------+---------------+--------+-------------------------+---------+------------------+-------------------------+------------------+-----+
|fraud_bool|            income|name_email_similarity|prev_address_months_count|current_address_months_count|customer_age|days_since_request|intended_balcon_amount|payment_type|zip_count_4w|      velocity_6h|      velocity_24h|      velocity_4w|bank_branch_count_8w|date_of_birth_distinct_emails_4w|employment_status|credit_risk_score|email_is_free|housing_status|phone_home_valid|phone_mobile_valid|bank_months_co

                                                                                

In [61]:
df.columns

['fraud_bool',
 'income',
 'name_email_similarity',
 'prev_address_months_count',
 'current_address_months_count',
 'customer_age',
 'days_since_request',
 'intended_balcon_amount',
 'payment_type',
 'zip_count_4w',
 'velocity_6h',
 'velocity_24h',
 'velocity_4w',
 'bank_branch_count_8w',
 'date_of_birth_distinct_emails_4w',
 'employment_status',
 'credit_risk_score',
 'email_is_free',
 'housing_status',
 'phone_home_valid',
 'phone_mobile_valid',
 'bank_months_count',
 'has_other_cards',
 'proposed_credit_limit',
 'foreign_request',
 'source',
 'session_length_in_minutes',
 'device_os',
 'keep_alive_session',
 'device_distinct_emails_8w',
 'device_fraud_count',
 'month']

In [62]:
df.printSchema()

root
 |-- fraud_bool: integer (nullable = true)
 |-- income: double (nullable = true)
 |-- name_email_similarity: double (nullable = true)
 |-- prev_address_months_count: integer (nullable = true)
 |-- current_address_months_count: integer (nullable = true)
 |-- customer_age: integer (nullable = true)
 |-- days_since_request: double (nullable = true)
 |-- intended_balcon_amount: double (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- zip_count_4w: integer (nullable = true)
 |-- velocity_6h: double (nullable = true)
 |-- velocity_24h: double (nullable = true)
 |-- velocity_4w: double (nullable = true)
 |-- bank_branch_count_8w: integer (nullable = true)
 |-- date_of_birth_distinct_emails_4w: integer (nullable = true)
 |-- employment_status: string (nullable = true)
 |-- credit_risk_score: integer (nullable = true)
 |-- email_is_free: integer (nullable = true)
 |-- housing_status: string (nullable = true)
 |-- phone_home_valid: integer (nullable = true)
 |-- phone_mobil

In [63]:
numeric_cols = [
    "income",
    "name_email_similarity",
    "prev_address_months_count",
    "current_address_months_count",
    "customer_age",
    "days_since_request",
    "intended_balcon_amount",
    "zip_count_4w",
    "velocity_6h",
    "velocity_24h",
    "velocity_4w",
    "bank_branch_count_8w",
    "date_of_birth_distinct_emails_4w",
    "credit_risk_score",
    "email_is_free",
    "phone_home_valid",
    "phone_mobile_valid",
    "bank_months_count",
    "has_other_cards",
    "proposed_credit_limit",
    "foreign_request",
    "session_length_in_minutes",
    "keep_alive_session",
    "device_distinct_emails_8w",
    "device_fraud_count",
    "month"
]

In [64]:
df = df.dropna()

### VectorAssembler

In [65]:
# recibe varias columnas y las concatena en una sola de tipo vector, de longitud igual al número de columnas que se quieran ensamblar.
assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features")

### Modelo

In [66]:
lr = LogisticRegression(
    featuresCol="features",
    labelCol="fraud_bool",
    maxIter=100,    # max number of iterations
    regParam=0.01,  # regularization parameter
    elasticNetParam=0.0  # type of regularization (0=L2, 1=L1)
)

### Pipeline
Secuencia de etapas que se ejecutan en un cierto orden

In [67]:
pipeline = Pipeline(stages=[assembler, lr])

### Division de los datos para entrenamiento y testeo

In [68]:
# Division de datos
train, test = df.randomSplit([0.8, 0.2], seed=42)

### Fit
Entrenamiento de los datos

In [69]:
model = pipeline.fit(train)



### Transform
Predicción del modelo

In [70]:
predictions = model.transform(test)
predictions.filter("fraud_bool = 0") \
    .select("prediction", "fraud_bool", "probability") \
    .show(5, truncate=False)



+----------+----------+-----------------------------------------+
|prediction|fraud_bool|probability                              |
+----------+----------+-----------------------------------------+
|0.0       |0         |[0.7944344960205908,0.2055655039794092]  |
|0.0       |0         |[0.9497949623082953,0.0502050376917047]  |
|0.0       |0         |[0.8835071831458258,0.11649281685417423] |
|0.0       |0         |[0.9581510509511154,0.04184894904888459] |
|0.0       |0         |[0.9608475558824348,0.039152444117565244]|
+----------+----------+-----------------------------------------+
only showing top 5 rows



                                                                                

In [71]:
predictions.filter("fraud_bool = 1") \
    .select("prediction", "fraud_bool", "probability") \
    .show(5, truncate=False)



+----------+----------+-----------------------------------------+
|prediction|fraud_bool|probability                              |
+----------+----------+-----------------------------------------+
|1.0       |1         |[0.04495718411819828,0.9550428158818017] |
|0.0       |1         |[0.802094026410367,0.19790597358963302]  |
|1.0       |1         |[0.10574801619463516,0.8942519838053649] |
|1.0       |1         |[0.026993492781650554,0.9730065072183495]|
|1.0       |1         |[0.012263292208461177,0.9877367077915388]|
+----------+----------+-----------------------------------------+
only showing top 5 rows



                                                                                

### Saving model to feed dataproc with flink

In [72]:
model.write().overwrite().save("gs://dataproc_spark_tfm/modelo_fraude_csv2")
# model.save("gs://dataproc_spark_tfm/modelo_autoencoder.h5")

                                                                                

### Saving coefficients

In [73]:
# Obtener la última etapa del pipeline (el modelo entrenado)
lr_model = model.stages[-1]

# Verifica que es un LogisticRegressionModel
print(type(lr_model))

<class 'pyspark.ml.classification.LogisticRegressionModel'>


In [74]:
import json

coefficients = lr_model.coefficients.toArray()
intercept = lr_model.intercept

model_dict = {
    "coefficients": coefficients.tolist(),
    "intercept": intercept
}

In [75]:
# Mostrar por pantalla
print(json.dumps(model_dict, indent=2))

{
  "coefficients": [
    0.8614079855782318,
    -0.5120002106288041,
    -0.004811517073836162,
    0.0017975213474542811,
    0.02143566699047804,
    0.014672130836650112,
    -0.007942170267935903,
    -3.6536220394025224e-06,
    -2.449173754443497e-05,
    -0.0001072310108661983,
    -0.0005162204622894853,
    -0.00012623182756095835,
    -0.03548309162011287,
    0.0026267305198610586,
    0.394023327748726,
    -0.7793561537871726,
    -0.541193104879032,
    -0.0007248703370028166,
    -0.9752028014785024,
    0.0001949947098751543,
    0.17664277708003343,
    0.000637584111118107,
    -0.8239110093824827,
    0.5703388959167144,
    0.0,
    0.9857382000216143
  ],
  "intercept": 1.7671751987160402
}


In [76]:
# Guardar localmente
with open("/tmp/lr_model2.json", "w") as f:
    json.dump(model_dict, f, indent=2)

In [77]:
# Subir a GCS
!gsutil cp /tmp/lr_model2.json gs://dataproc_spark_tfm/lr_model2.json

Copying file:///tmp/lr_model2.json [Content-Type=application/json]...
/ [1 files][  722.0 B/  722.0 B]                                                
Operation completed over 1 objects/722.0 B.                                      


### Individual prediction

In [78]:
from pyspark.sql.functions import rand

test_sample = test.orderBy(rand()).limit(1)
test_sample.show(truncate=False)
individual_prediction = model.transform(test_sample)
individual_prediction.select("prediction", "fraud_bool", "probability").show(truncate=False)

                                                                                

+----------+------------------+---------------------+-------------------------+----------------------------+------------+------------------+----------------------+------------+------------+-----------------+-----------------+-----------------+--------------------+--------------------------------+-----------------+-----------------+-------------+--------------+----------------+------------------+-----------------+---------------+---------------------+---------------+--------+-------------------------+---------+------------------+-------------------------+------------------+-----+
|fraud_bool|income            |name_email_similarity|prev_address_months_count|current_address_months_count|customer_age|days_since_request|intended_balcon_amount|payment_type|zip_count_4w|velocity_6h      |velocity_24h     |velocity_4w      |bank_branch_count_8w|date_of_birth_distinct_emails_4w|employment_status|credit_risk_score|email_is_free|housing_status|phone_home_valid|phone_mobile_valid|bank_months_coun



+----------+----------+----------------------------------------+
|prediction|fraud_bool|probability                             |
+----------+----------+----------------------------------------+
|1.0       |1         |[0.05954203233273887,0.9404579676672611]|
+----------+----------+----------------------------------------+



                                                                                

### Métricas para cuantificar la predicción del modelo

In [79]:
from pyspark.mllib.evaluation import MulticlassMetrics

In [80]:
# 'fraud_bool' debe ser numérico (0 o 1)
predictionAndLabels = predictions.select("prediction", "fraud_bool") \
                                 .rdd.map(lambda row: (float(row["prediction"]), float(row["fraud_bool"])))




In [81]:
metrics = MulticlassMetrics(predictionAndLabels)

                                                                                

In [82]:
# Matriz de confusión
confusion_matrix = metrics.confusionMatrix().toArray()
print("Matriz de confusión:\n", confusion_matrix)

# Métricas principales
print("Accuracy:", metrics.accuracy)
print("Precision para clase 1:", metrics.precision(1.0))
print("Recall para clase 1:", metrics.recall(1.0))
print("F1 Score para clase 1:", metrics.fMeasure(1.0))



Matriz de confusión:
 [[2073.   66.]
 [ 280. 1885.]]
Accuracy: 0.9196096654275093
Precision para clase 1: 0.9661711942593542
Recall para clase 1: 0.8706697459584296
F1 Score para clase 1: 0.9159378036929058


                                                                                