# <center> Procesamiento de datos a trusted </center>

## Librerías

In [1]:
import re
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql import functions as F, Window
from pyspark.sql.types import IntegerType, DoubleType

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2,application_1761950954788_0003,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Lectura de datos

In [2]:
spk = spark.builder \
    .appName("raw-to-trusted-spark") \
    .getOrCreate()

print("Spark iniciado:", spk)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Spark iniciado: <pyspark.sql.session.SparkSession object at 0x7f8ba4d98e50>

In [4]:
# Rutas de los archivos en s3
path_poliza = "s3://s3-pi-perfilamiento-20252/raw/polizas/detalle_poliza.csv"
path_producto = "s3://s3-pi-perfilamiento-20252/raw/productos/detalle_producto.csv"
path_clientes = "s3://s3-pi-perfilamiento-20252/raw/clientes/detalle_cliente.csv"

# Leer con inferencia de esquema y header
df_poliza = spark.read.option("header", "true").option("inferSchema", "true").csv(path_poliza)
df_producto = spark.read.option("header", "true").option("inferSchema", "true").csv(path_producto)
df_clientes = spark.read.option("header", "true").option("inferSchema", "true").csv(path_clientes)

# Mostrar confirmación
print("Poliza:", df_poliza.count(), "filas,", len(df_poliza.columns), "columnas")
print("Producto:", df_producto.count(), "filas,", len(df_producto.columns), "columnas")
print("Clientes:", df_clientes.count(), "filas,", len(df_clientes.columns), "columnas")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Poliza: 226254 filas, 28 columnas
Producto: 562 filas, 11 columnas
Clientes: 253062 filas, 8 columnas

## Transformaciones

### Unión tablas

In [5]:
# Unión de poliza con clientes
df_poliza_clientes = df_poliza.join(
    df_clientes,
    on="codCliente",  # clave común
    how="left"
)

# Unión del resultado anterior con producto
df_final = df_poliza_clientes.join(
    df_producto,
    on="codProducto",  # clave común
    how="left"
)

