In [None]:
# Notebook 03: Feature Engineering (Spark ML)

**Objetivo**: Construir variables (features) listas para modelado, siguiendo un enfoque incremental:
- Primero se valida el flujo con un subset pequeño (1.000–2.000 registros).
- Luego se ejecuta el mismo pipeline sobre todo el dataset (≥57.000).

**Salida**:
- Dataset transformado con columna `features` (Vector) y `label` (objetivo).
- Pipeline guardado para reutilización.


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, to_timestamp, year, month, dayofmonth,
    datediff, regexp_replace, log1p
)

from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
)

import os

In [2]:
spark = (SparkSession.builder
         .appName("SECOP_FeatureEngineering")
         .master("spark://spark-master:7077")
         .config("spark.executor.memory", "1g")
         .config("spark.executor.cores", "1")
         .getOrCreate())

spark.sparkContext.setLogLevel("WARN")
print("Spark Version:", spark.version)
print("Spark Master:", spark.sparkContext.master)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/15 01:01:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark Version: 3.5.0
Spark Master: spark://spark-master:7077


26/02/15 01:02:02 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [3]:
parquet_path = "/opt/spark-data/processed/secop_eda_q4_2025.parquet"
print("Leyendo:", parquet_path)

df = spark.read.parquet(parquet_path)
print("Registros:", df.count())
print("Columnas:", len(df.columns))


Leyendo: /opt/spark-data/processed/secop_eda_q4_2025.parquet




Registros: 60000
Columnas: 92


                                                                                

In [11]:
DEV_N = 2000  
df_dev = df.limit(DEV_N)

print("DEV registros:", df_dev.count())




DEV registros: 2000


                                                                                

In [12]:
# Parse robusto de fecha (viene tipo: 2025-10-09T00:00:00.000)
df_feat = (df_dev
           .withColumn("fecha_firma_ts", to_timestamp(col("fecha_de_firma"), "yyyy-MM-dd'T'HH:mm:ss.SSS"))
           .withColumn("anio_firma", year(col("fecha_firma_ts")))
           .withColumn("mes_firma", month(col("fecha_firma_ts")))
           .withColumn("dia_firma", dayofmonth(col("fecha_firma_ts")))
)

# Asegurar valor numérico 
df_feat = (df_feat
           .withColumn("valor_del_contrato_clean",
                       regexp_replace(col("valor_del_contrato"), r"[^0-9,\.]", ""))
           .withColumn("valor_del_contrato_num",
                       regexp_replace(col("valor_del_contrato_clean"), ",", ".").cast("double"))
)

# Label (objetivo) para regresión: log(1+valor) para reducir sesgo por outliers
df_feat = df_feat.withColumn("label", log1p(col("valor_del_contrato_num")))


In [9]:
cols = set(df_feat.columns)

if ("fecha_de_inicio_del_contrato" in cols) and ("fecha_de_fin_del_contrato" in cols):
    df_feat = (df_feat
               .withColumn("fecha_inicio_ts", to_timestamp(col("fecha_de_inicio_del_contrato"), "yyyy-MM-dd'T'HH:mm:ss.SSS"))
               .withColumn("fecha_fin_ts", to_timestamp(col("fecha_de_fin_del_contrato"), "yyyy-MM-dd'T'HH:mm:ss.SSS"))
               .withColumn("duracion_dias", datediff(col("fecha_fin_ts"), col("fecha_inicio_ts")))
              )
else:
  
    df_feat = df_feat.withColumn("duracion_dias", col("valor_del_contrato_num") * 0)  # 0 como placeholder


In [7]:
categorical_cols = [
    "departamento",
    "tipo_de_contrato",
    "estado_contrato",
    "modalidad_de_contratacion"
]

numeric_cols = [
    "anio_firma",
    "mes_firma",
    "dia_firma",
    "duracion_dias"
]
available_cat = [c for c in categorical_cols if c in df_feat.columns]
available_num = [c for c in numeric_cols if c in df_feat.columns]

print("Categóricas:", available_cat)
print("Numéricas:", available_num)


Categóricas: ['departamento', 'tipo_de_contrato', 'estado_contrato', 'modalidad_de_contratacion']
Numéricas: ['anio_firma', 'mes_firma', 'dia_firma', 'duracion_dias']


In [8]:
df_clean = df_feat.dropna(subset=["label"] + available_cat + available_num)
print("Registros después de dropna:", df_clean.count())




Registros después de dropna: 1989


                                                                                

In [13]:
indexers = [
    StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep")
    for c in available_cat
]

encoders = [
    OneHotEncoder(inputCol=f"{c}_idx", outputCol=f"{c}_ohe")
    for c in available_cat
]

feature_cols = available_num + [f"{c}_ohe" for c in available_cat]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_raw")

