In [0]:
spark.conf.set("spark.mongodb.input.uri", "mongodb://admin:admin@34.27.126.27:27017")
spark.conf.set("spark.mongodb.output.uri", "mongodb://admin:admin@34.27.126.27:27017")

In [0]:
from pyspark.sql import types as T, functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

In [0]:
stream_df = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "34.27.126.27:9092")
    .option("subscribe","tickers")
    .option("startingOffsets","latest")
    .option("failOnDataLoss","false")
    .load()
)

In [0]:
SCHEMA = T.StructType([
    T.StructField('data', T.ArrayType(T.StructType([
        T.StructField('c', T.StringType(), True),
        T.StructField('p', T.FloatType(), True),
        T.StructField('s', T.StringType(), True),
        T.StructField('t', T.LongType(), True),
        T.StructField('v', T.FloatType(), True)
    ]))),
    T.StructField('type', T.StringType(), True)
])

In [0]:
stream_df = stream_df.select(
    F.from_json(
        F.col("value").cast(T.StringType()),
        SCHEMA
    ).alias("raw")
)

In [0]:
parsed_stream = stream_df\
    .selectExpr("explode (raw.data) as data")\
    .selectExpr(
        "data.c as TradeConditions", 
        "data.p as LastPriceUSD", 
        "data.s as Symbol", 
        "data.t as UNIXTimestamp", 
        "data.v as Volume"
    )

In [0]:
parsed_stream = parsed_stream.withColumn(
    "Timestamp", 
    F.current_timestamp()
)

### Predictions

In [0]:
def use_batches(batch, _):
    # Creando el assembler de características
    assembler = VectorAssembler(
        inputCols=["UNIXTimestamp"],
        outputCol="features"
    )
    # Definiendo modelo de ML
    lr_model = LinearRegression(
        featuresCol="features",
        labelCol="LastPriceUSD",
        predictionCol="prediction"
    )
    # Creando Pipeline para definir etapas
    pipeline = Pipeline(stages=[assembler, lr_model])

    # Entrenar el modelo
    model = pipeline.fit(batch)

    # Predecir
    predictions = model.transform(batch)
    predictions = predictions.drop("features")

    """predictions.selectExpr("CAST(UNIXTimestamp AS STRING) AS key", "to_json(struct(*)) AS value") \
        .write \
        .format("kafka")\
        .option("kafka.bootstrap.servers", "34.27.126.27:9092")\
        .option("topic", "tickers_parsed")\
        .save()"""
    
    predictions.show()

    predictions.write\
        .format("com.mongodb.spark.sql.DefaultSource")\
        .mode("append")\
        .option("uri", "mongodb://admin:admin@34.27.126.27")\
        .option("database", "telematics_playgroud")\
        .option("collection", "tickers_test")\
        .save()



In [0]:
# send to kafka topic
parsed_stream\
    .writeStream\
    .option("checkpointLocation", "dbfs:/FileStore/tables/tickers_check/")\
    .foreachBatch(use_batches)\
    .start()\
    .awaitTermination()

+---------------+------------+---------------+-------------+-------+--------------------+-----------------+
|TradeConditions|LastPriceUSD|         Symbol|UNIXTimestamp| Volume|           Timestamp|       prediction|
+---------------+------------+---------------+-------------+-------+--------------------+-----------------+
|           NULL|    37549.02|BINANCE:BTCUSDT|1700507217898|0.00357|2023-11-20 19:07:...|32918.73788850204|
|           NULL|    37549.03|BINANCE:BTCUSDT|1700507218049| 6.3E-4|2023-11-20 19:07:...|32918.73788850204|
|           NULL|    37549.03|BINANCE:BTCUSDT|1700507218117| 7.1E-4|2023-11-20 19:07:...|32918.73788850204|
|           NULL|    37549.02|BINANCE:BTCUSDT|1700507218141|0.00589|2023-11-20 19:07:...|32918.73788850204|
|           NULL|    37549.03|BINANCE:BTCUSDT|1700507218146| 5.9E-4|2023-11-20 19:07:...|32918.73788850204|
|           NULL|    37549.03|BINANCE:BTCUSDT|1700507218292| 3.2E-4|2023-11-20 19:07:...|32918.73788850204|
|           NULL|    37549.1

[0;31m---------------------------------------------------------------------------[0m
[0;31mStreamingQueryException[0m                   Traceback (most recent call last)
File [0;32m<command-1945624411563795>, line 7[0m
[1;32m      1[0m [38;5;66;03m# send to kafka topic[39;00m
[1;32m      2[0m [43mparsed_stream[49m[43m\[49m
[1;32m      3[0m [43m    [49m[38;5;241;43m.[39;49m[43mwriteStream[49m[43m\[49m
[1;32m      4[0m [43m    [49m[38;5;241;43m.[39;49m[43moption[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mcheckpointLocation[39;49m[38;5;124;43m"[39;49m[43m,[49m[43m [49m[38;5;124;43m"[39;49m[38;5;124;43mdbfs:/FileStore/tables/tickers_check/[39;49m[38;5;124;43m"[39;49m[43m)[49m[43m\[49m
[1;32m      5[0m [43m    [49m[38;5;241;43m.[39;49m[43mforeachBatch[49m[43m([49m[43muse_batches[49m[43m)[49m[43m\[49m
[1;32m      6[0m [43m    [49m[38;5;241;43m.[39;49m[43mstart[49m[43m([49m[43m)[49m[43m\[49m
[0;32m----> 