# Spark Dataframes
Esta es una breve introducción e inicio rápido de la API PySpark DataFrame. Los DataFrames de PySpark se evalúan perezosamente. Se implementan sobre RDDs. Cuando Spark transforma datos, no computa inmediatamente la transformación, sino que planifica cómo computarla más tarde. Cuando acciones como collect() son llamadas explícitamente, el cálculo comienza. 

Las aplicaciones PySpark comienzan con la inicialización de SparkSession que es el punto de entrada de PySpark como se muestra a continuación.

In [None]:
!pip install -q pyspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [None]:
spark

## Creación de DataFrames

In [None]:
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

Creación de un DataFrame con un schema explícito.

In [None]:
df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a int, b float, c string, d date, e timestamp')
df

DataFrame[a: int, b: float, c: string, d: date, e: timestamp]

Creación de un DataFrame de PySpark a partir de un DataFrame de pandas.

In [None]:
pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

Creación de un DataFrame a partir de un RDD formado por una lista de tuplas.

In [None]:
rdd = spark.sparkContext.parallelize([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
])
df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

Todos los DataFrames creados anteriormente tienen el mismo resultado y el mismo schema.

In [None]:
df.show()
df.printSchema()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



## Inspeccionando los datos

Las primeras filas de un DataFrame se pueden mostrar usando `DataFrame.show()`.

In [None]:
df.show(2)

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
+---+---+-------+----------+-------------------+
only showing top 2 rows



Alternativamente, puedes habilitar la configuración `spark.sql.repl.eagerEval.enabled` para la evaluación eager de PySpark DataFrame en notebooks. El número de filas a mostrar se puede controlar mediante la configuración `spark.sql.repl.eagerEval.maxNumRows`.

In [None]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
df

a,b,c,d,e
1,2.0,string1,2000-01-01,2000-01-01 12:00:00
2,3.0,string2,2000-02-01,2000-01-02 12:00:00
3,4.0,string3,2000-03-01,2000-01-03 12:00:00


Las filas también pueden mostrarse verticalmente. Esto es útil cuando las filas son demasiado largas para mostrarlas horizontalmente.

In [None]:
df.show(1, vertical=True)

-RECORD 0------------------
 a   | 1                   
 b   | 2.0                 
 c   | string1             
 d   | 2000-01-01          
 e   | 2000-01-01 12:00:00 
only showing top 1 row



Puede ver el esquema del DataFrame y los nombres de las columnas de la siguiente manera:

In [None]:
df.columns

['a', 'b', 'c', 'd', 'e']

In [None]:
df.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



Mostrar el resumen del DataFrame:

In [None]:
df.select('a', 'b', 'c').describe().show()

+-------+---+---+-------+
|summary|  a|  b|      c|
+-------+---+---+-------+
|  count|  3|  3|      3|
|   mean|2.0|3.0|   null|
| stddev|1.0|1.0|   null|
|    min|  1|2.0|string1|
|    max|  3|4.0|string3|
+-------+---+---+-------+



`DataFrame.collect()` recoge los datos distribuidos al driver. Esto puede lanzar un error de memoria insuficiente cuando el conjunto de datos es demasiado grande para caber en driver, ya que recoge todos los datos de los executors.

In [None]:
df.collect()

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),
 Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)),
 Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]

Para evitar lanzar una excepción por falta de memoria, podemos utilizar `DataFrame.take()` o `DataFrame.tail()`.

In [None]:
df.take(1)

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0))]

PySpark DataFrame también proporciona la conversión a [pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) para aprovechar la API de pandas.

In [None]:
df.toPandas()

Unnamed: 0,a,b,c,d,e
0,1,2.0,string1,2000-01-01,2000-01-01 12:00:00
1,2,3.0,string2,2000-02-01,2000-01-02 12:00:00
2,3,4.0,string3,2000-03-01,2000-01-03 12:00:00


## Accediendo a los datos

PySpark DataFrame se evalúa perezosamente y la simple selección de una columna no desencadena el cálculo, sino que devuelve una instancia `Column`.

In [None]:
df.a

Column<'a'>

De hecho, la mayoría de las operaciones con columnas devuelven `Column`s.

In [None]:
from pyspark.sql import Column
from pyspark.sql.functions import upper

type(df.c) == type(upper(df.c)) == type(df.c.isNull())

True

Estas `Column`s se pueden utilizar para seleccionar las columnas de un DataFrame. Por ejemplo, `DataFrame.select()` toma las instancias `Column` que devuelve otro DataFrame.

In [None]:
df.select(df.c).show()

+-------+
|      c|
+-------+
|string1|
|string2|
|string3|
+-------+



Asignando una nueva instancia de `Column`.

In [None]:
df.withColumn('upper_c', upper(df.c)).show()

+---+---+-------+----------+-------------------+-------+
|  a|  b|      c|         d|                  e|upper_c|
+---+---+-------+----------+-------------------+-------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|
+---+---+-------+----------+-------------------+-------+



In [None]:
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



Para seleccionar un subconjunto de filas, utilizamos `DataFrame.filter()`.

In [None]:
df.filter(df.a == 1).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+



## Aplicando funciones

PySpark soporta varios UDFs y APIs para permitir a los usuarios ejecutar funciones nativas de Python. 

In [None]:
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
    # Simply plus one by using pandas Series.
    return series + 1

df.select(pandas_plus_one(df.a)).show()

