# Parte 3: Operaciones Básicas con DataFrames

In [104]:
!pip install -q pyspark

In [105]:
# Crear una sesión de Spark (si se corre usando spark-submit o con Google Colab)
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

Los datos que se va a utilizar provienen de la plataforma de [datos abiertos](https://www.datosabiertos.gob.pe/dataset/promedio-mensual-de-remuneraciones-brutas-soles-s-de-trabajadores-en-el-sector-privado-por-0) de Perú y representan el promedio mensual de remuneraciones brutas (en soles) de trabajadores en el sector privado por situación educativa, según distritos, para el año 2021.

In [23]:
# Lectura de los datos:
df_full = spark.read.csv('remuneracion_bruta_2021.csv', inferSchema=True, header=True)

# Mostrar el esquema inferido
# df_full.printSchema()

In [24]:
# Mostrar las 5 primeras filas
df_full.show(5)

+----------------+-----------+--------------------+-----------------------------+---------------------------+-----------------------------+---------------------------+-------------------------------+-----------------------------+----------------------------+--------------------------+-------------------------------------------------------+-----------------------------------------------------+----------------------------------+--------------------------------+------------------+--------+-------------------------------+-----------------------------+-----------------+--------------------------------+------------------------------+---------------+--------------+
|Código de Ubigeo|  DISTRITOS|SIN EDUCACIÓN FORMAL|EDUCACIÓN ESPECIAL INCOMPLETA|EDUCACIÓN ESPECIAL COMPLETA|EDUCACIÓN PRIMARIA INCOMPLETA|EDUCACIÓN PRIMARIA COMPLETA|EDUCACIÓN SECUNDARIA INCOMPLETA|EDUCACIÓN SECUNDARIA COMPLETA|EDUCACIÓN TÉCNICA INCOMPLETA|EDUCACIÓN TÉCNICA COMPLETA|EDUCACIÓN SUPERIOR (INSTITUTO SUPERIOR, ETC) INC

## 1. Preprocesamiento

### 1.1. Selección de Columnas

Dado que el conjunto de datos posee varias columnas, por facilidad se trabajará solo con algunas de ellas. Se seleccionará solo algunas columnas representativas.

In [55]:
df = df_full.select(['DISTRITOS', "EDUCACIÓN SECUNDARIA COMPLETA", 'GRADO DE BACHILLER', "TITULADO", "GRADO DE MAESTRÍA", ])
df.show(5)

+-----------+-----------------------------+------------------+--------+-----------------+
|  DISTRITOS|EDUCACIÓN SECUNDARIA COMPLETA|GRADO DE BACHILLER|TITULADO|GRADO DE MAESTRÍA|
+-----------+-----------------------------+------------------+--------+-----------------+
|CHACHAPOYAS|                       4 044 |            6 142 |  8 259 |           3 766 |
|     BALSAS|                           - |            6 538 |  5 643 |               - |
|    GRANADA|                           - |                - |  4 294 |               - |
|    HUANCAS|                         899 |                - |      - |               - |
|   LA JALCA|                       1 023 |                - |  2 018 |               - |
+-----------+-----------------------------+------------------+--------+-----------------+
only showing top 5 rows



### 1.2. Conversión de tipo de datos

Si uno revisa el esquema de los datos, las columnas numéricas han sido inferidas como `strings`. Para convertir estas columnas a entero se utilizará `withColumn`, para crear una nueva columna, y `cast("int")` para que dicha nueva columna tenga el tipo de datos deseado (entero).

In [46]:
# Nota: esto no dará error pero ignorará los miles (donde hay espacio)

# from pyspark.sql.functions import col
#
# df = df.withColumn("secundaria", col("EDUCACIÓN SECUNDARIA COMPLETA").cast("int"))
# df.show(5)

Debido a que en este conjunto de datos se tiene los miles separados por espacio, para poder convertir adecuadamente primero se debe eliminar dicho espacio. Para esto se puede utilizar la función `regex_replace` y luego aplicar el `cast`.

In [56]:
from pyspark.sql.functions import regexp_replace, initcap

df.withColumn("secundaria", regexp_replace("EDUCACIÓN SECUNDARIA COMPLETA", " ", "").cast("int")).show(5)

+-----------+-----------------------------+------------------+--------+-----------------+----------+
|  DISTRITOS|EDUCACIÓN SECUNDARIA COMPLETA|GRADO DE BACHILLER|TITULADO|GRADO DE MAESTRÍA|secundaria|
+-----------+-----------------------------+------------------+--------+-----------------+----------+
|CHACHAPOYAS|                       4 044 |            6 142 |  8 259 |           3 766 |      4044|
|     BALSAS|                           - |            6 538 |  5 643 |               - |      NULL|
|    GRANADA|                           - |                - |  4 294 |               - |      NULL|
|    HUANCAS|                         899 |                - |      - |               - |       899|
|   LA JALCA|                       1 023 |                - |  2 018 |               - |      1023|
+-----------+-----------------------------+------------------+--------+-----------------+----------+
only showing top 5 rows



Se continuará la aplicación para todas las columnas.

In [57]:
# Crear columnas numéricas (notar que "titulado" se sobrescribe)
df = df.withColumn("secundaria", regexp_replace("EDUCACIÓN SECUNDARIA COMPLETA", " ", "").cast("int"))
df = df.withColumn("bachiller", regexp_replace("GRADO DE BACHILLER", " ", "").cast("int"))
df = df.withColumn("titulado", regexp_replace("TITULADO", " ", "").cast("int"))
df = df.withColumn("maestria", regexp_replace("GRADO DE MAESTRÍA", " ", "").cast("int"))

# Cambiar las mayúsculas de los distritos a solo mayúscula la primera letra
df = df.withColumn("distritos", initcap(col("DISTRITOS")))

df.show(5)

+-----------+-----------------------------+------------------+--------+-----------------+----------+---------+--------+
|  distritos|EDUCACIÓN SECUNDARIA COMPLETA|GRADO DE BACHILLER|titulado|GRADO DE MAESTRÍA|secundaria|bachiller|maestria|
+-----------+-----------------------------+------------------+--------+-----------------+----------+---------+--------+
|Chachapoyas|                       4 044 |            6 142 |    8259|           3 766 |      4044|     6142|    3766|
|     Balsas|                           - |            6 538 |    5643|               - |      NULL|     6538|    NULL|
|    Granada|                           - |                - |    4294|               - |      NULL|     NULL|    NULL|
|    Huancas|                         899 |                - |    NULL|               - |       899|     NULL|    NULL|
|   La Jalca|                       1 023 |                - |    2018|               - |      1023|     NULL|    NULL|
+-----------+---------------------------

Se seleccionará solo las columnas convertidas a entero.

In [58]:
df = df.select(["distritos", "secundaria", "bachiller", "titulado", "maestria"])
df.show(5)

+-----------+----------+---------+--------+--------+
|  distritos|secundaria|bachiller|titulado|maestria|
+-----------+----------+---------+--------+--------+
|Chachapoyas|      4044|     6142|    8259|    3766|
|     Balsas|      NULL|     6538|    5643|    NULL|
|    Granada|      NULL|     NULL|    4294|    NULL|
|    Huancas|       899|     NULL|    NULL|    NULL|
|   La Jalca|      1023|     NULL|    2018|    NULL|
+-----------+----------+---------+--------+--------+
only showing top 5 rows



## 2. Ordenamiento de resultados: orderBy

Se utiliza `orderBy` con el nombre de una columna. Por defecto realiza un ordenamiento ascendente.

In [72]:
df.orderBy("titulado").show(5)

+-----------+----------+---------+--------+--------+
|  distritos|secundaria|bachiller|titulado|maestria|
+-----------+----------+---------+--------+--------+
|    Levanto|       930|     NULL|    NULL|    NULL|
|   Aramango|       930|     NULL|    NULL|    NULL|
|  Magdalena|      1773|     NULL|    NULL|    NULL|
| Leimebamba|      3128|     2500|    NULL|    NULL|
|Molinopampa|       930|     NULL|    NULL|    NULL|
+-----------+----------+---------+--------+--------+
only showing top 5 rows



En el caso anterior no se muestra adecuadamente debido a los `NULL` existentes en la columna deseada. Para evitar mostrar estos valores nulos, se puede agregar las siguientes opciones:
* `asc_nulls_last`: para order ascendente (mostrando nulos al final)
* `desc_nulls_last`: para orden descendente (mostrando nulos al final)

Si no hubiesen nulos se podría usar directamente las opciones `asc` y `desc`.

In [74]:
df.orderBy(df["titulado"].asc_nulls_last()).show(5)

+----------+----------+---------+--------+--------+
| distritos|secundaria|bachiller|titulado|maestria|
+----------+----------+---------+--------+--------+
|  Asuncion|      NULL|     NULL|     465|    NULL|
|Rahuapampa|       930|     NULL|     500|    NULL|
|   Amotape|      1874|     NULL|     500|    NULL|
|     Omate|      2305|      930|     500|    NULL|
|Curpahuasi|      1214|     NULL|     558|    NULL|
+----------+----------+---------+--------+--------+
only showing top 5 rows



In [75]:
df.orderBy(df["titulado"].desc_nulls_last()).show(5)

+--------------+----------+---------+--------+--------+
|     distritos|secundaria|bachiller|titulado|maestria|
+--------------+----------+---------+--------+--------+
|     Chinchero|      6166|     7415|  188098|     930|
|        Jangas|      6925|    28962|   87947|    NULL|
|        Chavin|      7068|    32341|   67475|   21044|
|Challhuahuacho|     11950|    26262|   52099|   46629|
|      Uchumayo|     18103|    24364|   46115|   84057|
+--------------+----------+---------+--------+--------+
only showing top 5 rows



## 3. Filtraje de datos

### 3.1. Filtraje con una sola condición (una sola columna)

In [76]:
df.filter("bachiller > 10000").show(5)

+----------+----------+---------+--------+--------+
| distritos|secundaria|bachiller|titulado|maestria|
+----------+----------+---------+--------+--------+
|    Jangas|      6925|    28962|   87947|    NULL|
| Huallanca|      5996|    12147|   14388|   14173|
|San Marcos|     13208|    33984|   33023|  241162|
|   Huarmey|      4806|    14672|   25236|   24242|
|  Culebras|      5818|    12943|    3849|    NULL|
+----------+----------+---------+--------+--------+
only showing top 5 rows



In [77]:
df.filter(df["bachiller"] > 10000).show(5)

+----------+----------+---------+--------+--------+
| distritos|secundaria|bachiller|titulado|maestria|
+----------+----------+---------+--------+--------+
|    Jangas|      6925|    28962|   87947|    NULL|
| Huallanca|      5996|    12147|   14388|   14173|
|San Marcos|     13208|    33984|   33023|  241162|
|   Huarmey|      4806|    14672|   25236|   24242|
|  Culebras|      5818|    12943|    3849|    NULL|
+----------+----------+---------+--------+--------+
only showing top 5 rows



In [93]:
df.filter(df["distritos"] == "San Isidro").show()

+----------+----------+---------+--------+--------+
| distritos|secundaria|bachiller|titulado|maestria|
+----------+----------+---------+--------+--------+
|San Isidro|      7120|    19915|   29475|   62556|
+----------+----------+---------+--------+--------+



In [96]:
df.filter(df["distritos"].isin(["San Isidro", "Miraflores", "Barranco"]) ).show()

+----------+----------+---------+--------+--------+
| distritos|secundaria|bachiller|titulado|maestria|
+----------+----------+---------+--------+--------+
|Miraflores|      4293|     6544|    9107|   10818|
|  Barranco|      4522|    11790|   16065|   20006|
|Miraflores|      5667|    16788|   23212|   47092|
|San Isidro|      7120|    19915|   29475|   62556|
+----------+----------+---------+--------+--------+



In [99]:
df.filter(df["distritos"].startswith("S")).show(4, truncate=False)

+----------------------+----------+---------+--------+--------+
|distritos             |secundaria|bachiller|titulado|maestria|
+----------------------+----------+---------+--------+--------+
|Soloco                |NULL      |NULL     |NULL    |NULL    |
|Shipasbamba           |NULL      |NULL     |NULL    |NULL    |
|San Francisco Del Yeso|NULL      |NULL     |NULL    |NULL    |
|Santo Tomas           |1119      |NULL     |NULL    |NULL    |
+----------------------+----------+---------+--------+--------+
only showing top 4 rows



In [103]:
df.filter(df["distritos"].like("San %")).show(5, truncate=False)

+-----------------------+----------+---------+--------+--------+
|distritos              |secundaria|bachiller|titulado|maestria|
+-----------------------+----------+---------+--------+--------+
|San Francisco Del Yeso |NULL      |NULL     |NULL    |NULL    |
|San Nicolas            |4576      |5423     |8846    |NULL    |
|San Miguel De Corpanqui|1000      |NULL     |NULL    |NULL    |
|San Luis               |5151      |1100     |NULL    |NULL    |
|San Nicolas            |4002      |NULL     |8862    |NULL    |
+-----------------------+----------+---------+--------+--------+
only showing top 5 rows



In [78]:
# Ordenar el resultado
df.filter(df["bachiller"] > 10000)\
  .orderBy("bachiller").show(5)

+-----------------+----------+---------+--------+--------+
|        distritos|secundaria|bachiller|titulado|maestria|
+-----------------+----------+---------+--------+--------+
|Coronel Castañeda|      8641|    10007|   19987|    8239|
|Villa El Salvador|      4708|    10034|   14761|   11713|
| Ignacio Escudero|      3675|    10061|   16637|    NULL|
|     San Hilarion|      3531|    10125|    4085|    NULL|
|      San Antonio|      4731|    10163|   13758|   24582|
+-----------------+----------+---------+--------+--------+
only showing top 5 rows



In [64]:
# Mostrar solo algunas columnas
df.filter(df["bachiller"] > 10000) \
  .select(["distritos", "bachiller"]).show(5)

+----------+---------+
| distritos|bachiller|
+----------+---------+
|    Jangas|    28962|
| Huallanca|    12147|
|San Marcos|    33984|
|   Huarmey|    14672|
|  Culebras|    12943|
+----------+---------+
only showing top 5 rows



### 3.2. Filtraje con varias condiciones

Para  varias condiciones se puede utilizar los operadores lógicos `&`, `|`.

In [84]:
# Dos condiciones usando Y lógico

df.filter((df["secundaria"]>10000) &
          (df["bachiller"]>10000)).show(5)

+--------------+----------+---------+--------+--------+
|     distritos|secundaria|bachiller|titulado|maestria|
+--------------+----------+---------+--------+--------+
|    San Marcos|     13208|    33984|   33023|  241162|
|Challhuahuacho|     11950|    26262|   52099|   46629|
|      Uchumayo|     18103|    24364|   46115|   84057|
|         Tapay|     10241|    26716|   36281|    NULL|
|      Encañada|     10313|    31141|   25351|   51210|
+--------------+----------+---------+--------+--------+
only showing top 5 rows



In [87]:
# 2 condiciones usando OR lógico y NOT lógico

df.filter((df["secundaria"]>10000) |
          ~(df["maestria"]<10000)).show(5)

+----------+----------+---------+--------+--------+
| distritos|secundaria|bachiller|titulado|maestria|
+----------+----------+---------+--------+--------+
|    Huaraz|      4251|     7137|    7101|   10019|
| Huallanca|      5996|    12147|   14388|   14173|
|San Marcos|     13208|    33984|   33023|  241162|
|   Huarmey|      4806|    14672|   25236|   24242|
|  Pallasca|      4326|     NULL|   12283|   18093|
+----------+----------+---------+--------+--------+
only showing top 5 rows



## 4. Mezcla de datos

### 4.1. Apilamiento de filas

Si se tiene dos o más DataFrames que poseen igual estructura, estos pueden ser apilados para formar un solo DataFrame usando `unionAll`.

In [106]:
from pyspark.sql import Row
row = Row("nombre", "mascota", "cantidad")

In [110]:
# Ejemplos de dataframes con similar estructura
df1 = spark.createDataFrame([row("Susana", "gato", 6),
                             row("Carlos", "perro", 1),
                             row("Alberto", "pez", 5)
                             ])

df2 = spark.createDataFrame([row("Pedro", "gato", 2),
                             row("Carla", "tortuga", 1),
                             row("Marcos", "hamster", 3)
                             ])

In [111]:
df = df1.unionAll(df2)
df.show()

+-------+-------+--------+
| nombre|mascota|cantidad|
+-------+-------+--------+
| Susana|   gato|       6|
| Carlos|  perro|       1|
|Alberto|    pez|       5|
|  Pedro|   gato|       2|
|  Carla|tortuga|       1|
| Marcos|hamster|       3|
+-------+-------+--------+



### 4.2. Joins

In [113]:
# Ejemplo de dataframes
row1 = Row("nombre", "mascota1", "cuenta1")
df1 = spark.createDataFrame([row1("Susana", "gato", 6),
                             row1("Carlos", "perro", 1),
                             row1("Roberto", "pez", 5),
                             row1("Liliana", "caballo", 1)
                             ])

row2 = Row("nombre", "mascota2", "cuenta2")
df2 = spark.createDataFrame([row2("Susana", "loro", 2),
                             row2("Carlos", "tortuga", 1),
                             row2("Roberto", "hamster", 3),
                             row2("Fernando", "pez", 12)
                             ])

df1.show()
df2.show()

+-------+--------+-------+
| nombre|mascota1|cuenta1|
+-------+--------+-------+
| Susana|    gato|      6|
| Carlos|   perro|      1|
|Roberto|     pez|      5|
|Liliana| caballo|      1|
+-------+--------+-------+

+--------+--------+-------+
|  nombre|mascota2|cuenta2|
+--------+--------+-------+
|  Susana|    loro|      2|
|  Carlos| tortuga|      1|
| Roberto| hamster|      3|
|Fernando|     pez|     12|
+--------+--------+-------+



**Inner Join**

Un "inner join" realiza la mezcla de filas que tienen correspondencia en ambos DataFrames y elimina todas las otras filas. Esta es la forma por defecto de realizar el join en Spark.

En este ejemplo, se realizará el "inner join" usando la columna `nombre`.

In [114]:
df1.join(df2, 'nombre', how='inner').show()

+-------+--------+-------+--------+-------+
| nombre|mascota1|cuenta1|mascota2|cuenta2|
+-------+--------+-------+--------+-------+
| Carlos|   perro|      1| tortuga|      1|
|Roberto|     pez|      5| hamster|      3|
| Susana|    gato|      6|    loro|      2|
+-------+--------+-------+--------+-------+



Si los datasets no tuviesen el mismo nombre de columna, se puede especificar explícitamente las columnas para las cuales se usará un join. En este caso se mantendrá cada columna por separado (en este ejemplo habrá 2 columnas `nombre`, cada una correspondiendo a un dataframe distinto)

In [118]:
df1.join(df2, df1["nombre"]==df2["nombre"], how='inner').show()

+-------+--------+-------+-------+--------+-------+
| nombre|mascota1|cuenta1| nombre|mascota2|cuenta2|
+-------+--------+-------+-------+--------+-------+
| Carlos|   perro|      1| Carlos| tortuga|      1|
|Roberto|     pez|      5|Roberto| hamster|      3|
| Susana|    gato|      6| Susana|    loro|      2|
+-------+--------+-------+-------+--------+-------+



**Outer Join**

 USa todas las filas (registros) de ambos DataFrames, independientemente de si hay correspondencias o no, y completa los valores faltantes con nulos.

In [116]:
df1.join(df2, 'nombre', how='outer').show()

+--------+--------+-------+--------+-------+
|  nombre|mascota1|cuenta1|mascota2|cuenta2|
+--------+--------+-------+--------+-------+
|  Carlos|   perro|      1| tortuga|      1|
|Fernando|    NULL|   NULL|     pez|     12|
| Liliana| caballo|      1|    NULL|   NULL|
| Roberto|     pez|      5| hamster|      3|
|  Susana|    gato|      6|    loro|      2|
+--------+--------+-------+--------+-------+



**Left Join**

Usa todas las claves del DataFrame de la izquierda. Los datos del DataFrame de la derecha solo aparecen si existe alguna coincidencia con los de la izquierda.

En este ejemplo, el DataFrame de la izquierda es el `df1`.

In [117]:
df1.join(df2, 'nombre', how='left').show()

+-------+--------+-------+--------+-------+
| nombre|mascota1|cuenta1|mascota2|cuenta2|
+-------+--------+-------+--------+-------+
| Susana|    gato|      6|    loro|      2|
| Carlos|   perro|      1| tortuga|      1|
|Liliana| caballo|      1|    NULL|   NULL|
|Roberto|     pez|      5| hamster|      3|
+-------+--------+-------+--------+-------+



## 5. Agregación de datos

En esta parte se trabajará con los siguientes datos.

In [151]:
df = spark.read.csv('ventas.csv', inferSchema=True, header=True)
df.show()

+---------+---------+-----+------+
| Compania|  Persona|Meses|Ventas|
+---------+---------+-----+------+
|   Guguel|      Sam|    4|   200|
|   Guguel|   Carlos|   12|   120|
|   Guguel|    Frank|    6|   340|
|Maicrosof|   Teresa|   19|   600|
|Maicrosof|      Amy|   14|   124|
|Maicrosof|  Vanessa|   25|   243|
|  Feisbuk|    Carla|   37|   870|
|  Feisbuk|     Sara|    8|   350|
|     Apol|     Juan|    5|   250|
|     Apol|    Linda|    1|   130|
|     Apol|   Miguel|    9|   750|
|     Apol|Christian|   10|   350|
+---------+---------+-----+------+



### 5.1. Agregación por columnas: agg

Una forma de realizar agregación de datos por columnas es utilizando `agg` con alguna de las dos sintaxis mostradas a continuación:
* Usando un diccionario (no requiere importar funciones adicionales)
* Usando funciones disponibles en `pyspark.sql.functions` (requiere importar las funciones)

In [152]:
# Forma 1: usando un diccionario

df.agg({'Ventas':'sum'}).show()
# df.agg({'Ventas':'max'}).show()

+-----------+
|sum(Ventas)|
+-----------+
|       4327|
+-----------+



In [153]:
# Forma 2: usando funciones específicas
from pyspark.sql.functions import sum

df.agg(sum("Ventas")).show()

+-----------+
|sum(Ventas)|
+-----------+
|       4327|
+-----------+



Suele ser conveniente utilizar `alias` para modificar el nombre de la columna resultante

In [154]:
df.agg(sum("Ventas").alias("Ventas totales")).show()

+--------------+
|Ventas totales|
+--------------+
|          4327|
+--------------+



Otras agregaciones usuales son las siguientes (se puede revisar las funciones disponibles en: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html


In [155]:
from pyspark.sql.functions import count, avg, min, max, stddev, variance

df.agg(count("Ventas").alias("cuenta"),
       avg("Ventas").alias("promedio"),
       min("Ventas").alias("mínimo"),
       max("Ventas").alias("máximo"),
       stddev("Ventas").alias("desviación"),
       variance("Ventas").alias("varianza")
).show()

+------+-----------------+------+------+------------------+------------------+
|cuenta|         promedio|mínimo|máximo|        desviación|          varianza|
+------+-----------------+------+------+------------------+------------------+
|    12|360.5833333333333|   120|   870|250.08742410799007|62543.719696969696|
+------+-----------------+------+------+------------------+------------------+



Se puede redondear el resultado usando la función `round`.

In [146]:
from pyspark.sql.functions import round

df.agg(count("Ventas").alias("cuenta"),
       round(avg("Ventas"),2).alias("promedio"),
       min("Ventas").alias("mínimo"),
       max("Ventas").alias("máximo"),
       round(stddev("Ventas"),2).alias("desviación"),
       round(variance("Ventas"),2).alias("varianza")
).show()

+------+--------+------+------+----------+--------+
|cuenta|promedio|mínimo|máximo|desviación|varianza|
+------+--------+------+------+----------+--------+
|    12|  360.58|   120|   870|    250.09|62543.72|
+------+--------+------+------+----------+--------+



Si se desea recuperar el valor del resultado se puede utilizar `collect`

In [129]:
v = df.agg({'Ventas':'max'}).collect()
print("Resultado: ", v[0][0])

Resultado:  870


### 5.2. Agrupamiento usando GroupBy

Una alternativa para trabajar con datos agrupados es utilizar `groupBy` y algunas funciones usuales que son provistas, como `count`, `mean`, `max`, `min`, `sum`, etc.

In [165]:
# Aplicación a todas las columnas
df.groupBy("Compania") \
  .sum().show()

+---------+----------+-----------+
| Compania|sum(Meses)|sum(Ventas)|
+---------+----------+-----------+
|   Guguel|        22|        660|
|  Feisbuk|        45|       1220|
|Maicrosof|        58|        967|
|     Apol|        25|       1480|
+---------+----------+-----------+



In [150]:
# df.groupBy("Compania").max().show()
# df.groupBy("Compania").min().show()
# df.groupBy("Compania").count().show()
# df.groupBy("Compania").mean().show()

In [166]:
# Aplicación a columnas específicas
df.groupBy("Compania") \
  .sum("Meses").show()

+---------+----------+
| Compania|sum(Meses)|
+---------+----------+
|   Guguel|        22|
|  Feisbuk|        45|
|Maicrosof|        58|
|     Apol|        25|
+---------+----------+



Alternativamente, se puede utilizar `agg` sobre un DataFrame con datos agrupados.

In [162]:
df.groupBy("Compania") \
  .agg({"Ventas":'max'}).show()

+---------+-----------+
| Compania|max(Ventas)|
+---------+-----------+
|   Guguel|        340|
|  Feisbuk|        870|
|Maicrosof|        600|
|     Apol|        750|
+---------+-----------+



In [160]:
df.groupBy("Compania") \
  .agg(max("Ventas")).show()

+---------+-----------+
| Compania|max(Ventas)|
+---------+-----------+
|   Guguel|        340|
|  Feisbuk|        870|
|Maicrosof|        600|
|     Apol|        750|
+---------+-----------+



In [170]:
df.groupBy("Compania") \
  .agg(sum("Ventas").alias("ventas_totales"), \
       round(avg("Meses"),2).alias("meses_promedio")
       ) \
  .show()

+---------+--------------+--------------+
| Compania|ventas_totales|meses_promedio|
+---------+--------------+--------------+
|   Guguel|           660|          7.33|
|  Feisbuk|          1220|          22.5|
|Maicrosof|           967|         19.33|
|     Apol|          1480|          6.25|
+---------+--------------+--------------+



Se puede añadir condiciones usando `where`

In [172]:
df.groupBy("Compania") \
  .agg(sum("Ventas").alias("ventas_totales"), \
       round(avg("Meses"),2).alias("meses_promedio")
       ) \
  .where(col("ventas_totales") >= 1000) \
  .show()

+--------+--------------+--------------+
|Compania|ventas_totales|meses_promedio|
+--------+--------------+--------------+
| Feisbuk|          1220|          22.5|
|    Apol|          1480|          6.25|
+--------+--------------+--------------+

