# SPARK - DATAFRAMES
Un DataFrame es una estructura equivalente a una tabla de base de datos relacional, con un motor bien optimizado para el trabajo en un clúster. Los datos se almacenan en filas y columnas y ofrece un conjunto de operaciones para manipular los datos.

El trabajo con DataFrames es más sencillo y eficiente que el procesamiento con RDD, por eso su uso es predominante en los nuevos desarrollos con Spark.

## CREAR SparkSession Y SparkContext

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, concat_ws
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

spark = (
    SparkSession.builder.master("local[*]").appName("pyspark_dataframe").getOrCreate()
)
# spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

24/01/21 22:10:49 WARN Utils: Your hostname, AINARA-MAC.local resolves to a loopback address: 127.0.0.1; using 192.168.0.109 instead (on interface en0)
24/01/21 22:10:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/21 22:10:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/01/21 22:10:50 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## CREAR DATAFRAMES

### CREAR UN DATAFRAME A PARTIR DE UN RDD

In [2]:
datos = [("Aitor", 182), ("Pedro", 178), ("Marina", 161)]
rdd = sc.parallelize(datos)
dfRDD = rdd.toDF()

                                                                                

Mediante printSchema obtenemos un resumen del esquema del DataFrame, donde para cada columna se indica el nombre, el tipo y si admite valores nulos:

In [3]:
dfRDD.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)



Podemos ver como los nombres de las columnas son _1 y _2. Para asignarle un nombre adecuado podemos pasarle una lista con los nombres a la hora de crear el DataFrame:

In [4]:
columnas = ["nombre","altura"]
dfRDD = rdd.toDF(columnas)
dfRDD.printSchema()

root
 |-- nombre: string (nullable = true)
 |-- altura: long (nullable = true)



También podemos crear un DataFrame directamente desde una SparkSession sin crear un RDD previamente mediante el método createDataFrame:

In [5]:
dfDesdeDatos = spark.createDataFrame(datos).toDF(*columnas)
dfDesdeDatos.printSchema()

root
 |-- nombre: string (nullable = true)
 |-- altura: long (nullable = true)



### CREAR UN DATAFRAME A PARTIR DE UN FICHERO
Lo más usual es cargar los datos desde una archivo externo. Para ello, mediante el API de DataFrameReader cargaremos los datos directamente en un Dataframe mediante diferentes métodos dependiendo del formato.

Para cada formato, existe un método corto que se llama como el formato en sí, y un método general donde mediante format indicamos el formato y que finaliza con el método load siempre dentro de spark.read:

CSV

In [6]:
dfCSV = spark.read.csv("Datos/pdi_sales.csv")
dfCSV = spark.read.csv("Datos/*.csv")  # Una carpeta entera
dfCSV = spark.read.options(sep=";", header=True, inferSchema=True).csv("Datos/pdi_sales.csv")
dfCSV = spark.read.format("csv").load("Datos/pdi_sales.csv")

                                                                                

TXT

In [7]:
dfTXT = spark.read.text("Datos/el_quijote.txt")
dfTXT = spark.read.option("wholetext", True).text(
    "Datos/"
)  # cada fichero se lee entero como un registro

JSON

In [8]:
dfJSON = spark.read.json("Datos/datos.json")
dfJSON = spark.read.format("json").load("Datos/datos.json")

Spark espera que cada documento JSON ocupe una única línea. Si cada documento ocupa más de una línea, se lo indicamos mediante la opción multiline:

In [9]:
df = spark.read.options(multiline=True,inferSchema=True).json("Datos/datos.json")

## MOSTRAR LOS DATOS

Para los siguientes apartados, supongamos que queremos almacenar ciertos datos de clientes, como son su nombre y apellidos, ciudad y sueldo:

In [10]:
clientes = [
    ("Aitor", "Medrano", "Elche", 3000),
    ("Pedro", "Casas", "Elche", 4000),
    ("Laura", "García", "Elche", 5000),
    ("Miguel", "Ruiz", "Torrellano", 6000),
    ("Isabel", "Guillén", "Alicante", 7000),
]
columnas = ["nombre", "apellidos", "ciudad", "sueldo"]
df = spark.createDataFrame(clientes, columnas)

### printSchema
Recureda mediante printSchema obtenemos un resumen del esquema del DataFrame , donde para cada columna se indica el nombre, el tipo y si admite valores nulos:

In [11]:
df.printSchema()

root
 |-- nombre: string (nullable = true)
 |-- apellidos: string (nullable = true)
 |-- ciudad: string (nullable = true)
 |-- sueldo: long (nullable = true)



### show

Para mostrar los datos, podemos utilizar el método show, al cual le podemos indicar o no la cantidad de registros a recuperar, así como si queremos que los datos se trunquen o no, o si los queremos mostrar en vertical:

In [12]:
df.show(2)

+------+---------+------+------+
|nombre|apellidos|ciudad|sueldo|
+------+---------+------+------+
| Aitor|  Medrano| Elche|  3000|
| Pedro|    Casas| Elche|  4000|
+------+---------+------+------+
only showing top 2 rows



In [13]:
df.show(truncate=False)