+------------------+
|pandas_plus_one(a)|
+------------------+
|                 2|
|                 3|
|                 4|
+------------------+



In [None]:
#df.createTempView('pandas')
spark.udf.register("pandas_plus_one", pandas_plus_one)

<function __main__.pandas_plus_one(series: pandas.core.series.Series) -> pandas.core.series.Series>

In [None]:
df.createTempView('test')

In [None]:
spark.sql('select * from test').show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



In [None]:
spark.sql('select pandas_plus_one(a), b from test').show()

+------------------+---+
|pandas_plus_one(a)|  b|
+------------------+---+
|                 2|2.0|
|                 3|3.0|
|                 4|4.0|
+------------------+---+



In [None]:
# TODO: Crea una UDF que devuelva un booleano indicando si el campo es mayor que 2
# La salida debe ser la que se muestra a continuación:

+------------------------+
|pandas_greater_than_2(a)|
+------------------------+
|                   false|
|                   false|
|                    true|
+------------------------+



Otro ejemplo es `DataFrame.mapInPandas` que permite a los usuarios utilizar directamente las APIs en un [pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) sin ninguna restricción como la longitud del resultado.

In [None]:
def pandas_filter_func(iterator):
    for pandas_df in iterator:
        yield pandas_df[pandas_df.a == 1]

df.mapInPandas(pandas_filter_func, schema=df.schema).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+



## Agrupación de datos

In [None]:
df = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
df.show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



Agrupar y luego aplicar la función `avg()` a los grupos resultantes.

In [None]:
df.groupby('color').avg().show()

+-----+-------+-------+
|color|avg(v1)|avg(v2)|
+-----+-------+-------+
|  red|    4.8|   48.0|
| blue|    3.0|   30.0|
|black|    6.0|   60.0|
+-----+-------+-------+



In [None]:
# TODO: Agrupa por fruta y calcula el máximo valor de v1 y v2
# La salida debe ser la que se muestra a continuación

+------+-------+-------+
| fruit|max(v1)|max(v2)|
+------+-------+-------+
| grape|      8|     80|
|banana|      7|     70|
|carrot|      6|     60|
+------+-------+-------+



También puede aplicar una función nativa de Python a cada grupo utilizando la API de pandas.

In [None]:
def plus_mean(pandas_df):
    return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())

df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana| -3| 10|
|  red|carrot| -1| 30|
|  red|carrot|  0| 50|
|  red|banana|  2| 70|
|  red| grape|  3| 80|
|black|carrot|  0| 60|
| blue|banana| -1| 20|
| blue| grape|  1| 40|
+-----+------+---+---+



Coagrupación y aplicación de una función.

In [None]:
df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ('time', 'id', 'v1'))

df2 = spark.createDataFrame(
    [(20000101, 1, 'x'), (20000101, 2, 'y')],
    ('time', 'id', 'v2'))

def asof_join(l, r):
    return pd.merge_asof(l, r, on='time', by='id')

df1.groupby('id').cogroup(df2.groupby('id')).applyInPandas(
    asof_join, schema='time int, id int, v1 double, v2 string').show()

+--------+---+---+---+
|    time| id| v1| v2|
+--------+---+---+---+
|20000101|  1|1.0|  x|
|20000102|  1|3.0|  x|
|20000101|  2|2.0|  y|
|20000102|  2|4.0|  y|
+--------+---+---+---+



## Leyendo y escribiendo datos

CSV es simple y fácil de usar. Parquet y ORC son formatos de archivo eficientes y compactos para leer y escribir más rápido.



### CSV

In [None]:
df.write.csv('foo.csv', header=True)
spark.read.csv('foo.csv', header=True).show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
+-----+------+---+---+



### Parquet

In [None]:
df.write.parquet('bar.parquet')
spark.read.parquet('bar.parquet').show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
+-----+------+---+---+



### ORC

In [None]:
df.write.orc('zoo.orc')
spark.read.orc('zoo.orc').show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
+-----+------+---+---+



## SQL

DataFrame y Spark SQL comparten el mismo motor de ejecución, por lo que pueden utilizarse indistintamente sin problemas. Por ejemplo, se puede registrar el DataFrame como una tabla y ejecutar una consulta SQL fácilmente como se indica a continuación:

In [None]:
df.createOrReplaceTempView("tableA")
spark.sql("SELECT count(*) from tableA").show()

+--------+
|count(1)|
+--------+
|       8|
+--------+



Además, las UDF pueden registrarse e invocarse en SQL de forma inmediata:

In [None]:
@pandas_udf("integer")
def add_one(s: pd.Series) -> pd.Series:
    return s + 1

spark.udf.register("add_one", add_one)
spark.sql("SELECT add_one(v1) FROM tableA").show()

+-----------+
|add_one(v1)|
+-----------+
|          2|
|          3|
|          4|
|          5|
|          6|
|          7|
|          8|
|          9|
+-----------+



Estas expresiones SQL pueden mezclarse directamente y utilizarse como columnas de PySpark.

In [None]:
from pyspark.sql.functions import expr

df.selectExpr('add_one(v1)').show()
df.select(expr('count(*)') > 0).show()

+-----------+
|add_one(v1)|
+-----------+
|          2|
|          3|
|          4|
|          5|
|          6|
|          7|
|          8|
|          9|
+-----------+

+--------------+
|(count(1) > 0)|
+--------------+
|          true|
+--------------+