# Mostrar resultado
print("Filas finales:", df_final.count())
print("Columnas finales:", len(df_final.columns))
df_final.show(5, truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Filas finales: 226254
Columnas finales: 45
+-----------+----------+---------+-------------------+-----------+----------+-------------------+----------+-----------------+------+----------+-------------+---------------------+-------------------+----------------+---------+------------------+----------+-----------+---------------+--------+----------+------------------------+--------------------+----------------+----------------------------+------------------+------------------+----------------------------+----------------+-------------------+----+------------------------------+------------+------------+------------------+-------+----------+-------------+-------------+-------------------------------+-------------------+----------+----------+------------------+
|codProducto|codCliente|codPoliza|estadoProcesamiento|fechaInicio|fechaFin  |origenProcesamiento|estadoDato|formaPagoVigencia|IVA   |valorTotal|IVA_Periodico|Valor_Total_Periodico|fechaInicioVigencia|fechaFinVigencia|estado   |EstadoR

### Eliminación columnas no relevantes

In [7]:
columnas_a_eliminar = [
    "codProducto",
    "estadoProcesamiento",
    "origenProcesamiento",
    "estadoDato",
    "IVA",
    "IVA_Periodico",
    "fechaInicio",
    "fechaFin",
    "EstadoRenovaciones",
    "tipoPoliza",
    "Fecha",
    "valorTotalSinIva",
    "Valor_Total_Periodico_SinIva",
    "TasaIVA",
    "TasaIVA_Periodico",
    "fechaNacCliente",
    "FechaCumple",
    "nomTipoDocumento",
    "Tiene poliza",
    "codRamo",
    "seRenuevaRamo",
    "nomGrupo",
    "nomCompania",
    "GrupoRamo",
    "nomRamo",
    "NegociosEspeciales",
    "aseguradora",
    "tomador"
    "nomProducto"
]
# Eliminar columnas (solo si existen en el dataframe)
df_final = df_final.drop(*[c for c in columnas_a_eliminar if c in df_final.columns])

print("Columnas eliminadas correctamente")
print("Número de columnas restantes:", len(df_final.columns))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Columnas eliminadas correctamente
N?mero de columnas restantes: 19

### Generar nuevas categorías de ramos

In [8]:
df_final = df_final.withColumn(
    "Ramo",
    F.when(F.col("Ramo").isin(
        "Salud Familiar", "Enfermedades Graves", "Salud Colectivo",
        "Asistencia Medica", "Medicina Prepagada"
    ), "Salud")
    .when(F.col("Ramo").isin(
        "Vida Individual", "Vida Grupo", "Educacion", "Pension",
        "Accidentes Personales", "Juvenil", "Exequial"
    ), "Vida")
    .when(F.col("Ramo").isin(
        "Automoviles", "Soat"
    ), "Autos")
    .when(F.col("Ramo").isin(
        "Cumplimiento", "Arrendamiento"
    ), "Cumplimiento")
    .when(F.col("Ramo").isin(
        "Incendio", "Responsabilidad Civil", "Hogar", "Transporte Mercancia",
        "Transporte Valores", "Maquinaria Y Equipo", "Manejo", "Copropiedad",
        "Montaje", "Construcción", "Navegacion",
        "R.C. Parqueaderos Y Talleres", "Agricola"
    ), "Patrimoniales")
    .otherwise("Otros")
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Reemplazar caracteres 

In [9]:
# Reemplazamos espacios y caracteres raros por guiones bajos
for old_name in df_final.columns:
    new_name = old_name.strip().replace(" ", "_").replace("-", "_")
    df_final = df_final.withColumnRenamed(old_name, new_name)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Validar tipos de datos

In [10]:
df_final = df_final.withColumn("Edad", col("Edad").cast(IntegerType()))
df_final = df_final.withColumn("valorTotal", col("valorTotal").cast(DoubleType()))
df_final = df_final.withColumn("fechaInicioVigencia", F.to_date("fechaInicioVigencia", "dd/MM/yyyy"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Filtros con reglas de negocio

In [11]:
# Eliminar filas donde valorTotal sea 0 o nulo
df_final = df_final.filter((F.col("valorTotal") > 0) & (F.col("valorTotal").isNotNull()))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Eliminar duplicados

In [12]:
# Eliminar duplicados por las columnas clave
df_final = df_final.dropDuplicates(["codCliente", "codPoliza", "fechaInicioVigencia", "fechaFinVigencia", "Ramo"])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Crear columna de orden

In [13]:
# Crear ventana por cliente y póliza, ordenada por fecha de inicio
window_spec = Window.partitionBy("codCliente", "codPoliza").orderBy(F.asc("fechaInicioVigencia"))

# Agregar columna de orden (opcional, para validar el orden)
df_final = df_final.withColumn("orden_vigencia", F.row_number().over(window_spec))

df_final = df_final.filter(F.col("orden_vigencia") == 1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Verificar datos

In [14]:
# Reordenar para verificar
df_final = df_final.orderBy("codCliente", "codPoliza", "fechaInicioVigencia")

print("Número de registros:", df_final.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

N?mero de registros: 81758

In [15]:
df_final.show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+---------+-----------------+-----------+---------------------+-------------------+----------------+-------+---------------+--------+------------------------+--------+-------------------+----+------------+------------+--------------------------------------------------------+-------------------+-------------+--------------+
|codCliente|codPoliza|formaPagoVigencia|valorTotal |Valor_Total_Periodico|fechaInicioVigencia|fechaFinVigencia|estado |OpracionesGrupo|Vigencia|EstadoRenovacionesinicio|ciudad  |tipoVinculacion    |Edad|Tipo_Empresa|tomador     |nomProducto                                             |nomGrupoEmpresarial|Ramo         |orden_vigencia|
+----------+---------+-----------------+-----------+---------------------+-------------------+----------------+-------+---------------+--------+------------------------+--------+-------------------+----+------------+------------+--------------------------------------------------------+-------------------+-------------+--------

## Guardar datos

In [19]:
path_trusted = "s3://s3-pi-perfilamiento-20252/trusted/"

df_final.coalesce(1) \
        .write \
        .mode("overwrite") \
        .option("header", "true") \
        .csv(path_trusted)

print(f"Datos de la zona trusted guardados exitosamente en: {path_trusted}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Datos de la zona trusted guardados exitosamente en: s3://s3-pi-perfilamiento-20252/trusted/