In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, avg, to_timestamp, lag
from pyspark.sql.window import Window
from pyspark.ml import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType

spark = SparkSession.builder \
    .appName("StockMarketPredictionKafka") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3") \
    .config("spark.jars.repositories", "https://repo.maven.apache.org/maven2") \
    .getOrCreate()

schema = StructType([
    StructField("Date", StringType(), True),
    StructField("Symbol", StringType(), True),
    StructField("Open", FloatType(), True),
    StructField( "High", FloatType(), True),
    StructField("Low", FloatType(), True),
    StructField("Close", FloatType(), True),
    StructField("Volume", IntegerType(), True),
    StructField("VWAP", FloatType(), True)
])

stock_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "stock_topic") \
    .option("startingOffsets", "latest") \
    .load()

stock_stream = stock_stream.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

stock_stream = stock_stream.withColumn("Date", to_timestamp("Date"))

def process_batch(batch_df, batch_id):
    from pyspark.sql import functions as F
    window_spec = Window.partitionBy("Symbol").orderBy("Date")

    batch_df = batch_df.withColumn("Prev_Close", F.lag("Close", 1).over(window_spec)) \
                       .withColumn("Prev_Volume", F.lag("Volume", 1).over(window_spec))

    window_ma = Window.partitionBy("Symbol").orderBy("Date").rowsBetween(-4, 0)
    batch_df = batch_df.withColumn("MA_Close_5", F.avg("Close").over(window_ma))

    feature_columns = ["Close", "Volume", "VWAP", "MA_Close_5", "Prev_Close", "Prev_Volume"]
    batch_df = batch_df.dropna(subset=feature_columns)

    if "features" in batch_df.columns:
        batch_df = batch_df.drop("features")

    model = PipelineModel.load("model/")

    # Apply the model to make predictions on the streaming data
    predictions = model.transform(batch_df)
    predictions_output = predictions.select("Symbol", "Date", "prediction") \
        .withColumnRenamed("prediction", "Action")
    predictions_output.write.format("console").mode("append").option("truncate", "false").save()

query = stock_stream.writeStream \
    .foreachBatch(process_batch) \
    .outputMode("update") \
    .start()
query.awaitTermination()


24/10/13 11:52:37 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/xs/mhfmsc0n6z71fqjy361gfjx00000gn/T/temporary-d4cd7263-ce2f-4a77-98b5-0a36cd66b282. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/10/13 11:52:37 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/10/13 11:52:37 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

+------+-------------------+------+
|Symbol|Date               |Action|
+------+-------------------+------+
|TITAN |2024-10-13 11:52:35|1.0   |
|TITAN |2024-10-13 11:52:35|1.0   |
|TITAN |2024-10-13 11:52:35|1.0   |
|ITC   |2024-10-13 11:52:35|1.0   |
|ITC   |2024-10-13 11:52:35|1.0   |
|ITC   |2024-10-13 11:52:35|1.0   |
|ITC   |2024-10-13 11:52:35|1.0   |
|ITC   |2024-10-13 11:52:35|1.0   |
+------+-------------------+------+

+------+-------------------+------+
|Symbol|Date               |Action|
+------+-------------------+------+
|TITAN |2024-10-13 11:52:35|1.0   |
|TITAN |2024-10-13 11:52:35|1.0   |
|TITAN |2024-10-13 11:52:35|1.0   |
|ITC   |2024-10-13 11:52:35|1.0   |
|ITC   |2024-10-13 11:52:35|1.0   |
|ITC   |2024-10-13 11:52:35|1.0   |
|ITC   |2024-10-13 11:52:35|1.0   |
|ITC   |2024-10-13 11:52:35|1.0   |
+------+-------------------+------+

+------+-------------------+------+
|Symbol|Date               |Action|
+------+-------------------+------+
|TITAN |2024-10-13 11:52:3

                                                                                

+------+----+------+
|Symbol|Date|Action|
+------+----+------+
+------+----+------+

+------+-------------------+------+
|Symbol|Date               |Action|
+------+-------------------+------+
|TITAN |2024-10-13 11:52:36|1.0   |
|TITAN |2024-10-13 11:52:36|1.0   |
|TITAN |2024-10-13 11:52:36|0.0   |
|TITAN |2024-10-13 11:52:36|0.0   |
|TITAN |2024-10-13 11:52:37|1.0   |
|TITAN |2024-10-13 11:52:37|0.0   |
|TITAN |2024-10-13 11:52:37|0.0   |
|TITAN |2024-10-13 11:52:37|0.0   |
|TITAN |2024-10-13 11:52:37|0.0   |
|TITAN |2024-10-13 11:52:37|0.0   |
|TITAN |2024-10-13 11:52:37|1.0   |
|TITAN |2024-10-13 11:52:37|0.0   |
|TITAN |2024-10-13 11:52:38|0.0   |
|TITAN |2024-10-13 11:52:38|0.0   |
|TITAN |2024-10-13 11:52:38|0.0   |
|TITAN |2024-10-13 11:52:39|0.0   |
|TITAN |2024-10-13 11:52:39|1.0   |
|TITAN |2024-10-13 11:52:39|0.0   |
|TITAN |2024-10-13 11:52:39|0.0   |
|TITAN |2024-10-13 11:52:39|0.0   |
+------+-------------------+------+
only showing top 20 rows

+------+-----------------

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/anaconda3/lib/python3.11/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/anaconda3/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

+------+-------------------+------+
|Symbol|Date               |Action|
+------+-------------------+------+
|TITAN |2024-10-13 11:53:19|1.0   |
|TITAN |2024-10-13 11:53:19|1.0   |
|TITAN |2024-10-13 11:53:19|0.0   |
|TITAN |2024-10-13 11:53:19|0.0   |
|TITAN |2024-10-13 11:53:19|1.0   |
|TITAN |2024-10-13 11:53:19|0.0   |
|TITAN |2024-10-13 11:53:19|0.0   |
|TITAN |2024-10-13 11:53:20|1.0   |
|TITAN |2024-10-13 11:53:20|0.0   |
|TITAN |2024-10-13 11:53:20|0.0   |
|TITAN |2024-10-13 11:53:20|0.0   |
|TITAN |2024-10-13 11:53:20|0.0   |
|TITAN |2024-10-13 11:53:20|1.0   |
|TITAN |2024-10-13 11:53:20|1.0   |
|TITAN |2024-10-13 11:53:21|0.0   |
|TITAN |2024-10-13 11:53:21|0.0   |
|TITAN |2024-10-13 11:53:21|0.0   |
|TITAN |2024-10-13 11:53:21|0.0   |
|TITAN |2024-10-13 11:53:22|0.0   |
|TITAN |2024-10-13 11:53:22|1.0   |
+------+-------------------+------+
only showing top 20 rows

+------+-------------------+------+
|Symbol|Date               |Action|
+------+-------------------+------+
|T