In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, to_timestamp, to_date
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
import threading

# Crear la sesión de Spark con los JARs necesarios
spark = SparkSession.builder \
    .appName("KafkaConsumerWithLimit") \
    .config("spark.sql.shuffle.partitions", "50") \
    .getOrCreate()

# Definir el esquema para el JSON
schema = StructType([
    StructField("uuid", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("workplace_uuid", StringType(), True),
    StructField("sensor_id", StringType(), True),
    StructField("temperature", DoubleType(), True),
    StructField("light", DoubleType(), True),
    StructField("pressure", DoubleType(), True),
    StructField("humidity", DoubleType(), True),
    StructField("co2", DoubleType(), True),
    StructField("s0", DoubleType(), True),
    StructField("no2", DoubleType(), True),
    StructField("o3", DoubleType(), True),
    StructField("position_x", DoubleType(), True),
    StructField("position_y", DoubleType(), True),
    StructField("position_z", DoubleType(), True)
])

In [2]:
spark.sparkContext._jvm.scala.util.Properties.versionString()


'version 2.12.18'

In [6]:
# Variable global para contar el número total de registros procesados
total_records_processed = 0
max_records = 150000

# Leer el stream de Kafka
df_raw = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "invernadero") \
    .option("startingOffsets", "earliest").option("maxOffsetsPerTrigger", "1000").load()


# Convertir el valor de los mensajes de Kafka de binario a cadena
df_string = df_raw.selectExpr("CAST(value AS STRING) as json_string")

# Parsear el JSON y obtener un DataFrame estructurado
df_parsed = df_string.select(from_json(col("json_string"), schema).alias("data")).select("data.*")

# Convertir el campo timestamp a tipo Timestamp y agregar columna de fecha
df_parsed = df_parsed \
    .withColumn("timestamp", to_timestamp("timestamp", "yyyy-MM-dd HH:mm:ss")) \
    .withColumn("fecha", to_date("timestamp"))

# Función para procesar cada micro-batch
def process_batch(df, epoch_id):
    global total_records_processed, max_records, query

    batch_count = df.count()
    total_records_processed += batch_count

    # Procesar el batch (por ejemplo, escribir en almacenamiento)
    df.write \
        .format("parquet") \
        .mode("append") \
        .partitionBy("workplace_uuid", "fecha") \
        .save("/home/jovyan/work/data")

    print(f"Batch {epoch_id}: Procesados {batch_count} registros, total acumulado: {total_records_processed}")

    if total_records_processed >= max_records:
        print("Se ha alcanzado el límite de registros. Deteniendo la consulta...")
        # Detener la consulta de streaming
        query.stop()

In [7]:
# Iniciar la consulta de streaming con foreachBatch
query = df_parsed.writeStream \
    .foreachBatch(process_batch) \
    .option("checkpointLocation", "/home/jovyan/work/checkpoints") \
    .start()

# Mantener el stream en ejecución
query.awaitTermination()

Batch 10154: Procesados 6 registros, total acumulado: 6
Batch 10155: Procesados 1000 registros, total acumulado: 1006
Batch 10156: Procesados 1000 registros, total acumulado: 2006
Batch 10157: Procesados 1000 registros, total acumulado: 3006
Batch 10158: Procesados 1000 registros, total acumulado: 4006
Batch 10159: Procesados 1000 registros, total acumulado: 5006
Batch 10160: Procesados 1000 registros, total acumulado: 6006
Batch 10161: Procesados 1000 registros, total acumulado: 7006
Batch 10162: Procesados 1000 registros, total acumulado: 8006
Batch 10163: Procesados 1000 registros, total acumulado: 9006
Batch 10164: Procesados 1000 registros, total acumulado: 10006
Batch 10165: Procesados 1000 registros, total acumulado: 11006
Batch 10166: Procesados 1000 registros, total acumulado: 12006
Batch 10167: Procesados 1000 registros, total acumulado: 13006
Batch 10168: Procesados 1000 registros, total acumulado: 14006
Batch 10169: Procesados 1000 registros, total acumulado: 15006
Batch 10