In [None]:
# Import necessary PySpark libraries for data processing
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [None]:
# Initialize Spark session with MinIO S3 configuration
# MinIO acts as local S3-compatible object storage
spark = SparkSession.builder \
    .appName("Merge Sentiment Data") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "2g") \
    .config("spark.sql.shuffle.partitions", "20") \
    .config("spark.sql.autoBroadcastJoinThreshold", -1) \
    .getOrCreate()

In [None]:
# Reduce log verbosity to only show errors
spark.sparkContext.setLogLevel("ERROR")

In [None]:
# Load cleaned news data from silver layer
df = spark.read.parquet('s3a://fnf-bucket/silver/news_data_clean')

In [None]:
# Verify news data record count
print(f"News data count: {df.count()}")

News data count: 729144


In [None]:
# Load sentiment analysis results
df_sentiment = spark.read.parquet('s3a://fnf-bucket/silver/sentiment_data')

In [None]:
# Verify sentiment data record count
print(f"Sentiment data count: {df_sentiment.count()}")

Sentiment data count: 729144


In [None]:
# Merge news data with sentiment scores using article number (No) as key
merged_df = df.join(df_sentiment, on="No", how="inner")

In [None]:
# Display sample merged record
merged_df.show(1)

+---+----------+--------------------+--------------------+--------------------+--------------------+------------+--------------------+-------------------+------------------+-------------------+
| No|      Date|       Article_title|                 Url|             Article|         Lsa_summary|Stock_symbol|            negative|            neutral|          positive|predicted_sentiment|
+---+----------+--------------------+--------------------+--------------------+--------------------+------------+--------------------+-------------------+------------------+-------------------+
| 95|2023-05-24|B of A Securities...|https://www.nasda...|Fintel reports th...|A payout ratio gr...|           A|0.034918103367090225|0.02807900495827198|0.9370028376579285|           positive|
+---+----------+--------------------+--------------------+--------------------+--------------------+------------+--------------------+-------------------+------------------+-------------------+
only showing top 1 row



In [None]:
# Check distribution of sentiment predictions
merged_df.groupBy(F.col("predicted_sentiment")).count().show()

+-------------------+------+
|predicted_sentiment| count|
+-------------------+------+
|           positive|286696|
|            neutral|191732|
|           negative|250716|
+-------------------+------+



In [None]:
# Filter data to focus on 2020-2023 time period
filtered_df = merged_df.filter(
    F.col("Date").between("2020-01-01", "2023-12-31")
)

In [None]:
# Display sample filtered records
filtered_df.show(5)

+---+----------+--------------------+--------------------+--------------------+--------------------+------------+--------------------+--------------------+-------------------+-------------------+
| No|      Date|       Article_title|                 Url|             Article|         Lsa_summary|Stock_symbol|            negative|             neutral|           positive|predicted_sentiment|
+---+----------+--------------------+--------------------+--------------------+--------------------+------------+--------------------+--------------------+-------------------+-------------------+
| 95|2023-05-24|B of A Securities...|https://www.nasda...|Fintel reports th...|A payout ratio gr...|           A|0.034918103367090225| 0.02807900495827198| 0.9370028376579285|           positive|
|148|2023-05-10|Agilent Technolog...|https://www.nasda...|Agilent Technolog...|Investors should ...|           A| 0.07616138458251953| 0.01637493632733822| 0.9074636697769165|           positive|
|229|2023-02-16|Trip

In [None]:
# Verify filtered data count
print(f"Filtered data count: {filtered_df.count()}")

Filtered data count: 392675


In [None]:
# Check sentiment distribution after filtering
filtered_df.groupBy(F.col("predicted_sentiment")).count().show()

+-------------------+------+
|predicted_sentiment| count|
+-------------------+------+
|           positive|156761|
|            neutral|106196|
|           negative|129718|
+-------------------+------+



In [None]:
# Examine data structure
filtered_df.printSchema()

root
 |-- No: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Article_title: string (nullable = true)
 |-- Url: string (nullable = true)
 |-- Article: string (nullable = true)
 |-- Lsa_summary: string (nullable = true)
 |-- Stock_symbol: string (nullable = true)
 |-- negative: double (nullable = true)
 |-- neutral: double (nullable = true)
 |-- positive: double (nullable = true)
 |-- predicted_sentiment: string (nullable = true)



In [None]:
# Aggregate sentiment scores by date and stock symbol
# Calculate average sentiment probabilities and count articles per day/stock
merged_df = filtered_df.groupBy("Date", "Stock_symbol").agg(
    F.avg("negative").alias("negative"),
    F.avg("neutral").alias("neutral"),
    F.avg("positive").alias("positive"),
    F.count("No").alias("article_count")
)

