# WordCount: RDD vs DataFrame Comparison

Comparing RDD and DataFrame approaches for the classic WordCount problem in PySpark.

## Setup and Sample Data

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import re
import time

# Initialize Spark
spark = SparkSession.builder \
    .appName("WordCount RDD vs DataFrame") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# Sample text data
sample_text = [
    "Apache Spark is a unified analytics engine for large-scale data processing",
    "Spark provides built-in modules for streaming SQL machine learning and graph processing",
    "You can use Spark interactively from the Scala Python R and SQL shells",
    "Spark runs on Hadoop Kubernetes Apache Mesos or standalone",
    "It can access diverse data sources including HDFS Alluxio Apache Cassandra",
    "Apache Spark achieves high performance for both batch and streaming data",
    "Spark uses advanced DAG execution engine that supports acyclic data flow",
    "The engine optimizes arbitrary operator graphs and supports in-memory computing"
]

print(f"Sample data: {len(sample_text)} lines")
for i, line in enumerate(sample_text[:3]):
    print(f"{i+1}: {line}")
print("...")

## Method 1: RDD Approach

In [None]:
def wordcount_rdd(text_data):
    """WordCount using RDD transformations"""
    
    # Create RDD from text data
    lines_rdd = spark.sparkContext.parallelize(text_data)
    
    # Transform and count words
    word_counts = lines_rdd \
        .flatMap(lambda line: re.findall(r'\b\w+\b', line.lower())) \
        .filter(lambda word: len(word) > 2) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a + b) \
        .sortBy(lambda x: x[1], ascending=False)
    
    return word_counts

# Execute RDD WordCount
start_time = time.time()
rdd_result = wordcount_rdd(sample_text)
rdd_time = time.time() - start_time

print("RDD WordCount Results (Top 10):")
for word, count in rdd_result.take(10):
    print(f"{word}: {count}")

print(f"\nRDD execution time: {rdd_time:.4f} seconds")

## Method 2: DataFrame Approach

In [None]:
def wordcount_dataframe(text_data):
    """WordCount using DataFrame operations"""
    
    # Create DataFrame from text data
    df = spark.createDataFrame([(line,) for line in text_data], ["line"])
    
    # Transform and count words using DataFrame operations
    word_counts = df \
        .select(explode(split(lower(col("line")), "\\W+")).alias("word")) \
        .filter(length(col("word")) > 2) \
        .filter(col("word") != "") \
        .groupBy("word") \
        .count() \
        .orderBy(desc("count"))
    
    return word_counts

# Execute DataFrame WordCount
start_time = time.time()
df_result = wordcount_dataframe(sample_text)
df_time = time.time() - start_time

print("DataFrame WordCount Results (Top 10):")
df_result.show(10)

print(f"DataFrame execution time: {df_time:.4f} seconds")

## Method 3: SQL Approach

In [None]:
def wordcount_sql(text_data):
    """WordCount using Spark SQL"""
    
    # Create DataFrame and register as temp view
    df = spark.createDataFrame([(line,) for line in text_data], ["line"])
    df.createOrReplaceTempView("text_data")
    
    # Use SQL for word counting
    word_counts = spark.sql("""
        SELECT word, COUNT(*) as count
        FROM (
            SELECT EXPLODE(SPLIT(LOWER(line), '\\W+')) as word
            FROM text_data
        ) words
        WHERE LENGTH(word) > 2 AND word != ''
        GROUP BY word
        ORDER BY count DESC
    """)
    
    return word_counts

# Execute SQL WordCount
start_time = time.time()
sql_result = wordcount_sql(sample_text)
sql_time = time.time() - start_time

print("SQL WordCount Results (Top 10):")
sql_result.show(10)

print(f"SQL execution time: {sql_time:.4f} seconds")

## Performance Comparison

In [None]:
print("Performance Comparison:")
print(f"RDD approach:        {rdd_time:.4f} seconds")
print(f"DataFrame approach:  {df_time:.4f} seconds")
print(f"SQL approach:        {sql_time:.4f} seconds")

# Show execution plans
print("\n=== DataFrame Execution Plan ===")
df_result.explain()

print("\n=== SQL Execution Plan ===")
sql_result.explain()

## Analysis and Insights

In [None]:
# Verify results are the same
rdd_words = dict(rdd_result.collect())
df_words = {row['word']: row['count'] for row in df_result.collect()}
sql_words = {row['word']: row['count'] for row in sql_result.collect()}

print(f"Total unique words (RDD): {len(rdd_words)}")
print(f"Total unique words (DataFrame): {len(df_words)}")
print(f"Total unique words (SQL): {len(sql_words)}")

# Check if results match
results_match = rdd_words == df_words == sql_words
print(f"\nAll results match: {results_match}")

if not results_match:
    print("Differences detected - checking common words...")
    common_words = set(rdd_words.keys()) & set(df_words.keys()) & set(sql_words.keys())
    print(f"Common words: {len(common_words)}")

print("\n=== Key Insights ===")
print("1. RDD: Lower-level, more control, functional programming style")
print("2. DataFrame: Higher-level API, Catalyst optimizer, better performance")
print("3. SQL: Most readable, familiar syntax, same optimization as DataFrame")
print("4. For large datasets, DataFrame/SQL typically outperform RDD due to optimization")

## Working with Files

In [None]:
# Example: Reading from file (uncomment if you have input files)
# file_rdd = spark.sparkContext.textFile("hdfs://path/to/input.txt")
# file_df = spark.read.text("hdfs://path/to/input.txt")

# Save results to different formats
print("Saving results to different formats...")

# Save DataFrame as Parquet
df_result.write.mode("overwrite").parquet("/tmp/wordcount_parquet")

# Save DataFrame as JSON
df_result.write.mode("overwrite").json("/tmp/wordcount_json")

# Save RDD as text
rdd_result.map(lambda x: f"{x[0]}\t{x[1]}").saveAsTextFile("/tmp/wordcount_rdd")

print("Results saved to /tmp/wordcount_*")

## Cleanup

In [None]:
# Stop Spark session
spark.stop()
print("Spark session stopped.")