## SparkStreaming Hackathon
### Course: Real-time Data Analysis
### Authors: Ruben Tak, Nils Jennissen, David Landeo
This task involves setting up a data streaming pipeline to extract and process posts and comments from Reddit. The data will be structured and sent through a socket, then received and processed by another process. References to users, posts, and external sites will be extracted and counted, and the top 10 important words will be identified using TF-IDF. Optional features include sentiment analysis, additional metrics, saving results to a database, creating a Jupyter Notebook dashboard, and visualizing the results on a web page. The deliverables include Python code, instructions, output data files, and optional Docker setup.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType
from pyspark.sql.functions import from_json, regexp_extract, split, window, count, min, max, avg, udf
from pyspark.ml.feature import CountVectorizer, IDF
from textblob import TextBlob
from pyspark.sql.functions import explode, to_date
from pyspark.sql.types import TimestampType
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.functions import from_unixtime
from typing import Iterator
import pandas as pd

def preprocess_comment(df):
    return df.withColumn("words", split(df["comment"], " "))

def calculate_tfidf(df):
    if df.rdd.isEmpty():
        return None, None, None

    vectorizer = CountVectorizer(inputCol="words", outputCol="raw_features")
    vectorizer_model = vectorizer.fit(df)
    count_vectorized = vectorizer_model.transform(df)

    idf = IDF(inputCol="raw_features", outputCol="features")
    idf_model = idf.fit(count_vectorized)
    tfidf = idf_model.transform(count_vectorized)

    return tfidf, vectorizer_model, idf_model

def extract_references(df):
    return df.select("comment", "prev_comment", "post", "created_utc",
                     regexp_extract("comment", '/u/([^\\s/]+)', 1).alias('user_reference'),
                     regexp_extract("comment", '/r/([^\\s/]+)', 1).alias('post_reference'),
                     regexp_extract("comment", 'http[s]?://([^\\s/]+)', 1).alias('site_reference'))

def calculate_time_range(df):
    # Filter rows with non-null values in the created_utc column
    df = df.filter(df["created_utc"].isNotNull())

    if df.rdd.isEmpty():
        return None, None

    time_range = df.agg(min("created_utc").alias("min_date"), max("created_utc").alias("max_date")).collect()[0]
    return time_range["min_date"], time_range["max_date"]


def calculate_sentiment(df):
    if df.rdd.isEmpty():
        return df

    @pandas_udf(FloatType())
    def sentiment_score(series: pd.Series) -> pd.Series:
        return series.apply(lambda text: TextBlob(text).sentiment.polarity)

    df = df.withColumn("sentiment", sentiment_score(df["comment"]))
    return df

def most_common_words(df, n=10):
    words_df = df.select(explode(split(df["post"], " ")).alias("word"))
    word_counts = words_df.groupBy("word").agg(count("*").alias("count")).orderBy("count", ascending=False)
    return word_counts.limit(n)

