In [None]:
import nltk

# Initialize NLTK's Vader and download stopwords
nltk.download('vader_lexicon')
nltk.download('stopwords')

In [None]:
references = df.select(
                explode(split(col("filtered_title"), " ")).alias("word")
            ).filter(
                (col("word").like("/u/%")) |
                (col("word").like("/r/%")) |
                (col("word").like("http%"))
            )
            print("-----------------------------")
            print(references.collect())
            print("-----------------------------")
            # User references
            user_refs = references.filter(col("word").like("/u/%")).groupBy("word").count().withColumnRenamed("count", "user_references_count")
            user_refs.createOrReplaceTempView("user_references")
            user_refs.write.json("/data/output/user_references", mode="append")
            print("User references table created and saved.")

            # Post references
            post_refs = references.filter(col("word").like("/r/%")).groupBy("word").count().withColumnRenamed("count", "post_references_count")
            post_refs.createOrReplaceTempView("post_references")
            post_refs.write.json("/data/output/post_references", mode="append")
            print("Post references table created and saved.")
            
            # URL references
            url_refs = references.filter(col("word").like("http%")).groupBy("word").count().withColumnRenamed("count", "url_references_count")
            url_refs.createOrReplaceTempView("url_references")
            url_refs.write.json("/data/output/url_references", mode="append")
            print("URL references table created and saved.")

In [1]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import explode, split, col, udf, min, max, lit
from pyspark.sql.types import StringType, DoubleType
from pyspark.ml.feature import HashingTF, IDF
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from nltk.corpus import stopwords
import json
import os

sid = SentimentIntensityAnalyzer()
stop_words = set(stopwords.words('english'))

# Define the host and port
host = "127.0.0.1"
port = 9998

# Initialize SparkContext and SparkSession
sc = SparkContext("local[2]", "RedditConsumer")
sc.setLogLevel("ERROR")
spark = SparkSession(sc)

# Initialize StreamingContext with a batch interval of 10 seconds
ssc = StreamingContext(sc, 60)

# Ensure the necessary directories exist
os.makedirs("/data/raw/reddit_ask", exist_ok=True)
os.makedirs("/data/output/user_references", exist_ok=True)
os.makedirs("/data/output/post_references", exist_ok=True)
os.makedirs("/data/output/url_references", exist_ok=True)
os.makedirs("/data/output/top_words", exist_ok=True)
os.makedirs("/data/output/metrics", exist_ok=True)
os.makedirs("/data/output/sentiment_analysis", exist_ok=True)

def get_sentiment(text):
    scores = sid.polarity_scores(text)
    return scores['compound']

def classify_sentiment(polarity):
    if polarity > 0.05:
        return 'positive'
    elif polarity < -0.05:
        return 'negative'
    else:
        return 'neutral'

def remove_stopwords(text):
    words = text.split()
    filtered_words = [word for word in words if word.lower() not in stop_words]
    return ' '.join(filtered_words)

get_sentiment_udf = udf(get_sentiment, DoubleType())
classify_sentiment_udf = udf(classify_sentiment, StringType())
remove_stopwords_udf = udf(remove_stopwords, StringType())
def new_columns(df):
    # Remove stopwords from titles
    df = df.withColumn("filtered_title", remove_stopwords_udf(col("title")))
    df = df.withColumn("polarity", get_sentiment_udf(col("filtered_title")))
    df = df.withColumn("sentiment", classify_sentiment_udf(col("polarity")))
    return df
def tf_idf(df):
    words_data = df.select(explode(split(col("filtered_title"), " ")).alias("words"))
    hashing_tf = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
    idf = IDF(inputCol="rawFeatures", outputCol="features")
    tf = hashing_tf.transform(words_data)
    idf_model = idf.fit(tf)
    return idf_model.transform(tf)
def time_optional(df):
    # Get the time range of the data
    time_range = df.select(min("date").alias("start_time"), max("date").alias("end_time")).collect()
    if time_range:
        start_time = time_range[0]["start_time"]
        end_time = time_range[0]["end_time"]
        print(f"Time range: {start_time} to {end_time}")
