# Analisis optimizaciones fichero .Parquet

Para realizar el analisis, despu칠s de arrancar la sesi칩n de Spark hay que ir ejecutando el codigo de las celdas y posteriormente meterse en el Spark UI para comparar los jobs ejecutados. Asi se puede observar las diferencias entre las consultas al fichero .csv y el .parquet.

In [1]:
import $ivy.`org.apache.spark::spark-sql:2.4.3`


import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window



//Reducir numero logs
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)


//For adding extra dependenies
import $ivy.`org.typelevel::cats-core:1.6.0`

//Plotly
import $ivy.`org.plotly-scala::plotly-almond:0.8.3`
import plotly._, plotly.element._, plotly.layout._, plotly.Almond._


//Iniciamos session de Spark
val spark = {
  NotebookSparkSession.builder()
     .master("local[*]")
    //.config("spark.executor.instances", "4") // N칰mero de ejecutores
    //.config("spark.executor.memory", "4g") // Memoria por ejecutor
    //.config("spark.executor.cores", "2") // N칰cleos por ejecutor
    //.config("spark.driver.memory", "4g") // Memoria del driver
    //.config("spark.sql.shuffle.partitions", "8") // N칰mero de particiones para operaciones de shuffle
    .getOrCreate()
}
import spark.implicits._

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties


