In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

In [2]:
# 1. Iniciamos la sesi√≥n de Spark
spark = SparkSession.builder \
    .appName("CryptoMonitor") \
    .getOrCreate()
spark.sparkContext.setLogLevel("WARN")

print("Spark iniciado correctamente")

Spark iniciado correctamente


In [3]:
# 2. Conectamos a Kafka
df_kafka = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "crypto_prices") \
    .option("startingOffsets", "earliest") \
    .load()

In [4]:
# 3. Definimos la estructura de datos
esquema_crypto = StructType([
    StructField("id", StringType(), True),
    StructField("symbol", StringType(), True),
    StructField("priceUsd", DoubleType(), True),
    StructField("last_updated", StringType(), True)
])

In [5]:
# 4. Limpiamos los datos de Kafka
df_limpio = df_kafka.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), esquema_crypto).alias("data")) \
    .select("data.*")

print("Conectado a Kafka. Esperando datos...")

Conectado a Kafka. Esperando datos...


In [6]:
# 5. Imprimimos el resultado en pantalla en tiempo real
query = df_limpio.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

# 1. Iniciamos Spark
spark = SparkSession.builder.appName("CryptoMonitor").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

# 2. Conectamos a Kafka
df_kafka = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "crypto_prices") \
    .option("startingOffsets", "earliest") \
    .load()

# 3. Aplicamos el molde
esquema_crypto = StructType([
    StructField("id", StringType(), True),
    StructField("symbol", StringType(), True),
    StructField("priceUsd", DoubleType(), True),
    StructField("last_updated", StringType(), True)
])

df_limpio = df_kafka.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), esquema_crypto).alias("data")) \
    .select("data.*")

# 4. Funci√≥n para guardar en Postgres
def guardar_en_postgres(df_lote, id_lote):
    cantidad = df_lote.count()
    print(f"\n---> üîç Iniciando Lote {id_lote} | Registros recibidos: {cantidad}")
    
    if cantidad > 0:
        try:
            df_lote.write \
                .format("jdbc") \
                .option("url", "jdbc:postgresql://postgres_db:5432/crypto_db") \
                .option("dbtable", "precios_cripto") \
                .option("user", "PON_AQUI_TU_USUARIO") \
                .option("password", "PON_AQUI_TU_PASSWORD") \
                .option("driver", "org.postgresql.Driver") \
                .mode("append") \
                .save()
            print(f"‚úÖ Lote {id_lote} guardado en Postgres con √©xito.")
        except Exception as e:
            print(f"‚ùå ERROR al guardar el lote {id_lote}: {e}")

# 5. Iniciamos el env√≠o continuo a la Base de Datos
query_postgres = df_limpio.writeStream \
    .foreachBatch(guardar_en_postgres) \
    .outputMode("append") \
    .option("checkpointLocation", "/home/jovyan/work/checkpoint_final_2") \
    .start()

print("Streaming hacia Postgres iniciado.")

Streaming hacia Postgres iniciado.
