**Spark SQL trabaja con DataFrames**. Un DataFrame, como ya lo hemos comentado es una **representación relacional de los datos**. Proporciona funciones con capacidades similares a SQL. Además, permite escribir **consultas tipo SQL** para nuestro análisis de datos.

### Creacion de un df desde 0

In [None]:
emp = [(1, "AAAAA", "dept1", 1000),
    (2, "BBBBB", "dept1", 1100),
    (3, "CCCCC", "dept1", 2000),
    (4, "DDDDD", "dept1", 3500),
    (5, "EEEEE", "dept2", 8000),
    (6, "FFFFF", "dept2", 5200),
    (7, "GGGGG", "dept3", 3100),
    (8, "HHHHH", "dept3", 6700),
    (9, "IIIII", "dept3", 6500),
    (10, "JJJJJ", "dept4", 5400)]

dept = [("dept1", "Department - 1"),
        ("dept2", "Department - 2"),
        ("dept3", "Department - 3"),
        ("dept4", "Department - 4")
       ]

dfemp = spark.createDataFrame(emp, ["id", "name", "dept", "salary"])

deptdf = spark.createDataFrame(dept, ["id", "name"])

In [None]:
dfemp.show()

+---+-----+-----+------+
| id| name| dept|salary|
+---+-----+-----+------+
|  1|AAAAA|dept1|  1000|
|  2|BBBBB|dept1|  1100|
|  3|CCCCC|dept1|  2000|
|  4|DDDDD|dept1|  3500|
|  5|EEEEE|dept2|  8000|
|  6|FFFFF|dept2|  5200|
|  7|GGGGG|dept3|  3100|
|  8|HHHHH|dept3|  6700|
|  9|IIIII|dept3|  6500|
| 10|JJJJJ|dept4|  5400|
+---+-----+-----+------+



In [None]:
deptdf.show()

+-----+--------------+
|   id|          name|
+-----+--------------+
|dept1|Department - 1|
|dept2|Department - 2|
|dept3|Department - 3|
|dept4|Department - 4|
+-----+--------------+



# Operaciones básicas en DataFrames

Podemos aplicar las transformaciones que ya hemos visto en la seccion de RDDs, por ejemplo:

In [None]:
dfemp.count()

Out[34]: 10

In [None]:
dfemp.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- dept: string (nullable = true)
 |-- salary: long (nullable = true)



In [None]:
dfemp.select("id", "name").show()

+---+-----+
| id| name|
+---+-----+
|  1|AAAAA|
|  2|BBBBB|
|  3|CCCCC|
|  4|DDDDD|
|  5|EEEEE|
|  6|FFFFF|
|  7|GGGGG|
|  8|HHHHH|
|  9|IIIII|
| 10|JJJJJ|
+---+-----+



### Ejemplo avanzado de: filter

* Filtrar las filas según alguna condición.
* Intentemos encontrar las filas con id = 1.
* Hay diferentes formas de especificar la condición.

In [None]:
dfemp.filter(dfemp["id"] == 1).show()
dfemp.filter(dfemp.id == 1).show()

+---+-----+-----+------+
| id| name| dept|salary|
+---+-----+-----+------+
|  1|AAAAA|dept1|  1000|
+---+-----+-----+------+

+---+-----+-----+------+
| id| name| dept|salary|
+---+-----+-----+------+
|  1|AAAAA|dept1|  1000|
+---+-----+-----+------+



In [None]:
from pyspark.sql.functions import col

dfemp.filter(col("id") == 1).show()
dfemp.filter("id = 1").show()

+---+-----+-----+------+
| id| name| dept|salary|
+---+-----+-----+------+
|  1|AAAAA|dept1|  1000|
+---+-----+-----+------+

+---+-----+-----+------+
| id| name| dept|salary|
+---+-----+-----+------+
|  1|AAAAA|dept1|  1000|
+---+-----+-----+------+



### Funcion: drop
* Elimina una columna en particular

In [None]:
newdf = dfemp.drop("id")
newdf.show(2)

+-----+-----+------+
| name| dept|salary|
+-----+-----+------+
|AAAAA|dept1|  1000|
|BBBBB|dept1|  1100|
+-----+-----+------+
only showing top 2 rows



### Funcion: withColumn
* Podemos usar la función "withColumn" para derivar la columna en función de las columnas existentes.

In [None]:
dfemp.withColumn("bonus", col("salary") * .1).show()

+---+-----+-----+------+-----+
| id| name| dept|salary|bonus|
+---+-----+-----+------+-----+
|  1|AAAAA|dept1|  1000|100.0|
|  2|BBBBB|dept1|  1100|110.0|
|  3|CCCCC|dept1|  2000|200.0|
|  4|DDDDD|dept1|  3500|350.0|
|  5|EEEEE|dept2|  8000|800.0|
|  6|FFFFF|dept2|  5200|520.0|
|  7|GGGGG|dept3|  3100|310.0|
|  8|HHHHH|dept3|  6700|670.0|
|  9|IIIII|dept3|  6500|650.0|
| 10|JJJJJ|dept4|  5400|540.0|
+---+-----+-----+------+-----+



