# Spark Pi Estimation - Assignment 3

This notebook demonstrates distributed Pi estimation using Apache Spark.

## Original Setup:
- **Local Cluster:** 1 master + 1 worker (2 cores, 4GB RAM each)
- **OCI Cluster:** 1 master + 1 worker (2 cores, 12GB RAM each)

## Current Setup:
- **Single PC:** 12 cores, 32GB RAM using Spark local mode

In [1]:
import time
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
import numpy as np

## Constants and Configuration

In [2]:
FIXED_STEPS = 1000000
STEP_COUNTS = [1000, 10000, 100000, 1000000]

## Pi Estimation Function

Uses Riemann sum to approximate π through the integral ∫₀¹ 4/(1+x²) dx = π

In [3]:
def pi_estimate_riemann(index, total_steps_n):
    """
    Calculate one rectangle's contribution to the Riemann sum.
    
    Args:
        index: Current step index
        total_steps_n: Total number of steps
    
    Returns:
        Area of rectangle at this index
    """
    delta_x = 1.0 / total_steps_n  # Width of rectangle
    x = delta_x * (index - 0.5)    # Midpoint
    area = 4.0 / (1.0 + x * x) * delta_x  # Height * width
    return area

def pi_mapper(index_total_steps):
    index, total_steps = index_total_steps
    return pi_estimate_riemann(index, total_steps)

def attach_total_steps_mapper(total_steps):
    def mapper(index):
        return index, total_steps
    return mapper

## Experiment Runner

Distributes computation across Spark executors using RDD operations.

In [None]:
def run_spark_experiment(spark, total_steps, num_partitions=None):
    # If partitions are not specified, use the default parallelism (all available cores)
    if num_partitions is None:
        num_partitions = spark.sparkContext.defaultParallelism

    start_time = time.time()

    rdd = spark.sparkContext.range(
        1,
        total_steps + 1,
        numSlices=num_partitions
    )

    paired = rdd.map(attach_total_steps_mapper(total_steps))

    estimated_pi = paired.map(pi_mapper).sum()

    runtime = time.time() - start_time
    return runtime, estimated_pi

## Initialize Spark Session

Using `local[4]` to simulate 4-core cluster (like original setup with 2 cores per node).

In [5]:
import os
import sys

# Set critical environment variables BEFORE importing SparkSession
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"
os.environ["HADOOP_USER_NAME"] = "spark"
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

from pyspark.sql import SparkSession

# Comprehensive Java options for Java 17/21/23
# The critical flag is -Djava.security.manager=allow
java_opts = (
    "-Djava.security.manager=allow "
    "--add-opens=java.base/java.lang=ALL-UNNAMED "
    "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED "
    "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED "
    "--add-opens=java.base/java.io=ALL-UNNAMED "
    "--add-opens=java.base/java.net=ALL-UNNAMED "
    "--add-opens=java.base/java.nio=ALL-UNNAMED "
    "--add-opens=java.base/java.util=ALL-UNNAMED "
    "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED "
    "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED "
    "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED "
    "--add-opens=java.base/sun.nio.cs=ALL-UNNAMED "
    "--add-opens=java.base/sun.security.action=ALL-UNNAMED "
    "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED "
    "--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED "
)

spark = SparkSession.builder \
    .appName("SparkPiEstimation") \
    .master("local[*]") \
    .config("spark.driver.memory", "2g") \
    .config("spark.ui.enabled", "false") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.driver.host", "localhost") \
    .config("spark.driver.extraJavaOptions", java_opts) \
    .config("spark.executor.extraJavaOptions", java_opts) \
    .getOrCreate()

print(f"Spark Version: {spark.version}")
print(f"Master: {spark.sparkContext.master}")
print(f"Default Parallelism: {spark.sparkContext.defaultParallelism}")

Spark Version: 4.0.1
Master: local[*]
Default Parallelism: 12


## Experiment 1: Core Scaling

Compare performance with 1 core vs 4 cores (fixed steps = 1M).

In [6]:
"""print("=" * 80)
print("Core Scaling Experiment")
print("=" * 80)

core_results = []

# Test with 1 core
spark.stop()
spark_1 = SparkSession.builder \
    .appName("Spark_1Core") \
    .master("local[1]") \
    .getOrCreate()

runtime, pi = run_spark_experiment(spark_1, FIXED_STEPS)
core_results.append({'cores': 1, 'time': runtime, 'pi': pi})
print(f"1 Core: Time={runtime:.4f}s, Pi={pi:.6f}")
spark_1.stop()

# Test with 4 cores
spark_4 = SparkSession.builder \
    .appName("Spark_4Cores") \
    .master("local[4]") \
    .getOrCreate()

runtime, pi = run_spark_experiment(spark_4, FIXED_STEPS)
core_results.append({'cores': 4, 'time': runtime, 'pi': pi})
print(f"4 Cores: Time={runtime:.4f}s, Pi={pi:.6f}")

# Restore 4-core session for next experiments
spark = spark_4"""

