In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,from_json
import time
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [2]:
# Crear la sesión de Spark
spark = SparkSession.builder \
    .appName("KafkaSparkStreamingExample") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.4") \
    .getOrCreate()


In [3]:
schema = StructType([
                StructField("name", StringType()),
                StructField("location", StringType()),
                StructField("phone", StringType()),
            ])

In [4]:

# Configuración de lectura desde Kafka
kafka_bootstrap_servers = "kafka:9092"  # Cambia al host y puerto de tu cluster Kafka
kafka_topic = "test"  # Nombre del tópico de Kafka

kafkaParams = {
    "kafka.bootstrap.servers": f"{kafka_bootstrap_servers}",  # Replace with your broker addresses
    "subscribe":f"{kafka_topic}",  # Replace with your topic name
    "startingOffsets": "earliest",  # or "earliest"
    "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
    "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
}



In [5]:
# Leer datos de Kafka
kafka_df = spark.readStream \
    .format("kafka") \
    .options(**kafkaParams)\
    .load()


In [6]:
query = kafka_df.writeStream.format("console").start()

In [7]:
def wait_events(df_streaming):
    while True:
        status = df_streaming.status['message']
        
        print(status)
        
        if status == 'Stopped':
            df_streaming.stop()

        
        time.sleep(2)
        
    print('has been stop the transmition')
    
#        if status == 'Waiting for data to arrive' and time.time() - last_batch_time > 10:
#            df_streaming.stop()
#            print('The steam was waiting for data and not recibe 10 seconds ago')
#        continue

In [8]:
query.status

{'message': 'Initializing sources',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [9]:
query.isActive

True

In [None]:
wait_events(query)

In [None]:
break

In [13]:
messages_df = kafka_df.select(
    col("key").cast("string").alias("key"),(col("value").cast("string").alias("value")))

messages_df.select(from_json(col('value'), schema).alias("data")).select(("data.*"))

messages_df# Imprimir los mensajes usando foreachBatch
def print_messages(batch_df, batch_id):
    print(f"--- Batch ID: {batch_id} ---")
    for row in batch_df.collect():  # Recorrer las filas del microbatch
        print(f"Key: {row['key']}, Value: {row['value']}")

query = messages_df.writeStream \
    .foreachBatch(print_messages) \
    .trigger(processingTime="2 seconds") \
    .option('checkpointLocation', './spark_kafka_checkpoint')\
    .outputMode("update") \
    .start()


query.awaitTermination()

--- Batch ID: 104 ---
Key: data, Value: {"name": "Michael Pham", "location": "Alvarezborough", "phone": "7458535596"}
--- Batch ID: 105 ---
Key: data, Value: {"name": "Regina Davis", "location": "Fisherburgh", "phone": "990-566-2659"}
Key: data, Value: {"name": "Molly Ford", "location": "Williambury", "phone": "592.311.2153"}
Key: data, Value: {"name": "Samantha Bates", "location": "Tiffanyville", "phone": "+1-818-531-9577x6162"}
Key: data, Value: {"name": "Brooke Brown", "location": "Christinaland", "phone": "+1-603-213-5736x36577"}
Key: data, Value: {"name": "Bryan Owen", "location": "Powersshire", "phone": "001-570-447-6458"}
Key: data, Value: {"name": "Ashley Smith", "location": "Sanderschester", "phone": "(571)688-7027"}
Key: data, Value: {"name": "Mrs. Barbara Thomas", "location": "Shelbystad", "phone": "(671)464-2489x7941"}
Key: data, Value: {"name": "Jeanette Young", "location": "East Karla", "phone": "379.865.9380"}
Key: data, Value: {"name": "Craig Taylor", "location": "East 

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
query = kafka_df.writeStream.format("console").start()
print(query.status)
time.sleep(100) # sleep 10 seconds
print(query.status)
query.stop()

In [None]:
query = kafka_df.writeStream.format("console").start()
print(query.status)
time.sleep(10) # sleep 10 seconds
print(query.status)
query.stop()

In [None]:
query.status['message']

In [None]:
break

In [None]:
# Convertir clave y valor a cadenas
messages_df = kafka_df.select(
    col("key").cast("string").alias("key"),(col("value").cast("string").alias("value")))

messages_df.select(from_json(col('value'), schema).alias("data")).select(("data.*"))

messages_df
# Imprimir los mensajes usando foreachBatch
def print_messages(batch_df, batch_id):
    print(f"--- Batch ID: {batch_id} ---")
    for row in batch_df.collect():  # Recorrer las filas del microbatch
        print(f"Key: {row['key']}, Value: {row['value']}")

query = messages_df.writeStream \
    .foreachBatch(messages_df) \
    .trigger(processingTime="2 seconds") \
    .option('checkpointLocation', './spark_kafka_checkpoint')\
    .outputMode("update") \
    .start()


query.awaitTermination()

# Monitor for inactivity
timeout_seconds = 2
while query.isActive:
    current_time = time.time()

    # Stop the query if no messages have been received for the timeout period
    if current_time - last_batch_time > timeout_seconds:
        print("No messages received for 2 seconds. Completing the job...")
        query.stop()

# Proceed to the next step
print("Streaming job completed. Proceeding to the next step...")