# Sentiment Data Analysis - Parquet Files

This notebook demonstrates how to read and analyze sentiment data stored in partitioned Parquet files located in `/data/sentiment/`.

## Data Structure
- **Location**: `/data/sentiment/`
- **Partitioning**: Hive-style partitioning by `ticker`, `year`, `month`, `day`
- **Format**: Parquet with Snappy compression
- **Schema**: 8 columns containing tweet/post sentiment analysis results

## Analysis Goals
1. Explore the partitioned data structure
2. Perform aggregations on sentiment scores
3. Analyze trends across tickers and time periods
4. Visualize sentiment distributions
5. Compare different data sources (Twitter vs Reddit)

## 1. Setup - Import Required Libraries

Import necessary libraries for data processing, analysis, and visualization.

In [1]:
# PySpark for distributed data processing
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, min, max, desc, from_unixtime, to_date
from pyspark.sql.functions import round as spark_round

# Pandas for data manipulation
import pandas as pd

# PyArrow for low-level Parquet operations
import pyarrow.parquet as pq

# Visualization libraries
import matplotlib.pyplot as plt
import seaborn as sns

# File operations
import glob
import os
from datetime import datetime

# Configure plotting style
sns.set_style("whitegrid")
plt.rcParams['figure.figsize'] = (12, 6)

print("✓ Libraries imported successfully")

ModuleNotFoundError: No module named 'pyspark'

## 2. Initialize PySpark Environment

Create a SparkSession optimized for reading partitioned Parquet files.

In [None]:
# Create Spark session with optimized configuration
spark = SparkSession.builder \
    .appName("SentimentDataAnalysis") \
    .master("local[*]") \
    .config("spark.sql.parquet.enableVectorizedReader", "true") \
    .config("spark.sql.parquet.compression.codec", "snappy") \
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic") \
    .getOrCreate()

# Set log level to reduce verbosity
spark.sparkContext.setLogLevel("WARN")

print(f"✓ Spark {spark.version} session created successfully")
print(f"  Master: {spark.sparkContext.master}")
print(f"  App Name: {spark.sparkContext.appName}")

## 3. Read Partitioned Parquet Data

Load all sentiment data from `/data/sentiment/`. Spark automatically discovers the partition structure.

In [None]:
# Read all partitioned Parquet files
data_path = "/data/sentiment"
df = spark.read.parquet(data_path)

# Cache for performance (data will be reused multiple times)
df.cache()

print(f"✓ Loaded data from: {data_path}")
print(f"  Total records: {df.count():,}")

### Display Schema

The schema includes 8 data columns plus 4 partition columns (ticker, year, month, day).

In [None]:
# Display complete schema
df.printSchema()

### Sample Data

Display first 20 records to understand the data structure.

In [None]:
# Show sample records
df.show(20, truncate=False)

## 4. Explore Partitions

Analyze the partition structure to understand data distribution.

In [None]:
# Show distinct values for partition columns
print("=== Partition Column Values ===\n")

print("Tickers:")
df.select("ticker").distinct().orderBy("ticker").show(truncate=False)

print("\nYears:")
df.select("year").distinct().orderBy("year").show(truncate=False)

print("\nMonths:")
df.select("month").distinct().orderBy("month").show(truncate=False)

print("\nDays:")
df.select("day").distinct().orderBy("day").show(truncate=False)

In [None]:
# Count records per ticker
print("=== Records per Ticker ===\n")
df.groupBy("ticker") \
    .agg(count("*").alias("record_count")) \
    .orderBy(desc("record_count")) \
    .show(truncate=False)

In [None]:
# Count records per day
print("=== Records per Day ===\n")
df.groupBy("year", "month", "day") \
    .agg(count("*").alias("record_count")) \
    .orderBy("year", "month", "day") \
    .show(truncate=False)

## 5. Efficient Filtering with Partition Pruning

Demonstrate how to filter data efficiently using partition columns. Spark will skip reading irrelevant partitions.

In [None]:
# Filter for AAPL in January 2026
aapl_jan = df.filter(
    (col("ticker") == "AAPL") &
    (col("year") == 2026) &
    (col("month") == 1)
)

print(f"AAPL records in January 2026: {aapl_jan.count()}")
aapl_jan.show(10, truncate=False)

In [None]:
# Filter for specific date (January 8, 2026)
jan8_data = df.filter(
    (col("year") == 2026) &
    (col("month") == 1) &
    (col("day") == 8)
)