def process_batch(df, epoch_id):
    if df.rdd.isEmpty():
        return

    # Preprocess comments
    preprocessed_comments = preprocess_comment(df)

    # Print schema and sample data for debugging
    preprocessed_comments.printSchema()
    preprocessed_comments.show(5)

    # Calculate TF-IDF
    tfidf, vectorizer_model, idf_model = calculate_tfidf(preprocessed_comments)

    if vectorizer_model and idf_model:
        # Get top 10 important words
        vocab = vectorizer_model.vocabulary
        top_10_words = idf_model.idf.toArray().argsort()[-10:]
        top_10_words = [vocab[idx] for idx in top_10_words]
        if top_10_words:
            print("Top 10 important words:")
            print(top_10_words)

    # Extract references from the data
    references_df = extract_references(preprocessed_comments)

    # Save the raw data to a temporary table in Spark
    preprocessed_comments.createOrReplaceTempView("raw")

    # Save the raw data to disk
    preprocessed_comments.write.json("output/raw", mode="append")

    # Calculate occurrences of references
    user_ref_counts = references_df.groupBy(window("created_utc", "60 seconds", "5 seconds"), "user_reference").agg(count("*").alias("count")).orderBy("window", "count", ascending=False)
    post_ref_counts = references_df.groupBy(window("created_utc", "60 seconds", "5 seconds"), "post_reference").agg(count("*").alias("count")).orderBy("window", "count", ascending=False)
    site_ref_counts = references_df.groupBy(window("created_utc", "60 seconds", "5 seconds"), "site_reference").agg(count("*").alias("count")).orderBy("window", "count", ascending=False)

    # Print the occurrences if not empty
    if not user_ref_counts.rdd.isEmpty():
        print("User references:")
        user_ref_counts.show()
    if not post_ref_counts.rdd.isEmpty():
        print("Post references:")
        post_ref_counts.show()
    if not site_ref_counts.rdd.isEmpty():
        print("Site references:")
        site_ref_counts.show()

    # Calculate time range
    min_date, max_date = calculate_time_range(preprocessed_comments)
    if min_date and max_date:
        print(f"Time range of the data: {min_date} - {max_date}")

    # Calculate sentiment
    sentiment_df = calculate_sentiment(preprocessed_comments)
    if not sentiment_df.rdd.isEmpty():
        avg_sentiment = sentiment_df.agg(avg("sentiment").alias("average_sentiment")).collect()[0]["average_sentiment"]
        print(f"Average sentiment: {avg_sentiment}")
    else:
        print("No sentiment data available.")

    # Calculate most common words
    common_words_df = most_common_words(preprocessed_comments)
    print("Most common words in post titles:")
    common_words_df.show()

def calculate_sentiment(df):
    if df.rdd.isEmpty():
        return df

    @pandas_udf(FloatType(), PandasUDFType.SCALAR)
    def sentiment_score(series: pd.Series) -> pd.Series:
        return series.apply(lambda text: TextBlob(text).sentiment.polarity)

    df = df.withColumn("sentiment", sentiment_score(df["comment"]))
    return df

# Create a SparkSession
spark = SparkSession.builder.appName("reddit").getOrCreate()

# Define the schema
schema = StructType([
    StructField("comment", StringType(), True),
    StructField("prev_comment", StringType(), True),
    StructField("post", StringType(), True),
    StructField("created_utc", StringType(), True),
])

# Read the data from the socket as a streaming DataFrame
raw_data = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()

# Parse the JSON data and apply the schema
parsed_data = raw_data.select(from_json(raw_data.value, schema).alias("data")).select("data.*")

# Convert the created_utc field to TimestampType
parsed_data = parsed_data.withColumn("created_utc", from_unixtime(parsed_data["created_utc"]).cast(TimestampType()))

# Process each batch of data
query = parsed_data.writeStream.foreachBatch(process_batch).start()

query.awaitTermination()

23/06/21 21:37:26 WARN Utils: Your hostname, Nilss-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.108 instead (on interface en0)
23/06/21 21:37:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Exception in thread "main" java.lang.ExceptionInInitializerError
	at org.apache.spark.unsafe.array.ByteArrayMethods.<clinit>(ByteArrayMethods.java:54)
	at org.apache.spark.internal.config.package$.<init>(package.scala:1006)
	at org.apache.spark.internal.config.package$.<clinit>(package.scala)
	at org.apache.spark.deploy.SparkSubmitArguments.$anonfun$loadEnvironmentArguments$3(SparkSubmitArguments.scala:157)
	at scala.Option.orElse(Option.scala:447)
	at org.apache.spark.deploy.SparkSubmitArguments.loadEnvironmentArguments(SparkSubmitArguments.scala:157)
	at org.apache.spark.deploy.SparkSubmitArguments.<init>(SparkSubmitArguments.scala:115)
	at org.apache.spark.deploy.SparkSubmit$$anon$2$$anon$3.<init>(SparkSubmit.scala:990)
	at org.apache.spark

Exception: Java gateway process exited before sending its port number