def metric(user_refs):
    #optional
        # Store metrics in a temporary table
    metrics_df = user_refs.join(post_refs, "word", "outer") \
                          .join(url_refs, "word", "outer")
    metrics_df = metrics_df.withColumn("average_sentiment", lit(average_sentiment))
    metrics_df = metrics_df.withColumn("start_time", lit(start_time))
    metrics_df = metrics_df.withColumn("end_time", lit(end_time))
    metrics_df.createOrReplaceTempView("metrics")
    metrics_df.write.json("/data/output/metrics", mode="append")
    print("Metrics table created and saved.")
    metrics_df.show(5, truncate=False)

def placeholder()
    # Calculate TF-IDF
            
    tfidf_data = tf_idf(df)
    print(tfidf_data)
    # Find the top words and their TF-IDF scores
    top_words = tfidf_data.select("words", "features").rdd.flatMap(
        lambda row: [(row.words, float(v)) for v in row.features.toArray()]
    ).sortBy(lambda x: -x[1]).take(10)
    print(top_words.collect())
    # Convert top words and their TF-IDF scores to DataFrame and persist it
    top_words_df = spark.createDataFrame(top_words, ["word", "tfidf"])
    top_words_df.createOrReplaceTempView("top_words")
    #top_words_df.write.json("/data/output/top_words", mode="append")
    print(spark.sql("select * from top_words"))
    print("TF-IDF table created and saved.")
    
    # Sentiment Analysis using NLTK's Vader
    sentiment_df = df.select("title", "polarity", "sentiment")
    sentiment_df.createOrReplaceTempView("sentiment_analysis")
    sentiment_df.write.json("/data/output/sentiment_analysis", mode="append")
    print("Sentiment analysis table created and saved.")
    
    sentiment_rdd = df.select("polarity").rdd.map(lambda row: row.polarity)
    average_sentiment = sentiment_rdd.mean()
    print(f"Average sentiment: {average_sentiment}")

def process_rdd(rdd):
    """
    Process each RDD:
    1. Convert RDD to DataFrame and persist it.
    2. Extract references to users, posts, and URLs.
    3. Calculate TF-IDF.
    4. Find the top words and their TF-IDF scores.
    5. Perform sentiment analysis.
    6. Store metrics in a temporary table.
    7. Get the time range of the data.
    """
    print("Processing RDD...")
    if not rdd.isEmpty():
        # Parse each record as JSON
        records = rdd.map(lambda record: json.loads(record)).collect()
        
        # Check if records are valid and not empty
        if records:
            # Convert RDD to DataFrame
            df = spark.createDataFrame(records)
            df = new_columns(df)
          
            # Save raw data
            df.createOrReplaceTempView("raw")
            df.write.json("/data/raw/reddit_ask", mode="append")
            
            # Print DataFrame schema and some data
            #df.printSchema()
            df.show(5, truncate=False)
            
            
            
            
            
            
            

            
        else:
            print("No valid records found in this RDD.")

# Define the socket stream
lines = ssc.socketTextStream(host, port)

# Apply the processing function to each RDD in the DStream
lines.foreachRDD(lambda rdd: process_rdd(rdd))

# Print received lines (for debugging)
#lines.pprint()

# Start the streaming context and wait for termination
ssc.start()
print("Streaming started...")
ssc.awaitTermination()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/09 15:34:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Streaming started...


24/06/09 15:34:51 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Socket data stream had no more data
24/06/09 15:34:54 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Socket data stream had no more data
24/06/09 15:34:56 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Socket data stream had no more data
24/06/09 15:34:59 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Socket data stream had no more data


Processing RDD...


24/06/09 15:35:02 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Socket data stream had no more data
24/06/09 15:35:05 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Socket data stream had no more data
[Stage 0:>                                                          (0 + 1) / 1]

+-----------+-------------------+---------+----------------------------------------------------------------------+--------------------------------------------+--------+---------+
|author     |date               |subreddit|title                                                                 |filtered_title                              |polarity|sentiment|
+-----------+-------------------+---------+----------------------------------------------------------------------+--------------------------------------------+--------+---------+
|usrnmthisis|2024-06-09 15:08:13|AskReddit|why do you think cats would rather starve than eating what they hunt ?|think cats would rather starve eating hunt ?|-0.4404 |negative |
|usrnmthisis|2024-06-09 15:08:13|AskReddit|why do you think cats would rather starve than eating what they hunt ?|think cats would rather starve eating hunt ?|-0.4404 |negative |
|usrnmthisis|2024-06-09 15:08:13|AskReddit|why do you think cats would rather starve than eating what the