print(f"Records on January 8, 2026: {jan8_data.count()}")

# Show query execution plan to see partition pruning
print("\n=== Query Execution Plan (shows partition pruning) ===")
jan8_data.explain()

## 6. Sentiment Aggregations

Perform various aggregations to understand sentiment patterns across tickers.

In [None]:
# Sentiment distribution by ticker
print("=== Sentiment Distribution by Ticker ===\n")
sentiment_by_ticker = df.groupBy("ticker", "sentiment") \
    .agg(count("*").alias("count")) \
    .orderBy("ticker", "sentiment")

sentiment_by_ticker.show(50, truncate=False)

In [None]:
# Average sentiment score by ticker
print("=== Average Sentiment Score by Ticker ===\n")
avg_scores = df.groupBy("ticker") \
    .agg(
        count("*").alias("total_records"),
        spark_round(avg("score"), 4).alias("avg_score"),
        spark_round(min("score"), 4).alias("min_score"),
        spark_round(max("score"), 4).alias("max_score")
    ) \
    .orderBy(desc("avg_score"))

avg_scores.show(truncate=False)

In [None]:
# Calculate sentiment percentages per ticker
from pyspark.sql.window import Window
from pyspark.sql.functions import sum as spark_sum

# Count by ticker and sentiment
counts = df.groupBy("ticker", "sentiment").agg(count("*").alias("count"))

# Calculate total per ticker
window = Window.partitionBy("ticker")
sentiment_pct = counts.withColumn("total", spark_sum("count").over(window)) \
    .withColumn("percentage", spark_round((col("count") / col("total")) * 100, 2)) \
    .orderBy("ticker", "sentiment")

print("=== Sentiment Percentages by Ticker ===\n")
sentiment_pct.show(50, truncate=False)

## 7. Time-Series Analysis

Convert timestamps and analyze sentiment trends over time.

In [None]:
# Convert processedAt timestamp (milliseconds) to readable date
df_with_date = df.withColumn(
    "processed_date",
    to_date(from_unixtime(col("processedAt") / 1000))
)

# Show sample with converted dates
print("=== Sample Data with Converted Dates ===\n")
df_with_date.select("ticker", "sentiment", "score", "processedAt", "processed_date") \
    .show(10, truncate=False)

In [None]:
# Daily sentiment trends by ticker
print("=== Daily Sentiment Trends ===\n")
daily_trends = df_with_date.groupBy("ticker", "processed_date", "sentiment") \
    .agg(count("*").alias("count")) \
    .orderBy("processed_date", "ticker", "sentiment")

daily_trends.show(30, truncate=False)

In [None]:
# Daily average sentiment scores by ticker
print("=== Daily Average Sentiment Scores ===\n")
daily_avg = df_with_date.groupBy("ticker", "processed_date") \
    .agg(
        count("*").alias("record_count"),
        spark_round(avg("score"), 4).alias("avg_score")
    ) \
    .orderBy("processed_date", "ticker")

daily_avg.show(30, truncate=False)

## 8. Source Platform Analysis

Compare sentiment across different source platforms (Twitter vs Reddit).

In [None]:
# Count records by source platform
print("=== Records by Source Platform ===\n")
df.groupBy("source") \
    .agg(count("*").alias("count")) \
    .orderBy(desc("count")) \
    .show(truncate=False)

In [None]:
# Compare average scores between sources
print("=== Average Sentiment Score by Source ===\n")
df.groupBy("source") \
    .agg(
        count("*").alias("count"),
        spark_round(avg("score"), 4).alias("avg_score")
    ) \
    .orderBy("source") \
    .show(truncate=False)

In [None]:
# Top publishers by record count
print("=== Top 10 Publishers ===\n")
df.groupBy("publisher") \
    .agg(count("*").alias("post_count")) \
    .orderBy(desc("post_count")) \
    .show(10, truncate=False)

## 9. Visualizations

Create charts to visualize sentiment patterns. Convert Spark DataFrames to Pandas for plotting.

In [None]:
# Convert to Pandas for visualization
pdf = df.toPandas()