+------+---------+----------+------+
|nombre|apellidos|ciudad    |sueldo|
+------+---------+----------+------+
|Aitor |Medrano  |Elche     |3000  |
|Pedro |Casas    |Elche     |4000  |
|Laura |García   |Elche     |5000  |
|Miguel|Ruiz     |Torrellano|6000  |
|Isabel|Guillén  |Alicante  |7000  |
+------+---------+----------+------+



In [14]:
df.show(3, vertical=True)

-RECORD 0------------
 nombre    | Aitor   
 apellidos | Medrano 
 ciudad    | Elche   
 sueldo    | 3000    
-RECORD 1------------
 nombre    | Pedro   
 apellidos | Casas   
 ciudad    | Elche   
 sueldo    | 4000    
-RECORD 2------------
 nombre    | Laura   
 apellidos | García  
 ciudad    | Elche   
 sueldo    | 5000    
only showing top 3 rows



In [15]:
df.show(truncate=3)  # Truncar cadenas a 3 caracteres

+------+---------+------+------+
|nombre|apellidos|ciudad|sueldo|
+------+---------+------+------+
|   Ait|      Med|   Elc|   300|
|   Ped|      Cas|   Elc|   400|
|   Lau|      Gar|   Elc|   500|
|   Mig|      Rui|   Tor|   600|
|   Isa|      Gui|   Ali|   700|
+------+---------+------+------+



### first y head

Si sólo queremos recuperar unos pocos datos (los primeros), podemos hacer uso de head o first los cuales devuelven objetos Row:

In [16]:
df.first()

Row(nombre='Aitor', apellidos='Medrano', ciudad='Elche', sueldo=3000)

In [17]:
df.head()

Row(nombre='Aitor', apellidos='Medrano', ciudad='Elche', sueldo=3000)

In [18]:
df.head(3)

[Row(nombre='Aitor', apellidos='Medrano', ciudad='Elche', sueldo=3000),
 Row(nombre='Pedro', apellidos='Casas', ciudad='Elche', sueldo=4000),
 Row(nombre='Laura', apellidos='García', ciudad='Elche', sueldo=5000)]

Si queremos obtener un valor en concreto, una vez recuperada una fila, podemos acceder a sus columnas:

In [19]:
nom1 = df.first()[0]
print(nom1)

Aitor


In [20]:
nom2 = df.first()["nombre"]
print(nom2)

Aitor


### describe
Podemos obtener un sumario de los datos mediante describe:

In [21]:
df.describe().show()

24/01/17 15:49:36 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 37:>                                                       (0 + 16) / 16]

+-------+------+---------+----------+------------------+
|summary|nombre|apellidos|    ciudad|            sueldo|
+-------+------+---------+----------+------------------+
|  count|     5|        5|         5|                 5|
|   mean|  NULL|     NULL|      NULL|            5000.0|
| stddev|  NULL|     NULL|      NULL|1581.1388300841897|
|    min| Aitor|    Casas|  Alicante|              3000|
|    max| Pedro|     Ruiz|Torrellano|              7000|
+-------+------+---------+----------+------------------+



                                                                                

### count
Si únicamente nos interesa saber cuantas filas tiene nuestro DataFrame, podemos hacer uso de count:

In [22]:
df.count()

5

### collect y take
Por último, como un DataFrame por debajo es un RDD, podemos usar collect y take conforme necesitemos y recuperar objetos de tipo Row:

In [23]:
df.collect()

[Row(nombre='Aitor', apellidos='Medrano', ciudad='Elche', sueldo=3000),
 Row(nombre='Pedro', apellidos='Casas', ciudad='Elche', sueldo=4000),
 Row(nombre='Laura', apellidos='García', ciudad='Elche', sueldo=5000),
 Row(nombre='Miguel', apellidos='Ruiz', ciudad='Torrellano', sueldo=6000),
 Row(nombre='Isabel', apellidos='Guillén', ciudad='Alicante', sueldo=7000)]

In [24]:
df.take(2)

[Row(nombre='Aitor', apellidos='Medrano', ciudad='Elche', sueldo=3000),
 Row(nombre='Pedro', apellidos='Casas', ciudad='Elche', sueldo=4000)]

In [25]:
nom = df.collect()[0][0]
print(nom)

Aitor


## PERSISTIR LOS DATOS
Si lo que queremos es persistir los datos, en vez de read, utilizaremos write (de manera que obtenemos un DataFrameWriter) y si usamos la forma general usaremos el método save.

Por cada partición, Spark generará un archivo de salida.

### CSV

In [26]:
dfCSV.write.mode("overwrite").csv("Datos/dat.csv")

### TXT

In [27]:
dfTXT.write.mode("overwrite").text("Datos/dat.txt")

### JSON

In [28]:
dfJSON.write.mode("overwrite").json("Datos/dat.json")

## DATOS Y ESQUEMAS
El esquema completo de un DataFrame se modela mediante un StructType, el cual contiene una colección de objetos StructField. Así pues, cada columna de un DataFrame de Spark se modela mediante un objeto StructField indicando su nombre, tipo y gestión de los nulos.

Hemos visto que al crear un DataFrame desde un archivo externo, podemos inferir el esquema. Si queremos crear un DataFrame desde un esquema propio utilizaremos los tipos StructType, StructField, así como el tipo necesario para cada columna. 

