### ***Test Notebook with pyspark-env***

In [1]:
!conda env list

# conda environments:
#
base                     /opt/conda
pyspark-env           *  /opt/conda/envs/pyspark-env



In [2]:
!python --version

Python 3.12.10


In [3]:
!conda list

# packages in environment at /opt/conda/envs/pyspark-env:
#
# Name                    Version                   Build  Channel
_libgcc_mutex             0.1                 conda_forge    conda-forge
_openmp_mutex             4.5                       2_gnu    conda-forge
alabaster                 1.0.0                    pypi_0    pypi
anyio                     4.9.0                    pypi_0    pypi
argon2-cffi               23.1.0                   pypi_0    pypi
argon2-cffi-bindings      21.2.0                   pypi_0    pypi
arrow                     1.3.0                    pypi_0    pypi
asttokens                 3.0.0                    pypi_0    pypi
async-lru                 2.0.5                    pypi_0    pypi
attrs                     25.3.0                   pypi_0    pypi
aws-c-auth                0.9.0                h0a147a0_3    conda-forge
aws-c-cal                 0.9.0                hada3f3f_0    conda-forge
aws-c-common              0.12.2               hb9d3c

In [4]:
try:
    import pandas as pd
    import debugpy
    import pyspark
    print("Pandas version:", pd.__version__)
    print("Debugpy version:", debugpy.__version__)
    print("Pyspark version:", pyspark.__version__)
except Exception as e:
    print("An unexpected error occurred:", e)
    print("Please check your conda environment and package installations.")
    raise

Pandas version: 2.2.3
Debugpy version: 1.8.14
Pyspark version: 3.5.5


In [5]:
# Prueba de funcionamiento básico de PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import pyspark.sql.functions as F


def test_pyspark_environment():
    """
    Prueba básica para verificar que PySpark está funcionando correctamente.
    Crea una sesión, genera datos de prueba y ejecuta operaciones básicas.
    """    
    # 1. Inicializar sesión de Spark
    try:
        spark = SparkSession.builder \
            .appName("PySparkTest") \
            .getOrCreate()
        
        print("✅ SparkSession creada exitosamente")
        print(f"Versión de PySpark: {spark.version}")
    except Exception as e:
        print(f"❌ Error al crear SparkSession: {str(e)}")
        return False

    # 2. Crear DataFrame de prueba
    test_data = [("PVC_001", 150), 
                 ("PVC_002", 200),
                 ("PVC_003", 75)]
    
    try:
        df = spark.createDataFrame(test_data, ["product_id", "demand"])
        print("\n✅ DataFrame creado exitosamente:")
        df.show()
    except Exception as e:
        print(f"❌ Error al crear DataFrame: {str(e)}")
        spark.stop()
        return False

    # 3. Probar transformaciones básicas
    try:
        # Calcular demanda total
        total_demand = df.agg(F.sum("demand").alias("total_demand")).first()[0]
        print(f"\n📊 Demanda total calculada: {total_demand} (Valor esperado: 425)")
        
        # Filtrar productos con demanda > 100
        high_demand = df.filter(col("demand") > 100).count()
        print(f"🔍 Productos con demanda >100: {high_demand} (Valor esperado: 2)")
        
        # Agregar columna calculada
        df = df.withColumn("demand_category", 
                          F.when(col("demand") > 150, "Alta")
                           .otherwise("Media/Baja"))
        print("\n🎛️ DataFrame con categorías de demanda:")
        df.show()
    except Exception as e:
        print(f"❌ Error en transformaciones: {str(e)}")
        spark.stop()
        return False

    # 4. Probar escritura temporal (modo seguro)
    try:
        temp_path = "/tmp/pyspark_test_output"
        df.write.mode("overwrite").parquet(temp_path)
        print(f"\n💾 Datos escritos temporalmente en: {temp_path}")
        
        # Leer datos guardados para verificación
        df_read = spark.read.parquet(temp_path)
        if df_read.count() == df.count():
            print("🔄 Lectura de datos verificada exitosamente")
        else:
            raise ValueError("Conteo de registros no coincide")
    except Exception as e:
        print(f"❌ Error en escritura/lectura: {str(e)}")
        spark.stop()
        return False

    # 5. Limpieza
    spark.stop()
    print("\n🧹 Sesión de Spark cerrada correctamente")
    return True

# Ejecutar prueba
if __name__ == "__main__":
    success = test_pyspark_environment()
    if success:
        print("\n🎉 ¡Todas las pruebas de PySpark se completaron exitosamente!")
    else:
        print("\n🔴 Se encontraron problemas en la configuración de PySpark")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/02 08:03:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


✅ SparkSession creada exitosamente
Versión de PySpark: 3.5.5

✅ DataFrame creado exitosamente:


                                                                                

+----------+------+
|product_id|demand|
+----------+------+
|   PVC_001|   150|
|   PVC_002|   200|
|   PVC_003|    75|
+----------+------+


📊 Demanda total calculada: 425 (Valor esperado: 425)
🔍 Productos con demanda >100: 2 (Valor esperado: 2)

🎛️ DataFrame con categorías de demanda:
+----------+------+---------------+
|product_id|demand|demand_category|
+----------+------+---------------+
|   PVC_001|   150|     Media/Baja|
|   PVC_002|   200|           Alta|
|   PVC_003|    75|     Media/Baja|
+----------+------+---------------+



                                                                                


💾 Datos escritos temporalmente en: /tmp/pyspark_test_output
🔄 Lectura de datos verificada exitosamente

🧹 Sesión de Spark cerrada correctamente

🎉 ¡Todas las pruebas de PySpark se completaron exitosamente!
