In [None]:
# similar to the labs setup im leaving the feature prepping in a seperate notebook 
spark.conf.set(
    "fs.azure.account.key.project60300347.dfs.core.windows.net",
    # Im removing this again bcz github dont allow me to leave it in
)

In [None]:
tweets_clean = spark.read.format("delta").load(
    "abfss://lakehouse@project60300347.dfs.core.windows.net/processed/tweets_cleaned/"
)

tweets_clean.printSchema()
tweets_clean.show(5, truncate=False)

root
 |-- id: string (nullable = true)
 |-- user: string (nullable = true)
 |-- date: date (nullable = true)
 |-- query: string (nullable = true)
 |-- text: string (nullable = true)
 |-- sentiment: integer (nullable = true)

+----------+---------------+----------+-----+----------------------------------------------------------------------------------------------------------------------+---------+
|id        |user           |date      |query|text                                                                                                                  |sentiment|
+----------+---------------+----------+-----+----------------------------------------------------------------------------------------------------------------------+---------+
|1467825003|leslierosales  |2009-04-07|NULL | not forever see you soon                                                                                             |0        |
|1467853135|andrewofthediaz|2009-04-07|NULL |oh just got all my macheist ap

In [None]:
# I will create a few aggregations
from pyspark.sql.functions import count, sum as _sum, col, round

# Daily sentiment counts
daily_sentiment = (
    tweets_clean.groupBy("date")
    .agg(
        count("*").alias("total_tweets"),
        _sum(col("sentiment")).alias("positive_tweets")
    )
    .withColumn("negative_tweets", col("total_tweets") - col("positive_tweets"))
    .withColumn("positive_ratio", round(col("positive_tweets") / col("total_tweets"), 3))
)
daily_sentiment.show(5, truncate=False)

+----------+------------+---------------+---------------+--------------+
|date      |total_tweets|positive_tweets|negative_tweets|positive_ratio|
+----------+------------+---------------+---------------+--------------+
|2009-06-25|25905       |0              |25905          |0.0           |
|2009-06-15|82883       |48767          |34116          |0.588         |
|2009-06-07|111201      |67577          |43624          |0.608         |
|2009-05-03|26436       |15446          |10990          |0.584         |
|2009-06-24|2086        |0              |2086           |0.0           |
+----------+------------+---------------+---------------+--------------+
only showing top 5 rows


In [None]:
# top active users
top_users = (
    tweets_clean.groupBy("user")
    .agg(count("*").alias("user_tweet_count"))
    .orderBy(col("user_tweet_count").desc())
)
top_users.show(5, truncate=False)

+---------------+----------------+
|user           |user_tweet_count|
+---------------+----------------+
|lost_dog       |549             |
|webwoke        |345             |
|tweetpet       |310             |
|SallytheShizzle|281             |
|VioletsCRUK    |279             |
+---------------+----------------+
only showing top 5 rows


In [None]:
# avg tweet length by sentiment 
from pyspark.sql.functions import length, avg, round, col

tweet_length_stats = (
    tweets_clean.withColumn("tweet_length", length(col("text")))
    .groupBy("sentiment")
    .agg(
        round(avg("tweet_length"), 2).alias("sentiment_avg_length"),
        count("*").alias("sentiment_tweet_count")
    )
    .orderBy("sentiment")
)
tweet_length_stats.show(5, truncate=False)

+---------+--------------------+---------------------+
|sentiment|sentiment_avg_length|sentiment_tweet_count|
+---------+--------------------+---------------------+
|0        |64.21               |796985               |
|1        |60.64               |794484               |
+---------+--------------------+---------------------+



In [None]:
# after creating all aggregations, i will join them together with the main tweets_clean and save it as features_v1
features_v1 = (
    tweets_clean
    .join(daily_sentiment, on="date", how="left")
    .join(top_users, on="user", how="left")
    .join(tweet_length_stats, on="sentiment", how="left")
)


In [None]:
features_v1.printSchema()
features_v1.show(5, truncate=False)