[32mimport [39m[36m$ivy.$[39m
[32mimport [39m[36morg.apache.spark.sql._[39m
[32mimport [39m[36morg.apache.spark.sql.functions._[39m
[32mimport [39m[36morg.apache.spark.sql.expressions.Window[39m
[32mimport [39m[36morg.apache.log4j.{Level, Logger}[39m
[32mimport [39m[36m$ivy.$[39m
[32mimport [39m[36m$ivy.$[39m
[32mimport [39m[36mplotly._, plotly.element._, plotly.layout._, plotly.Almond._


//Iniciamos session de Spark
[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@1a099168
[32mimport [39m[36mspark.implicits._[39m

#### Lectura simple de datos

In [59]:
val pathCsv = "../../data/csv/dsBalanceNacionalAnalisis.csv"      
val pathParquet = "../../data/parquet/dsBalanceNacionalParticionado.parquet"

// Medir tiempo de lectura CSV
val startCsv = System.nanoTime()
val dfFromCsv: DataFrame = spark.read
    .option("header", "true") // Para que Spark lea la primera linea como encabezado
    .option("inferSchema", "true") // Para inferir autom치ticamente el esquema
    .csv(pathCsv)
val endCsv = System.nanoTime()
println(s"Tiempo de lectura CSV: ${(endCsv - startCsv) / 1e9} segundos")


// Medir tiempo de lectura Parquet
val startParquet = System.nanoTime()
val dfFromParquet: DataFrame = spark.read.parquet(pathParquet)
val endParquet = System.nanoTime()
println(s"Tiempo de lectura Parquet: ${(endParquet - startParquet) / 1e9} segundos")

// Ver esquema
dfFromCsv.explain()
dfFromParquet.explain()

Tiempo de lectura CSV: 0.3589797 segundos


Tiempo de lectura Parquet: 0.1673615 segundos
== Physical Plan ==
*(1) FileScan csv [Familia#1635,Tipo#1636,Compuesto#1637,Porcentaje#1638,Valor#1639,Fecha#1640,BajasEmisiones#1641,A침o#1642,Mes#1643] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/C:/Proyectos/spark-datos-energia/data/csv/dsBalanceNacionalAnalisis.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Familia:string,Tipo:string,Compuesto:boolean,Porcentaje:double,Valor:double,Fecha:timestam...
== Physical Plan ==
*(1) FileScan parquet [Familia#1653,Tipo#1654,Compuesto#1655,Porcentaje#1656,Valor#1657,Fecha#1658,BajasEmisiones#1659,A침o#1660,Mes#1661] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Proyectos/spark-datos-energia/data/parquet/dsBalanceNacionalParticiona..., PartitionCount: 169, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Familia:string,Tipo:string,Compuesto:boolean,Porcentaje:double,Valor:double,Fecha:timestam...


[36mpathCsv[39m: [32mString[39m = [32m"../../data/csv/dsBalanceNacionalAnalisis.csv"[39m
[36mpathParquet[39m: [32mString[39m = [32m"../../data/parquet/dsBalanceNacionalParticionado.parquet"[39m
[36mstartCsv[39m: [32mLong[39m = [32m783512048687500L[39m
[36mdfFromCsv[39m: [32mDataFrame[39m = [Familia: string, Tipo: string ... 7 more fields]
[36mendCsv[39m: [32mLong[39m = [32m783512407667200L[39m
[36mstartParquet[39m: [32mLong[39m = [32m783512407933500L[39m
[36mdfFromParquet[39m: [32mDataFrame[39m = [Familia: string, Tipo: string ... 7 more fields]
[36mendParquet[39m: [32mLong[39m = [32m783512575295000L[39m

#### Lectura parcial (solo algunas columnas)

In [35]:
val columnasFromCsv = dfFromCsv
    .select("Fecha", "Tipo", "Valor")

columnasFromCsv.explain()

val columnasFromParquet = dfFromParquet
    .select("Fecha", "Tipo", "Valor")

columnasFromParquet.explain()

== Physical Plan ==
*(1) Project [Fecha#425, Tipo#421, Valor#424]
+- *(1) FileScan csv [Tipo#421,Valor#424,Fecha#425] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/C:/Proyectos/spark-datos-energia/data/csv/dsBalanceNacional11-25.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Tipo:string,Valor:double,Fecha:timestamp>
== Physical Plan ==
*(1) Project [Fecha#439, Tipo#435, Valor#438]
+- *(1) FileScan parquet [Tipo#435,Valor#438,Fecha#439] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Proyectos/spark-datos-energia/data/parquet/dsBalanceNacional11-25.parq..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Tipo:string,Valor:double,Fecha:timestamp>


[36mcolumnasFromCsv[39m: [32mDataFrame[39m = [Fecha: timestamp, Tipo: string ... 1 more field]
[36mcolumnasFromParquet[39m: [32mDataFrame[39m = [Fecha: timestamp, Tipo: string ... 1 more field]

#### Lectura filtrada

In [63]:
val pathCsv = "../../data/csv/dsBalanceNacionalAnalisis.csv"      
val pathParquet = "../../data/parquet/dsBalanceNacionalParticionado.parquet"

//csv
val startCsv = System.nanoTime()
val dfCsvFiltered = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(pathCsv)
    .filter($"A침o" === 2024 && $"Mes" === 1)// Filtro directo en la columna sin transformaciones

val endCsv = System.nanoTime()

println(s"Tiempo de lectura CSV: ${(endCsv - startCsv) / 1e9} segundos")

dfCsvFiltered.explain() // Ver plan de ejecuci칩n
dfCsvFiltered.show(1)

Tiempo de lectura CSV: 0.3754405 segundos
== Physical Plan ==
*(1) Project [Familia#1801, Tipo#1802, Compuesto#1803, Porcentaje#1804, Valor#1805, Fecha#1806, BajasEmisiones#1807, A침o#1808, Mes#1809]
+- *(1) Filter (((isnotnull(A침o#1808) && isnotnull(Mes#1809)) && (A침o#1808 = 2024)) && (Mes#1809 = 1))
   +- *(1) FileScan csv [Familia#1801,Tipo#1802,Compuesto#1803,Porcentaje#1804,Valor#1805,Fecha#1806,BajasEmisiones#1807,A침o#1808,Mes#1809] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/C:/Proyectos/spark-datos-energia/data/csv/dsBalanceNacionalAnalisis.csv], PartitionFilters: [], PushedFilters: [IsNotNull(A침o), IsNotNull(Mes), EqualTo(A침o,2024), EqualTo(Mes,1)], ReadSchema: struct<Familia:string,Tipo:string,Compuesto:boolean,Porcentaje:double,Valor:double,Fecha:timestam...


+---------+----------+---------+-------------------+---------+-------------------+--------------+----+---+
|  Familia|      Tipo|Compuesto|         Porcentaje|    Valor|              Fecha|BajasEmisiones| A침o|Mes|
+---------+----------+---------+-------------------+---------+-------------------+--------------+----+---+
|Renovable|Hidr치ulica|    false|0.19718853720380722|58381.148|2024-01-01 01:00:00|          true|2024|  1|
+---------+----------+---------+-------------------+---------+-------------------+--------------+----+---+
only showing top 1 row



[36mpathCsv[39m: [32mString[39m = [32m"../../data/csv/dsBalanceNacionalAnalisis.csv"[39m
[36mpathParquet[39m: [32mString[39m = [32m"../../data/parquet/dsBalanceNacionalParticionado.parquet"[39m
[36mstartCsv[39m: [32mLong[39m = [32m784023217208500L[39m
[36mdfCsvFiltered[39m: [32mDataset[39m[[32mRow[39m] = [Familia: string, Tipo: string ... 7 more fields]
[36mendCsv[39m: [32mLong[39m = [32m784023592649000L[39m

In [58]:
//Parquet
val startParquet = System.nanoTime()

val dfParquetFiltered = spark.read
    .parquet(pathParquet)
    .filter($"A침o" === 2024 && $"Mes" === 1) // Filtro directo en la columna sin transformaciones

val endParquet = System.nanoTime()

println(s"Tiempo de lectura Parquet: ${(endParquet - startParquet) / 1e9} segundos")

dfParquetFiltered.explain() // Ver plan de ejecuci칩n
dfParquetFiltered.show(1)

Tiempo de lectura Parquet: 0.2106526 segundos
== Physical Plan ==
*(1) FileScan parquet [Familia#1578,Tipo#1579,Compuesto#1580,Porcentaje#1581,Valor#1582,Fecha#1583,BajasEmisiones#1584,A침o#1585,Mes#1586] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Proyectos/spark-datos-energia/data/parquet/dsBalanceNacionalParticiona..., PartitionCount: 1, PartitionFilters: [isnotnull(A침o#1585), isnotnull(Mes#1586), (A침o#1585 = 2024), (Mes#1586 = 1)], PushedFilters: [], ReadSchema: struct<Familia:string,Tipo:string,Compuesto:boolean,Porcentaje:double,Valor:double,Fecha:timestam...


+---------+----------+---------+-------------------+---------+-------------------+--------------+----+---+
|  Familia|      Tipo|Compuesto|         Porcentaje|    Valor|              Fecha|BajasEmisiones| A침o|Mes|
+---------+----------+---------+-------------------+---------+-------------------+--------------+----+---+
|Renovable|Hidr치ulica|    false|0.19718853720380722|58381.148|2024-01-01 01:00:00|          true|2024|  1|
+---------+----------+---------+-------------------+---------+-------------------+--------------+----+---+
only showing top 1 row



[36mstartParquet[39m: [32mLong[39m = [32m783399512523100L[39m
[36mdfParquetFiltered[39m: [32mDataset[39m[[32mRow[39m] = [Familia: string, Tipo: string ... 7 more fields]
[36mendParquet[39m: [32mLong[39m = [32m783399723175700L[39m

#### Agrupaciones

In [65]:
val startCsv = System.nanoTime()

val dfCsv = spark.read
    .option("header", "true")
    .option("inferSchema", "true") // Spark debe inferir el esquema en cada lectura
    .csv(pathCsv)
    .filter($"A침o" === 2024 && $"Mes" === 1) // NO optimizado (lee todo el archivo)
    .filter($"Familia" === "Renovable") // NO optimizado (lee todas las filas)
    .groupBy("Tipo")
    .agg(
        sum("Valor").as("TotalGenerado"),
        avg("Porcentaje").as("PorcentajePromedio")
    )

val endCsv = System.nanoTime()
println(s"Tiempo de lectura CSV: ${(endCsv - startCsv) / 1e9} segundos")

dfCsv.explain(true)
dfCsv.show(5)

Tiempo de lectura CSV: 0.4236098 segundos
== Parsed Logical Plan ==
'Aggregate [Tipo#1868], [Tipo#1868, sum('Valor) AS TotalGenerado#1895, avg('Porcentaje) AS PorcentajePromedio#1897]
+- Filter (Familia#1867 = Renovable)
   +- Filter ((A침o#1874 = 2024) && (Mes#1875 = 1))
      +- Relation[Familia#1867,Tipo#1868,Compuesto#1869,Porcentaje#1870,Valor#1871,Fecha#1872,BajasEmisiones#1873,A침o#1874,Mes#1875] csv

== Analyzed Logical Plan ==
Tipo: string, TotalGenerado: double, PorcentajePromedio: double
Aggregate [Tipo#1868], [Tipo#1868, sum(Valor#1871) AS TotalGenerado#1895, avg(Porcentaje#1870) AS PorcentajePromedio#1897]
+- Filter (Familia#1867 = Renovable)
   +- Filter ((A침o#1874 = 2024) && (Mes#1875 = 1))
      +- Relation[Familia#1867,Tipo#1868,Compuesto#1869,Porcentaje#1870,Valor#1871,Fecha#1872,BajasEmisiones#1873,A침o#1874,Mes#1875] csv

== Optimized Logical Plan ==
Aggregate [Tipo#1868], [Tipo#1868, sum(Valor#1871) AS TotalGenerado#1895, avg(Porcentaje#1870) AS PorcentajePromedio#189

+-------------------+------------------+--------------------+
|               Tipo|     TotalGenerado|  PorcentajePromedio|
+-------------------+------------------+--------------------+
|Residuos renovables| 69243.12599999999|0.005950484155910...|
| Solar fotovoltaica|1932458.7750000001| 0.16709750936029064|
|      Solar t칠rmica| 94242.96699999998|0.008281224136326733|
|   Otras renovables| 283523.7090000001| 0.02439302139040502|
|        Hidroe칩lica| 739.2880000000001|6.366359586924555E-5|
+-------------------+------------------+--------------------+
only showing top 5 rows



[36mstartCsv[39m: [32mLong[39m = [32m785003079486700L[39m
[36mdfCsv[39m: [32mDataFrame[39m = [Tipo: string, TotalGenerado: double ... 1 more field]
[36mendCsv[39m: [32mLong[39m = [32m785003503096500L[39m

In [67]:
//Parquet
val startParquet = System.nanoTime()

val dfParquet = spark.read
    .parquet(pathParquet)
    .filter($"A침o" === 2024 && $"Mes" === 1) // Partition Pruning (lee solo las carpetas necesarias)
    .filter($"Familia" === "Renovable") // Predicate Pushdown (lee solo las filas necesarias)
    .groupBy("Tipo")
    .agg(
        sum("Valor").as("TotalGenerado"),
        avg("Porcentaje").as("PorcentajePromedio")
    )
val endParquet = System.nanoTime()

println(s"Tiempo de lectura Parquet: ${(endParquet - startParquet) / 1e9} segundos")


dfParquet.explain(true)
dfParquet.show(5)

Tiempo de lectura Parquet: 0.2106089 segundos
== Parsed Logical Plan ==
'Aggregate [Tipo#1931], [Tipo#1931, sum('Valor) AS TotalGenerado#1958, avg('Porcentaje) AS PorcentajePromedio#1960]
+- Filter (Familia#1930 = Renovable)
   +- Filter ((A침o#1937 = 2024) && (Mes#1938 = 1))
      +- Relation[Familia#1930,Tipo#1931,Compuesto#1932,Porcentaje#1933,Valor#1934,Fecha#1935,BajasEmisiones#1936,A침o#1937,Mes#1938] parquet

== Analyzed Logical Plan ==
Tipo: string, TotalGenerado: double, PorcentajePromedio: double
Aggregate [Tipo#1931], [Tipo#1931, sum(Valor#1934) AS TotalGenerado#1958, avg(Porcentaje#1933) AS PorcentajePromedio#1960]
+- Filter (Familia#1930 = Renovable)
   +- Filter ((A침o#1937 = 2024) && (Mes#1938 = 1))
      +- Relation[Familia#1930,Tipo#1931,Compuesto#1932,Porcentaje#1933,Valor#1934,Fecha#1935,BajasEmisiones#1936,A침o#1937,Mes#1938] parquet

== Optimized Logical Plan ==
Aggregate [Tipo#1931], [Tipo#1931, sum(Valor#1934) AS TotalGenerado#1958, avg(Porcentaje#1933) AS Porcentaje

+-------------------+------------------+--------------------+
|               Tipo|     TotalGenerado|  PorcentajePromedio|
+-------------------+------------------+--------------------+
|Residuos renovables| 69243.12599999999|0.005950484155910...|
| Solar fotovoltaica|1932458.7750000001| 0.16709750936029064|
|      Solar t칠rmica| 94242.96699999998|0.008281224136326733|
|   Otras renovables| 283523.7090000001| 0.02439302139040502|
|        Hidroe칩lica| 739.2880000000001|6.366359586924555E-5|
+-------------------+------------------+--------------------+
only showing top 5 rows



[36mstartParquet[39m: [32mLong[39m = [32m785067117229600L[39m
[36mdfParquet[39m: [32mDataFrame[39m = [Tipo: string, TotalGenerado: double ... 1 more field]
[36mendParquet[39m: [32mLong[39m = [32m785067327838500L[39m

In [18]:
dfFromParquet.groupBy("Tipo").count().show()

+--------------------+-----+
|                Tipo|count|
+--------------------+-----+
| Residuos renovables| 5145|
|     Demanda en b.c.| 5145|
|  Turbinaci칩n bombeo| 5145|
|Saldo almacenamiento| 5145|
|     Consumos bombeo| 5145|
|  Solar fotovoltaica| 5145|
|       Solar t칠rmica| 5141|
|Generaci칩n no ren...| 5145|
|    Otras renovables| 5145|
|             Nuclear| 5145|
|         Hidroe칩lica| 3838|
|      Turbina de gas| 5145|
|Generaci칩n renovable| 5145|
|Residuos no renov...| 5145|
|        Cogeneraci칩n| 5145|
|Saldo I. internac...| 5145|
|       Carga bater칤a|  727|
|     Entrega bater칤a|  665|
|      Motores di칠sel| 5145|
|              E칩lica| 5145|
+--------------------+-----+
only showing top 20 rows



In [15]:
//parquet

val startParquet = System.nanoTime()
val ventanaParquet = Window.partitionBy("A침o")

val datosBalanceAnualesParquet = dfFromParquet
    .withColumn("A침o", date_format($"Fecha", "yyyy"))
    .groupBy($"A침o", $"Familia", $"Tipo", $"Compuesto", $"BajasEmisiones")
    .agg(
        sum("Valor").as("ValorAnual"),
        avg("Porcentaje").as("PorcentajeAnualSobreFamilia")
    )
    .withColumn(
        "TotalGenerado", 
        round(
            sum(
                when(!$"Compuesto" && $"Familia" =!= "Demanda" ,$"ValorAnual")
                    .otherwise(0)).over(ventanaParquet),2)
    )
    .withColumn(
        "PorcentajeSobreTotal", 
        round($"ValorAnual" / $"TotalGenerado" * 100,2))
    .withColumn(
        "PorcentajeBajasEmisiones", 
        round(
            sum(
                when($"BajasEmisiones" && !$"Compuesto" && $"Familia" =!= "Demanda", $"ValorAnual")
                    .otherwise(0)).over(ventanaParquet) / $"TotalGenerado" * 100,2)
    )

// Mostrar los primeros resultados y explicar el plan de ejecuci칩n
datosBalanceAnualesParquet.show(5)
//datosBalanceAnualesParquet.explain(true)
val endParquet = System.nanoTime()
println(s"Tiempo ejecuci칩n Parquet: ${(endParquet - startParquet) / 1e9} segundos")

+----+------------+--------------------+---------+--------------+-------------------+---------------------------+--------------+--------------------+------------------------+
| A침o|     Familia|                Tipo|Compuesto|BajasEmisiones|         ValorAnual|PorcentajeAnualSobreFamilia| TotalGenerado|PorcentajeSobreTotal|PorcentajeBajasEmisiones|
+----+------------+--------------------+---------+--------------+-------------------+---------------------------+--------------+--------------------+------------------------+
|2016|No-Renovable|    Turbina de vapor|    false|         false| 2536143.0030000005|         0.0167077641039327|2.5700810617E8|                0.99|                   61.13|
|2016|No-Renovable|      Turbina de gas|    false|         false|  616037.3019999998|       0.003893066689377...|2.5700810617E8|                0.24|                   61.13|
|2016|   Renovable|       Solar t칠rmica|    false|          true|  5071201.701999999|       0.054412720483706205|2.5700810617

[36mstartParquet[39m: [32mLong[39m = [32m704686483035900L[39m
[36mventanaParquet[39m: [32mexpressions[39m.[32mWindowSpec[39m = org.apache.spark.sql.expressions.WindowSpec@52c3df61
[36mdatosBalanceAnualesParquet[39m: [32mDataFrame[39m = [A침o: string, Familia: string ... 8 more fields]
[36mendParquet[39m: [32mLong[39m = [32m704687162083400L[39m

In [14]:
//csv
val startCsv = System.nanoTime()

val ventanaCsv = Window.partitionBy("A침o")
val datosBalanceAnualesCsv = dfFromCsv
    .withColumn("A침o", date_format($"Fecha", "yyyy"))
    .groupBy($"A침o", $"Familia", $"Tipo", $"Compuesto", $"BajasEmisiones")
    .agg(
        sum("Valor").as("ValorAnual"),
        avg("Porcentaje").as("PorcentajeAnualSobreFamilia")
    )
    .withColumn(
        "TotalGenerado", 
        round(
            sum(
                when(!$"Compuesto" && $"Familia" =!= "Demanda" ,$"ValorAnual")
                    .otherwise(0)).over(ventanaCsv),2)
    )
    .withColumn(
        "PorcentajeSobreTotal", 
        round($"ValorAnual" / $"TotalGenerado" * 100,2))
    .withColumn(
        "PorcentajeBajasEmisiones", 
        round(
            sum(
                when($"BajasEmisiones" && !$"Compuesto" && $"Familia" =!= "Demanda", $"ValorAnual")
                    .otherwise(0)).over(ventanaCsv) / $"TotalGenerado" * 100,2)
    )

// Mostrar los primeros resultados y explicar el plan de ejecuci칩n
datosBalanceAnualesCsv.show(5)
//datosBalanceAnualesCsv.explain(true)

val endCsv = System.nanoTime()
println(s"Tiempo ejecuci칩n CSV: ${(endCsv - startCsv) / 1e9} segundos")

+----+------------+--------------------+---------+--------------+-------------------+---------------------------+--------------+--------------------+------------------------+
| A침o|     Familia|                Tipo|Compuesto|BajasEmisiones|         ValorAnual|PorcentajeAnualSobreFamilia| TotalGenerado|PorcentajeSobreTotal|PorcentajeBajasEmisiones|
+----+------------+--------------------+---------+--------------+-------------------+---------------------------+--------------+--------------------+------------------------+
|2016|No-Renovable|    Turbina de vapor|    false|         false| 2536143.0030000005|         0.0167077641039327|2.5700810617E8|                0.99|                   61.13|
|2016|No-Renovable|      Turbina de gas|    false|         false|  616037.3019999998|       0.003893066689377...|2.5700810617E8|                0.24|                   61.13|
|2016|   Renovable|       Solar t칠rmica|    false|          true|  5071201.701999999|       0.054412720483706205|2.5700810617

[36mstartCsv[39m: [32mLong[39m = [32m704682062526900L[39m
[36mventanaCsv[39m: [32mexpressions[39m.[32mWindowSpec[39m = org.apache.spark.sql.expressions.WindowSpec@503e0a9d
[36mdatosBalanceAnualesCsv[39m: [32mDataFrame[39m = [A침o: string, Familia: string ... 8 more fields]
[36mendCsv[39m: [32mLong[39m = [32m704682875117900L[39m

In [19]:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

// 游늷 Leer datos desde CSV con inferSchema activado
val pathCsv = "../../data/csv/dsBalanceNacional11-25.csv"

val startCsv = System.nanoTime()

val dfFromCsv = spark.read
    .option("header", "true")
    .option("inferSchema", "true") // Spark inferir치 autom치ticamente los tipos de datos
    .csv(pathCsv)

val ventanaCsv = Window.partitionBy("A침o")

val datosBalanceAnualesCsv = dfFromCsv
    .withColumn("A침o", date_format($"Fecha", "yyyy"))
    .groupBy($"A침o", $"Familia", $"Tipo", $"Compuesto", $"BajasEmisiones")
    .agg(
        sum("Valor").as("ValorAnual"),
        avg("Porcentaje").as("PorcentajeAnualSobreFamilia")
    )
    .withColumn(
        "TotalGenerado", 
        round(
            sum(
                when(!$"Compuesto" && $"Familia" =!= "Demanda", $"ValorAnual")
                    .otherwise(0)).over(ventanaCsv),2)
    )
    .withColumn(
        "PorcentajeSobreTotal", 
        round($"ValorAnual" / $"TotalGenerado" * 100,2))
    .withColumn(
        "PorcentajeBajasEmisiones", 
        round(
            sum(
                when($"BajasEmisiones" && !$"Compuesto" && $"Familia" =!= "Demanda", $"ValorAnual")
                    .otherwise(0)).over(ventanaCsv) / $"TotalGenerado" * 100,2)
    )

// Mostrar resultados y calcular tiempo de ejecuci칩n
datosBalanceAnualesCsv.show(5)

val endCsv = System.nanoTime()
println(s"Tiempo ejecuci칩n CSV: ${(endCsv - startCsv) / 1e9} segundos")


+----+------------+--------------------+---------+--------------+-------------------+---------------------------+--------------+--------------------+------------------------+
| A침o|     Familia|                Tipo|Compuesto|BajasEmisiones|         ValorAnual|PorcentajeAnualSobreFamilia| TotalGenerado|PorcentajeSobreTotal|PorcentajeBajasEmisiones|
+----+------------+--------------------+---------+--------------+-------------------+---------------------------+--------------+--------------------+------------------------+
|2016|No-Renovable|    Turbina de vapor|    false|         false| 2536143.0030000005|         0.0167077641039327|2.5700810617E8|                0.99|                   61.13|
|2016|No-Renovable|      Turbina de gas|    false|         false|  616037.3019999998|       0.003893066689377...|2.5700810617E8|                0.24|                   61.13|
|2016|   Renovable|       Solar t칠rmica|    false|          true|  5071201.701999999|       0.054412720483706205|2.5700810617

[32mimport [39m[36morg.apache.spark.sql.expressions.Window[39m
[32mimport [39m[36morg.apache.spark.sql.functions._[39m
[36mpathCsv[39m: [32mString[39m = [32m"../../data/csv/dsBalanceNacional11-25.csv"[39m
[36mstartCsv[39m: [32mLong[39m = [32m705252237406700L[39m
[36mdfFromCsv[39m: [32mDataFrame[39m = [Familia: string, Tipo: string ... 5 more fields]
[36mventanaCsv[39m: [32mexpressions[39m.[32mWindowSpec[39m = org.apache.spark.sql.expressions.WindowSpec@6959bb9e
[36mdatosBalanceAnualesCsv[39m: [32mDataFrame[39m = [A침o: string, Familia: string ... 8 more fields]
[36mendCsv[39m: [32mLong[39m = [32m705253387328300L[39m

In [20]:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

// 游늷 Leer datos desde Parquet
val pathParquet = "../../data/parquet/dsBalanceNacional11-25.parquet"

val startParquet = System.nanoTime()

val dfFromParquet = spark.read
    .parquet(pathParquet)

val ventanaParquet = Window.partitionBy("A침o")

val datosBalanceAnualesParquet = dfFromParquet
    .withColumn("A침o", date_format($"Fecha", "yyyy"))
    .groupBy($"A침o", $"Familia", $"Tipo", $"Compuesto", $"BajasEmisiones")
    .agg(
        sum("Valor").as("ValorAnual"),
        avg("Porcentaje").as("PorcentajeAnualSobreFamilia")
    )
    .withColumn(
        "TotalGenerado", 
        round(
            sum(
                when(!$"Compuesto" && $"Familia" =!= "Demanda", $"ValorAnual")
                    .otherwise(0)).over(ventanaParquet),2)
    )
    .withColumn(
        "PorcentajeSobreTotal", 
        round($"ValorAnual" / $"TotalGenerado" * 100,2))
    .withColumn(
        "PorcentajeBajasEmisiones", 
        round(
            sum(
                when($"BajasEmisiones" && !$"Compuesto" && $"Familia" =!= "Demanda", $"ValorAnual")
                    .otherwise(0)).over(ventanaParquet) / $"TotalGenerado" * 100,2)
    )

// Mostrar resultados y calcular tiempo de ejecuci칩n
datosBalanceAnualesParquet.show(5)

val endParquet = System.nanoTime()
println(s"Tiempo ejecuci칩n Parquet: ${(endParquet - startParquet) / 1e9} segundos")


+----+------------+--------------------+---------+--------------+-------------------+---------------------------+--------------+--------------------+------------------------+
| A침o|     Familia|                Tipo|Compuesto|BajasEmisiones|         ValorAnual|PorcentajeAnualSobreFamilia| TotalGenerado|PorcentajeSobreTotal|PorcentajeBajasEmisiones|
+----+------------+--------------------+---------+--------------+-------------------+---------------------------+--------------+--------------------+------------------------+
|2016|No-Renovable|    Turbina de vapor|    false|         false| 2536143.0030000005|         0.0167077641039327|2.5700810617E8|                0.99|                   61.13|
|2016|No-Renovable|      Turbina de gas|    false|         false|  616037.3019999998|       0.003893066689377...|2.5700810617E8|                0.24|                   61.13|
|2016|   Renovable|       Solar t칠rmica|    false|          true|  5071201.701999999|       0.054412720483706205|2.5700810617

[32mimport [39m[36morg.apache.spark.sql.expressions.Window[39m
[32mimport [39m[36morg.apache.spark.sql.functions._[39m
[36mpathParquet[39m: [32mString[39m = [32m"../../data/parquet/dsBalanceNacional11-25.parquet"[39m
[36mstartParquet[39m: [32mLong[39m = [32m705273071622100L[39m
[36mdfFromParquet[39m: [32mDataFrame[39m = [Familia: string, Tipo: string ... 5 more fields]
[36mventanaParquet[39m: [32mexpressions[39m.[32mWindowSpec[39m = org.apache.spark.sql.expressions.WindowSpec@3c562df2
[36mdatosBalanceAnualesParquet[39m: [32mDataFrame[39m = [A침o: string, Familia: string ... 8 more fields]
[36mendParquet[39m: [32mLong[39m = [32m705273878471200L[39m