### Useful links:
NOTE: Java error seems dependant on worker dying. Therefore recommended that we use python v3.10-3.12. Will need to setup venv and run again 
- [local pyspark installation, windows 11 notebook](https://www.youtube.com/watch?v=yslubfE8IQU)
- [Markdown guide](https://www.markdownguide.org/basic-syntax/)

In [2]:
import os, sys, platform, subprocess, shutil, pathlib, json

# --- 0) HARDEN ENV BEFORE SparkSession IS CREATED ---
# Make sure driver & executors use this exact Python
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

# Put Spark temp on a non-synced, short path (OneDrive can cause grief)
local_tmp = r"C:\spark-tmp" if os.name == "nt" else "/tmp/spark-tmp"
pathlib.Path(local_tmp).mkdir(parents=True, exist_ok=True)
os.environ["SPARK_LOCAL_DIRS"] = local_tmp  # used by Spark core
os.environ["TMPDIR"] = local_tmp             # used by Python workers

# --- 1) PRINT VERSIONS & JAVA ---
print("Python:", sys.version)
print("Platform:", platform.platform())
print("Executable:", sys.executable)
print("JAVA_HOME:", os.environ.get("JAVA_HOME"))
try:
    out = subprocess.check_output(["java", "-version"], stderr=subprocess.STDOUT, text=True)
    print(out)
except Exception as e:
    print("java -version failed:", e)

# --- 2) START SPARK IN LOCAL MODE WITH SAFE SETTINGS ---
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("pyspark-eof-diagnose")
    .master("local[*]")  # explicit local
    .config("spark.local.dir", local_tmp)
    .config("spark.python.worker.reuse", "false")   # avoid stale workers
    .config("spark.sql.execution.arrow.pyspark.enabled", "false")  # rule out Arrow crashes
    .config("spark.ui.showConsoleProgress", "true")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")
print("Spark version:", spark.version)


Python: 3.12.11 (main, Aug  8 2025, 17:05:04) [MSC v.1944 64 bit (AMD64)]
Platform: Windows-11-10.0.26200-SP0
Executable: c:\Users\Tony\OneDrive - Ulster University\Teaching\COM739\2025-26\.venv\Scripts\python.exe
JAVA_HOME: C:\Program Files\Eclipse Adoptium\jdk-17.0.16.8-hotspot\
openjdk version "17.0.16" 2025-07-15
OpenJDK Runtime Environment Temurin-17.0.16+8 (build 17.0.16+8)
OpenJDK 64-Bit Server VM Temurin-17.0.16+8 (build 17.0.16+8, mixed mode, sharing)

Spark version: 4.0.1


In [6]:
spark.stop()

In [3]:
data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)

In [None]:
# --- 3) SMOKE TEST (should ALWAYS work) ---
print("Basic range collect:")
print(spark.range(5).collect())

# --- 4) YOUR DF: try ultra-small read path ---
print("df.limit(1).collect() test:")
print(df.limit(1).collect())

# --- 5) Narrow: single column, schema, plan ---
print("Schema:")
df.printSchema()
print("\nFirst plan:")
df.explain(True)

first_col = df.columns[0] if df.columns else None
if first_col:
    print("\nSingle column limit(5):")
    print(df.select(first_col).limit(5).collect())

# --- 6) If you use UDFs, try stripping them: select only native cols ---
# Heuristic: find obviously safe primitive columns
safe_types = {"IntegerType","LongType","DoubleType","FloatType","StringType","BooleanType","ShortType","ByteType","DateType","TimestampType"}
from pyspark.sql.types import StructType
schema = df.schema
safe_cols = [f.name for f in schema.fields if f.dataType.simpleString().split("(")[0] in {t.lower().replace("type","") for t in safe_types}]
if safe_cols:
    print("\nSafe cols sample collect:")
    print(df.select(*safe_cols[:min(5,len(safe_cols))]).limit(10).collect())

# --- 7) RDD path test (bypasses some SQL codegen paths) ---
print("\nRDD take(1) test:")
print(df.rdd.take(1))

In [5]:
from pyspark.sql import functions as F, types as T

def simple_dtype(t):
    # normalize type name for easy checks
    return t.simpleString().lower()

schema = df.schema

problem_cols = []
ok_cols = []
for f in schema.fields:
    col = f.name
    try:
        df.select(col).limit(5).collect()
        ok_cols.append((col, simple_dtype(f.dataType)))
    except Exception as e:
        problem_cols.append((col, simple_dtype(f.dataType), repr(e)))

print("OK columns:", ok_cols[:10], "... total:", len(ok_cols))
print("Problem columns:", problem_cols)

# Heuristic: try safe string-casts for problematic ones to confirm it's a serializer/type issue
cast_exprs = []
for f in schema.fields:
    col = f.name
    if any(col == p[0] for p in problem_cols):
        cast_exprs.append(F.col(col).cast("string").alias(f"{col}__as_str"))
    else:
        cast_exprs.append(F.col(col))

print("Try collecting with problematic columns cast to string:")
print(df.select(*cast_exprs).limit(5).collect())


OK columns: [] ... total: 0
Problem columns: [('firstname', 'string', "Py4JJavaError('An error occurred while calling o73.collectToPython.\\n', JavaObject id=o74)"), ('middlename', 'string', "Py4JJavaError('An error occurred while calling o81.collectToPython.\\n', JavaObject id=o82)"), ('lastname', 'string', "Py4JJavaError('An error occurred while calling o89.collectToPython.\\n', JavaObject id=o90)"), ('dob', 'string', "Py4JJavaError('An error occurred while calling o97.collectToPython.\\n', JavaObject id=o98)"), ('gender', 'string', "Py4JJavaError('An error occurred while calling o105.collectToPython.\\n', JavaObject id=o106)"), ('salary', 'bigint', "Py4JJavaError('An error occurred while calling o113.collectToPython.\\n', JavaObject id=o114)")]
Try collecting with problematic columns cast to string:


Py4JJavaError: An error occurred while calling o158.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 55) (host.docker.internal executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed). Consider setting 'spark.sql.execution.pyspark.udf.faulthandler.enabled' or'spark.python.worker.faulthandler.enabled' configuration to 'true' for the better Python traceback.
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:599)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:35)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:945)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:925)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:532)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:402)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:901)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:901)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.io.EOFException
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:386)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:933)
	... 26 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$3(DAGScheduler.scala:2935)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2935)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2927)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2927)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1295)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1295)
	at scala.Option.foreach(Option.scala:437)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1295)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3207)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3141)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3130)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:50)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2484)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2505)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2524)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:544)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:497)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:58)
	at org.apache.spark.sql.classic.Dataset.$anonfun$collectToPython$1(Dataset.scala:2057)
	at org.apache.spark.sql.classic.Dataset.$anonfun$withAction$2(Dataset.scala:2234)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
	at org.apache.spark.sql.classic.Dataset.$anonfun$withAction$1(Dataset.scala:2232)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:272)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:125)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
	at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:295)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:124)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:237)
	at org.apache.spark.sql.classic.Dataset.withAction(Dataset.scala:2232)
	at org.apache.spark.sql.classic.Dataset.collectToPython(Dataset.scala:2054)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed). Consider setting 'spark.sql.execution.pyspark.udf.faulthandler.enabled' or'spark.python.worker.faulthandler.enabled' configuration to 'true' for the better Python traceback.
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:599)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:35)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:945)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:925)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:532)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:402)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:901)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:901)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more
Caused by: java.io.EOFException
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:386)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:933)
	... 26 more


