In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Start Spark session
spark = SparkSession.builder \
    .appName("RedditTrumpFilterToCSV") \
    .master("local[*]") \
    .getOrCreate()

# Reduce log verbosity
spark.sparkContext.setLogLevel("ERROR")

# Read stream from socket
df = spark.readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# Filter for keyword "Trump" (case-insensitive)
filtered_df = df.filter(
    col("value").rlike("(?i).*\\bTrump\\b.*")
)

# Add a column name for CSV compatibility
final_df = filtered_df.withColumnRenamed("value", "comment")

# Save filtered comments to HDFS as CSV
query = final_df.writeStream \
    .outputMode("append") \
    .format("csv") \
    .option("path", "/tmp/reddit_trump_output_csv") \
    .option("checkpointLocation", "/tmp/reddit_trump_checkpoint_csv") \
    .option("header", "true") \
    .start()

print("✅ Streaming started. Saving Trump-related comments as CSV...")

query.awaitTermination()


✅ Streaming started. Saving Trump-related comments as CSV...


                                                                                