In [1]:
from pyspark.sql import SparkSession

# 1) Arrancar SparkSession apuntando a Glue como metastore (Hive)
spark = SparkSession.builder \
    .appName("EDA_SparkSQL_proyecto1db") \
    .enableHiveSupport() \
    .getOrCreate()


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1748274109137_0002,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
spark.sql("SHOW DATABASES").show(truncate=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+
|namespace  |
+-----------+
|default    |
|proyecto1db|
+-----------+

In [3]:
# 1) Mostrar tablas en la base proyecto1db
spark.sql("SHOW TABLES IN proyecto1db").show(truncate=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----------------------+-----------+
|namespace  |tableName             |isTemporary|
+-----------+----------------------+-----------+
|proyecto1db|base_entrenamiento_csv|false      |
+-----------+----------------------+-----------+

In [4]:
# 1) Mostrar el esquema detallado (columnas y tipos)
spark.sql("DESCRIBE proyecto1db.base_entrenamiento_csv").show(truncate=False)

# 2) Ver un ejemplo de fila para confirmar nombres exactos de encabezados
spark.table("proyecto1db.base_entrenamiento_csv") \
     .limit(1) \
     .toDF(*spark.table("proyecto1db.base_entrenamiento_csv").columns) \
     .show(truncate=False)

# 1) Cargar la tabla tal cual está en Glue  
df_raw = spark.table("proyecto1db.base_entrenamiento_csv")

# 2) Ver nombres exactos y tipos  
spark.sql("DESCRIBE proyecto1db.base_entrenamiento_csv").show(truncate=False)


df_raw.limit(1).show(truncate=False)

# 4) Renombrar columnas: espacios → guión bajo, todo en minúsculas
clean_cols = [c.strip().lower().replace(" ", "_") for c in df_raw.columns]
df = df_raw.toDF(*clean_cols)

# 5) Ver esquema limpio y una muestra
df.printSchema()
df.show(1, truncate=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+---------+-------+
|col_name           |data_type|comment|
+-------------------+---------+-------+
|id_uc              |string   |NULL   |
|actividad economica|string   |NULL   |
|consumo promedio   |double   |NULL   |
|ultimo valor leido |double   |NULL   |
|diferencia lectura |string   |NULL   |
|total puntaje      |double   |NULL   |
|evento             |string   |NULL   |
|evento_cat         |bigint   |NULL   |
|observacion_ok_str |string   |NULL   |
|bigrams_str        |string   |NULL   |
+-------------------+---------+-------+

+-----+-------------------+----------------+------------------+------------------+-------------+------+----------+------------------+-----------+
|id_uc|actividad economica|consumo promedio|ultimo valor leido|diferencia lectura|total puntaje|evento|evento_cat|observacion_ok_str|bigrams_str|
+-----+-------------------+----------------+------------------+------------------+-------------+------+----------+------------------+-----------+
|

In [5]:
df.groupBy("id_uc") \
  .count() \
  .orderBy("count", ascending=False) \
  .show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-----+
|   id_uc|count|
+--------+-----+
|12408022|    2|
|12553767|    2|
|12361370|    2|
|12353261|    2|
|12303103|    2|
|12364706|    2|
|12530854|    2|
|12318869|    2|
|12469543|    2|
|12327528|    2|
+--------+-----+
only showing top 10 rows

In [6]:
df.groupBy("actividad_economica") \
  .count() \
  .orderBy("count", ascending=False) \
  .show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----+
| actividad_economica|count|
+--------------------+-----+
|Residencial estra...|  517|
|Residencial estra...|  494|
|Residencial estra...|  370|
|           Comercial|  364|
|          Industrial|  125|
|             Oficial|  118|
|Residencial estra...|  102|
|              Exenta|   69|
|Residencial estra...|   58|
|Expendio de comid...|   51|
+--------------------+-----+
only showing top 10 rows

In [10]:
base_path = "s3://proyectointegrador1017/refined/"


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
# 3) Estadísticas descriptivas vía spark.sql y guardado en S3
stats_df = spark.sql("""
  SELECT
    ROUND(AVG(`consumo promedio`), 2)           AS avg_consumo,
    ROUND(STDDEV(`consumo promedio`), 2)        AS sd_consumo,
    MIN(`consumo promedio`)                    AS min_consumo,
    PERCENTILE_APPROX(`consumo promedio`, 0.5)  AS med_consumo,
    MAX(`consumo promedio`)                    AS max_consumo
  FROM proyecto1db.base_entrenamiento_csv
""")

stats_df.show(truncate=False)

stats_df.coalesce(1) \
        .write \
        .mode("overwrite") \
        .option("header", True) \
        .csv(base_path + "estadisticas_descriptivas/")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----------+-----------+-----------+-----------+
|avg_consumo|sd_consumo|min_consumo|med_consumo|max_consumo|
+-----------+----------+-----------+-----------+-----------+
|21.03      |102.49    |0.0        |5.0        |3292.0     |
+-----------+----------+-----------+-----------+-----------+

In [12]:
# 6a) Correlaciones entre consumo_promedio, ultimo_valor_leido y diferencia_lectura
corr_val = spark.sql("""
  SELECT 
    corr(`consumo promedio`, `ultimo valor leido`)  AS corr_consumo_valor,
    corr(`consumo promedio`, `diferencia lectura`) AS corr_consumo_diferencia
  FROM proyecto1db.base_entrenamiento_csv
""")
corr_val.show()

# Guardar correlaciones en S3
corr_val.coalesce(1) \
        .write \
        .mode("overwrite") \
        .option("header", True) \
        .csv(base_path + "correlaciones/")

# 6b) Histograma binned de diferencia_lectura
hist_diff = spark.sql("""
  SELECT
    FLOOR(`diferencia lectura`/5)*5 AS diff_bin,
    COUNT(*)                  AS cnt
  FROM proyecto1db.base_entrenamiento_csv
  WHERE `diferencia lectura` IS NOT NULL
  GROUP BY FLOOR(`diferencia lectura`/5)*5
  ORDER BY diff_bin
""")
hist_diff.show(20)

# Guardar histograma de diferencia lectura en S3
hist_diff.coalesce(1) \
         .write \
         .mode("overwrite") \
         .option("header", True) \
         .csv(base_path + "histograma_diferencia/")

print("✅ Correcciones aplicadas: correlaciones e histogramas con columnas válidas.")  


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+-----------------------+
|corr_consumo_valor|corr_consumo_diferencia|
+------------------+-----------------------+
|0.2488259648802437|   -0.00227867894441...|
+------------------+-----------------------+

+--------+----+
|diff_bin| cnt|
+--------+----+
|    NULL| 292|
|       0|1344|
|       5| 297|
|      10| 158|
|      15| 108|
|      20|  58|
|      25|  48|
|      30|  39|
|      35|  30|
|      40|  25|
|      45|  22|
|      50|  15|
|      55|  12|
|      60|  13|
|      65|  16|
|      70|   4|
|      75|   8|
|      80|  18|
|      85|  10|
|      90|   7|
+--------+----+
only showing top 20 rows

? Correcciones aplicadas: correlaciones e histogramas con columnas v?lidas.