In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("API_Enhancement_Tests_Spark2") \
    .getOrCreate()

print(f"Spark version: {spark.version}")

Spark version: 2.4.8


## API Benchmark Test Design

In [2]:
# 1. DataFrame Operations Tests
# For both Spark 2.x and 3.x notebooks
import time
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window

# Performance measurement function
def measure_execution(operation_name, func):
    start_time = time.time()
    result = func()
    end_time = time.time()
    execution_time = end_time - start_time
    print(f"Operation: {operation_name}, Execution time: {execution_time:.4f} seconds")
    return result, execution_time

# Create sample data
def create_test_data(spark):
    # Create a larger dataset for meaningful measurements
    data = [(i, f"name_{i}", i % 100, i % 5) for i in range(10**6)]
    df = spark.createDataFrame(data, ["id", "name", "value", "category"])
    return df

# Test operations
test_df = create_test_data(spark)
results = []

# Test 1: Basic filtering
filter_op, filter_time = measure_execution(
    "Basic filtering",
    lambda: test_df.filter(F.col("value") > 50).count()
)
results.append(("Basic filtering", filter_time))

# Test 2: Grouping and aggregation
group_op, group_time = measure_execution(
    "Grouping and aggregation",
    lambda: test_df.groupBy("category").agg(F.avg("value").alias("avg_value")).collect()
)
results.append(("Grouping and aggregation", group_time))

# Test 3: Window functions
window_op, window_time = measure_execution(
    "Window functions",
    lambda: test_df.withColumn(
        "rank",
        F.rank().over(Window.partitionBy("category").orderBy(F.desc("value")))
    ).collect()
)
results.append(("Window functions", window_time))

# Store results
performance_results = pd.DataFrame(results, columns=["Operation", "Execution Time (s)"])
performance_results

Operation: Basic filtering, Execution time: 10.8832 seconds
Operation: Grouping and aggregation, Execution time: 9.7304 seconds
Operation: Window functions, Execution time: 16.1734 seconds


Unnamed: 0,Operation,Execution Time (s)
0,Basic filtering,10.883168
1,Grouping and aggregation,9.730412
2,Window functions,16.173417


In [3]:
# 3. Type Hints and API Ergonomics Tests
# For Spark 2.x
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define schema explicitly in Spark 2.x
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

data = [("Alice", 25), ("Bob", 30)]
df_manual_schema = spark.createDataFrame(data, schema)

In [4]:
# 4. API Usage Metrics
# Track API usage metrics
api_metrics = []

import ast

# Spark 2.x Code (More Verbose)
spark2_code = """
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

data = [("Alice", 25), ("Bob", 30)]
df_manual_schema = spark.createDataFrame(data, schema)
"""

# Spark 3.x Code (With Type Hints, Optimized for Fewer Lines)
spark3_code = """
from pyspark.sql import SparkSession

data = [("Alice", 25), ("Bob", 30)]
df_typed = spark.createDataFrame(data, schema=["name: string", "age: int"])
"""

def count_code_lines(code_string):
    """Counts the number of executable statements in a code string."""
    tree = ast.parse(code_string)  # Parse code
    return sum(isinstance(node, ast.stmt) for node in ast.walk(tree))

# Count lines again
code_lines_spark2 = count_code_lines(spark2_code)
code_lines_spark3 = count_code_lines(spark3_code)

# Store API Metrics
api_metrics = [{
    "Operation": "Type Hints & Schema Definition",
    "Code Lines Spark 2.x": code_lines_spark2,
    "Code Lines Spark 3.x": code_lines_spark3,
    "Reduction (%)": round((1 - code_lines_spark3/code_lines_spark2) * 100, 2)
}]

# Create DataFrame
df_api_metrics = pd.DataFrame(api_metrics)

# Display the table properly in Jupyter Notebook
display(df_api_metrics)


Unnamed: 0,Operation,Code Lines Spark 2.x,Code Lines Spark 3.x,Reduction (%)
0,Type Hints & Schema Definition,4,3,25.0


In [5]:
# 5. Dataset API Tests (Spark 2.x)
# ------------------ Spark 2.x Dataset API Test (100M Rows) ------------------
import time
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Dataset API Test - Spark 2.x") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

# Define Schema for Dataset API
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

# Generate Large Dataset Using Row Objects (Simulating Dataset API)
NUM_RECORDS = 10**8  # 100 million rows
data = [Row(name=f"Person_{i}", age=i % 100) for i in range(10**6)]  # Generate 1M at a time

# Convert to Dataset (Using DataFrame in PySpark)
df_spark2 = spark.createDataFrame(data, schema=schema)

# Apply Dataset-style Filtering Operation
start_time = time.time()
df_spark2_filtered = df_spark2.filter(col("age") > 30).count()
end_time = time.time()

# Print Execution Time
spark2_time = round(end_time - start_time, 4)
print(f"Spark 2.x Execution Time: {spark2_time} seconds")

# Stop Spark Session
spark.stop()


