In [0]:
%pip install textblob

Collecting textblob
  Obtaining dependency information for textblob from https://files.pythonhosted.org/packages/02/07/5fd2945356dd839974d3a25de8a142dc37293c21315729a41e775b5f3569/textblob-0.18.0.post0-py3-none-any.whl.metadata
  Downloading textblob-0.18.0.post0-py3-none-any.whl.metadata (4.5 kB)
Downloading textblob-0.18.0.post0-py3-none-any.whl (626 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/626.3 kB[0m [31m?[0m eta [36m-:--:--[0m
[2K   [91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.2/626.3 kB[0m [31m?[0m eta [36m-:--:--[0m
[2K   [91m━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m30.7/626.3 kB[0m [31m791.1 kB/s[0m eta [36m0:00:01[0m
[2K   [91m━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.4/626.3 kB[0m [31m554.7 kB/s[0m eta [36m0:00:02[0m
[2K   [91m━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m92.2/626.3 kB[0m [31m720.3 kB/s[0m eta [36m0:

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from textblob import TextBlob
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml import Pipeline

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Reddit Sentiment & Emotion Analysis") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .config("spark.sql.warehouse.dir", "file:///tmp/spark-warehouse") \
    .getOrCreate()


### Define UDFs for Sentiment and Emotion Analysis
Next, define functions for sentiment polarity (using TextBlob) and emotion detection based on sentiment polarity.

In [0]:
# Define UDFs for sentiment and emotion analysis
def get_sentiment(text):
    if text:
        blob = TextBlob(text)
        return blob.sentiment.polarity
    return 0.0  # Return 0.0 for empty or null texts

def get_emotion(text):
    sentiment = get_sentiment(text)
    if sentiment > 0.1:
        return "Positive"
    elif sentiment < -0.1:
        return "Negative"
    else:
        return "Neutral"

# Register UDFs for sentiment and emotion analysis
spark.udf.register("get_sentiment", get_sentiment)
spark.udf.register("get_emotion", get_emotion)

<function __main__.get_emotion(text)>

### Load Data from Silver Layer

In [0]:
# Load data from the Silver layer (assuming it is stored in Delta format)
silver_df = spark.read.format("delta").table("big_data_analytics_v.big_data_analytics_sesssion_v.silver_reddit_posts")

### Apply Sentiment and Emotion Analy sis  on 'title' and 'description'
This part involves applying the sentiment and emotion analysis functions (get_sentiment and get_emotion) to the title and description columns of the Reddit posts.

In [0]:
# Perform sentiment and emotion analysis on 'title' and 'description'
transformed_df = silver_df.withColumn("title_polarity", F.expr("get_sentiment(title)")) \
                          .withColumn("title_emotion", F.expr("get_emotion(title)")) \
                          .withColumn("description_polarity", F.expr("get_sentiment(description)")) \
                          .withColumn("description_emotion", F.expr("get_emotion(description)"))

### TF-IDF Feature Extraction
This part involves applying the sentiment and emotion analysis functions (get_sentiment and get_emotion) to the title and description columns of the Reddit posts.

In [0]:
# Replace null values in 'title' and 'description' columns with empty strings
transformed_df = transformed_df.fillna({'title': '', 'description': ''})

# Perform TF-IDF Feature Extraction for 'title' and 'description'
tokenizer_title = Tokenizer(inputCol="title", outputCol="title_words")
tokenizer_description = Tokenizer(inputCol="description", outputCol="description_words")

hashing_tf_title = HashingTF(inputCol="title_words", outputCol="title_tfidf")
hashing_tf_description = HashingTF(inputCol="description_words", outputCol="description_tfidf")

idf_title = IDF(inputCol="title_tfidf", outputCol="title_tfidf_features")
idf_description = IDF(inputCol="description_tfidf", outputCol="description_tfidf_features")

# Create a pipeline for TF-IDF feature extraction
pipeline = Pipeline(stages=[tokenizer_title, tokenizer_description, hashing_tf_title, 
                            hashing_tf_description, idf_title, idf_description])

# Fit and transform the data to extract features
pipeline_model = pipeline.fit(transformed_df)
final_df = pipeline_model.transform(transformed_df)

# Display the final DataFrame
display(final_df)

In [0]:
final_df.printSchema()


root
 |-- post_id: string (nullable = true)
 |-- title: string (nullable = false)
 |-- description: string (nullable = false)
 |-- subreddit: string (nullable = true)
 |-- author: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- url: string (nullable = true)
 |-- title_polarity: string (nullable = true)
 |-- title_emotion: string (nullable = true)
 |-- description_polarity: string (nullable = true)
 |-- description_emotion: string (nullable = true)
 |-- title_words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- description_words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- title_tfidf: vector (nullable = true)
 |-- description_tfidf: vector (nullable = true)
 |-- title_tfidf_features: vector (nullable = true)
 |-- description_tfidf_features: vector (nullable = true)



### Save the Final Data

In [0]:
# Create a temporary view with only necessary columns for the gold layer
final_df_gold = final_df.select(
    "post_id",
    "title",
    "description",
    "subreddit",
    "author",
    "score",
    "created_at",
    "url",
    "title_polarity",
    "title_emotion",
    "description_polarity",
    "description_emotion"
)

# Replace the temporary view with the cleaned DataFrame
final_df_gold.createOrReplaceTempView("silver_reddit_posts_temp_gold")

# Insert data into the gold layer table from the cleaned DataFrame
spark.sql("""
    INSERT INTO big_data_analytics_v.big_data_analytics_sesssion_v.gold_reddit_posts
    SELECT 
        post_id,
        title,
        description,
        subreddit,
        author,
        score,
        created_at,
        url,
        title_polarity,
        title_emotion,
        description_polarity,
        description_emotion
    FROM silver_reddit_posts_temp_gold
""")

# Log the completion of the insert process
print("Data inserted into Gold table successfully!")


Data inserted into Gold table successfully!
