In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, expr
from pyspark.sql.types import StructType, StringType, DoubleType
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.linalg import DenseVector
import mlflow
import mlflow.spark
from mlflow.models.signature import infer_signature
import time

In [2]:
# Initialize Spark Session with Kafka package
spark = SparkSession.builder \
    .appName("YelpKafkaSentiment") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.4") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.driver.maxResultSize", "2g") \
    .config("spark.sql.streaming.checkpointLocation", "/tmp/spark-checkpoint") \
    .getOrCreate()

# Define schema
schema = StructType() \
    .add("text", StringType()) \
    .add("stars", DoubleType())

# Ingest from Kafka
df_raw = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "yelp_reviews") \
    .option("startingOffsets", "earliest") \
    .load()

# Parse JSON
df = df_raw.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*") \
    .withColumn("label", col("stars").cast("int")) \
    .drop("stars")

# Show the results
query = df.writeStream.format("console").outputMode("append").start()
time.sleep(30)  # run for 30 seconds
query.stop()

25/04/20 18:25:17 WARN Utils: Your hostname, sriganesh-Inspiron-14-Plus-7440 resolves to a loopback address: 127.0.1.1; using 172.31.82.137 instead (on interface wlp0s20f3)
25/04/20 18:25:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/sriganesh/conda_root/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/sriganesh/.ivy2/cache
The jars for the packages stored in: /home/sriganesh/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e4c3f2a3-5f45-4c7f-88b3-0a88f285efa4;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.4 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.4 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.5 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in local-m2-cache
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 398ms :: artifact

-------------------------------------------
Batch: 0
-------------------------------------------


25/04/20 18:25:29 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


+--------------------+-----+
|                text|label|
+--------------------+-----+
|If you decide to ...|    3|
|I've taken a lot ...|    5|
|Family diner. Had...|    3|
|Wow!  Yummy, diff...|    5|
|Cute interior and...|    4|
|I am a long term ...|    1|
|Loved this tour! ...|    5|
|Amazingly amazing...|    5|
|This easter inste...|    3|
|Had a party of 6 ...|    3|
|My experience wit...|    5|
|Locals recommende...|    4|
|Love going here f...|    4|
|Good food--loved ...|    4|
|The bun makes the...|    4|
|Great place for b...|    5|
|Tremendous servic...|    5|
|The hubby and I h...|    4|
|I go to blow bar ...|    5|
|My absolute favor...|    5|
+--------------------+-----+
only showing top 20 rows



In [4]:
df.writeStream \
    .format("memory") \
    .queryName("training_buffer") \
    .outputMode("append") \
    .start()

print("⌛ Buffering for 60s...")
time.sleep(60)
training_df = spark.sql("SELECT * FROM training_buffer")
print(f"Training on {training_df.count()} records...")

25/04/20 18:28:33 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/04/20 18:28:33 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


⌛ Buffering for 60s...


25/04/20 18:29:35 WARN TaskSetManager: Stage 2 contains a task of very large size (5285 KiB). The maximum recommended task size is 1000 KiB.


Training on 200000 records...


In [5]:
# Preprocessing pipeline
tokenizer = Tokenizer(inputCol="text", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features")

In [None]:
# Define Models
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=50)
# Create a pipelinea
base_stages = [tokenizer, remover, hashingTF, idf]
lr_pipeline = Pipeline(stages=base_stages + [lr])
rf_pipeline = Pipeline(stages=base_stages + [rf])

In [13]:
# Sample input DataFrame for logging models (replace with your actual input DataFrame)
input_example = training_df.limit(5).toPandas()  # Spark DataFrame -> pandas DataFrame

# Function to log models with input example and signature
def log_model_with_signature(model, model_name):
    # Get predictions (or output) for input example
    predictions = model.transform(training_df)
    
    # Infer the model signature from input and output
    signature = infer_signature(input_example, predictions.select("prediction").limit(5).toPandas())
    
    # Log the model with input example and signature
    mlflow.spark.log_model(
        model,
        model_name,
        input_example=input_example,
        signature=signature
    )

In [None]:
mlflow.set_experiment("KafkaYelpSentiment")