Spark 2.x Execution Time: 9.8884 seconds


## Specific Tests

In [6]:
# Ensure findspark is initialized (only needed if running outside a Spark environment)
import findspark
findspark.init()

# Import Spark modules
from pyspark.sql import SparkSession

# Stop any existing Spark session (Avoid conflicts)
try:
    spark.stop()
except:
    pass  # Ignore errors if Spark was not running

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("DataFrame Transformations Test") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

print(f"Spark version: {spark.version}")


Spark version: 2.4.8


In [7]:
# 1. DataFrame Transformations (Spark 2.x)
import time
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.types import *

# Initialize Spark Session
import findspark
findspark.init()
from pyspark.sql import SparkSession

# Stop existing session & start a new one
try:
    spark.stop()
except:
    pass

spark = SparkSession.builder \
    .appName("DataFrame Transformations Test - Spark 2.x") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

print(f"Spark version: {spark.version}")

# Measure execution function
def measure_execution(operation_name, func):
    start_time = time.time()
    result = func()
    end_time = time.time()
    execution_time = end_time - start_time

    print(f"Operation: {operation_name}")
    print(f"  Execution time: {execution_time:.4f} seconds")

    return {
        "operation": operation_name,
        "execution_time": execution_time
    }

# Create test data function
def create_test_data(spark, size=100000):
    data = [(i, f"name_{i}", i % 100, i % 5) for i in range(size)]
    df = spark.createDataFrame(data, ["id", "name", "value", "category"])
    return df

# Run test
test_df = create_test_data(spark, size=1000000)
print("Test DataFrame created successfully!")


Spark version: 2.4.8
Test DataFrame created successfully!


In [9]:
# 2. Specific DataFrame Transformation Tests
# Test: Column operations
results.append(measure_execution(
    "Column operations", 
    lambda: test_df.withColumn("value_squared", F.col("value") * F.col("value"))
             .withColumn("category_str", F.concat(F.lit("Category: "), F.col("category").cast("string")))
             .select("id", "name", "value_squared", "category_str")
             .cache()
             .count()
))

# Test: Complex aggregations
results.append(measure_execution(
    "Complex aggregations", 
    lambda: test_df.groupBy("category")
             .agg(
                 F.count("id").alias("count"),
                 F.sum("value").alias("total_value"),
                 F.avg("value").alias("avg_value"),
                 F.min("value").alias("min_value"),
                 F.max("value").alias("max_value"),
                 F.expr("percentile(value, 0.5)").alias("median_value")
             )
             .orderBy("category")
             .collect()
))

# Test: Window functions
results.append(measure_execution(
    "Window functions", 
    lambda: test_df.withColumn(
                "rank", F.rank().over(Window.partitionBy("category").orderBy(F.desc("value")))
             )
             .withColumn(
                "running_total", F.sum("value").over(Window.partitionBy("category").orderBy("id").rowsBetween(Window.unboundedPreceding, 0))
             )
             .where(F.col("rank") <= 10)
             .orderBy("category", "rank")
             .cache()
             .count()
))

# Test: Join operations
results.append(measure_execution(
    "Join operations", 
    lambda: spark.createDataFrame(
            [(i % 5, f"cat_{i % 5}") for i in range(5)],
            ["category", "category_name"]
        ).join(
            test_df,
            on="category",
            how="inner"
        )
        .groupBy("category_name")
        .agg(F.count("*").alias("count"), F.avg("value").alias("avg_value"))
        .collect()
))

Operation: Column operations
  Execution time: 0.7132 seconds
Operation: Complex aggregations
  Execution time: 1.7049 seconds
Operation: Window functions
  Execution time: 1.9442 seconds
Operation: Join operations
  Execution time: 9.7936 seconds


In [10]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, StringType
import time

# ✅ Reduce dataset size
NUM_RECORDS = 50000  # Reduced to 50,000 rows
data = [(i, f"Person_{i}", i % 100, i % 5) for i in range(NUM_RECORDS)]
test_df = spark.createDataFrame(data, ["id", "name", "value", "category"])

# ✅ Optimize Spark Config
spark.conf.set("spark.sql.shuffle.partitions", "50")  # Reduce shuffle partitions
spark.conf.set("spark.sql.execution.arrow.enabled", "false")  # Disable Arrow to prevent conflicts

# ✅ Define Standard UDFs
standard_double_udf = udf(lambda x: x * 2 if x is not None else 0, IntegerType())
standard_categorize_udf = udf(lambda x: "high" if x > 50 else "low", StringType())

# ✅ Apply UDFs efficiently
test_df = test_df.withColumn("doubled", standard_double_udf(F.col("value"))) \
                 .withColumn("category_label", standard_categorize_udf(F.col("value")))

# ✅ Persist (instead of cache) to optimize memory
test_df.persist()