Además de cadenas (StringType), enteros (IntegerType), flotantes (FloatType) Y dobles (DoubleType), tenemos tipos booleanos (BooleanType), de fecha (DateType y TimestampType), así como tipos complejos como ArrayType, MapType y StructType.

Volvamos al ejemplo anterior donde tenemos ciertos datos de clientes, como son su nombre y apellidos, ciudad y sueldo:

In [29]:
clientes = [
    ("Aitor", "Medrano", "Elche", 3000),
    ("Pedro", "Casas", "Elche", 4000),
    ("Laura", "García", "Elche", 5000),
    ("Miguel", "Ruiz", "Torrellano", 6000),
    ("Isabel", "Guillén", "Alicante", 7000),
]

Para esta estructura, definiremos un esquema con los campos, indicando para cada uno de ellos su nombre, tipo y si admite valores nulos:

In [30]:
esquema = StructType(
    [
        StructField("nombre", StringType(), False),
        StructField("apellidos", StringType(), False),
        StructField("ciudad", StringType(), True),
        StructField("sueldo", IntegerType(), False),
    ]
)

A continuación ya podemos crear un DataFrame con datos propios que cumplen un esquema haciendo uso del método createDataFrame:

In [31]:
dfClientes = spark.createDataFrame(data=clientes, schema=esquema)
dfClientes.printSchema()

root
 |-- nombre: string (nullable = false)
 |-- apellidos: string (nullable = false)
 |-- ciudad: string (nullable = true)
 |-- sueldo: integer (nullable = false)



Si lo que queremos es asignarle un esquema a un DataFrame que vamos a leer desde una fuente de datos externa, hemos de emplear el método schema:
``````
dfClientes = spark.read.option("header", True).schema(esquema).csv("clientes.csv")
``````

La inferencia de los tipos de los datos es un proceso computacionalmente costoso. Por ello, si nuestro conjunto de datos es grande, es muy recomendable crear el esquema de forma programativa y configurarlo en la carga de datos.


Respecto al esquema, tenemos diferentes propiedades como columns, dtypes y schema con las que obtener su información:

In [32]:
dfClientes.columns

['nombre', 'apellidos', 'ciudad', 'sueldo']

In [33]:
dfClientes.dtypes

[('nombre', 'string'),
 ('apellidos', 'string'),
 ('ciudad', 'string'),
 ('sueldo', 'int')]

In [34]:
dfClientes.schema

StructType([StructField('nombre', StringType(), False), StructField('apellidos', StringType(), False), StructField('ciudad', StringType(), True), StructField('sueldo', IntegerType(), False)])

Si una vez hemos cargado un DataFrame queremos cambiar el tipo de una de sus columnas, podemos hacer uso del método withColumn:

In [35]:
# Forma larga
dfClientes = dfClientes.withColumn("sueldo", dfClientes.sueldo.cast(DoubleType()))
# Forma corta
dfClientes = dfClientes.withColumn("sueldo", dfClientes.sueldo.cast("double"))
# ddfClientes = dfClientes.withColumn("fnac", to_date(dfClientes.Date, "M/d/yyy"))

Si tenemos un error al leer un dato que contiene un tipo no esperado, por defecto, Spark lanzará una excepción y se detendrá la lectura.

Si queremos que asigne los tipos a los campos pero que no los valide, podemos pasarle el parámetro extra verifySchema a False al crear un DataFrame mediante spark.createDataFrame o enforceSchema también a False al cargar desde una fuente externa mediante spark.read, de manera que los datos que no concuerden con el tipo se quedarán nulos, vacíos o con valor 0, dependiendo del tipo de dato que tiene asignada la columna en el esquema.

``````
dfClientes = spark.read.option("header", True).option("enforceSchema",False).schema(esquema).csv("clientes.csv")
``````

## DATAFRAME API

Una vez tenemos un DataFrame podemos trabajar con los datos mediante un conjunto de operaciones estructuradas, muy similares al lenguaje relacional. Estas operaciones también se clasifican en transformaciones y acciones, recordando que las transformaciones utilizan una evaluación perezosa.

Es muy importante tener en cuenta que todas las operaciones que vamos a realizar a continuación son immutables, es decir, nunca van a modificar el DataFrame sobre el que realizamos la transformación.

Para los siguientes apartados, vamos a trabajar sobre el siguiente DataFrame con el fichero de ventas (pdi_sales_small.csv):

In [36]:
dfSales = (
    spark.read.option("sep", ";")
    .option("header", "true")
    .option("inferSchema", "true")
    .csv("Datos/pdi_sales_small.csv")
)
dfSales.printSchema()

root
 |-- ProductID: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- Zip: string (nullable = true)
 |-- Units: integer (nullable = true)
 |-- Revenue: double (nullable = true)
 |-- Country: string (nullable = true)



### PROYECTAR
#### select
La operación select permite indicar las columnas a recuperar pasándolas como parámetros:

In [37]:
dfSales.select("ProductID", "Revenue").show(3)

+---------+-------+
|ProductID|Revenue|
+---------+-------+
|      725|  115.5|
|      787|  314.9|
|      788|  314.9|
+---------+-------+
only showing top 3 rows



También podemos realizar cálculos (referenciando a los campos con nombreDataframe.nombreColumna) sobre las columnas y crear un alias (operación asociada a un campo):