with mlflow.start_run(run_name="EnsembleModels"):
    lr_model = lr_pipeline.fit(training_df)
    rf_model = rf_pipeline.fit(training_df)
    log_model_with_signature(lr_model, "lr_model")
    log_model_with_signature(rf_model, "rf_model")

    mlflow.log_param("model_type", "Ensemble of LR + RF")# Re-parse stream
predict_df = df_raw.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.text")

# Make predictions from each model
lr_pred = lr_model.transform(predict_df).select("text", col("prediction").alias("lr_pred"))
rf_pred = rf_model.transform(predict_df).select("text", col("prediction").alias("rf_pred"))

# Join on text
joined = lr_pred.join(rf_pred, "text")

# Majority vote
final_pred = joined.withColumn(
    "prediction",
    expr("int(array(lr_pred, rf_pred)[0] + array(lr_pred, rf_pred)[1]) / 2")
)

# Output to console
query = final_pred.select("text", "lr_pred", "rf_pred", "prediction") \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()
time.sleep(30)  # run for 30 seconds
query.stop()

25/04/20 18:55:57 WARN TaskSetManager: Stage 226 contains a task of very large size (5285 KiB). The maximum recommended task size is 1000 KiB.
25/04/20 18:56:01 WARN TaskSetManager: Stage 228 contains a task of very large size (5285 KiB). The maximum recommended task size is 1000 KiB.
25/04/20 18:56:03 WARN TaskSetManager: Stage 230 contains a task of very large size (5285 KiB). The maximum recommended task size is 1000 KiB.
25/04/20 18:56:05 WARN TaskSetManager: Stage 232 contains a task of very large size (5285 KiB). The maximum recommended task size is 1000 KiB.
25/04/20 18:56:05 WARN TaskSetManager: Stage 234 contains a task of very large size (5285 KiB). The maximum recommended task size is 1000 KiB.
25/04/20 18:56:06 WARN TaskSetManager: Stage 236 contains a task of very large size (5285 KiB). The maximum recommended task size is 1000 KiB.
25/04/20 18:56:06 WARN TaskSetManager: Stage 238 contains a task of very large size (5285 KiB). The maximum recommended task size is 1000 KiB.

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+-------+-------+----------+
|                text|lr_pred|rf_pred|prediction|
+--------------------+-------+-------+----------+
|So much to like a...|    4.0|    5.0|       4.5|
|Our 1st visit her...|    5.0|    5.0|       5.0|
|tldr; Huge plates...|    4.0|    5.0|       4.5|
|Three chances and...|    1.0|    5.0|       3.0|
|Yummy food,  they...|    5.0|    5.0|       5.0|
|Very cool bar att...|    4.0|    5.0|       4.5|
|Was taken here to...|    5.0|    5.0|       5.0|
|I have groen up e...|    5.0|    5.0|       5.0|
|I'm not really in...|    5.0|    5.0|       5.0|
|Visited Shaving G...|    5.0|    5.0|       5.0|
|Very good local i...|    4.0|    5.0|       4.5|
|Really like this ...|    4.0|    5.0|       4.5|
|I've only been he...|    5.0|    5.0|       5.0|
|Where do I start....|    5.0|    5.0|       5.0|
|Very cool nostalg...|    5.0|    5.0|       5.0|
|Yu

In [20]:
custom_text = "The food was ok ok!"

# Create a DataFrame for custom input
custom_df = spark.createDataFrame([(custom_text,)], ["text"])

# Transform using models
lr_pred = lr_model.transform(custom_df).select("text", col("prediction").alias("lr_pred"))
rf_pred = rf_model.transform(custom_df).select("text", col("prediction").alias("rf_pred"))

# Join predictions on 'text'
joined = lr_pred.join(rf_pred, "text")

# Ensemble prediction (majority vote)
final_pred = joined.withColumn(
    "prediction",
    expr("int((lr_pred + rf_pred) / 2)")
)

# Show the final result
final_pred.select("text", "lr_pred", "rf_pred", "prediction").show()

+-------------------+-------+-------+----------+
|               text|lr_pred|rf_pred|prediction|
+-------------------+-------+-------+----------+
|The food was ok ok!|    3.0|    5.0|         4|
+-------------------+-------+-------+----------+



25/04/20 19:02:08 WARN DAGScheduler: Broadcasting large task binary with size 1241.0 KiB
                                                                                