scaler = StandardScaler(inputCol="features_raw", outputCol="features", withMean=False, withStd=True)

pipeline = Pipeline(stages=indexers + encoders + [assembler, scaler])

print("Features usadas:", feature_cols)


Features usadas: ['anio_firma', 'mes_firma', 'dia_firma', 'duracion_dias', 'departamento_ohe', 'tipo_de_contrato_ohe', 'estado_contrato_ohe', 'modalidad_de_contratacion_ohe']


In [14]:
print("Entrenando pipeline (DEV)...")
pipeline_model = pipeline.fit(df_clean)

df_transformed = pipeline_model.transform(df_clean)

df_transformed.select("label", "features").show(5, truncate=False)
print("Dimensión vector:", len(df_transformed.select("features").first()[0]))


Entrenando pipeline (DEV)...


                                                                                

+------------------+----------------------------------------------------------------------------------------------------------------------------------------------------+
|label             |features                                                                                                                                            |
+------------------+----------------------------------------------------------------------------------------------------------------------------------------------------+
|16.334818714984902|(67,[0,1,2,3,11,38,51,56],[0.0,0.0,2.105792065767273,0.3595536840551413,5.292500109115442,2.4761070103460034,4.411209203389229,2.092911665785985])  |
|16.797819844969908|(67,[0,1,2,3,8,38,49,57],[0.0,0.0,2.105792065767273,0.32848114345778345,4.372899895825201,2.4761070103460034,2.2451519194022547,2.4538669099744217])|
|17.191595272992377|(67,[0,1,2,3,5,38,50,56],[0.0,0.0,2.105792065767273,1.3449971144284916,3.670271436144235,2.4761070103460034,2.541425645951468,2.09

                                                                                

Dimensión vector: 67


In [15]:
# Repetimos las mismas transformaciones sobre df completo
df_full = df

df_full_feat = (df_full
                .withColumn("fecha_firma_ts", to_timestamp(col("fecha_de_firma"), "yyyy-MM-dd'T'HH:mm:ss.SSS"))
                .withColumn("anio_firma", year(col("fecha_firma_ts")))
                .withColumn("mes_firma", month(col("fecha_firma_ts")))
                .withColumn("dia_firma", dayofmonth(col("fecha_firma_ts")))
                .withColumn("valor_del_contrato_clean",
                            regexp_replace(col("valor_del_contrato"), r"[^0-9,\.]", ""))
                .withColumn("valor_del_contrato_num",
                            regexp_replace(col("valor_del_contrato_clean"), ",", ".").cast("double"))
                .withColumn("label", log1p(col("valor_del_contrato_num")))
               )

cols_full = set(df_full_feat.columns)
if ("fecha_de_inicio_del_contrato" in cols_full) and ("fecha_de_fin_del_contrato" in cols_full):
    df_full_feat = (df_full_feat
                    .withColumn("fecha_inicio_ts", to_timestamp(col("fecha_de_inicio_del_contrato"), "yyyy-MM-dd'T'HH:mm:ss.SSS"))
                    .withColumn("fecha_fin_ts", to_timestamp(col("fecha_de_fin_del_contrato"), "yyyy-MM-dd'T'HH:mm:ss.SSS"))
                    .withColumn("duracion_dias", datediff(col("fecha_fin_ts"), col("fecha_inicio_ts")))
                   )
else:
    df_full_feat = df_full_feat.withColumn("duracion_dias", col("valor_del_contrato_num") * 0)

# Limpieza mínima
df_full_clean = df_full_feat.dropna(subset=["label"] + available_cat + available_num)

print("FULL registros:", df_full_clean.count())


[Stage 36:>                                                         (0 + 1) / 1]

FULL registros: 59125


                                                                                

In [16]:
print("Entrenando pipeline (FULL)...")
pipeline_model_full = pipeline.fit(df_full_clean)
df_full_transformed = pipeline_model_full.transform(df_full_clean)

out_dir = "/opt/spark-data/processed"
os.makedirs(out_dir, exist_ok=True)

pipeline_path = f"{out_dir}/feature_pipeline_q4_2025"
features_path = f"{out_dir}/secop_features_q4_2025.parquet"

# Guardar pipeline y dataset
pipeline_model_full.write().overwrite().save(pipeline_path)
df_full_transformed.select("label", "features").write.mode("overwrite").parquet(features_path)

print("Pipeline guardado en:", pipeline_path)
print("Dataset features guardado en:", features_path)



Entrenando pipeline (FULL)...


                                                                                

Pipeline guardado en: /opt/spark-data/processed/feature_pipeline_q4_2025
Dataset features guardado en: /opt/spark-data/processed/secop_features_q4_2025.parquet


In [17]:
spark.stop()
print("SparkSession finalizada")


SparkSession finalizada