In [38]:
dfSales.select(dfSales.ProductID, (dfSales.Revenue + 10).alias("VentasMas10")).show(3)

+---------+-----------+
|ProductID|VentasMas10|
+---------+-----------+
|      725|      125.5|
|      787|      324.9|
|      788|      324.9|
+---------+-----------+
only showing top 3 rows



#### drop
Si tenemos un DataFrame con un gran número de columnas y queremos recuperarlas todas a excepción de unas pocas, es más cómodo utilizar la transformación drop, la cual funciona de manera opuesta a select, es decir, indicando las columnas que queremos quitar del resultado:

In [39]:
dfSales.drop("Units", "Revenue", "Country").show(3)

+---------+---------+---------------+
|ProductID|     Date|            Zip|
+---------+---------+---------------+
|      725|1/15/1999|41540          |
|      787| 6/6/2002|41540          |
|      788| 6/6/2002|41540          |
+---------+---------+---------------+
only showing top 3 rows



### TRABAJAR CON COLUMNAS

Las columnas de un Dataframe son objetos de tipo Column. 

Podemos recuperar ciertas columnas de un DataFrame con cualquiera de las siguientes expresiones:

In [40]:
dfSales.select("ProductID", "Revenue").show()
dfSales.select(dfSales.ProductID, dfSales.Revenue).show()
dfSales.select(dfSales["ProductID"], dfSales["Revenue"]).show()
dfSales.select(col("ProductID"), col("Revenue")).show()

+---------+-------+
|ProductID|Revenue|
+---------+-------+
|      725|  115.5|
|      787|  314.9|
|      788|  314.9|
|      940|  687.7|
|      396|  857.1|
|      734|  330.7|
|      769|  257.2|
|      499|  846.3|
|     2254|   57.7|
|       31|  761.2|
|      475|  970.2|
|      510|  837.1|
|      499|  883.0|
|      289|  866.0|
|      702|  286.1|
|      910|  414.7|
|      901|  818.9|
|      550|  404.0|
|      559|  585.6|
|      767|  105.0|
+---------+-------+
only showing top 20 rows

+---------+-------+
|ProductID|Revenue|
+---------+-------+
|      725|  115.5|
|      787|  314.9|
|      788|  314.9|
|      940|  687.7|
|      396|  857.1|
|      734|  330.7|
|      769|  257.2|
|      499|  846.3|
|     2254|   57.7|
|       31|  761.2|
|      475|  970.2|
|      510|  837.1|
|      499|  883.0|
|      289|  866.0|
|      702|  286.1|
|      910|  414.7|
|      901|  818.9|
|      550|  404.0|
|      559|  585.6|
|      767|  105.0|
+---------+-------+
only showing t

Veamos un ejemplo para seleccionar columnas que creamos a partir de las conlumnas existentes, en este caso vamos a usar concat_ws para concatenar textos utilizado un separador.

In [41]:
dfClientes.select(
    concat_ws(" ", col("nombre"), col("apellidos")).alias("nombreCompleto"),
    "sueldo",
    (col("sueldo") * 1.1).alias("nuevoSueldo"),
).show()

+--------------+------+------------------+
|nombreCompleto|sueldo|       nuevoSueldo|
+--------------+------+------------------+
| Aitor Medrano|3000.0|3300.0000000000005|
|   Pedro Casas|4000.0|            4400.0|
|  Laura García|5000.0|            5500.0|
|   Miguel Ruiz|6000.0| 6600.000000000001|
|Isabel Guillén|7000.0| 7700.000000000001|
+--------------+------+------------------+



Una vez tenemos un DataFrame, podemos añadir columnas mediante el método withColumn:

In [42]:
dfNuevo = dfSales.withColumn("total", dfSales.Units * dfSales.Revenue)
dfNuevo.show()

+---------+---------+---------------+-----+-------+-------+------+
|ProductID|     Date|            Zip|Units|Revenue|Country| total|
+---------+---------+---------------+-----+-------+-------+------+
|      725|1/15/1999|41540          |    1|  115.5|Germany| 115.5|
|      787| 6/6/2002|41540          |    1|  314.9|Germany| 314.9|
|      788| 6/6/2002|41540          |    1|  314.9|Germany| 314.9|
|      940|1/15/1999|22587          |    1|  687.7|Germany| 687.7|
|      396|1/15/1999|22587          |    1|  857.1|Germany| 857.1|
|      734|4/10/2003|22587          |    1|  330.7|Germany| 330.7|
|      769|2/15/1999|22587          |    1|  257.2|Germany| 257.2|
|      499|1/15/1999|12555          |    1|  846.3|Germany| 846.3|
|     2254|1/15/1999|40217          |    1|   57.7|Germany|  57.7|
|       31|5/31/2002|40217          |    1|  761.2|Germany| 761.2|
|      475|2/15/1999|13583          |    1|  970.2|Germany| 970.2|
|      510|1/15/1999|22337          |    1|  837.1|Germany| 83

Otra forma de añadir una columna con una expresión es mediante la transformación selectExpr. Por ejemplo, podemos conseguir el mismo resultado que en el ejemplo anterior de la siguiente manera:

In [43]:
dfSales.selectExpr("*", "Units * Revenue as total").show()