24/06/09 15:35:06 ERROR JobScheduler: Error running job streaming job 1717947300000 ms.0
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/opt/bitnami/spark/python/pyspark/streaming/util.py", line 71, in call
    r = self.func(t, *rdds)
        ^^^^^^^^^^^^^^^^^^^
  File "/opt/bitnami/spark/python/pyspark/streaming/dstream.py", line 236, in func
    return old_func(rdd)  # type: ignore[call-arg, arg-type]
           ^^^^^^^^^^^^^
  File "/tmp/ipykernel_38846/3316594128.py", line 158, in <lambda>
    lines.foreachRDD(lambda rdd: process_rdd(rdd))
                                 ^^^^^^^^^^^^^^^^
  File "/tmp/ipykernel_38846/3316594128.py", line 123, in process_rdd
    tfidf_data = tf_idf(df)
                 ^^^^^^^^^^
  File "/tmp/ipykernel_38846/3316594128.py", line 66, in tf_idf
    tf = hashing_tf.transform(words_data)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/bitnami/spark/python/pyspark/ml/base.py", line 26

Py4JJavaError: An error occurred while calling o30.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/opt/bitnami/spark/python/pyspark/streaming/util.py", line 71, in call
    r = self.func(t, *rdds)
        ^^^^^^^^^^^^^^^^^^^
  File "/opt/bitnami/spark/python/pyspark/streaming/dstream.py", line 236, in func
    return old_func(rdd)  # type: ignore[call-arg, arg-type]
           ^^^^^^^^^^^^^
  File "/tmp/ipykernel_38846/3316594128.py", line 158, in <lambda>
    lines.foreachRDD(lambda rdd: process_rdd(rdd))
                                 ^^^^^^^^^^^^^^^^
  File "/tmp/ipykernel_38846/3316594128.py", line 123, in process_rdd
    tfidf_data = tf_idf(df)
                 ^^^^^^^^^^
  File "/tmp/ipykernel_38846/3316594128.py", line 66, in tf_idf
    tf = hashing_tf.transform(words_data)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/bitnami/spark/python/pyspark/ml/base.py", line 262, in transform
    return self._transform(dataset)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/bitnami/spark/python/pyspark/ml/wrapper.py", line 398, in _transform
    return DataFrame(self._java_obj.transform(dataset._jdf), dataset.sparkSession)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/bitnami/python/lib/python3.11/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
                   ^^^^^^^^^^^^^^^^^
  File "/opt/bitnami/spark/python/pyspark/errors/exceptions/captured.py", line 185, in deco
    raise converted from None
pyspark.errors.exceptions.captured.IllegalArgumentException: requirement failed: The input column must be array, but got string.

	at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
	at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
	at org.apache.spark.streaming.api.python.PythonDStream$.$anonfun$callForeachRDD$1(PythonDStream.scala:179)
	at org.apache.spark.streaming.api.python.PythonDStream$.$anonfun$callForeachRDD$1$adapted(PythonDStream.scala:179)
	at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
	at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:256)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)


Processing RDD...
+-----------+-------------------+---------+----------------------------------------------------------------------+--------------------------------------------+--------+---------+
|author     |date               |subreddit|title                                                                 |filtered_title                              |polarity|sentiment|
+-----------+-------------------+---------+----------------------------------------------------------------------+--------------------------------------------+--------+---------+
|usrnmthisis|2024-06-09 15:08:13|AskReddit|why do you think cats would rather starve than eating what they hunt ?|think cats would rather starve eating hunt ?|-0.4404 |negative |
|usrnmthisis|2024-06-09 15:08:13|AskReddit|why do you think cats would rather starve than eating what they hunt ?|think cats would rather starve eating hunt ?|-0.4404 |negative |
|usrnmthisis|2024-06-09 15:08:13|AskReddit|why do you think cats would rather starve th