In [None]:
# Determine overall predicted sentiment based on highest average probability
# Priority: positive >= neutral >= negative
merged_df = merged_df.withColumn(
    "predicted_sentiment",
    F.when((F.col("positive") >= F.col("neutral")) & (F.col("positive") >= F.col("negative")), "positive")
     .when((F.col("neutral") >= F.col("positive")) & (F.col("neutral") >= F.col("negative")), "neutral")
     .otherwise("negative")
)

In [None]:
# Verify aggregated record count
merged_df.count()

161387

In [None]:
# Expected record count: 381,680 (date-stock combinations)

In [None]:
# Save aggregated sentiment data to silver layer
merged_df.write \
    .mode('overwrite') \
    .option('compression', 'snappy') \
    .parquet('s3a://fnf-bucket/silver/sentiment_data_agg')

In [None]:
# Load stock price data
df = spark.read.parquet("s3a://fnf-bucket/silver/stock_price_data")

In [None]:
# Load aggregated sentiment data
sentiment_df = spark.read.parquet("s3a://fnf-bucket/silver/sentiment_data_agg")

In [None]:
# Examine stock price data structure
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: long (nullable = true)
 |-- Stock_symbol: string (nullable = true)



In [None]:
# Filter stock price data to match sentiment data time period
df = df.filter(
    F.col("Date").between("2020-01-01", "2023-12-31")
)

In [None]:
# Examine sentiment data structure
sentiment_df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Stock_symbol: string (nullable = true)
 |-- negative: double (nullable = true)
 |-- neutral: double (nullable = true)
 |-- positive: double (nullable = true)
 |-- article_count: long (nullable = true)
 |-- predicted_sentiment: string (nullable = true)



In [None]:
# Left join stock prices with sentiment data
# Keep all price records even if no news exists for that day
joined_df = df.join(
    sentiment_df,
    on=['Date', 'Stock_symbol'],
    how='left'
)

In [None]:
# Add binary indicator for whether news exists for that date-stock combination
joined_df = joined_df.withColumn(
    'has_news',
    F.when(F.col('article_count').isNotNull(), True).otherwise(False)
)

In [None]:
# Fill missing sentiment values for days without news
# Default: neutral sentiment with zero article count
joined_df = joined_df.fillna({
    'negative': 0.0,
    'positive': 0.0,
    'neutral': 1.0,
    'article_count': 0,
    'predicted_sentiment': 'neutral'
})

In [None]:
# Verify final schema
joined_df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Stock_symbol: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: long (nullable = true)
 |-- negative: double (nullable = false)
 |-- neutral: double (nullable = false)
 |-- positive: double (nullable = false)
 |-- article_count: long (nullable = false)
 |-- predicted_sentiment: string (nullable = false)
 |-- has_news: boolean (nullable = false)



In [None]:
# Save merged news-price-sentiment dataset
joined_df.write \
    .mode('overwrite') \
    .option('compression', 'snappy') \
    .parquet('s3a://fnf-bucket/silver/news_price_with_sentiment')

