## Load Silver data

In [0]:
spark.conf.set(
    "fs.azure.account.key.toxicitylake7032.dfs.core.windows.net",
    "DJ khaled anotha one keys keys keyss"
)


In [0]:
from pyspark.sql import functions as F

silver_path = "abfss://lakehouse@toxicitylake7032.dfs.core.windows.net/silver/reddit/"

df_silver = spark.read.parquet(silver_path)

df_silver.printSchema()
df_silver.show(5, truncate=False)


root
 |-- post_id: string (nullable = true)
 |-- publish_date: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- parent_user_id: string (nullable = true)
 |-- content: string (nullable = true)
 |-- url: string (nullable = true)
 |-- language: string (nullable = true)
 |-- interaction_type: string (nullable = true)
 |-- platform: string (nullable = true)
 |-- community: string (nullable = true)
 |-- strict_filter: integer (nullable = true)
 |-- sentiment_vader: double (nullable = true)
 |-- sentiment_textblob: double (nullable = true)
 |-- subjectivity_textblob: double (nullable = true)
 |-- toxicity_toxigen: double (nullable = true)

+------------------------------------+------------+------------------------------------+---------+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## Filter + clean basic columns

In [0]:

df = (
    df_silver
    .filter(F.col("content").isNotNull() & (F.length("content") > 3))
    .withColumn("publish_ts", F.from_unixtime("publish_date").cast("timestamp"))
    .withColumn("publish_date_only", F.to_date("publish_ts"))
)


## Feature Engineering

In [0]:
# length features:

df = (
    df
    .withColumn("content_length_chars", F.length("content"))
    .withColumn("content_length_words", F.size(F.split("content", r"\s+")))
)

In [0]:
# toxicity binary label:

df = df.withColumn(
    "toxicity_label",
    (F.col("toxicity_toxigen") >= 0.5).cast("int")
)


In [0]:
# sentiment bucket:

df = df.withColumn(
    "sentiment_bucket",
    F.when(F.col("sentiment_vader") <= -0.2, "negative")
     .when(F.col("sentiment_vader") >= 0.2, "positive")
     .otherwise("neutral")
)


In [0]:
# Risk Scoring:

df = df.withColumn(
    "risk_level",
    F.when(F.col("toxicity_label") == 1, "high")
     .when(F.col("strict_filter") == 1, "medium")
     .otherwise("low")
)


## Save Gold layer to Data Lake

In [0]:
df_gold = df.select(
    "post_id",
    "user_id",
    "parent_id",
    "parent_user_id",
    "platform",
    "community",
    "interaction_type",
    "publish_ts",
    "publish_date_only",
    "content",
    "url",
    "sentiment_vader",
    "sentiment_textblob",
    "subjectivity_textblob",
    "toxicity_toxigen",
    "strict_filter",
    "content_length_chars",
    "content_length_words",
    "toxicity_label",
    "sentiment_bucket",
    "risk_level"
)

df_gold.show(5, truncate=False)


+------------------------------------+------------------------------------+---------+--------------+--------+-------------+----------------+-------------------+-----------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
df_silver.select("community").distinct().show()

+--------------+
|     community|
+--------------+
| CringeAnarchy|
|    MensRights|
|greatawakening|
| fatpeoplehate|
+--------------+



In [0]:
spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")


In [0]:
gold_path = "abfss://lakehouse@toxicitylake7032.dfs.core.windows.net/gold/reddit_features"

df_gold.write.mode("overwrite").parquet(gold_path)

## Daily aggregated table for Power BI graphs

In [0]:
df_daily = (
    df_gold.groupBy("publish_date_only", "community")
    .agg(
        F.count("*").alias("num_interactions"),
        F.avg("toxicity_toxigen").alias("avg_toxicity"),
        F.avg("toxicity_label").alias("toxicity_rate"),
    )
)

daily_path = "abfss://lakehouse@toxicitylake7032.dfs.core.windows.net/gold/reddit_daily_stats"

df_daily.write.mode("overwrite").parquet(daily_path)