+---------+---------+---------------+-----+-------+-------+------+
|ProductID|     Date|            Zip|Units|Revenue|Country| total|
+---------+---------+---------------+-----+-------+-------+------+
|      725|1/15/1999|41540          |    1|  115.5|Germany| 115.5|
|      787| 6/6/2002|41540          |    1|  314.9|Germany| 314.9|
|      788| 6/6/2002|41540          |    1|  314.9|Germany| 314.9|
|      940|1/15/1999|22587          |    1|  687.7|Germany| 687.7|
|      396|1/15/1999|22587          |    1|  857.1|Germany| 857.1|
|      734|4/10/2003|22587          |    1|  330.7|Germany| 330.7|
|      769|2/15/1999|22587          |    1|  257.2|Germany| 257.2|
|      499|1/15/1999|12555          |    1|  846.3|Germany| 846.3|
|     2254|1/15/1999|40217          |    1|   57.7|Germany|  57.7|
|       31|5/31/2002|40217          |    1|  761.2|Germany| 761.2|
|      475|2/15/1999|13583          |    1|  970.2|Germany| 970.2|
|      510|1/15/1999|22337          |    1|  837.1|Germany| 83

Si por algún extraño motivo necesitamos cambiarle el nombre a una columna podemos utilizar la transformación withColumnRenamed:

In [44]:
dfSales.withColumnRenamed("Zip", "PostalCode").show(5)

+---------+---------+---------------+-----+-------+-------+
|ProductID|     Date|     PostalCode|Units|Revenue|Country|
+---------+---------+---------------+-----+-------+-------+
|      725|1/15/1999|41540          |    1|  115.5|Germany|
|      787| 6/6/2002|41540          |    1|  314.9|Germany|
|      788| 6/6/2002|41540          |    1|  314.9|Germany|
|      940|1/15/1999|22587          |    1|  687.7|Germany|
|      396|1/15/1999|22587          |    1|  857.1|Germany|
+---------+---------+---------------+-----+-------+-------+
only showing top 5 rows



### FILTRAR

Si queremos eliminar filas, usaremos el método filter:

In [45]:
dfSales.filter(dfSales.Country == "Germany").show()

+---------+---------+---------------+-----+-------+-------+
|ProductID|     Date|            Zip|Units|Revenue|Country|
+---------+---------+---------------+-----+-------+-------+
|      725|1/15/1999|41540          |    1|  115.5|Germany|
|      787| 6/6/2002|41540          |    1|  314.9|Germany|
|      788| 6/6/2002|41540          |    1|  314.9|Germany|
|      940|1/15/1999|22587          |    1|  687.7|Germany|
|      396|1/15/1999|22587          |    1|  857.1|Germany|
|      734|4/10/2003|22587          |    1|  330.7|Germany|
|      769|2/15/1999|22587          |    1|  257.2|Germany|
|      499|1/15/1999|12555          |    1|  846.3|Germany|
|     2254|1/15/1999|40217          |    1|   57.7|Germany|
|       31|5/31/2002|40217          |    1|  761.2|Germany|
|      475|2/15/1999|13583          |    1|  970.2|Germany|
|      510|1/15/1999|22337          |    1|  837.1|Germany|
|      499| 6/5/2002|22337          |    1|  883.0|Germany|
|      289|2/15/1999|13587          |   

Por similitud con SQL, podemos utilizar también where como un alias de filter:

In [46]:
dfSales.where(dfSales.Units > 20).show()

+---------+----------+---------------+-----+-------+-------+
|ProductID|      Date|            Zip|Units|Revenue|Country|
+---------+----------+---------------+-----+-------+-------+
|      495| 3/15/1999|75213 CEDEX 16 |   77|43194.1|France |
|     2091| 5/15/1999|9739           |   24| 3652.7|Mexico |
|     2091| 6/15/1999|40213          |   41| 6240.1|Germany|
|     2091|10/15/1999|40213          |   41| 6347.7|Germany|
|     2091|12/15/1999|40213          |   23| 3560.9|Germany|
+---------+----------+---------------+-----+-------+-------+



Podemos utilizar los operadores lógicos (& para conjunción y | para la disyunción) para crear condiciones compuestas (recordad rodear cada condición entre paréntesis):

In [47]:
dfSales.filter((dfSales.Country == "Germany") & (dfSales.Units > 20)).show()

+---------+----------+---------------+-----+-------+-------+
|ProductID|      Date|            Zip|Units|Revenue|Country|
+---------+----------+---------------+-----+-------+-------+
|     2091| 6/15/1999|40213          |   41| 6240.1|Germany|
|     2091|10/15/1999|40213          |   41| 6347.7|Germany|
|     2091|12/15/1999|40213          |   23| 3560.9|Germany|
+---------+----------+---------------+-----+-------+-------+



In [48]:
dfSales.filter((dfSales.ProductID==2314) | (dfSales.ProductID==1322)).show()

+---------+---------+---------------+-----+-------+-------+
|ProductID|     Date|            Zip|Units|Revenue|Country|
+---------+---------+---------------+-----+-------+-------+
|     2314|5/15/1999|46045          |    1|   13.9|Germany|
|     1322| 1/6/2000|75593 CEDEX 12 |    1|  254.5|France |
+---------+---------+---------------+-----+-------+-------+