### Ejemplo de agregacion:
* Podemos usar la función groupBy para agrupar los datos y luego usar la función "agg" para realizar la agregación de datos agrupados.

In [None]:
from pyspark.sql import functions as f

(dfemp.groupBy("dept")
    .agg(
        f.count("salary").alias("conteo"),
        f.sum("salary").alias("suma"),
        f.max("salary").alias("maximo"),
        f.min("salary").alias("minimo"),
        f.avg("salary").alias("promedio"))
    .show()
)

+-----+------+-----+------+------+-----------------+
| dept|conteo| suma|maximo|minimo|         promedio|
+-----+------+-----+------+------+-----------------+
|dept1|     4| 7600|  3500|  1000|           1900.0|
|dept2|     2|13200|  8000|  5200|           6600.0|
|dept3|     3|16300|  6700|  3100|5433.333333333333|
|dept4|     1| 5400|  5400|  5400|           5400.0|
+-----+------+-----+------+------+-----------------+



### Por ultimo, tambien podemos hacer joins, como en SQL

In [None]:
# Inner JOIN.
dfemp.join(deptdf, dfemp["dept"] == deptdf["id"]).show()

+---+-----+-----+------+-----+--------------+
| id| name| dept|salary|   id|          name|
+---+-----+-----+------+-----+--------------+
|  1|AAAAA|dept1|  1000|dept1|Department - 1|
|  2|BBBBB|dept1|  1100|dept1|Department - 1|
|  3|CCCCC|dept1|  2000|dept1|Department - 1|
|  4|DDDDD|dept1|  3500|dept1|Department - 1|
|  5|EEEEE|dept2|  8000|dept2|Department - 2|
|  6|FFFFF|dept2|  5200|dept2|Department - 2|
|  7|GGGGG|dept3|  3100|dept3|Department - 3|
|  8|HHHHH|dept3|  6700|dept3|Department - 3|
|  9|IIIII|dept3|  6500|dept3|Department - 3|
| 10|JJJJJ|dept4|  5400|dept4|Department - 4|
+---+-----+-----+------+-----+--------------+



### Left Outer Join

In [None]:
dfemp.join(deptdf, dfemp["dept"] == deptdf["id"], "left_outer").show()

+---+-----+-----+------+-----+--------------+
| id| name| dept|salary|   id|          name|
+---+-----+-----+------+-----+--------------+
|  1|AAAAA|dept1|  1000|dept1|Department - 1|
|  2|BBBBB|dept1|  1100|dept1|Department - 1|
|  3|CCCCC|dept1|  2000|dept1|Department - 1|
|  4|DDDDD|dept1|  3500|dept1|Department - 1|
|  5|EEEEE|dept2|  8000|dept2|Department - 2|
|  6|FFFFF|dept2|  5200|dept2|Department - 2|
|  7|GGGGG|dept3|  3100|dept3|Department - 3|
|  8|HHHHH|dept3|  6700|dept3|Department - 3|
|  9|IIIII|dept3|  6500|dept3|Department - 3|
| 10|JJJJJ|dept4|  5400|dept4|Department - 4|
+---+-----+-----+------+-----+--------------+



### Right Outer Join

In [None]:
dfemp.join(deptdf, dfemp["dept"] == deptdf["id"], "right_outer").show()

+---+-----+-----+------+-----+--------------+
| id| name| dept|salary|   id|          name|
+---+-----+-----+------+-----+--------------+
|  4|DDDDD|dept1|  3500|dept1|Department - 1|
|  3|CCCCC|dept1|  2000|dept1|Department - 1|
|  2|BBBBB|dept1|  1100|dept1|Department - 1|
|  1|AAAAA|dept1|  1000|dept1|Department - 1|
|  6|FFFFF|dept2|  5200|dept2|Department - 2|
|  5|EEEEE|dept2|  8000|dept2|Department - 2|
|  9|IIIII|dept3|  6500|dept3|Department - 3|
|  8|HHHHH|dept3|  6700|dept3|Department - 3|
|  7|GGGGG|dept3|  3100|dept3|Department - 3|
| 10|JJJJJ|dept4|  5400|dept4|Department - 4|
+---+-----+-----+------+-----+--------------+



### Full Outer Join

In [None]:
dfemp.join(deptdf, dfemp["dept"] == deptdf["id"], "outer").show()

+---+-----+-----+------+-----+--------------+
| id| name| dept|salary|   id|          name|
+---+-----+-----+------+-----+--------------+
|  1|AAAAA|dept1|  1000|dept1|Department - 1|
|  2|BBBBB|dept1|  1100|dept1|Department - 1|
|  3|CCCCC|dept1|  2000|dept1|Department - 1|
|  4|DDDDD|dept1|  3500|dept1|Department - 1|
|  5|EEEEE|dept2|  8000|dept2|Department - 2|
|  6|FFFFF|dept2|  5200|dept2|Department - 2|
|  7|GGGGG|dept3|  3100|dept3|Department - 3|
|  8|HHHHH|dept3|  6700|dept3|Department - 3|
|  9|IIIII|dept3|  6500|dept3|Department - 3|
| 10|JJJJJ|dept4|  5400|dept4|Department - 4|
+---+-----+-----+------+-----+--------------+

