Using Pyspark Structured Streaming, process new tweets immediately and identify and count the hashtags of each tweet. Also, use the Senative feature of each tweet to analyze the emotions and calculate and display the average of each hashtag in a real-time. Explain how to process the tweets and calculate the averages of the sentiment and report the up -to -date results. (If Negative, sentiment is 0, Positive is 1 and if neutral is 0.5) 

## Spark initialization

In [None]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession as ss

spark = ss.builder.appName("MDA2024-Project").master("local[*]").config("spark.executor.memory", "2g")\
    .config("spark.driver.memory", "2g").config("spark.hadoop.native.lib", "false")\
    .config("spark.sql.files.ignoreCorruptFiles", "true") \
    .config("spark.sql.files.ignoreMissingFiles", "true") \
    .config("spark.hadoop.io.nativeio.NativeIO", "false") \
    .getOrCreate()
sc=spark.sparkContext

sc

## Batching

In [None]:
import os
import math
import pandas as pd
from pyspark.sql.types import StructType, StructField, LongType, StringType, ArrayType
from pyspark.sql.functions import explode, col

# Define a minimal schema for the JSON data.
schema = StructType([
    StructField("timestamp", LongType(), True),
    StructField("nlp", StructType([
         StructField("sentiment", StringType(), True)
    ]), True),
    StructField("entities", StructType([
         StructField("hashtags", ArrayType(
             StructType([
                StructField("text", StringType(), True)
             ])
         ), True)
    ]), True)
])

# Read the JSONL file as a static DataFrame.
df = spark.read.schema(schema).json("/content/twitter_sample.jsonl")

# Flatten the data by exploding the hashtags array.
# This produces one row per hashtag.
flat_df = df.withColumn("hashtag", explode(col("entities.hashtags.text"))) \
            .select("timestamp", col("nlp.sentiment").alias("sentiment"), "hashtag")

# Collect the flattened DataFrame (small datasets are OK).
rows = flat_df.collect()

# Define batch size and output directory.
batch_size = 10
output_dir = "streaming_csv"
os.makedirs(output_dir, exist_ok=True)

# Compute the number of batches and write each batch as a CSV file.
num_batches = math.ceil(len(rows) / batch_size)
for i in range(num_batches):
    batch_rows = rows[i * batch_size:(i + 1) * batch_size]
    # Convert each Row to a dictionary.
    batch_dicts = [row.asDict() for row in batch_rows]
    batch_pd_df = pd.DataFrame(batch_dicts)
    csv_file = os.path.join(output_dir, f"batch_{i}.csv")
    batch_pd_df.to_csv(csv_file, index=False)
    print(f"Wrote {csv_file}")


Wrote streaming_csv/batch_0.csv
Wrote streaming_csv/batch_1.csv
Wrote streaming_csv/batch_2.csv
Wrote streaming_csv/batch_3.csv
Wrote streaming_csv/batch_4.csv
Wrote streaming_csv/batch_5.csv
Wrote streaming_csv/batch_6.csv
Wrote streaming_csv/batch_7.csv
Wrote streaming_csv/batch_8.csv
Wrote streaming_csv/batch_9.csv
Wrote streaming_csv/batch_10.csv
Wrote streaming_csv/batch_11.csv
Wrote streaming_csv/batch_12.csv
Wrote streaming_csv/batch_13.csv
Wrote streaming_csv/batch_14.csv
Wrote streaming_csv/batch_15.csv
Wrote streaming_csv/batch_16.csv
Wrote streaming_csv/batch_17.csv
Wrote streaming_csv/batch_18.csv
Wrote streaming_csv/batch_19.csv
Wrote streaming_csv/batch_20.csv
Wrote streaming_csv/batch_21.csv
Wrote streaming_csv/batch_22.csv
Wrote streaming_csv/batch_23.csv
Wrote streaming_csv/batch_24.csv
Wrote streaming_csv/batch_25.csv
Wrote streaming_csv/batch_26.csv
Wrote streaming_csv/batch_27.csv
Wrote streaming_csv/batch_28.csv
Wrote streaming_csv/batch_29.csv
Wrote streaming_csv/

## Stream Data analysis

In [None]:
import time
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, StringType
from pyspark.sql.functions import to_timestamp, window, when, col, count, avg

# Define the schema for the CSV files.
# The CSV files are assumed to have columns: timestamp, sentiment, hashtag.
csv_schema = StructType([
    StructField("timestamp", LongType(), True),
    StructField("sentiment", StringType(), True),
    StructField("hashtag", StringType(), True)
])

# Read the CSV files as a streaming source.
# The option "maxFilesPerTrigger" simulates a new batch arriving by processing one file per trigger.
csv_stream = spark.readStream \
    .schema(csv_schema) \
    .option("maxFilesPerTrigger", 1) \
    .csv("/content/streaming_csv")

# Prepare the DataFrame by converting the timestamp column to an event time
# and mapping the sentiment string to a numeric value (-1, 1, or 0).
tweets = csv_stream.withColumn("event_time", to_timestamp(col("timestamp"))) \
                   .withColumn("sentiment_value",
                               when(col("sentiment") == "Neg", -1)
                               .when(col("sentiment") == "Pos", 1)
                               .otherwise(0))

# **Query 1: Hashtag counts over a 2-second window.**
# Modified hashtag_counts query
hashtag_counts = tweets.withWatermark("event_time", "2 seconds") \
    .groupBy(
        window(col("event_time"), "2 seconds"),
        col("hashtag")
    ).agg(count("*").alias("hashtag_count"))

# Change output mode
query_counts = hashtag_counts.writeStream \
    .outputMode("update") \
    .format("memory") \
    .queryName("hashtag_counts_table") \
    .option("truncate", False) \
    .trigger(processingTime="1 second") \
    .start()


# **Query 2: Average sentiment per hashtag.**
hashtag_sentiment_avg = tweets.groupBy("hashtag") \
    .agg(avg("sentiment_value").alias("avg_sentiment"))

query_sentiment = hashtag_sentiment_avg.writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("sentiment_table") \
    .option("truncate", False) \
    .trigger(processingTime="1 second") \
    .start()

# Instead of using .awaitTermination(), poll for a fixed amount of time.
for i in range(30):  # Let the stream run for 30 seconds.
    if not (query_counts.isActive and query_sentiment.isActive):
        break
    time.sleep(1)

# Stop the streaming queries gracefully.
spark.sql("SELECT * FROM hashtag_counts_table").show()
spark.sql("SELECT * FROM sentiment_table").show()


+--------------------+--------------------+-------------+
|              window|             hashtag|hashtag_count|
+--------------------+--------------------+-------------+
|{2023-12-01 06:14...|         پرستو_معینی|            1|
|{2023-11-22 11:18...|        دلیران_میدان|            1|
|{2023-12-01 06:14...|          زهرا_صفایی|            1|
|{2023-11-10 07:10...|         درمان_سرطان|            1|
|{2023-11-10 07:10...|       سرطان_پروستات|            1|
|{2023-11-10 07:10...|          رادیوتراپی|            1|
|{2023-11-10 07:10...|               سرطان|            1|
|{2023-11-10 07:10...|    متخصص_انکولوژیست|            1|
|{2023-11-22 11:18...|           بسیج_مردم|            1|
|{2023-11-10 07:10...|          انکولوژیست|            1|
|{2023-12-01 06:14...|     محمدمسعود_معینی|            1|
|{2023-11-13 10:14...|           میدان_دار|            1|
|{2023-12-01 06:14...|            حقوق_بشر|            1|
|{2023-12-01 06:14...|     اعتراضات_سراسرى|            1|
|{2023-12-01 0