Un caso particular de filtrado es la eliminación de los registros repetidos, lo cual lo podemos hacer de dos maneras:

* Haciendo uso del método distinct tras haber realizado alguna transformación
* Utilizando dropDuplicates sobre un DataFrame

In [49]:
dfSales.select("Country").distinct().show()

+-------+
|Country|
+-------+
|Germany|
|France |
|Canada |
|Mexico |
| France|
+-------+



In [50]:
dfSales.dropDuplicates(["Country"]).select("Country").show()

+-------+
|Country|
+-------+
|Germany|
|France |
|Canada |
|Mexico |
| France|
+-------+



### ORDENAR
Una vez recuperados los datos deseados, podemos ordenarlos mediante sort u orderBy (son operaciones totalmente equivalentes):

In [51]:
dfSales.select("ProductID", "Revenue").sort("Revenue").show(5)
dfSales.sort("Revenue", ascending=True).show(5)

# Ordenación descendiente
dfSales.sort(dfSales.Revenue.desc()).show(5)
dfSales.sort("Revenue", ascending=False).show(5)

# Ordenación diferente en cada columna
dfSales.sort(dfSales.Revenue.desc(), dfSales.Units.asc()).show(5)
dfSales.sort(["Revenue", "Units"], ascending=[0, 1]).show(5)

+---------+-------+
|ProductID|Revenue|
+---------+-------+
|     2314|   13.9|
|     1974|   52.4|
|     1974|   52.4|
|     1974|   52.4|
|     1974|   52.4|
+---------+-------+
only showing top 5 rows

+---------+---------+---------------+-----+-------+-------+
|ProductID|     Date|            Zip|Units|Revenue|Country|
+---------+---------+---------------+-----+-------+-------+
|     2314|5/15/1999|46045          |    1|   13.9|Germany|
|     1974|3/15/1999|R3B            |    1|   52.4|Canada |
|     1974|4/15/1999|R3H            |    1|   52.4|Canada |
|     1974|3/15/1999|R3H            |    1|   52.4|Canada |
|     1974|1/15/1999|R3S            |    1|   52.4|Canada |
+---------+---------+---------------+-----+-------+-------+
only showing top 5 rows

+---------+---------+---------------+-----+-------+-------+
|ProductID|     Date|            Zip|Units|Revenue|Country|
+---------+---------+---------------+-----+-------+-------+
|      495|3/15/1999|75213 CEDEX 16 |   77|43194.1

Normalmente, tras realizar una ordenación, es habitual quedarse con un subconjunto de los datos. Para ello, podemos utilizar la transformación limit.

Por ejemplo, la siguiente transformación es similar al ejemplo anterior, sólo que ahora al driver únicamente le llegan 5 registros, en vez de traerlos todos y sólo mostrar 5:

In [52]:
dfSales.sort(dfSales.Revenue.desc(), dfSales.Units.asc()).limit(5).show()

+---------+---------+---------------+-----+-------+-------+
|ProductID|     Date|            Zip|Units|Revenue|Country|
+---------+---------+---------------+-----+-------+-------+
|      495|3/15/1999|75213 CEDEX 16 |   77|43194.1|France |
|      495| 3/1/2000|75391 CEDEX 08 |   18|10395.0|France |
|      464|6/11/2003|75213 CEDEX 16 |   16|10075.8|France |
|      464| 8/1/2000|22397          |   17| 9817.5|Germany|
|      495| 3/1/2000|06175 CEDEX 2  |   16| 9240.0|France |
+---------+---------+---------------+-----+-------+-------+



#### OPERAR COMO CONJUNTOS
La única manera de añadir filas a un DataFrame es creando uno nuevo que sea el resultado de unir dos DataFrames que compartan el mismo esquema (mismo nombres de columnas y en el mismo orden)

In [53]:
nuevasVenta = [
    (6666, "2022-03-24", "03206", 33, 3333.33, "Spain"),
    (6666, "2022-03-25", "03206", 22, 2222.22, "Spain"),
]
# Creamos un nuevo DataFrame con las nuevas Ventas
nvDF = spark.createDataFrame(nuevasVenta)
# Unimos los dos DataFrames
dfUpdated = dfSales.union(nvDF)

Considerando dos DataFrames como dos conjuntos, podemos emplear las operaciones union, intersect, intersectAll (mantiene los duplicados), exceptAll (mantiene los duplicados) y subtract .

### COGER MUESTRAS
Si necesitamos recoger un subconjunto de los datos, ya sea para preparar los datos para algún modelo de machine learning como para una muestra aleatoria de los mismos, podemos utilizar las siguientes transformaciones: sample y randomSplit

#### sample
Permite obtener una muestra a partir de un porcentaje (no tiene porqué obtener una cantidad exacta). También admite un semilla e indicar si queremos que pueda repetir los datos.


In [54]:
dfSales.count() 

120239

In [55]:
muestra = dfSales.sample(0.10)
muestra.count()

12133

In [56]:
muestraConRepetidos = dfSales.sample(True, 0.10)
muestraConRepetidos.count()

11919

#### randomSplit
Recupera diferentes DataFrames cuyos tamaños en porcentaje se indican como parámetros (si no suman uno, los parámetros se normalizan)

In [57]:
dfs = dfSales.randomSplit([0.8, 0.2])
dfEntrenamiento = dfs[0]
dfPrueba = dfs[1]
dfEntrenamiento.count() 

