In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, FloatType

# Build SparkSession with Kafka integration
spark = SparkSession.builder \
    .appName("CryptoStream") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0") \
    .getOrCreate()

# Define the schema
schema = StructType([
    StructField("id", StringType(), True),
    StructField("current_price", FloatType(), True),
    StructField("market_cap", FloatType(), True),
    StructField("total_volume", FloatType(), True)
])

# Read from Kafka
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "crypto-topic") \
    .load()

df = df.selectExpr("CAST(value AS STRING)") \
       .select(from_json(col("value"), schema).alias("data")) \
       .select("data.*")

# Perform some processing (e.g., calculate average price)
df_agg = df.groupBy("id").avg("current_price")

# Write the aggregated data to a CSV file
query = df_agg.writeStream \
    .outputMode("complete") \
    .format("csv") \
    .option("path", "processed_crypto_data") \
    .option("checkpointLocation", "checkpoints") \
    .start()

query.awaitTermination()


AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of Structured Streaming + Kafka Integration Guide.