root
 |-- sentiment: integer (nullable = true)
 |-- user: string (nullable = true)
 |-- date: date (nullable = true)
 |-- id: string (nullable = true)
 |-- query: string (nullable = true)
 |-- text: string (nullable = true)
 |-- total_tweets: long (nullable = true)
 |-- positive_tweets: long (nullable = true)
 |-- negative_tweets: long (nullable = true)
 |-- positive_ratio: double (nullable = true)
 |-- user_tweet_count: long (nullable = true)
 |-- sentiment_avg_length: double (nullable = true)
 |-- sentiment_tweet_count: long (nullable = true)

+---------+---------------+----------+----------+-----+----------------------------------------------------------------------------------------------------------------------+------------+---------------+---------------+--------------+----------------+--------------------+---------------------+
|sentiment|user           |date      |id        |query|text                                                                                              

In [None]:
# before saving the data i will perform quick checks to make sure the data is valid 
from pyspark.sql import functions as F
from pyspark.sql.functions import col, sum as _sum, current_date

# 1. Basic structure & row count
print("Row count:", features_v1.count())
features_v1.printSchema()

# 2. Null & empty value checks
print("\nChecking for null values in each column:")
null_counts = features_v1.select(
    [_sum(col(c).isNull().cast("int")).alias(c) for c in features_v1.columns]
)
null_counts.show(truncate=False)

# 3. Range & logical checks
print("\nChecking value ranges and logical limits:")
features_v1.select(
    F.min("total_tweets").alias("min_total_tweets"),
    F.max("total_tweets").alias("max_total_tweets"),
    F.min("positive_ratio").alias("min_positive_ratio"),
    F.max("positive_ratio").alias("max_positive_ratio"),
    F.min("user_tweet_count").alias("min_user_tweet_count"),
    F.max("user_tweet_count").alias("max_user_tweet_count"),
    F.min("sentiment_avg_length").alias("min_avg_length"),
    F.max("sentiment_avg_length").alias("max_avg_length")
).show()

# 4. Descriptive statistics overview
print("\nDescriptive statistics overview:")
features_v1.describe([
    "total_tweets", 
    "positive_tweets", 
    "negative_tweets", 
    "positive_ratio", 
    "user_tweet_count", 
    "sentiment_avg_length"
]).show()

# 5. Date sanity check
print("\nChecking for any future-dated rows:")
future_dates = features_v1.filter(F.col("date") > current_date())
print("Future-dated rows:", future_dates.count())

# 6. Duplicate check based on unique tweet ID
print("\nChecking for duplicate tweet IDs:")
dupes = features_v1.groupBy("id").count().filter("count > 1")
print("Duplicate tweet_id count:", dupes.count())

# 7. Sentiment distribution check
print("\nSentiment distribution:")
features_v1.groupBy("sentiment").count().orderBy("sentiment").show()

Row count: 1591469
root
 |-- sentiment: integer (nullable = true)
 |-- user: string (nullable = true)
 |-- date: date (nullable = true)
 |-- id: string (nullable = true)
 |-- query: string (nullable = true)
 |-- text: string (nullable = true)
 |-- total_tweets: long (nullable = true)
 |-- positive_tweets: long (nullable = true)
 |-- negative_tweets: long (nullable = true)
 |-- positive_ratio: double (nullable = true)
 |-- user_tweet_count: long (nullable = true)
 |-- sentiment_avg_length: double (nullable = true)
 |-- sentiment_tweet_count: long (nullable = true)


Checking for null values in each column:
+---------+----+----+---+-------+----+------------+---------------+---------------+--------------+----------------+--------------------+---------------------+
|sentiment|user|date|id |query  |text|total_tweets|positive_tweets|negative_tweets|positive_ratio|user_tweet_count|sentiment_avg_length|sentiment_tweet_count|
+---------+----+----+---+-------+----+------------+---------------+--

In [None]:
# everything looked great in the checks so data is ready to be saved in the gold layer
features_v1.write.format("delta").mode("overwrite").save(
    "abfss://lakehouse@project60300347.dfs.core.windows.net/curated/features_v1/"
)