95909

In [58]:
dfPrueba.count() 

24330

## LIMPIAR DATOS
Hay tres formas de gestionar la ausencia de datos o los datos erróneos:
1. Eliminar las filas que tienen valores vacíos en una o más columnas.
2. Rellenar los valores nulos con valores que definimos nosotros.
3. Sustituir los datos erróneos por algún valor que sepamos como gestionarlo.

Vamos a ver cada uno de estos casos a partir del siguiente dataset:

In [59]:
malasVentas = [
    (6666, "2022-03-22", "03206", 33, 3333.33, "Spain"),
    (6666, "2022-03-22", None, 33, 3333.33, "Spain"),
    (6666, "2022-03-23", "03206", None, 2222.22, "Spain"),
    (6666, "2022-03-24", "03206", None, None, "Espain"),
    (None, None, None, None, None, None),
]
malDF = spark.createDataFrame(
    malasVentas, ["ProductID", "Date", "Zip", "Units", "Revenue", "Country"]
)
malDF.show()

+---------+----------+-----+-----+-------+-------+
|ProductID|      Date|  Zip|Units|Revenue|Country|
+---------+----------+-----+-----+-------+-------+
|     6666|2022-03-22|03206|   33|3333.33|  Spain|
|     6666|2022-03-22| NULL|   33|3333.33|  Spain|
|     6666|2022-03-23|03206| NULL|2222.22|  Spain|
|     6666|2022-03-24|03206| NULL|   NULL| Espain|
|     NULL|      NULL| NULL| NULL|   NULL|   NULL|
+---------+----------+-----+-----+-------+-------+



Si queremos saber si una columna contiene nulos, podemos hacer un filtrado utilizando el método isNull sobre los campos deseados (también podemos utilizar isNotNull si queremos el caso contrario):

In [60]:
malDF.filter(malDF.Zip.isNull()).show()

+---------+----------+----+-----+-------+-------+
|ProductID|      Date| Zip|Units|Revenue|Country|
+---------+----------+----+-----+-------+-------+
|     6666|2022-03-22|NULL|   33|3333.33|  Spain|
|     NULL|      NULL|NULL| NULL|   NULL|   NULL|
+---------+----------+----+-----+-------+-------+



Para trabajar con las filas que contengan algún dato nulo, podemos acceder a la propiedad na, y:

* eliminar las filas mediante el método drop / dropna. Puede recibir "any" (borrará las filas que contengan algún nulo) o "all" (borrará las filas que todas sus columnas contengan nulos) y una lista con las columnas a considerar. También podemos indicar la cantidad de valores no nulos que ha de contener cada fila para eliminarla mediante el parámetro thresh:

In [61]:
# Elimina todos los nulos
malDF.na.drop().show()

+---------+----------+-----+-----+-------+-------+
|ProductID|      Date|  Zip|Units|Revenue|Country|
+---------+----------+-----+-----+-------+-------+
|     6666|2022-03-22|03206|   33|3333.33|  Spain|
+---------+----------+-----+-----+-------+-------+



In [62]:
# Elimina las filas que todas sus columnas son nulas
malDF.na.drop("all").show()

+---------+----------+-----+-----+-------+-------+
|ProductID|      Date|  Zip|Units|Revenue|Country|
+---------+----------+-----+-----+-------+-------+
|     6666|2022-03-22|03206|   33|3333.33|  Spain|
|     6666|2022-03-22| NULL|   33|3333.33|  Spain|
|     6666|2022-03-23|03206| NULL|2222.22|  Spain|
|     6666|2022-03-24|03206| NULL|   NULL| Espain|
+---------+----------+-----+-----+-------+-------+



In [63]:
# Elimina las filas que tienen el Zip nulo
malDF.na.drop(subset=["Zip"]).show()

+---------+----------+-----+-----+-------+-------+
|ProductID|      Date|  Zip|Units|Revenue|Country|
+---------+----------+-----+-----+-------+-------+
|     6666|2022-03-22|03206|   33|3333.33|  Spain|
|     6666|2022-03-23|03206| NULL|2222.22|  Spain|
|     6666|2022-03-24|03206| NULL|   NULL| Espain|
+---------+----------+-----+-----+-------+-------+



In [64]:
malDF.na.drop(thresh=3).show()

+---------+----------+-----+-----+-------+-------+
|ProductID|      Date|  Zip|Units|Revenue|Country|
+---------+----------+-----+-----+-------+-------+
|     6666|2022-03-22|03206|   33|3333.33|  Spain|
|     6666|2022-03-22| NULL|   33|3333.33|  Spain|
|     6666|2022-03-23|03206| NULL|2222.22|  Spain|
|     6666|2022-03-24|03206| NULL|   NULL| Espain|
+---------+----------+-----+-----+-------+-------+



* rellenar los datos nulos mediante el método fill / fillna, indicando el valor y si queremos, sobre qué columnas aplicar la modificación:

In [65]:
# Rellenamos los zips vacíos por 99999
malDF.na.fill("99999", subset=["Zip"]).show()
# malDF.na.fill("99999", ["Zip"]).show()
# malDF.fillna({"Zip": "99999"})

