In [None]:
import os, sys
from pathlib import Path
from pyspark.sql import SparkSession

# === Fuerza el Python correcto (mismo venv) ===
this_python = sys.executable
os.environ["PYSPARK_PYTHON"] = this_python
os.environ["PYSPARK_DRIVER_PYTHON"] = this_python

# === Fuerza IPv4/localhost y temp dir sin espacios ===
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"
spark_tmp = r"C:\spark-tmp"
Path(spark_tmp).mkdir(parents=True, exist_ok=True)

spark = (
    SparkSession.builder
    .appName("DiagnosticoSparkLocal")
    .master("local[*]")
    .config("spark.driver.bindAddress","127.0.0.1")
    .config("spark.driver.host","127.0.0.1")
    .config("spark.local.dir", spark_tmp)
    .config("spark.sql.execution.arrow.pyspark.enabled","false")
    .config("spark.network.timeout","300s")
    .getOrCreate()
)

print("✅ Spark iniciado:", spark.version)

# --- PRUEBA JVM pura (no necesita worker Python) ---
print("Cuenta JVM:", spark.range(10).count())

# --- PRUEBA worker Python (RDD con lambda) ---
rdd = spark.sparkContext.parallelize([1,2,3,4], 2).map(lambda x: x*2)
print("RDD collect:", rdd.collect())

# (Diagnóstico) imprime versión de Python dentro del worker
def pyver_in_worker(it):
    import sys
    yield "Worker Python: " + sys.version
print(rdd.mapPartitions(pyver_in_worker).take(1)[0])

# --- PRUEBA DataFrame ---
df = spark.createDataFrame([(1,"hola"), (2,"mundo")], ["id","texto"])
df.show()

print("✅ OK")



def ensure_spark():
    global spark
    try:
        if spark is None or getattr(spark, "_jsc", None) is None:
            raise RuntimeError("Spark not active")
    except NameError:
        pass  # spark no existe
    if 'spark' not in globals() or getattr(spark, "_jsc", None) is None:
        spark = (SparkSession.builder
                 .appName("DiagCheck")
                 .master("local[*]")
                 .config("spark.sql.shuffle.partitions", "8")
                 .getOrCreate())
    return spark

# Usar la función
spark = ensure_spark()

print("Spark OK:s", spark.version)
print("defaultParallelism:", spark.sparkContext.defaultParallelism)
print("particiones shuffle:", spark.conf.get("spark.sql.shuffle.partitions"))


✅ Spark iniciado: 3.5.4
Cuenta JVM: 10
RDD collect: [2, 4, 6, 8]
Worker Python: 3.11.0 (main, Oct 24 2022, 18:26:48) [MSC v.1933 64 bit (AMD64)]
+---+-----+
| id|texto|
+---+-----+
|  1| hola|
|  2|mundo|
+---+-----+

✅ OK
Spark OK: 3.5.4
defaultParallelism: 8
particiones shuffle: 200


In [2]:
from pyspark.sql import SparkSession

def ensure_spark():
    global spark
    try:
        if spark is None or getattr(spark, "_jsc", None) is None:
            raise RuntimeError("Spark not active")
    except NameError:
        pass  # spark no existe
    if 'spark' not in globals() or getattr(spark, "_jsc", None) is None:
        spark = (SparkSession.builder
                 .appName("DiagCheck")
                 .master("local[*]")
                 .config("spark.sql.shuffle.partitions", "8")
                 .getOrCreate())
    return spark

# Usar la función
spark = ensure_spark()

print("Spark OK:", spark.version)
print("defaultParallelism:", spark.sparkContext.defaultParallelism)
print("particiones shuffle:", spark.conf.get("spark.sql.shuffle.partitions"))


Spark OK: 3.5.4
defaultParallelism: 8
particiones shuffle: 8


In [None]:
from pyspark.sql import functions as F


df = spark.range(1, 1_000_000).withColumn("grp", (F.col("id") % 10))
print("Antes", df.rdd.getNumPartitions())

#cambiar particiones de shuffle a 8
spark.conf.set("spark.sql.shuffle.partitions",8)

res = df.groupBy("grp").count()
print("Después del groupBy, paticiones de shuffle:", res.rdd.getNumPartitions())

Antes 8
Después del groupBy, paticiones de shuffle: 1


In [5]:
spark.stop

<bound method SparkSession.stop of <pyspark.sql.session.SparkSession object at 0x0000014C38156190>>