In [1]:
# Cell 1: Setup and Imports
"""
Phase 1 Validation Notebook
Test and validate all Phase 1 components
"""
import sys
from pathlib import Path
sys.path.append(str(Path.cwd().parent))

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from config.spark_config import create_spark_session

# Cell 2: Initialize Spark
spark = create_spark_session("Phase1_Validation")
print(f"Spark UI: http://localhost:4040")

# Cell 3: Load Processed Data
df = spark.read.parquet("../data/processed/pipeline_features")
print(f"Total records: {df.count()}")
df.printSchema()

# Cell 4: Sentiment Distribution
sentiment_dist = df.groupBy("sentiment").count().toPandas()
plt.figure(figsize=(8, 6))
plt.pie(sentiment_dist['count'], labels=['Negative', 'Positive'], autopct='%1.1f%%')
plt.title('Sentiment Distribution')
plt.show()

# Cell 5: Feature Analysis
feature_stats = df.select(
    "text_length", "token_count", "vader_compound",
    "emoji_sentiment", "exclamation_count"
).describe().toPandas()
display(feature_stats)

# Cell 6: VADER vs Actual Sentiment
vader_analysis = df.select("sentiment", "vader_compound").toPandas()
plt.figure(figsize=(10, 6))
sns.boxplot(x='sentiment', y='vader_compound', data=vader_analysis)
plt.title('VADER Compound Score by Actual Sentiment')
plt.show()

# Cell 7: Text Length Distribution
length_data = df.select("sentiment", "text_length").toPandas()
plt.figure(figsize=(10, 6))
sns.histplot(data=length_data, x='text_length', hue='sentiment', bins=50)
plt.title('Text Length Distribution by Sentiment')
plt.show()

# Cell 8: Sample Tweets Analysis
print("Sample positive tweets with high VADER scores:")
df.filter((df.sentiment == 1) & (df.vader_compound > 0.8)) \
  .select("text", "vader_compound", "emoji_sentiment") \
  .show(5, truncate=False)

print("\nSample negative tweets with low VADER scores:")
df.filter((df.sentiment == 0) & (df.vader_compound < -0.8)) \
  .select("text", "vader_compound", "emoji_sentiment") \
  .show(5, truncate=False)

# Cell 9: Feature Correlations
numeric_features = [
    "text_length", "token_count", "emoji_sentiment",
    "exclamation_count", "question_count", "uppercase_ratio",
    "vader_compound", "vader_positive", "vader_negative"
]
corr_data = df.select(numeric_features).toPandas().corr()

plt.figure(figsize=(12, 10))
sns.heatmap(corr_data, annot=True, cmap='coolwarm', center=0)
plt.title('Feature Correlation Matrix')
plt.show()

# Cell 10: Performance Metrics
print("Data Processing Performance:")
print(f"- Total partitions: {df.rdd.getNumPartitions()}")
print(f"- Spark UI for detailed metrics: http://localhost:4040")

INFO:config.spark_config:Creating Spark session with driver memory: 12g
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/05/24 14:12:12 WARN Utils: Your hostname, Alis-MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 192.168.8.177 instead (on interface en0)
25/05/24 14:12:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/24 14:12:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/05/24 14:12:13 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in standalone/kubernetes and LOCAL_DIRS in YARN).
INFO:config.spark_config:Spark session created successf

Spark UI: http://localhost:4040


25/05/24 14:12:17 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: ../data/processed/pipeline_features.
java.io.FileNotFoundException: File ../data/processed/pipeline_features does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:917)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1238)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:907)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:56)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:381)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.org$apache$spark$sql$catalyst$analysis$ResolveDataSource$$loadV1BatchSource(ResolveDataSource.scala:143)
	at org.apache.spark.sql

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/Users/ali/Documents/Projects/micap/data/processed/pipeline_features. SQLSTATE: 42K03