print(f"✓ Converted {len(pdf):,} records to Pandas DataFrame")
print(f"  Memory usage: {pdf.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

### Sentiment Counts by Ticker

In [None]:
plt.figure(figsize=(12, 6))
ticker_counts = pdf['ticker'].value_counts().sort_index()
ticker_counts.plot(kind='bar', color='steelblue', edgecolor='black')
plt.title('Sentiment Records by Ticker', fontsize=16, fontweight='bold')
plt.xlabel('Ticker', fontsize=12)
plt.ylabel('Number of Records', fontsize=12)
plt.xticks(rotation=45)
plt.grid(axis='y', alpha=0.3)
plt.tight_layout()
plt.show()

### Distribution of Sentiment Scores

In [None]:
plt.figure(figsize=(12, 6))
plt.hist(pdf['score'], bins=50, color='teal', edgecolor='black', alpha=0.7)
plt.title('Distribution of Sentiment Scores', fontsize=16, fontweight='bold')
plt.xlabel('Sentiment Score', fontsize=12)
plt.ylabel('Frequency', fontsize=12)
plt.axvline(pdf['score'].mean(), color='red', linestyle='--', linewidth=2, label=f'Mean: {pdf["score"].mean():.4f}')
plt.legend()
plt.grid(axis='y', alpha=0.3)
plt.tight_layout()
plt.show()

### Score Distribution by Ticker (Box Plot)

In [None]:
plt.figure(figsize=(12, 6))
pdf_sorted = pdf.sort_values('ticker')
sns.boxplot(data=pdf_sorted, x='ticker', y='score', palette='Set2')
plt.title('Sentiment Score Distribution by Ticker', fontsize=16, fontweight='bold')
plt.xlabel('Ticker', fontsize=12)
plt.ylabel('Sentiment Score', fontsize=12)
plt.xticks(rotation=45)
plt.grid(axis='y', alpha=0.3)
plt.tight_layout()
plt.show()

### Sentiment Categories Distribution

In [None]:
plt.figure(figsize=(10, 6))
sentiment_colors = {'POSITIVE': 'green', 'NEGATIVE': 'red', 'NEUTRAL': 'gray'}
sentiment_counts = pdf['sentiment'].value_counts()
ax = sentiment_counts.plot(kind='bar', color=[sentiment_colors.get(x, 'blue') for x in sentiment_counts.index], edgecolor='black')
plt.title('Sentiment Categories Distribution', fontsize=16, fontweight='bold')
plt.xlabel('Sentiment', fontsize=12)
plt.ylabel('Count', fontsize=12)
plt.xticks(rotation=0)
plt.grid(axis='y', alpha=0.3)

# Add value labels on bars
for i, v in enumerate(sentiment_counts):
    ax.text(i, v + 0.5, str(v), ha='center', va='bottom', fontweight='bold')

plt.tight_layout()
plt.show()

### Records by Source Platform

In [None]:
plt.figure(figsize=(10, 6))
source_counts = pdf['source'].value_counts()
colors = ['#1DA1F2', '#FF4500']  # Twitter blue, Reddit orange
ax = source_counts.plot(kind='bar', color=colors[:len(source_counts)], edgecolor='black')
plt.title('Records by Source Platform', fontsize=16, fontweight='bold')
plt.xlabel('Source', fontsize=12)
plt.ylabel('Number of Records', fontsize=12)
plt.xticks(rotation=0)
plt.grid(axis='y', alpha=0.3)

# Add value labels on bars
for i, v in enumerate(source_counts):
    ax.text(i, v + 0.5, str(v), ha='center', va='bottom', fontweight='bold')

plt.tight_layout()
plt.show()

### Time Series: Average Daily Sentiment Score by Ticker

In [None]:
# Convert daily average data to Pandas
daily_avg_pdf = daily_avg.toPandas()
daily_avg_pdf['processed_date'] = pd.to_datetime(daily_avg_pdf['processed_date'])

plt.figure(figsize=(14, 7))
for ticker in daily_avg_pdf['ticker'].unique():
    ticker_data = daily_avg_pdf[daily_avg_pdf['ticker'] == ticker]
    plt.plot(ticker_data['processed_date'], ticker_data['avg_score'], 
             marker='o', label=ticker, linewidth=2)

plt.title('Average Daily Sentiment Score by Ticker', fontsize=16, fontweight='bold')
plt.xlabel('Date', fontsize=12)
plt.ylabel('Average Sentiment Score', fontsize=12)
plt.legend(title='Ticker', bbox_to_anchor=(1.05, 1), loc='upper left')
plt.grid(True, alpha=0.3)
plt.axhline(y=0, color='black', linestyle='--', linewidth=1, alpha=0.5)
plt.tight_layout()
plt.show()

## 10. Alternative Reading Methods

Demonstrate other ways to read Parquet files depending on your use case.

### Method A: Using Pandas with Glob Pattern

Best for analyzing a single ticker or smaller datasets.

In [None]:
# Read all AAPL parquet files using glob
aapl_pattern = "/data/sentiment/ticker=AAPL/**/*.parquet"
aapl_files = glob.glob(aapl_pattern, recursive=True)

print(f"Found {len(aapl_files)} parquet files for AAPL:")
for f in aapl_files:
    print(f"  - {f}")

# Read and concatenate all files
if aapl_files:
    aapl_df = pd.concat([pd.read_parquet(f) for f in aapl_files], ignore_index=True)
    
    print(f"\nLoaded {len(aapl_df)} AAPL records")
    print("\nBasic Statistics:")
    print(aapl_df['score'].describe())
    
    print("\nSentiment Distribution:")
    print(aapl_df['sentiment'].value_counts())
else:
    print("No AAPL files found")

### Method B: Using PyArrow for Schema Inspection

Best for examining file structure and metadata without loading all data.

In [None]:
# Find a sample parquet file
sample_files = glob.glob("/data/sentiment/ticker=AAPL/**/*.parquet", recursive=True)

if sample_files:
    sample_file = sample_files[0]
    print(f"Inspecting: {sample_file}\n")
    
    # Read file with PyArrow
    parquet_file = pq.ParquetFile(sample_file)
    
    print("=== Schema ===")
    print(parquet_file.schema)
    
    print("\n=== Metadata ===")
    print(f"Number of rows: {parquet_file.metadata.num_rows}")
    print(f"Number of row groups: {parquet_file.metadata.num_row_groups}")
    print(f"Created by: {parquet_file.metadata.created_by}")
    
    print("\n=== Row Group 0 Details ===")
    rg = parquet_file.metadata.row_group(0)
    print(f"Number of rows: {rg.num_rows}")
    print(f"Total byte size: {rg.total_byte_size:,} bytes")
    
    # Read as PyArrow Table
    table = pq.read_table(sample_file)
    print(f"\n=== Table Info ===")
    print(f"Shape: {table.num_rows} rows x {table.num_columns} columns")
    print(f"Column names: {table.column_names}")
    
    # Convert to Pandas and show sample
    sample_df = table.to_pandas()
    print("\n=== Sample Data ===")
    print(sample_df.head())
else:
    print("No parquet files found for inspection")

## 11. Export Examples

Export aggregated results for further processing or reporting.

In [None]:
# Export average scores to CSV
output_dir = "/investpulse.net/try/output"
os.makedirs(output_dir, exist_ok=True)

csv_path = f"{output_dir}/sentiment_summary.csv"
avg_scores.toPandas().to_csv(csv_path, index=False)
print(f"✓ Exported summary to: {csv_path}")

In [None]:
# Write filtered data back to Parquet (example: only positive sentiment)
positive_sentiment = df.filter(col("sentiment") == "POSITIVE")

parquet_output = f"{output_dir}/positive_sentiment"
positive_sentiment.write.mode("overwrite").parquet(parquet_output)

print(f"✓ Exported {positive_sentiment.count():,} positive sentiment records to: {parquet_output}")

## 12. Summary and Cleanup

Key findings and cleanup operations.

In [None]:
# Display summary statistics
total_records = df.count()
total_tickers = df.select("ticker").distinct().count()
date_range = df.agg(min("day").alias("min_day"), max("day").alias("max_day")).collect()[0]

print("=" * 60)
print("SENTIMENT DATA ANALYSIS SUMMARY")
print("=" * 60)
print(f"Total Records: {total_records:,}")
print(f"Unique Tickers: {total_tickers}")
print(f"Date Range: Day {date_range['min_day']} to Day {date_range['max_day']}")
print(f"Data Path: {data_path}")
print("\nKey Insights:")
print("- Sentiment data is partitioned by ticker/year/month/day")
print("- Multiple sources: Twitter and Reddit")
print("- Scores range from -1.0 (negative) to 1.0 (positive)")
print("- Data includes timestamps for both processing and original post times")
print("\nFiles exported to:", output_dir)
print("=" * 60)

In [None]:
# Stop Spark session
spark.stop()
print("\n✓ Spark session stopped successfully")