In [1]:
import os
import shutil
#os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars ./libs/spark-sql-kafka-0-10_2.11-2.4.5.jar,./libs/kafka-clients-2.4.1.jar pyspark-shell'
#os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /opt/spark/jars/spark-sql-kafka-0-10_2.11-2.4.5.jar,/opt/spark/jars/kafka-clients-2.4.1.jar pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /app/libs/spark-sql-kafka-0-10_2.11-2.4.5.jar,/app/libs/kafka-clients-2.4.1.jar pyspark-shell'

import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession, Row, DataFrame
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType
import pyspark.sql.functions as F
from pyspark.sql.window import Window
import logging


# Create a Spark session connected to the Spark master in the cluster
spark = SparkSession \
    .builder \
    .appName("SparkDemo") \
    .master("spark://master:7077") \
    .getOrCreate()

In [None]:

## Kafka configs
kafka_samborondon_config = {
    "kafka.bootstrap.servers" : "kafka:9092",
    "subscribe" : "consumo-en-samborondon",
    "startingOffsets" : "latest",
    "failOnDataLoss" : "false"
}

kafka_daule_config = {
    "kafka.bootstrap.servers" : "kafka:9092",
    "subscribe" : "consumo-en-daule",
    "startingOffsets" : "latest",
    "failOnDataLoss" : "false"
}

checkpoint_dir = "/app/checkpoint"
if os.path.exists(checkpoint_dir):
    shutil.rmtree(checkpoint_dir, ignore_errors=True)

os.makedirs(checkpoint_dir, exist_ok=True)

kafka_output_config = {
    "kafka.bootstrap.servers": "kafka:9092",
    "topic": "output",
    "checkpointLocation": "/app/checkpoint"  # Ruta absoluta y asegurarse que exista
}

# Define the schema of the incoming JSON data
consumo_schema = StructType([
    StructField("idMedidor", IntegerType(), True),
    StructField("latitud", DoubleType(), True),
    StructField("longitud", DoubleType(), True),
    StructField("region", StringType(), True),
    StructField("consumo", IntegerType(), True),
    StructField("tiempo", StringType(), True)  # ISO timestamp string
])

# Function to read from Kafka and parse JSON
def read_kafka_stream(topic_config):
    df = spark.readStream.format("kafka").options(**topic_config).load().selectExpr("CAST(value AS STRING) as json_str")
    
    df_parsed = df.select(F.from_json(F.col("json_str"), consumo_schema).alias("data")).select("data.*")
    
    # Convert 'tiempo' to timestamp
    df_with_timestamp = df_parsed.withColumn("timestamp", F.to_timestamp("tiempo"))
    
    return df_with_timestamp

# Read streams for both regions
df_samborondon = read_kafka_stream(kafka_samborondon_config)
df_daule = read_kafka_stream(kafka_daule_config)

# Union the two streams
df_union = df_samborondon.union(df_daule)

# Define window duration
window_duration = "1 minute" 
watermark_duration = "2 minutes"

# Aggregations: average consumption per region per window
agg_df = df_union     .withWatermark("timestamp", watermark_duration)     .groupBy(
        F.window("timestamp", window_duration),
        "region"
    ) \
    .agg(
        F.avg("consumo").alias("avg_consumo"),
        F.max("consumo").alias("max_consumo"),
        F.min("consumo").alias("min_consumo"),
        F.count("consumo").alias("count_consumo"),
        F.stddev("consumo").alias("stddev_consumo")
    )

# Detect peaks: Define a peak as consumo > 1990
processed_df = agg_df.withColumn(
    "is_peak",
    F.when(F.col("avg_consumo") > 1400, True).otherwise(False) # PEAK
).select(
    "window",
    "region",
    "avg_consumo",
    "max_consumo",
    "min_consumo",
    "count_consumo",
    "is_peak"
)

# Convert the result to JSON
output_df = processed_df.select(
    F.to_json(F.struct(
        F.col("window").start.alias("window_start"),
        F.col("window").end.alias("window_end"),
        "region",
        "avg_consumo",
        "max_consumo",
        "min_consumo",
        "count_consumo",
        "is_peak"
    )).alias("value")
)

# Write the aggregated data to Kafka for visualization
query = output_df.writeStream.format("kafka").options(**kafka_output_config).outputMode("update").start()

query.awaitTermination()

In [None]:
for stream in spark.streams.active:
    print(f"Stopping stream: {stream.id}")
    stream.stop()


In [None]:
import shutil
shutil.rmtree('./check.txt', ignore_errors=True)