In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, from_json, col, lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import json

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 pyspark-shell'

# Set up PySpark
spark = SparkSession.builder \
    .appName("KafkaConsumer") \
    .getOrCreate()

# Kafka configurations
kafka_bootstrap_servers = "localhost:9092"
kafka_topics = ["turbine1", "turbine2", "turbine3", "turbine4", "turbine5"]

# Create a separate output path for each Kafka topic
output_paths = {topic: f"data/{topic}" for topic in kafka_topics}

In [None]:
# Read from Kafka
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", ",".join(kafka_topics)) \
    .option("failOnDataLoss", "false") \
    .option("startingOffsets", "earliest") \
    .option("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") \
    .load()

# Cast the message value to a string
df = df.selectExpr("CAST(value AS STRING)")

query = df \
    .writeStream \
    .outputMode("append") \
    .format("csv") \
    .option("checkpointLocation", "test") \
    .option("path", "data") \
    .option("header", "True") \
    .start()

query.awaitTermination()