# Aggregations
Su objetivo es resumir y agrupar información númerica de grandes conjuntos de información, para ello es necesario definir: una llave o grouping y una función de Agregación. Existen muchas funciones de agregación en Apache Spark, que son de vital importancia cuando se requiere hacer analisis de los datos. Entre los grupos tenemos Simples, Grouping, Window, GroupingSet, Rollup y Cube. En este notebook vamos a ir describiendo cada uno de los grupos con ejemplos que podran ejecutar.

## Grouping
En muchas ocaciones necesitamos agrupar la información para efectuar algunas operaciones dentro del grupo

Veamos algunos ejemplos:
Vamos a tomar el mismo conjunto de datos https://www.kaggle.com/sidtwr/videogames-sales-dataset

In [None]:
//Import para Jupyter-notebooks 
import $ivy.`org.apache.spark::spark-sql:2.4.0`
import $ivy.`sh.almond::almond-spark:0.6.0`
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)

import org.apache.spark.sql._

val spark = {
  NotebookSparkSession.builder()
    .master("local[*]")
    .getOrCreate()
}

val df = spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("resources/vgsales.csv")
df.createOrReplaceTempView("vgsales")

In [None]:
import spark.implicits._
import org.apache.spark.sql.functions._
df.groupBy("Platform", "Year").count().sort(desc("count")).show()

In [None]:
spark.sql("select Platform, Year, count(1) as count from vgsales group by Platform, Year order by count Desc").show()

Tambien podemos utilizar las funciones vistas anteriormente.
`Calcular la sumatoria de ventas totales agrupados por genero`

In [None]:
df.groupBy("Genre").agg(
  sum("Global_sales").alias("globalSales")).show()

Tambien podemos utilizar `expr` para obtener el mismo resultado

In [None]:
df.groupBy("Genre").agg(
  expr("sum(Global_sales)")).show()

Spark tambien ofrece trabajar agregaciones dentro de mapas, para que la especificación de las mimas sea más facil

In [None]:
df.groupBy("Genre").agg("Global_sales"->"avg", "Global_sales"->"sum").show()

## Window functions
Estas funciones son similiares a las funciones anteriores de Group-by sin embargo, en un group-by todas las filas  van en un sólo grupo mientras que en las Window functions se obtiene un valor por cada fila de una tabla basado en un grupo de filas llamdos *frames*.
<img src="resources/windows.png"/>

In [None]:
df.select('Name,'Platform,'Year,'Publisher,'Global_Sales).where($"Platform" === "PC" && $"Year" <= 1994).orderBy($"Year").show()


In [None]:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val windowSpec = Window
  .partitionBy("Platform") // Cómo dividir el grupo
  .orderBy(col("Year")) // Orden dentro de la partición
  .rowsBetween(Window.unboundedPreceding, Window.currentRow) //Especificación del frame

df.where("Year IS NOT NULL")
  .select(
    col("Platform"),
    col("Year"),
    col("Global_Sales"),
    sum("Global_Sales").over(windowSpec).alias("Global_Sales ACC")
  ).distinct().show()

Este mismo resultado se puede obtener si corremos una sentencia SQL

In [None]:
spark.sql("""
SELECT 
Platform,
Year,
Global_Sales, 
sum(Global_Sales) over ( partition by Platform order by Year ROWS BETWEEN
                        UNBOUNDED PRECEDING AND
                        CURRENT ROW ) as GlobalSalesAcc
FROM vgsales
""").show()

## Grouping sets
En las anteriores funciones se realizaban agregaciones sobre un cojunto de columnas con sus respectivos valores, Cuando se quiere algo más complejo cómo una agregación sobre multiples grupos es posible utilizar grouping sets. **Esta función sólo esta disponible en SQL, para utilizarla con Dataframes se podra utilizar `rollup` y `cube`**

En el siguiente caso queremos tener agrupaciones del Año con la plataforma.

In [None]:
spark.sql("""
SELECT Year, Platform, count(Platform) FROM vgsales
GROUP BY Year, Platform GROUPING SETS((Year, Platform),())
ORDER BY Year DESC, Platform 
""").show(1000)

## Rollup 
Es una agregación multidimensional que ejecuta una variedad de group-by dentro del conjunto de datos, entre los resultados se puede ver las columnas que hacen parte del rollup en `null` para especificar el agrupamiento realizado.

In [None]:
 df.where("Year > 2012").groupBy("Year","Platform").count().show()

In [None]:
import org.apache.spark.sql.functions._
val rolledUpDF = df.where("Year > 2012").rollup("Year", "Platform").agg(count("Platform"))
  .selectExpr("Year", "Platform", "`count(Platform)` as total_quantity")
  .sort(desc("Year"), col("Platform"))
rolledUpDF.show(1000)

In [None]:
rolledUpDF.where("Year is NULL").show()

In [None]:
rolledUpDF.where("Platform is NULL").show()

## Cube


In [None]:
df.where("Year > 2012").cube("Year", "Platform").agg(count(col("Platform")))
  .select("Year", "Platform", "count(Platform)").orderBy("Year").show(10000)