In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.functions import col, from_json, regexp_replace, to_timestamp, lit, count, max, avg, min
# -----------------------------------------------
# STEP 1: Definir esquemas
# -----------------------------------------------

# Esquema para los datos planos dentro de 'current'
pago_schema = StructType([
    StructField("Id", IntegerType()),
    StructField("ClienteId", StringType()),
    StructField("Monto", StringType()),  # money como string para evitar problemas
    StructField("MetodoPago", StringType()),
    StructField("FechaPago", StringType()),  # datetime como string para parsear luego
    StructField("Estado", StringType())
])

# Esquema para el CloudEvent recibido
cloud_event_schema = StructType([
    StructField("specversion", StringType()),
    StructField("type", StringType()),
    StructField("source", StringType()),
    StructField("id", StringType()),
    StructField("time", StringType()),
    StructField("datacontenttype", StringType()),
    StructField("operation", StringType()),
    StructField("data", StringType())  # JSON anidado como string
])

# Esquema para el campo 'data' dentro del CloudEvent
data_schema = StructType([
    StructField("eventsource", StructType([
        StructField("db", StringType()),
        StructField("schema", StringType()),
        StructField("tbl", StringType()),
        StructField("cols", ArrayType(StructType([
            StructField("name", StringType()),
            StructField("type", StringType()),
            StructField("index", StringType())
        ]))),
        StructField("pkkey", ArrayType(StructType([
            StructField("columnname", StringType()),
            StructField("value", StringType())
        ]))),
        StructField("transaction", StructType([
            StructField("commitlsn", StringType()),
            StructField("beginlsn", StringType()),
            StructField("sequencenumber", StringType()),
            StructField("committime", StringType())
        ]))
    ])),
    StructField("eventrow", StructType([
        StructField("old", StringType()),
        StructField("current", StringType())
    ]))
])

# Esquema para el mensaje CDC de Confluent Cloud (ajustado al ejemplo)
cdc_schema = StructType([
    StructField("before", StringType()),
    StructField("after", StructType([
        StructField("Id", IntegerType()),
        StructField("ClienteId", StringType()),
        StructField("Monto", StringType()),         # Está codificado en Base64, manejar según necesidad
        StructField("MetodoPago", StringType()),
        StructField("FechaPago", StringType()),     # Timestamp en formato numérico string
        StructField("Estado", StringType())
    ])),
    StructField("source", StructType([
        StructField("version", StringType()),
        StructField("connector", StringType()),
        StructField("name", StringType()),
        StructField("ts_ms", LongType()),            # Timestamp en ms de la fuente (inserción)
        StructField("snapshot", StringType()),
        StructField("db", StringType()),
        StructField("schema", StringType()),
        StructField("table", StringType()),
        StructField("commit_lsn", StringType())
    ])),
    StructField("op", StringType()),
    StructField("ts_ms", LongType()),               # Timestamp en ms del evento en Kafka
    StructField("transaction", StringType())
])

# -----------------------------------------------
# STEP 2: Configuración para leer desde Azure Event Hub
# -----------------------------------------------

event_hub_conn_str = "Endpoint=sb://arquitecturadatosdemoces.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=Oa8J/Zenf+jtntTTW0CZbgj3dow0WLaz8+AEhDgfUf0=;EntityPath=pagos_ces"

eh_conf = {
    'eventhubs.connectionString': sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(event_hub_conn_str)
}

raw_eh_df = (spark.readStream
    .format("eventhubs")
    .options(**eh_conf)
    .load()
)

# -----------------------------------------------
# STEP 3: Configuración para conectarse a Confluent Cloud Kafka
# -----------------------------------------------
kafka_bootstrap_servers = "pkc-619z3.us-east1.gcp.confluent.cloud:9092"  # Cambia por tu bootstrap server real
kafka_topic = "bdd_cdc.bdd_cdc.dbo.Pagos"
kafka_api_key = "T3USLKCFIX4ACKDS"       # Cambia por tu API key real
kafka_api_secret = "wQP2H5QjtN1geUto7gyBcP3o69ecwItMz7Qfn+HohwUiiDeY6iAuakRoMBt0xA6R" # Cambia por tu API secret real

kafka_conf = {
    "kafka.bootstrap.servers": kafka_bootstrap_servers,
    "subscribe": kafka_topic,
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.sasl.jaas.config": f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafka_api_key}" password="{kafka_api_secret}";',
    "startingOffsets": "earliest"
}

raw_kafka_df = (spark.readStream
    .format("kafka")
    .options(**kafka_conf)
    .load()
)

# -----------------------------------------------
# STEP 4: Decodificación y parseo de los mensajes
# -----------------------------------------------
# Parsear CloudEvent
parsed_eh_df = raw_eh_df.withColumn("body_str", col("body").cast("string"))
parsed_eh_df = parsed_eh_df.withColumn("cloud_event", from_json("body_str", cloud_event_schema))
# Convertir 'value' de binary a string y Parsear JSON del mensaje Kafka
kafka_df = raw_kafka_df.selectExpr("CAST(value AS STRING) as json_str", "timestamp as kafka_timestamp")
parsed_kafka_df = kafka_df.withColumn("cdc", from_json(col("json_str"), cdc_schema))

