In [74]:
# Set up the configuration for accessing the storage account
storage_account_name = "ds562team9datalake"
storage_account_key = "KXg2Djg7uRevBSpPNIVnKw/N6HpqBh+kJwDX07wkywbpU2joMZdTIBOXk30EoMMxH2d8wwb+9j0g+AStO60IWw=="
container = "silver"

# Set Spark config to access the storage account
spark.conf.set(
    f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net",
    storage_account_key
 )

In [75]:
tweets_df = spark.read.parquet("abfss://silver@ds562team9datalake.dfs.core.windows.net/historical_crypto_tweet_data/english_crypto_tweets_2014_2019.parquet")
news_df = spark.read.parquet("abfss://silver@ds562team9datalake.dfs.core.windows.net/historical_crypto_news_data/crypto_news_filtered_2013_2018.parquet")
prices_df = spark.read.parquet("abfss://silver@ds562team9datalake.dfs.core.windows.net/historical_crypto_prices/crypto_prices_2014_2021.parquet")
sentiment_df = spark.read.csv(
    "abfss://silver@ds562team9datalake.dfs.core.windows.net/historical_sentiment/Spearman_Pearson_correlation.csv",
    header=True,
    inferSchema=True
)

In [76]:
from pyspark.sql.functions import to_date, col, lit, concat_ws
from pyspark.sql.types import DateType

# Ensure 'date' columns are casted to proper DateType where available
tweets_df = tweets_df.withColumn("date", to_date(col("date").cast(DateType())))
prices_df = prices_df.withColumn("date", to_date(col("date").cast(DateType())))
sentiment_df = sentiment_df.withColumn("date", to_date(col("date").cast(DateType())))

# For news_df, construct a dummy date from year since only 'year' is available
news_df = news_df.withColumn(
    "date",
    to_date(concat_ws("-", col("year").cast("string"), lit("01"), lit("01")))
)

In [77]:
print("✅ tweets_df schema:")
tweets_df.printSchema()

print("✅ news_df schema:")
news_df.printSchema()

print("✅ prices_df schema:")
prices_df.printSchema()

print("✅ sentiment_df schema:")
sentiment_df.printSchema()

In [78]:
from pyspark.sql.types import IntegerType

news_df = news_df.withColumn("year", col("year").cast(IntegerType()))

In [79]:
from pyspark.sql.types import DoubleType

sentiment_df = sentiment_df.select(
    "date",
    col("Close").cast(DoubleType()),
    col("tweet_sentiment").cast(DoubleType()),
    col("news_sentiment").cast(DoubleType()),
    col("Close_lag1").cast(DoubleType()),
    col("return_1d").cast(DoubleType()),
    col("tweet_sent_roll3").cast(DoubleType()),
    col("news_sent_roll3").cast(DoubleType()),
    col("volatility_3d").cast(DoubleType()),
    col("tweet_sent_roll7").cast(DoubleType()),
    col("news_sent_roll7").cast(DoubleType()),
    col("volatility_7d").cast(DoubleType()),
    col("sma_5").cast(DoubleType()),
    col("sma_10").cast(DoubleType()),
    col("rsi_14").cast(DoubleType())
)

# table aggregation

In [80]:
from pyspark.sql.functions import count

agg_crypto_tweets = tweets_df.groupBy("date").agg(
    count("*").alias("tweet_count")
)

In [81]:
agg_news_by_source = news_df.groupBy("date", "source").agg(
    count("*").alias("news_count")
)

In [82]:
from pyspark.sql.functions import avg

agg_crypto_prices = prices_df.groupBy("date").agg(
    avg("Open").alias("avg_open"),
    avg("Close").alias("avg_close"),
    avg("High").alias("avg_high"),
    avg("Low").alias("avg_low"),
    avg("Volume").alias("avg_volume")
)

In [83]:
from pyspark.sql.functions import avg

agg_sentiment_over_time = sentiment_df.groupBy("date").agg(
    avg("tweet_sentiment").alias("avg_tweet_sentiment"),
    avg("news_sentiment").alias("avg_news_sentiment")
)

In [84]:
agg_rolling_sentiment_volatility = sentiment_df.groupBy("date").agg(
    avg("tweet_sent_roll3").alias("avg_tweet_sent_roll3"),
    avg("news_sent_roll3").alias("avg_news_sent_roll3"),
    avg("volatility_3d").alias("avg_volatility_3d"),
    avg("tweet_sent_roll7").alias("avg_tweet_sent_roll7"),
    avg("news_sent_roll7").alias("avg_news_sent_roll7"),
    avg("volatility_7d").alias("avg_volatility_7d")
)

In [85]:
agg_rsi_vs_price = sentiment_df.select("date", "rsi_14", "Close")

In [86]:
agg_summary_metrics = sentiment_df.groupBy("date").agg(
    avg("Close").alias("avg_close"),
    avg("return_1d").alias("avg_return_1d"),
    avg("tweet_sentiment").alias("avg_tweet_sentiment"),
    avg("news_sentiment").alias("avg_news_sentiment"),
    avg("volatility_3d").alias("volatility_3d"),
    avg("rsi_14").alias("avg_rsi_14")
)

# save & load

In [87]:
# Set Gold layer base path
gold_path = "abfss://gold@ds562team9datalake.dfs.core.windows.net"

# Save aggregated tables
agg_crypto_tweets.coalesce(1).write.mode("overwrite").parquet(f"{gold_path}/agg_crypto_tweets")
agg_news_by_source.coalesce(1).write.mode("overwrite").parquet(f"{gold_path}/agg_crypto_news_by_source")
agg_crypto_prices.coalesce(1).write.mode("overwrite").parquet(f"{gold_path}/agg_crypto_prices")
agg_sentiment_over_time.coalesce(1).write.mode("overwrite").parquet(f"{gold_path}/agg_sentiment_over_time")
agg_rolling_sentiment_volatility.coalesce(1).write.mode("overwrite").parquet(f"{gold_path}/agg_rolling_sentiment_volatility")
agg_rsi_vs_price.coalesce(1).write.mode("overwrite").parquet(f"{gold_path}/agg_rsi_vs_price")
agg_summary_metrics.coalesce(1).write.mode("overwrite").parquet(f"{gold_path}/agg_summary_metrics")
tweets_df.coalesce(1).write.mode("overwrite").parquet(f"{gold_path}/processed_crypto_tweets")
news_df.coalesce(1).write.mode("overwrite").parquet(f"{gold_path}/processed_crypto_news")
prices_df.coalesce(1).write.mode("overwrite").parquet(f"{gold_path}/processed_crypto_prices")
sentiment_df.coalesce(1).write.mode("overwrite").parquet(f"{gold_path}/processed_sentiment")