In [1]:
# Create the Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("Streaming from Kafka")
    .config("spark.streaming.stopGracefullyOnShutdown", True)
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0")
    .config("spark.sql.shuffle.partitions", 4)
    .master("local[*]")
    .getOrCreate()
)

spark

In [2]:
# Create the kafka_df to read from kafka

kafka_df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "ed-kafka:29092")
    .option("subscribe", "house-data")
    .option("startingOffsets", "earliest")
    .load()
)

In [3]:
# View schema for raw kafka_df
kafka_df.printSchema()
# kafka_df.show()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [4]:
# Parse value from binay to string into kafka_json_df
from pyspark.sql.functions import expr

kafka_json_df = kafka_df.withColumn("value", expr("cast(value as string)"))

In [5]:
from pyspark.sql.functions import from_json, struct, to_json
from pyspark.sql.types import (
    FloatType,
    IntegerType,
    StringType,
    StructField,
    StructType,
)

# Define the schema based on the provided columns
house_schema = StructType(
    [
        StructField("Id", IntegerType(), True),
        StructField("MSSubClass", IntegerType(), True),
        StructField("MSZoning", StringType(), True),
        StructField("LotFrontage", IntegerType(), True),
        StructField("LotArea", IntegerType(), True),
        StructField("Street", StringType(), True),
        StructField("Alley", FloatType(), True),
        StructField("LotShape", StringType(), True),
        StructField("LandContour", StringType(), True),
        StructField("Utilities", StringType(), True),
        StructField("LotConfig", StringType(), True),
        StructField("LandSlope", StringType(), True),
        StructField("Neighborhood", StringType(), True),
        StructField("Condition1", StringType(), True),
        StructField("Condition2", StringType(), True),
        StructField("BldgType", StringType(), True),
        StructField("HouseStyle", StringType(), True),
        StructField("OverallQual", IntegerType(), True),
        StructField("OverallCond", IntegerType(), True),
        StructField("YearBuilt", IntegerType(), True),
        StructField("YearRemodAdd", IntegerType(), True),
        StructField("RoofStyle", StringType(), True),
        StructField("RoofMatl", StringType(), True),
        StructField("Exterior1st", StringType(), True),
        StructField("Exterior2nd", StringType(), True),
        StructField("MasVnrType", StringType(), True),
        StructField("MasVnrArea", IntegerType(), True),
        StructField("ExterQual", StringType(), True),
        StructField("ExterCond", StringType(), True),
        StructField("Foundation", StringType(), True),
        StructField("BsmtQual", StringType(), True),
        StructField("BsmtCond", StringType(), True),
        StructField("BsmtExposure", StringType(), True),
        StructField("BsmtFinType1", StringType(), True),
        StructField("BsmtFinSF1", IntegerType(), True),
        StructField("BsmtFinType2", StringType(), True),
        StructField("BsmtFinSF2", IntegerType(), True),
        StructField("BsmtUnfSF", IntegerType(), True),
        StructField("TotalBsmtSF", IntegerType(), True),
        StructField("Heating", StringType(), True),
        StructField("HeatingQC", StringType(), True),
        StructField("CentralAir", StringType(), True),
        StructField("Electrical", StringType(), True),
        StructField("1stFlrSF", IntegerType(), True),
        StructField("2ndFlrSF", IntegerType(), True),
        StructField("LowQualFinSF", IntegerType(), True),
        StructField("GrLivArea", IntegerType(), True),
        StructField("BsmtFullBath", IntegerType(), True),
        StructField("BsmtHalfBath", IntegerType(), True),
        StructField("FullBath", IntegerType(), True),
        StructField("HalfBath", IntegerType(), True),
        StructField("BedroomAbvGr", IntegerType(), True),
        StructField("KitchenAbvGr", IntegerType(), True),
        StructField("KitchenQual", StringType(), True),
        StructField("TotRmsAbvGrd", IntegerType(), True),
        StructField("Functional", StringType(), True),
        StructField("Fireplaces", IntegerType(), True),
        StructField("FireplaceQu", FloatType(), True),
        StructField("GarageType", StringType(), True),
        StructField("GarageYrBlt", IntegerType(), True),
        StructField("GarageFinish", StringType(), True),
        StructField("GarageCars", IntegerType(), True),
        StructField("GarageArea", IntegerType(), True),
        StructField("GarageQual", StringType(), True),
        StructField("GarageCond", StringType(), True),
        StructField("PavedDrive", StringType(), True),
        StructField("WoodDeckSF", IntegerType(), True),
        StructField("OpenPorchSF", IntegerType(), True),
        StructField("EnclosedPorch", IntegerType(), True),
        StructField("3SsnPorch", IntegerType(), True),
        StructField("ScreenPorch", IntegerType(), True),
        StructField("PoolArea", IntegerType(), True),
        StructField("PoolQC", FloatType(), True),
        StructField("Fence", FloatType(), True),
        StructField("MiscFeature", FloatType(), True),
        StructField("MiscVal", IntegerType(), True),
        StructField("MoSold", IntegerType(), True),
        StructField("YrSold", IntegerType(), True),
        StructField("SaleType", StringType(), True),
        StructField("SaleCondition", StringType(), True),
        StructField("SalePrice", IntegerType(), True),
    ]
)

# Using this schema to parse JSON messages from Kafka
parsed_df = (
    kafka_df.selectExpr("CAST(value AS STRING) as json_string")
    .select(from_json("json_string", house_schema).alias("data"))
    .select("data.*")
)

In [6]:
from pyspark.ml import PipelineModel

model = PipelineModel.load("house_price_model.pkl")
predictions_df = model.transform(parsed_df)

In [None]:
# kafka_output_df = predictions_df.select(to_json(struct(*predictions_df.columns)).alias("value"))
kafka_output_df = predictions_df.select(
    to_json(struct("Id", "prediction")).alias("value")
)


# Output the predictions to a Kafka topic
query = (
    kafka_output_df.writeStream.format("kafka")
    .option(
        "kafka.bootstrap.servers", "ed-kafka:29092"
    )  # Adjust the server address as necessary
    .option("topic", "house-data-predictions")
    .option(
        "checkpointLocation", "checkpoint_dir_kafka_to_kafka"
    )  # Specify checkpoint directory
    .outputMode("update")  # Choose the appropriate output mode
    .start()
)

query.awaitTermination()

In [None]:
# # Write the output to console sink to check the output

# (predictions_df
#  .writeStream
#  .format("console")
#  .outputMode("append")
#  .option("checkpointLocation", "checkpoint_dir_kafka")
#  .start()
#  .awaitTermination())

In [None]:
# Check the data at the output location
# spark.stop()