# Parsear campo 'data' JSON anidado
parsed_eh_df = parsed_eh_df.withColumn("data_json", from_json(col("cloud_event.data"), data_schema))
# Desescapar el string JSON en 'eventrow.old' para poder parsearlo correctamente
clean_eh_current = regexp_replace(col("data_json.eventrow.current"), r'\\', '')
# Parsear el JSON limpio con el esquema final
final_eh_df = parsed_eh_df.withColumn("current_data", from_json(clean_eh_current, pago_schema))
# 3. Seleccionar los campos deseados y convertir tipos
result_eh_df = final_eh_df.select(
    col("current_data.Id").cast(IntegerType()).alias("Id"),
    col("current_data.ClienteId").alias("ClienteId"),
    col("current_data.Monto").cast("double").alias("Monto"),
    col("current_data.MetodoPago"),
    to_timestamp(col("current_data.FechaPago"), "yyyy-MM-dd HH:mm:ss.SSSSSSS").alias("FechaPago"),
    col("current_data.Estado"),
    col("cloud_event.time").alias("eventhub_time"),
    col("enqueuedTime"),
    lit("eventhub").alias("source")
)

# Extraer campos relevantes
processed_kafka_df = parsed_kafka_df.select(
    col("cdc.after.Id").alias("Id"),
    col("cdc.after.ClienteId").alias("ClienteId"),
    # Si 'Monto' está en Base64, decodificarlo si es necesario, aquí se deja como string
    col("cdc.after.Monto").alias("Monto"),
    col("cdc.after.MetodoPago").alias("MetodoPago"),
    # Convertir FechaPago de string numérico a timestamp (nanosegundos desde epoch)
    # Ajustar según formato real, aquí se convierte de nanosegundos a timestamp
    (col("cdc.after.FechaPago").cast("long") / 1e9).alias("FechaPagoEpochSec"),
    col("cdc.after.Estado").alias("Estado"),
    col("cdc.source.ts_ms").alias("insercion_ts_ms"),
    col("cdc.ts_ms").alias("evento_ts_ms"),
    col("kafka_timestamp").alias("kafka_ingest_time"),
    lit("confluent").alias("source")
)


# Convertir FechaPagoEpochSec a timestamp
processed_kafka_df = processed_kafka_df.withColumn("FechaPago", to_timestamp(col("FechaPagoEpochSec")))
# Calcular diferencia de tiempo en segundos entre inserción y llegada al tópico
processed_kafka_df = processed_kafka_df.withColumn("tiempo_diferencia_segundos",(col("evento_ts_ms")/1000 - col("insercion_ts_ms")/1000))
# Seleccionar columnas finales
result_kafka_df = processed_kafka_df.select(
    "Id", "ClienteId", "Monto", "MetodoPago", "FechaPago", "Estado",
    "insercion_ts_ms", "evento_ts_ms", "kafka_ingest_time", "source", "tiempo_diferencia_segundos"
)
result_kafka_df = result_kafka_df.filter(col("FechaPago").isNotNull())

# 4. Calcular diferencia de tiempo
result_eh_df = result_eh_df.withColumn(
    "tiempo_diferencia_segundos",
    (col("enqueuedTime").cast("long") - col("FechaPago").cast("long"))
)
result_eh_df = result_eh_df.filter(col("FechaPago").isNotNull())
# Mostrar resultado
#result_eh_df.display()
# Mostrar resultados (o escribir en Delta Lake)
#result_kafka_df.display()

# Columnas que tiene result_eh_df pero no result_kafka_df
cols_eh = set(result_eh_df.columns)
cols_kafka = set(result_kafka_df.columns)
# Columnas faltantes en result_kafka_df
missing_in_kafka = cols_eh - cols_kafka
for c in missing_in_kafka:
    result_kafka_df = result_kafka_df.withColumn(c, lit(None))
result_kafka_df = result_kafka_df.withColumn("kafka_ingest_time_adj", col("kafka_ingest_time") - expr("INTERVAL 0 HOURS"))
result_kafka_df = result_kafka_df.withColumn(
    "tiempo_diferencia_segundos",
    (col("kafka_ingest_time_adj").cast("long") - col("FechaPago").cast("long"))
)

# Eliminar columnas no deseadas
result_kafka_df = result_kafka_df.drop("evento_ts_ms", "insercion_ts_ms", "Id", "kafka_ingest_time").withColumnRenamed("kafka_ingest_time_adj", "kafka_ingest_time")

# Columnas faltantes en result_eh_df
missing_in_eh = cols_kafka - cols_eh
for c in missing_in_eh:
    result_eh_df = result_eh_df.withColumn(c, lit(None))
result_eh_df = result_eh_df.withColumn("enqueuedTime_adj", col("enqueuedTime") - expr("INTERVAL 0 HOURS"))
result_eh_df = result_eh_df.withColumn(
    "tiempo_diferencia_segundos",
    (col("enqueuedTime_adj").cast("long") - col("FechaPago").cast("long"))
)# Eliminar columnas no deseadas
result_eh_df = result_eh_df.drop("evento_ts_ms", "insercion_ts_ms", "Id", "enqueuedTime").withColumnRenamed("enqueuedTime_adj", "enqueuedTime")



result_kafka_df = result_kafka_df.select(sorted(result_kafka_df.columns))
result_eh_df = result_eh_df.select(sorted(result_eh_df.columns))
result_all_df = result_eh_df.unionByName(result_kafka_df)


result_all_df = result_all_df.groupBy("source").agg(
    count("*").alias("numero_registros"),
    max("tiempo_diferencia_segundos").alias("duracion_maxima_segundos"),
    avg("tiempo_diferencia_segundos").alias("duracion_promedio_segundos"),
    min("tiempo_diferencia_segundos").alias("duracion_minima_segundos")
)

result_all_df.display()