In [1]:
import sys, os
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

In [2]:
""" Configuration of environment for PySpark applications. """
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SparkTest").getOrCreate()

print(f"findspark version: {findspark.__version__}")
print(f"pyspark version: {spark.version}")

findspark version: 2.0.1
pyspark version: 3.5.7


In [4]:
# read in json file
df1 = spark.read.format("json").load("people.json") # this works although more verbose
df2 = spark.read.json("people.json") # this is a direct way to read json
df2.show()
df1.show()

+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [5]:
df1.printSchema() # print the data schema 
df2.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [6]:
df1.collect()
df2.collect() # collect the data as a list of rows

[Row(age=None, name='Michael'),
 Row(age=30, name='Andy'),
 Row(age=19, name='Justin')]

In [7]:
df1.count()

3

In [8]:
df1.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   NULL|
| stddev|7.7781745930520225|   NULL|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



## Week 3 pyspark

In [9]:
data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)

In [10]:
df.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)



In [11]:
df.collect()

[Row(firstname='James', middlename='', lastname='Smith', dob='1991-04-01', gender='M', salary=3000),
 Row(firstname='Michael', middlename='Rose', lastname='', dob='2000-05-19', gender='M', salary=4000),
 Row(firstname='Robert', middlename='', lastname='Williams', dob='1978-09-05', gender='M', salary=4000),
 Row(firstname='Maria', middlename='Anne', lastname='Jones', dob='1967-12-01', gender='F', salary=4000),
 Row(firstname='Jen', middlename='Mary', lastname='Brown', dob='1980-02-17', gender='F', salary=-1)]