+---------+----------+-----+-----+-------+-------+
|ProductID|      Date|  Zip|Units|Revenue|Country|
+---------+----------+-----+-----+-------+-------+
|     6666|2022-03-22|03206|   33|3333.33|  Spain|
|     6666|2022-03-22|99999|   33|3333.33|  Spain|
|     6666|2022-03-23|03206| NULL|2222.22|  Spain|
|     6666|2022-03-24|03206| NULL|   NULL| Espain|
|     NULL|      NULL|99999| NULL|   NULL|   NULL|
+---------+----------+-----+-----+-------+-------+



* sustituir los datos erróneos mediante el método replace:

In [66]:
# Cambiamos Espain por Spain
malDF.na.replace("Espain", "Spain").show()

+---------+----------+-----+-----+-------+-------+
|ProductID|      Date|  Zip|Units|Revenue|Country|
+---------+----------+-----+-----+-------+-------+
|     6666|2022-03-22|03206|   33|3333.33|  Spain|
|     6666|2022-03-22| NULL|   33|3333.33|  Spain|
|     6666|2022-03-23|03206| NULL|2222.22|  Spain|
|     6666|2022-03-24|03206| NULL|   NULL|  Spain|
|     NULL|      NULL| NULL| NULL|   NULL|   NULL|
+---------+----------+-----+-----+-------+-------+



Otro caso muy común es realizar una operación sobre una columna para transformar su valor, por ejemplo, pasar todo el texto a minúsculas o dividir una columna entre 100 para cambiar la escala.

En nuestro caso, vamos a modificar las columnas Zip y Country para realizar un trim y borrar los espacios en blanco:

In [67]:
dfSales = dfSales.withColumn("Country", trim(col("Country"))).withColumn(
    "Zip", trim(col("Zip"))
)
dfSales.show(5)

+---------+---------+-----+-----+-------+-------+
|ProductID|     Date|  Zip|Units|Revenue|Country|
+---------+---------+-----+-----+-------+-------+
|      725|1/15/1999|41540|    1|  115.5|Germany|
|      787| 6/6/2002|41540|    1|  314.9|Germany|
|      788| 6/6/2002|41540|    1|  314.9|Germany|
|      940|1/15/1999|22587|    1|  687.7|Germany|
|      396|1/15/1999|22587|    1|  857.1|Germany|
+---------+---------+-----+-----+-------+-------+
only showing top 5 rows



## USAR SQL
Spark soporta el SQL, ampliamente establecido en el mundo de las bases de datos.

### VISTAS TEMPORALES

Ya hemos visto que los DataFrames tienen una estructura similar a una tabla de una base de datos relacional. Para poder realizar consultas, necesitaremos crear vistas temporales mediante el método createTempView o createOrReplaceTempView para posteriormente realizar una consulta sobre la vista creada a través de spark.sql:

In [68]:
# 1. definimos la vista
dfSales.createOrReplaceTempView("ventas")
# 2. realizamos la consulta
ventasCanada = spark.sql("select * from ventas where trim(Country)='Canada'")
ventasCanada.show(3)

+---------+---------+---+-----+-------+-------+
|ProductID|     Date|Zip|Units|Revenue|Country|
+---------+---------+---+-----+-------+-------+
|      725|1/15/1999|H1B|    1|  115.4| Canada|
|     2235|1/15/1999|H1B|    2|  131.1| Canada|
|      713|1/15/1999|H1B|    1|  160.1| Canada|
+---------+---------+---+-----+-------+-------+
only showing top 3 rows



### VISTAS GLOBALES 

Las vistas temporales tienen un alcance de SparkSession, de manera que desaparecen una vez finalice la sesión que ha creado la vista. Si necesitamos tener una vista que se comparta entre todas las sesiones y que permanezca viva hasta que la aplicación Spark finalice, podemos crear una vista temporal global mediante createOrReplaceGlobalTempView

Estas vistas se almacenan en la base de datos global_temp y en las consultas es necesario poner el prefijo global_temp para acceder a sus vistas.

In [69]:
# 1. definimos la vista global
dfSales.createOrReplaceGlobalTempView("ventasg")
# 2. realizamos la consulta
ventasCanadaG = spark.sql(
    "select * from global_temp.ventasg where trim(Country)='Canada'"
)
ventasCanadaG.show(3)

+---------+---------+---+-----+-------+-------+
|ProductID|     Date|Zip|Units|Revenue|Country|
+---------+---------+---+-----+-------+-------+
|      725|1/15/1999|H1B|    1|  115.4| Canada|
|     2235|1/15/1999|H1B|    2|  131.1| Canada|
|      713|1/15/1999|H1B|    1|  160.1| Canada|
+---------+---------+---+-----+-------+-------+
only showing top 3 rows



In [70]:
# Creamos otra sesión y vemos como funciona
spark.newSession().sql(
    "select count(*) from global_temp.ventasg where trim(Country)='Canada'"
).show()

+--------+
|count(1)|
+--------+
|   30060|
+--------+



### ELIMINAR VISTAS
Para borrar una vista que hayamos creado, necesitamos acceder al Spark Catalog y utilizar el método dropTempView o dropGlobalTempView dependiendo del tipo de vista:

In [71]:
spark.catalog.dropTempView("ventas")
spark.catalog.dropGlobalTempView("ventasg")

True