# ✅ Perform aggregation separately (fetch only required results)
def safe_measure_execution(operation_name, func):
    try:
        start_time = time.time()
        result = func()
        end_time = time.time()
        execution_time = end_time - start_time
        print(f"✅ {operation_name}: {execution_time:.4f} sec")
        return execution_time
    except Exception as e:
        print(f"❌ {operation_name} Failed: {str(e)}")
        return None

results.append(safe_measure_execution(
    "Standard UDF processing",
    lambda: test_df.groupBy("category_label")
             .agg(F.avg("doubled").alias("avg_doubled"))
             .collect()  # No `.limit()` before collect
))

# ✅ Code Complexity Measurement
standard_udf_code = """
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, StringType

# Define Standard UDFs
standard_double_udf = udf(lambda x: x * 2 if x is not None else 0, IntegerType())
standard_categorize_udf = udf(lambda x: "high" if x > 50 else "low", StringType())

# Apply UDFs efficiently
test_df = test_df.withColumn("doubled", standard_double_udf(F.col("value"))) \
                 .withColumn("category_label", standard_categorize_udf(F.col("value")))

test_df.persist()

# Perform aggregation separately
results.append(measure_execution(
    "Standard UDF processing",
    lambda: test_df.groupBy("category_label")
             .agg(F.avg("doubled").alias("avg_doubled"))
             .collect()
))
"""

# ✅ Measure line count dynamically
line_count = len(standard_udf_code.strip().split('\n'))
print(f"📏 Standard UDF Code Complexity: {line_count} lines")


✅ Standard UDF processing: 18.0084 sec
📏 Standard UDF Code Complexity: 19 lines


In [11]:
import time
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *

# Track metrics without sparkmeasure
def track_metrics(test_name, operation_func, code_snippet):
    # Time measurement
    start_time = time.time()
    result = operation_func()
    end_time = time.time()
    execution_time = end_time - start_time
    
    # Count lines in code snippet
    line_count = len(code_snippet.strip().split('\n'))
    
    print(f"Test: {test_name}")
    print(f"  Execution time: {execution_time:.4f} seconds")
    print(f"  Code lines: {line_count}")
    
    return {
        "test_name": test_name,
        "spark_version": "2.x",
        "execution_time": execution_time,
        "code_lines": line_count
    }

# Create sample data
def create_test_data(size=50000):
    data = [(i, f"name_{i}", i % 100, i % 5) for i in range(size)]
    return spark.createDataFrame(data, ["id", "name", "value", "category"])

# Create test dataset
test_df = create_test_data()
metrics = []

# Test 1: DataFrame filtering and aggregation
test_1_code = """
filtered_df = test_df.filter(F.col("value") > 50)
result = filtered_df.groupBy("category") \
                   .agg(F.count("*").alias("count"), 
                        F.avg("value").alias("avg_value"),
                        F.sum("value").alias("sum_value")) \
                   .orderBy("category")
"""

metrics.append(track_metrics(
    "DataFrame filtering and aggregation",
    lambda: eval(compile(test_1_code, "<string>", "exec"), 
                {"test_df": test_df, "F": F}),
    test_1_code
))

# Test 2: Window functions
test_2_code = """
window_spec = Window.partitionBy("category").orderBy(F.desc("value"))
result = test_df.withColumn("rank", F.rank().over(window_spec)) \
               .filter(F.col("rank") <= 10) \
               .orderBy("category", "rank")
"""

metrics.append(track_metrics(
    "Window functions",
    lambda: eval(compile(test_2_code, "<string>", "exec"), 
                {"test_df": test_df, "F": F, "Window": Window}),
    test_2_code
))

# Test 3: Standard UDF in Spark 2.x
test_3_code = """
from pyspark.sql.functions import udf

double_value = udf(lambda x: x * 2, IntegerType())
categorize = udf(lambda x: "high" if x > 50 else "low", StringType())

result = test_df.withColumn("doubled", double_value(F.col("value"))) \
               .withColumn("category_text", categorize(F.col("value"))) \
               .groupBy("category_text") \
               .agg(F.avg("doubled").alias("avg_doubled"))
"""

metrics.append(track_metrics(
    "Standard UDF processing",
    lambda: eval(compile(test_3_code, "<string>", "exec"), 
                {"test_df": test_df, "F": F, "IntegerType": IntegerType, 
                 "StringType": StringType}),
    test_3_code
))

# Save results
metrics_df = pd.DataFrame(metrics)
print("\nSpark 2.x Metrics Summary:")
print(metrics_df)
metrics_df.to_csv("spark2_metrics.csv", index=False)

Test: DataFrame filtering and aggregation
  Execution time: 0.0532 seconds
  Code lines: 4
Test: Window functions
  Execution time: 0.0396 seconds
  Code lines: 2
Test: Standard UDF processing
  Execution time: 0.0581 seconds
  Code lines: 6

Spark 2.x Metrics Summary:
                             test_name spark_version  execution_time  \
0  DataFrame filtering and aggregation           2.x        0.053166   
1                     Window functions           2.x        0.039643   
2              Standard UDF processing           2.x        0.058070   

   code_lines  
0           4  
1           2  
2           6  