print("=" * 80)
print("Core Scaling Experiment (Option A)")
print("=" * 80)

core_results = []

for cores in [1, 4]:
    runtime, pi = run_spark_experiment(
        spark,
        FIXED_STEPS,
        num_partitions=cores
    )

    core_results.append({
        "cores": cores,
        "time": runtime,
        "pi": pi
    })

    print(f"{cores} Core(s): Time={runtime:.4f}s, Pi={pi:.6f}")

Core Scaling Experiment (Option A)


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (kubernetes.docker.internal executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed). Consider setting 'spark.sql.execution.pyspark.udf.faulthandler.enabled' or'spark.python.worker.faulthandler.enabled' configuration to 'true' for the better Python traceback.
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:599)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:35)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:945)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:925)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:532)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.mutable.Growable.addAll(Growable.scala:61)
	at scala.collection.mutable.Growable.addAll$(Growable.scala:57)
	at scala.collection.mutable.ArrayBuilder.addAll(ArrayBuilder.scala:75)
	at scala.collection.IterableOnceOps.toArray(IterableOnce.scala:1505)
	at scala.collection.IterableOnceOps.toArray$(IterableOnce.scala:1498)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1057)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2524)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1575)
Caused by: java.io.EOFException
	at java.base/java.io.DataInputStream.readFully(DataInputStream.java:210)
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:385)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:933)
	... 22 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$3(DAGScheduler.scala:2935)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2935)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2927)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2927)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1295)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1295)
	at scala.Option.foreach(Option.scala:437)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1295)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3207)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3141)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3130)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:50)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2484)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2505)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2524)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2549)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1057)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:417)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1056)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:203)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:1575)
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed). Consider setting 'spark.sql.execution.pyspark.udf.faulthandler.enabled' or'spark.python.worker.faulthandler.enabled' configuration to 'true' for the better Python traceback.
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:599)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:35)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:945)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:925)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:532)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.mutable.Growable.addAll(Growable.scala:61)
	at scala.collection.mutable.Growable.addAll$(Growable.scala:57)
	at scala.collection.mutable.ArrayBuilder.addAll(ArrayBuilder.scala:75)
	at scala.collection.IterableOnceOps.toArray(IterableOnce.scala:1505)
	at scala.collection.IterableOnceOps.toArray$(IterableOnce.scala:1498)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1057)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2524)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	... 1 more
Caused by: java.io.EOFException
	at java.base/java.io.DataInputStream.readFully(DataInputStream.java:210)
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:385)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:933)
	... 22 more


## Experiment 2: Step Scaling

Test different step counts with 4 cores.

In [None]:
print("\n" + "=" * 80)
print("Step Scaling Experiment (4 Cores)")
print("=" * 80)

step_results = []

for steps in STEP_COUNTS:
    runtime, pi = run_spark_experiment(spark, steps)
    step_results.append({'steps': steps, 'time': runtime, 'pi': pi})
    print(f"Steps: {steps:>7}, Time: {runtime:.4f}s, Pi: {pi:.6f}")

: 

## Visualization: Core Scaling

In [None]:
core_labels = [f"{r['cores']} Core(s)" for r in core_results]
core_times = [r['time'] for r in core_results]

plt.figure(figsize=(8, 6))
plt.bar(core_labels, core_times, color=['skyblue', 'lightcoral'])
plt.title('Spark: Core Scaling Performance (Steps = 1M)')
plt.xlabel('Number of Cores')
plt.ylabel('Execution Time (seconds)')
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.show()

# Calculate speedup
speedup = core_times[0] / core_times[1]
print(f"\nSpeedup (1 core → 4 cores): {speedup:.2f}x")

: 

## Visualization: Step Scaling

In [None]:
step_values = [r['steps'] for r in step_results]
step_times = [r['time'] for r in step_results]

plt.figure(figsize=(10, 6))
plt.plot(step_values, step_times, marker='o', linestyle='-', color='blue', linewidth=2)
plt.xscale('log')
plt.title('Spark: Step Scaling Performance (Cores = 4)')
plt.xlabel('Number of Steps (Log Scale)')
plt.ylabel('Execution Time (seconds)')
plt.grid(True, which="both", ls="--", alpha=0.7)
plt.show()

: 

## Cleanup

In [None]:
spark.stop()
print("Spark session stopped.")

: 

## Analysis

### Key Observations:
1. **Core Scaling:** Performance improvement from 1 to 4 cores
2. **Step Scaling:** How overhead affects small vs large computations
3. **Spark Overhead:** Note initialization time in small step counts

### Comparison with Original Clusters:
*(Add your original cluster results here)*

### Why Spark May Be Slower for Small Problems:
- Task scheduling overhead
- Data serialization costs
- JVM warmup time
- Better suited for large-scale data processing (100GB+)

### Spark vs Ray:
- Ray: Lower overhead, better for iterative/small tasks
- Spark: Optimized for batch processing at scale