In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, udf
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType, TimestampType
import json
import datetime


# 1. Cargar Archivo
with open("config.json") as f:
    config = json.load(f)

# 2. inicializaer Variables
kafka_bootstrap_servers = config["kafkaCDC"]["bootstrap_servers"]
kafka_topic = config["kafkaCDC"]["topic"]
kafka_sasl_username = config["kafkaCDC"]["username"]
kafka_sasl_password = config["kafkaCDC"]["password"]

mongo_uri = config["mongodb"]["uri"]
mongo_db = config["mongodb"]["database"]
mongo_collection = config["mongodb"]["collection"]

# 3. Crear sesión de Spark
spark = SparkSession.builder \
    .appName("KafkaToMongoBatchJob") \
    .config("spark.jars.packages", 
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0," 
            "org.mongodb.spark:mongo-spark-connector_2.12:10.1.1") \
    .getOrCreate()

# 4. Definir esquema del mensaje Kafka
after_schema = StructType([
    StructField("Id", IntegerType()),
    StructField("ClienteId", StringType()),
    StructField("Monto", StringType()),
    StructField("MetodoPago", StringType()),
    StructField("FechaPago", StringType()),  # Llega como string
    StructField("Estado", StringType())
])

main_schema = StructType([
    StructField("before", StringType()),  # o StructType([]) si lo necesitas
    StructField("after", after_schema),
    StructField("source", StringType()),  # O StructType([]) si lo necesitas
    StructField("op", StringType()),
    StructField("ts_ms", LongType()),
    StructField("transaction", StringType())
])

# Función para convertir ticks de SQL Server a timestamp
def ticks_to_timestamp(ticks_str):
    try:
        ticks = int(ticks_str)
        epoch_start = datetime.datetime(1, 1, 1)
        delta = datetime.timedelta(microseconds=ticks / 10)
        return epoch_start + delta
    except:
        return None

# Registrar UDF
ticks_to_timestamp_udf = udf(ticks_to_timestamp, TimestampType())

# 5. Leer mensajes desde Kafka (modo streaming)
kafka_options = {
    "kafka.bootstrap.servers": kafka_bootstrap_servers,
    "subscribe": kafka_topic,
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.sasl.jaas.config": f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafka_sasl_username}" password="{kafka_sasl_password}";',
    "startingOffsets": "latest"
}
df_kafka_streaming = spark.readStream.format("kafka").options(**kafka_options).load()

df_json = df_kafka_streaming.selectExpr("CAST(value AS STRING) as json_value")
# 6. Convertir 'value' a string y parsear JSON con el esquema
df_parsed = df_json.select(from_json(col("json_value"), main_schema).alias("data")).select("data.after.*")

# 7. Convertir FechaPago
df_final = df_parsed.withColumn("FechaPago", ticks_to_timestamp_udf(col("FechaPago")))

# 8. Renombrar la columna 'Id' a '_id'
df_mongo = df_final.withColumnRenamed("Id", "_id")
df_mongo.display()

# 9. Escribir en MongoDB Atlas
df_mongo.writeStream   \
    .format("mongodb") \
    .option("spark.mongodb.connection.uri", mongo_uri) \
    .option("spark.mongodb.database", mongo_db) \
    .option("spark.mongodb.collection", mongo_collection) \
    .option("checkpointLocation", "dbfs:/mnt/checkpoints/pagoscdc") \
    .outputMode("append") \
    .start()

#.option("checkpointLocation", "/tmp/checkpoint_pagos") \