In [None]:
# Define stock categorization by industry sector
# Groups stocks into 12 major sectors for analysis
stock_data = {
  "Technology": ["AAPL", "MSFT", "GOOG", "NVDA", "AMD", "INTC", "ORCL", "CRM", "ADBE", "QCOM", "IBM", "AVGO", "MU", "NOW", "PANW", "CRWD", "ANET", "FTNT", "TXN", "ADI", "KLAC", "LRCX", "AMAT", "SWKS", "ON", "DELL", "HPE", "HPQ", "CDNS", "ADSK", "WDAY", "DDOG", "AKAM", "NTAP", "SMCI", "EPAM", "VRSN", "GDDY", "FICO", "TYL", "BR", "FFIV", "FDS", "ZBRA", "TER", "MPWR", "GLW", "TRMB"],
  "Healthcare": ["ABT", "LLY", "REGN", "GILD", "BIIB", "VRTX", "ABBV", "AMGN", "MRK", "BSX", "DXCM", "ZTS", "MDT", "BAX", "EW", "ALGN", "HOLX", "PODD", "SYK", "TMO", "DHR", "RMD", "WST", "CI", "HUM", "HCA", "UHS", "DVA", "MCK", "CAH", "LH", "DGX", "CRL", "INCY"],
  "Financial_Services": ["V", "PYPL", "GS", "MS", "WFC", "C", "BX", "KKR", "APO", "COF", "USB", "PNC", "AIG", "BK", "KEY", "MTB", "FITB", "HBAN", "TFC", "RF", "CFG", "NTRS", "RJF", "IBKR", "AXP", "FIS", "GPN", "ICE", "CME", "NDAQ", "CBOE", "MCO", "MSCI", "AON", "AJG", "ACGL", "PGR", "ALL", "TRV", "AFL", "HIG", "PRU", "PFG", "CINF", "AIZ", "BRO", "ERIE"],
  "Consumer_Discretionary": ["AMZN", "TSLA", "NKE", "SBUX", "CMG", "DPZ", "TGT", "COST", "BBY", "TJX", "ROST", "DLTR", "ULTA", "DRI", "MCD", "YUM", "BKNG", "EXPE", "MGM", "RCL", "NCLH", "LYV", "ORLY", "AZO", "GM", "F", "DHI", "LEN", "PHM", "NVR", "TPR", "RL", "DECK", "WSM", "CVNA", "DAL", "UAL", "LUV", "UBER"],
  "Consumer_Staples": ["WMT", "KO", "PEP", "PM", "GIS", "KHC", "CAG", "TSN", "CPB", "CL", "CLX", "KMB", "CHD", "MKC", "HRL", "TAP", "MNST", "EL", "SYY", "ADM"],
  "Energy": ["XOM", "CVX", "COP", "OXY", "SLB", "HAL", "DVN", "EOG", "FANG", "VLO", "PSX", "APA", "BKR", "EQT", "OKE", "KMI", "TRGP", "CTRA"],
  "Industrials": ["BA", "CAT", "GE", "DE", "UPS", "FDX", "NOC", "LMT", "UNP", "CSX", "NSC", "ODFL", "EMR", "ETN", "PCAR", "SWK", "CMI", "DOV", "ROK", "PH", "CARR", "OTIS", "LII", "ALLE", "APH", "WAB", "APTV", "PWR", "AME", "HUBB", "EME", "FTV", "TT", "LHX", "LDOS", "MSI", "GWW", "FAST", "SNA", "ROL", "CTAS", "URI", "CPRT", "CHRW", "VRSK", "ROP", "WAT", "A", "MTD", "XYL", "GNRC", "NDSN", "PNR", "AOS", "DOC", "TECH", "CRH"],
  "Materials": ["LIN", "APD", "NEM", "NUE", "DOW", "DD", "LYB", "CF", "MOS", "ALB", "VMC", "MLM", "SHW", "ECL", "CTVA", "BG", "PKG", "AVY", "WY"],
  "Real_Estate": ["AMT", "PLD", "PSA", "O", "VICI", "CCI", "EQR", "AVB", "VTR", "ARE", "WELL", "EXR", "ESS", "KIM", "FRT", "REG", "HST", "BXP", "CPT"],
  "Utilities": ["NEE", "DUK", "SO", "D", "AEP", "EXC", "PCG", "ED", "PEG", "EIX", "XEL", "WEC", "ES", "DTE", "AES", "AEE", "ETR", "CMS", "CNP", "ATO", "NRG", "PNW", "EVRG", "LNT", "AWK"],
  "Communication_Services": ["DIS", "CMCSA", "TMUS", "CHTR", "EA", "TTWO", "MTCH", "NWSA", "NWS", "FOXA", "FOX"],
  "Miscellaneous": ["HAS", "TSCO", "GPC", "L", "LW", "COO", "GL", "FIX", "VST", "GEN", "COR", "ARES", "TPL", "CSGP", "POOL"]
}

In [None]:
# Convert nested dictionary to flat list of (symbol, category) pairs
flattened_data = []
for category, symbols in stock_data.items():
    for symbol in symbols:
        flattened_data.append((symbol, category))

In [None]:
# Define schema for stock categories DataFrame
from pyspark.sql.types import *
schema = StructType([
    StructField("Stock_symbol", StringType(), False),
    StructField("category", StringType(), False)
])

In [None]:
# Create DataFrame from stock categorization data
categories_df = spark.createDataFrame(flattened_data, schema)

In [None]:
# Verify total number of categorized stocks
categories_df.count()

352

In [None]:
# Load merged news-price-sentiment dataset
df = spark.read.parquet('s3a://fnf-bucket/silver/news_price_with_sentiment')

In [None]:
# Add industry category to each stock record
# Stocks not in predefined categories are labeled as "Miscellaneous"
merged_df = df.join(
    categories_df,
    on=['Stock_symbol'],
    how='left'
).fillna("Miscellaneous", subset=["category"])

In [None]:
# Save final categorized dataset partitioned by industry sector
# Partitioning improves query performance for sector-based analysis
merged_df.write \
    .mode('overwrite') \
    .option('compression', 'snappy') \
    .partitionBy('category') \
    .parquet('s3a://fnf-bucket/silver/categorical_stocks_data')