# Introducción a los DataFrames

En este tema veremos:

  - Cómo crear un DataFrame
  - Algunas operaciones básicas sobre DataFrames
      - Mostrar filas
      - Seleccionar columnas
      - Renombrar, añadir y eliminar columnas
      - Eliminar valores nulos y filas duplicadas
      - Reemplazar valores
  - Guardar los DataFrames en diferentes formatos 

## Creación de DataFrames
Un DataFrame puede crearse de distintas formas:

  - A partir de una secuencia de datos
  - A partir de objetos de tipo Row
  - A partir de un RDD o DataSet
  - Leyendo los datos de un fichero
      - Igual que Hadoop, Spark soporta diferentes filesystems: local, HDFS, Amazon S3
          - En general, soporta cualquier fuente de datos que se pueda leer con Hadoop
      - Spark puede acceder a diferentes tipos de ficheros: texto plano, CSV, JSON, [Parquet](https://parquet.apache.org/), [ORC](https://orc.apache.org/), Sequence, etc
        -   Soporta ficheros comprimidos
  - Accediendo a bases de datos relacionales o NoSQL
    -   MySQL, Postgres, etc. mediante JDBC/ODBC
    -   Hive, HBase, Cassandra, MongoDB, AWS Redshift, etc.

## Creando DataFrames a partir de una secuencia o lista de datos

In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
import os

# Elegir el máster de Spark dependiendo de si se ha definido la variable de entorno HADOOP_CONF_DIR o YARN_CONF_DIR
SPARK_MASTER: str = 'local[*]'
if 'HADOOP_CONF_DIR' in os.environ or 'YARN_CONF_DIR' in os.environ:
  SPARK_MASTER = 'yarn'

# Creamos un objeto SparkSession (o lo obtenemos si ya está creado)
spark: SparkSession = SparkSession \
  .builder \
  .appName("Mi aplicacion") \
  .config("spark.rdd.compress", "true") \
  .config("spark.executor.memory", "3g") \
  .config("spark.driver.memory", "3g") \
  .master(SPARK_MASTER) \
  .getOrCreate()

sc: SparkContext = spark.sparkContext

24/11/12 16:07:11 WARN Utils: Your hostname, MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 172.18.17.35 instead (on interface en0)
24/11/12 16:07:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/12 16:07:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
sc._conf.getAll()

[('spark.driver.port', '52388'),
 ('spark.driver.extraJavaOptions',
  '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false'),
 ('spark.executor.memory', '3g'),
 ('spark.app.submitTime', '173142403

In [3]:
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import col

# Creando un DataFrame desde un rango y añadiéndole dos columnas
df: DataFrame = spark.range(1,7,2).toDF("n")
df.show()

# Añadiendo dos columnas al DataFrame
# La expresión para la columna puede incluir operadores.
df.withColumn("n1", col("n")+1).withColumn("n2", 2*col("n1")).show()

                                                                                

+---+
|  n|
+---+
|  1|
|  3|
|  5|
+---+

+---+---+---+
|  n| n1| n2|
+---+---+---+
|  1|  2|  4|
|  3|  4|  8|
|  5|  6| 12|
+---+---+---+



In [4]:
# DataFrame a partir de una lista de tuplas
l = [("Pepe", 5.1, "Aprobado"),
     ("Juan", 4.0, "Suspenso"),
     ("Manuel", None, None)]
dfNotas: DataFrame = spark.createDataFrame(l, schema=["Nombre", "nota", "cal"])
dfNotas.show()
dfNotas.printSchema()

                                                                                

+------+----+--------+
|Nombre|nota|     cal|
+------+----+--------+
|  Pepe| 5.1|Aprobado|
|  Juan| 4.0|Suspenso|
|Manuel|NULL|    NULL|
+------+----+--------+

root
 |-- Nombre: string (nullable = true)
 |-- nota: double (nullable = true)
 |-- cal: string (nullable = true)



## Creando DataFrames con esquema
A la hora de crear un DataFrame, es conveniente especificar el esquema del mismo:

  - El esquema define los nombres y tipos de datos de las columnas
  - Se usa un objeto de tipo `StructType` para definir el nombre y tipo de las columnas, y un objeto de tipo `StructField` para definir el nombre y tipo de una columna
  - Los tipos de datos que utiliza Spark están definidos en:
      - Para PySpark: https://spark.apache.org/docs/latest/sql-ref-datatypes.html

In [5]:
from pyspark.sql.types import StructField, StructType, FloatType, StringType
from pyspark.sql import Row

# Definimos el esquema del DataFrame
esquemaNotas = StructType(fields=[
    StructField(name="Nombre", dataType=StringType(), nullable=False),
    StructField(name="nota", dataType=FloatType(), nullable=True),
    StructField(name="cal", dataType=StringType(), nullable=True)
    ])

# Creamos el DataFrame a partir de una lista de objetos Row
filas: list[Row] = [
    Row("Pepe", 5.1, "Aprobado"),
    Row("Juan", 4.0, "Suspenso"),
    Row("Manuel", None, None)
    ]

dfNotas = spark.createDataFrame(filas, schema=esquemaNotas)
dfNotas.show()
dfNotas.printSchema()

+------+----+--------+
|Nombre|nota|     cal|
+------+----+--------+
|  Pepe| 5.1|Aprobado|
|  Juan| 4.0|Suspenso|
|Manuel|NULL|    NULL|
+------+----+--------+

root
 |-- Nombre: string (nullable = false)
 |-- nota: float (nullable = true)
 |-- cal: string (nullable = true)



## Creando DataFrames a partir de un fichero de texto

Cada línea del fichero se guarda como una fila

In [6]:
from pyspark import SparkFiles

# Añadimos un archivo al contexto de Spark (los descarga en cada nodo)
sc.addFile("https://raw.githubusercontent.com/dsevilla/tcdm-public/refs/heads/24-25/datos/quijote.txt.gz")

dfQuijote: DataFrame = spark.read.text("file://" + SparkFiles.get("quijote.txt.gz"))
dfQuijote.show(50, truncate=False)

+---------------------------------------------------------------------------+
|value                                                                      |
+---------------------------------------------------------------------------+
|The Project Gutenberg EBook of Don Quijote, by Miguel de Cervantes Saavedra|
|                                                                           |
|This eBook is for the use of anyone anywhere at no cost and with           |
|almost no restrictions whatsoever.  You may copy it, give it away or       |
|re-use it under the terms of the Project Gutenberg License included        |
|with this eBook or online at www.gutenberg.net                             |
|                                                                           |
|                                                                           |
|Title: Don Quijote                                                         |
|                                                               

## Creando DataFrames a partir de un fichero CSV

Como ejemplo vamos a utilizar el fichero de preguntas y respuestas de Stack Overflow en Español, que hemos utilizado en otras asignaturas. Es un fichero CSV, con unos campos que son:

- `Id`: integer: La identificación de la pregunta o respuesta
- `AcceptedAnswerId`: integer: La identificación de la respuesta aceptada (si existe)
- `AnswerCount`: integer: El número de respuestas
- `Body`: string: El cuerpo de la pregunta o respuesta
- `ClosedDate`: timestamp: Fecha de cierre de la pregunta (si está cerrada)
- `CommentCount`: integer: Número de comentarios
- `CommunityOwnedDate`: timestamp: (no se usará)  
- `ContentLicense`: string: Licencia de contenido
- `CreationDate`: timestamp: La fecha de creación
- `FavoriteCount`: integer: Número de favoritos
- `LastActivityDate`: timestamp: (no se usará)
- `LastEditDate`: timestamp: (no se usará)
- `LastEditorDisplayName`: string: (no se usará)
- `LastEditorUserId`: integer: (no se usará)
- `OwnerDisplayName`: string: El nombre del propietario (si se borró el usuario) 
- `OwnerUserId`: integer: El identificador del propietario
- `ParentId`: integer: El identificador de la pregunta padre (si es una respuesta)
- `PostTypeId`: integer: El tipo de post (1 = pregunta, 2 = respuesta, etc.)
- `Score`: integer: La puntuación de la pregunta o respuesta
- `Tags`: string: El conjunto de etiquetas
- `Title`: string: El título de la pregunta
- `ViewCount`: integer: El número de visitas

Los campos se encuentran separados por el símbolo ";", y el carácter de escape de comillas es el propio carácter de comillas.

### Leemos el fichero infiriendo el esquema

In [13]:
%%sh
wget -q https://github.com/dsevilla/bd2-data/raw/main/es.stackoverflow/es.stackoverflow.csv.7z.001 -O - > es.stackoverflow.csv.7z
wget -q https://github.com/dsevilla/bd2-data/raw/main/es.stackoverflow/es.stackoverflow.csv.7z.002 -O - >> es.stackoverflow.csv.7z

In [14]:
%%sh
7zr x -aoa es.stackoverflow.csv.7z Posts.csv
rm es.stackoverflow.csv.7z


7-Zip (a) [64] 17.05 : Copyright (c) 1999-2021 Igor Pavlov : 2017-08-28
p7zip Version 17.05 (locale=utf8,Utf16=on,HugeFiles=on,64 bits,8 CPUs LE)

Scanning the drive for archives:
1 file, 200457538 bytes (192 MiB)

Extracting archive: es.stackoverflow.csv.7z
--
Path = es.stackoverflow.csv.7z
Type = 7z
Physical Size = 200457538
Headers Size = 248
Method = LZMA2:24
Solid = +
Blocks = 1

Everything is Ok

Files: 2
Size:       973100259
Compressed: 200457538


In [15]:
if SPARK_MASTER == 'yarn':
  !hdfs dfs -put Posts.csv /user/hdadmin/

In [16]:
dfSEInfered = spark.read.format("csv")\
                    .option("mode", "FAILFAST")\
                    .option("sep", ",")\
                    .option("escape", "\"")\
                    .option("inferSchema", "true")\
                    .option("lineSep", "\r\n")\
                    .option("header", "true")\
                    .option("nullValue", "")\
                    .load("Posts.csv")

24/11/12 16:12:34 WARN CSVOptions: It is not recommended to set 'lineSep' with 2 characters due to the limitation of supporting multi-char 'lineSep' within quotes.
                                                                                

Algunas opciones:

1. ``mode``: especifica qué hacer cuando se encuentra registros corruptos
    - ``PERMISSIVE``: pone todos los campos a null cuando se encuentra un registro corrupto (valor por defecto)
    - ``DROPMALFORMED``: elimina las filas con registros corruptos
    - ``FAILFAST``: da un error cuando se encuentra un registro corrupto
2. ``sep``: separador entre campos (por defecto ",")
3. ``inferSchema``: especifica si se deben inferir el tipo de las columnas (por defecto "false")
4. ``header``: si "true" se toma la primera fila como cabecera (por defecto "false")
5. ``nullValue``: carrácter o cadena que representa un NULL en el fichero (por defecto "")
6. ``compression``: topo de compresión utilizada (por defecto "none")
  
Las opciones son similares para otros tipos de ficheros.

In [17]:
# Vemos 5 filas
dfSEInfered.show(5)

24/11/12 16:12:52 WARN CSVOptions: It is not recommended to set 'lineSep' with 2 characters due to the limitation of supporting multi-char 'lineSep' within quotes.
24/11/12 16:12:52 WARN CSVOptions: It is not recommended to set 'lineSep' with 2 characters due to the limitation of supporting multi-char 'lineSep' within quotes.


+---+----------------+-----------+--------------------+----------+------------+------------------+--------------+--------------------+-------------+--------------------+--------------------+---------------------+----------------+----------------+-----------+--------+----------+-----+--------------------+--------------------+---------+
| Id|AcceptedAnswerId|AnswerCount|                Body|ClosedDate|CommentCount|CommunityOwnedDate|ContentLicense|        CreationDate|FavoriteCount|    LastActivityDate|        LastEditDate|LastEditorDisplayName|LastEditorUserId|OwnerDisplayName|OwnerUserId|ParentId|PostTypeId|Score|                Tags|               Title|ViewCount|
+---+----------------+-----------+--------------------+----------+------------+------------------+--------------+--------------------+-------------+--------------------+--------------------+---------------------+----------------+----------------+-----------+--------+----------+-----+--------------------+--------------------+

In [19]:
# Vemos como se ha inferido el esquema
dfSEInfered.schema

StructType([StructField('Id', IntegerType(), True), StructField('AcceptedAnswerId', IntegerType(), True), StructField('AnswerCount', IntegerType(), True), StructField('Body', StringType(), True), StructField('ClosedDate', TimestampType(), True), StructField('CommentCount', IntegerType(), True), StructField('CommunityOwnedDate', TimestampType(), True), StructField('ContentLicense', StringType(), True), StructField('CreationDate', TimestampType(), True), StructField('FavoriteCount', IntegerType(), True), StructField('LastActivityDate', TimestampType(), True), StructField('LastEditDate', TimestampType(), True), StructField('LastEditorDisplayName', StringType(), True), StructField('LastEditorUserId', IntegerType(), True), StructField('OwnerDisplayName', StringType(), True), StructField('OwnerUserId', IntegerType(), True), StructField('ParentId', IntegerType(), True), StructField('PostTypeId', IntegerType(), True), StructField('Score', IntegerType(), True), StructField('Tags', StringType(),

In [20]:
# Otra forma de verlo
dfSEInfered.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- AcceptedAnswerId: integer (nullable = true)
 |-- AnswerCount: integer (nullable = true)
 |-- Body: string (nullable = true)
 |-- ClosedDate: timestamp (nullable = true)
 |-- CommentCount: integer (nullable = true)
 |-- CommunityOwnedDate: timestamp (nullable = true)
 |-- ContentLicense: string (nullable = true)
 |-- CreationDate: timestamp (nullable = true)
 |-- FavoriteCount: integer (nullable = true)
 |-- LastActivityDate: timestamp (nullable = true)
 |-- LastEditDate: timestamp (nullable = true)
 |-- LastEditorDisplayName: string (nullable = true)
 |-- LastEditorUserId: integer (nullable = true)
 |-- OwnerDisplayName: string (nullable = true)
 |-- OwnerUserId: integer (nullable = true)
 |-- ParentId: integer (nullable = true)
 |-- PostTypeId: integer (nullable = true)
 |-- Score: integer (nullable = true)
 |-- Tags: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- ViewCount: integer (nullable = true)



### Leemos especificando el esquema

El esquema inferido tiene ciertos fallos, como considerar algunos campos como strings cuando deberían ser enteros, o tipos Timestamp en vez de Date. Por ello, vamos a especificar el esquema.

In [21]:
from pyspark.sql.types import StructField, StructType, IntegerType, TimestampType, StringType

# Defino el esquema para los elementos de la tabla
# StructType -> Permite definir un esquema para el DF a partir de una lista de StructFields
# StructField -> Definen el nombre y tipo de cada columna, así como si es nullable o no (campo True)
dfSE_Schema = StructType([StructField('Id', IntegerType(), False),
                          StructField('AcceptedAnswerId', IntegerType(), True),
                          StructField('AnswerCount', IntegerType(), True),
                          StructField('Body', StringType(), True),
                          StructField('ClosedDate', TimestampType(), True),
                          StructField('CommentCount', IntegerType(), True),
                          StructField('CommunityOwnedDate', TimestampType(), True),
                          StructField('ContentLicense', StringType(), True),
                          StructField('CreationDate', TimestampType(), True),
                          StructField('FavoriteCount', IntegerType(), True),
                          StructField('LastActivityDate', TimestampType(), True),
                          StructField('LastEditDate', TimestampType(), True),
                          StructField('LastEditorDisplayName', StringType(), True),
                          StructField('LastEditorUserId', IntegerType(), True),
                          StructField('OwnerDisplayName', StringType(), True),
                          StructField('OwnerUserId', IntegerType(), True),
                          StructField('ParentId', IntegerType(), True),
                          StructField('PostTypeId', IntegerType(), True),
                          StructField('Score', IntegerType(), True),
                          StructField('Tags', StringType(), True),
                          StructField('Title', StringType(), True),
                          StructField('ViewCount', IntegerType(), True)])

# Creo el DataFrame con el esquema definido
dfSE = spark.read.format("csv")\
                    .option("mode", "FAILFAST")\
                    .option("inferSchema", "false")\
                    .option("sep", ",")\
                    .option("header", "true")\
                    .option("nullValue", "")\
                    .option("lineSep", "\r\n")\
                    .option("escape", "\"")\
                    .schema(dfSE_Schema)\
                    .load("Posts.csv")
dfSE.cache()

DataFrame[Id: int, AcceptedAnswerId: int, AnswerCount: int, Body: string, ClosedDate: timestamp, CommentCount: int, CommunityOwnedDate: timestamp, ContentLicense: string, CreationDate: timestamp, FavoriteCount: int, LastActivityDate: timestamp, LastEditDate: timestamp, LastEditorDisplayName: string, LastEditorUserId: int, OwnerDisplayName: string, OwnerUserId: int, ParentId: int, PostTypeId: int, Score: int, Tags: string, Title: string, ViewCount: int]

In [22]:
dfSE.sort("Id").show()

24/11/12 16:15:32 WARN CSVOptions: It is not recommended to set 'lineSep' with 2 characters due to the limitation of supporting multi-char 'lineSep' within quotes.
24/11/12 16:15:32 WARN CSVOptions: It is not recommended to set 'lineSep' with 2 characters due to the limitation of supporting multi-char 'lineSep' within quotes.
                                                                                

+---+----------------+-----------+--------------------+----------+------------+------------------+--------------+--------------------+-------------+--------------------+--------------------+---------------------+----------------+----------------+-----------+--------+----------+-----+--------------------+--------------------+---------+
| Id|AcceptedAnswerId|AnswerCount|                Body|ClosedDate|CommentCount|CommunityOwnedDate|ContentLicense|        CreationDate|FavoriteCount|    LastActivityDate|        LastEditDate|LastEditorDisplayName|LastEditorUserId|OwnerDisplayName|OwnerUserId|ParentId|PostTypeId|Score|                Tags|               Title|ViewCount|
+---+----------------+-----------+--------------------+----------+------------+------------------+--------------+--------------------+-------------+--------------------+--------------------+---------------------+----------------+----------------+-----------+--------+----------+-----+--------------------+--------------------+

In [23]:
dfSE.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- AcceptedAnswerId: integer (nullable = true)
 |-- AnswerCount: integer (nullable = true)
 |-- Body: string (nullable = true)
 |-- ClosedDate: timestamp (nullable = true)
 |-- CommentCount: integer (nullable = true)
 |-- CommunityOwnedDate: timestamp (nullable = true)
 |-- ContentLicense: string (nullable = true)
 |-- CreationDate: timestamp (nullable = true)
 |-- FavoriteCount: integer (nullable = true)
 |-- LastActivityDate: timestamp (nullable = true)
 |-- LastEditDate: timestamp (nullable = true)
 |-- LastEditorDisplayName: string (nullable = true)
 |-- LastEditorUserId: integer (nullable = true)
 |-- OwnerDisplayName: string (nullable = true)
 |-- OwnerUserId: integer (nullable = true)
 |-- ParentId: integer (nullable = true)
 |-- PostTypeId: integer (nullable = true)
 |-- Score: integer (nullable = true)
 |-- Tags: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- ViewCount: integer (nullable = true)



# Operaciones básicas con DataFrames


### Mostrar filas

In [24]:
# show(n) permite mostrar las primeras n filas (por defecto, n=20)
dfSE.show(5)

+---+----------------+-----------+--------------------+----------+------------+------------------+--------------+--------------------+-------------+--------------------+--------------------+---------------------+----------------+----------------+-----------+--------+----------+-----+--------------------+--------------------+---------+
| Id|AcceptedAnswerId|AnswerCount|                Body|ClosedDate|CommentCount|CommunityOwnedDate|ContentLicense|        CreationDate|FavoriteCount|    LastActivityDate|        LastEditDate|LastEditorDisplayName|LastEditorUserId|OwnerDisplayName|OwnerUserId|ParentId|PostTypeId|Score|                Tags|               Title|ViewCount|
+---+----------------+-----------+--------------------+----------+------------+------------------+--------------+--------------------+-------------+--------------------+--------------------+---------------------+----------------+----------------+-----------+--------+----------+-----+--------------------+--------------------+

In [25]:
# Podemos indicar que no trunque los campos largos
dfSE.show(5, truncate=False)

+---+----------------+-----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [26]:
# take(n) devuelve las n primeras filas como una lista Python de objetos Row
lista = dfSE.take(5)
print(lista[1])
print("\n")
# collect() devuelve todo el DataFrame como una lista Python de objetos Row
# Si el DataFrame es muy grande podría colapsar al Driver
#lista2 = dfSE.collect()
#print(lista2[10])

Row(Id=2, AcceptedAnswerId=None, AnswerCount=None, Body='<p>He encontrado la solución.</p><br/><br/><p>Resulta que las rutas tienen asignada una <strong>precedencia</strong> numérica asignada, que el motor de enrutamiento de WebApi usa para decidir qué ruta usar en caso de conflicto. Las rutas creadas automáticamente para una misma acción siempre tienen una precedencia distinta, ¡pero la ruta que yo estaba creando manualmente tenía la misma precedencia que la ya existente!</p><br/><br/><p>Así pues la solución es añadir lo siguiente a <code>GetActionDirectRoutes</code>, inmediatamente después de <code>new RouteEntry</code>:</p><br/><br/><pre><code>entry.Route.DataTokens["precedence"] = <br/>    ((decimal)route.Route.DataTokens["precedence"]) - 0.1M;<br/></code></pre><br/>', ClosedDate=None, CommentCount=2, CommunityOwnedDate=None, ContentLicense='CC BY-SA 3.0', CreationDate=datetime.datetime(2015, 10, 29, 19, 14, 23, 673000), FavoriteCount=None, LastActivityDate=datetime.datetime(2015, 

In [50]:
# sample(withReplacement, fraction, seed=None) devuelve un nuevo Dataframe con una fracción de las filas
dfSESampled = dfSE.sample(False, 0.1, seed=None)
print("N de filas original = {0}; n de filas muestreadas = {1}".format(dfSE.count(), dfSESampled.count()))

N de filas original = 410346; n de filas muestreadas = 40717


In [28]:
# limit(n) limita a n el número de filas obtenidas
dfSE_10filas = dfSE.sample(False, 0.1, seed=None).limit(10)
print("N de filas muestreadas = {0}".format(dfSE_10filas.count()))
dfSE_10filas.show()

N de filas muestreadas = 10
+---+----------------+-----------+--------------------+----------+------------+------------------+--------------+--------------------+-------------+--------------------+--------------------+---------------------+----------------+----------------+-----------+--------+----------+-----+--------------------+--------------------+---------+
| Id|AcceptedAnswerId|AnswerCount|                Body|ClosedDate|CommentCount|CommunityOwnedDate|ContentLicense|        CreationDate|FavoriteCount|    LastActivityDate|        LastEditDate|LastEditorDisplayName|LastEditorUserId|OwnerDisplayName|OwnerUserId|ParentId|PostTypeId|Score|                Tags|               Title|ViewCount|
+---+----------------+-----------+--------------------+----------+------------+------------------+--------------+--------------------+-------------+--------------------+--------------------+---------------------+----------------+----------------+-----------+--------+----------+-----+--------------

### Ejecutar una operación sobre cada una de las filas
El método `foreach` aplica una función a cada una de las filas

- El DataFrame no se modifica y no se crea ningún otro DataFrame
- El `foreach`se ejecuta en los workers

In [29]:
def printid(f) -> None:
    print(f["Id"])

# En teoría, deberían de imprimirse todos los valores de la columna id
# Debido a la forma de gestionar las tareas del Zeppelin, no se ve
# En un pyspark-shell si funciona
dfSE_10filas.foreach(printid)

2
18
24
27
34
35
46
61
76
77


### Seleccionar columnas

In [30]:
# Crea un nuevo DataFrame seleccionando columnas por nombre
dfIdBody: DataFrame = dfSE.select("Id", "Body")
dfIdBody.show(5)

print("El objeto dfIdCuerpo es de tipo {0}".format(type(dfIdBody)))

+---+--------------------+
| Id|                Body|
+---+--------------------+
|  1|<p>Estoy creando ...|
|  2|<p>He encontrado ...|
|  3|<p>Luego de ver c...|
|  4|<p><code>.AsStrin...|
|  5|<p>¿Cuál es la fo...|
+---+--------------------+
only showing top 5 rows

El objeto dfIdCuerpo es de tipo <class 'pyspark.sql.dataframe.DataFrame'>


In [31]:
# Otra forma de indicar a las columnas
dfIdBody2: DataFrame = dfSE.select(dfSE.Id, dfSE.Body)
dfIdBody2.show(5)

+---+--------------------+
| Id|                Body|
+---+--------------------+
|  1|<p>Estoy creando ...|
|  2|<p>He encontrado ...|
|  3|<p>Luego de ver c...|
|  4|<p><code>.AsStrin...|
|  5|<p>¿Cuál es la fo...|
+---+--------------------+
only showing top 5 rows



In [32]:
# También es posible indicar objetos de tipo Column
from pyspark.sql.functions import col

colId = col("Id")
colCreaDate = col("CreationDate")
print("El objeto colId es de tipo {0}".format(type(colId)))
print("El objeto colCreaDate es de tipo {0}".format(type(colCreaDate)))

El objeto colId es de tipo <class 'pyspark.sql.column.Column'>
El objeto colCreaDate es de tipo <class 'pyspark.sql.column.Column'>


In [33]:
# Y crear un DataFrame a partir de objetos Column, renombrando columnas
dfIdFechaCuerpo: DataFrame = dfSE.select(colId,
                              colCreaDate.alias("Fecha_Creación"),
                              dfSE.Body.alias("Cuerpo"))
dfIdFechaCuerpo.show(5)

+---+--------------------+--------------------+
| Id|      Fecha_Creación|              Cuerpo|
+---+--------------------+--------------------+
|  1|2015-10-29 15:56:...|<p>Estoy creando ...|
|  2|2015-10-29 19:14:...|<p>He encontrado ...|
|  3|2015-10-29 23:54:...|<p>Luego de ver c...|
|  4|2015-10-30 00:45:...|<p><code>.AsStrin...|
|  5|2015-10-30 01:15:...|<p>¿Cuál es la fo...|
+---+--------------------+--------------------+
only showing top 5 rows



In [35]:
from pyspark.sql.functions import expr
# El DataFrame anterior usando expresiones
dfIdFechaCuerpoExpr: DataFrame = dfSE.select(
                           expr("Id AS ID"),
                           expr('CreationDate AS `Fecha_Creación`'),
                           expr("Body AS Cuerpo"))
dfIdFechaCuerpoExpr.show(5)


+---+--------------------+--------------------+
| ID|      Fecha_Creación|              Cuerpo|
+---+--------------------+--------------------+
|  1|2015-10-29 15:56:...|<p>Estoy creando ...|
|  2|2015-10-29 19:14:...|<p>He encontrado ...|
|  3|2015-10-29 23:54:...|<p>Luego de ver c...|
|  4|2015-10-30 00:45:...|<p><code>.AsStrin...|
|  5|2015-10-30 01:15:...|<p>¿Cuál es la fo...|
+---+--------------------+--------------------+
only showing top 5 rows



In [36]:
# Se pueden usar expresiones más complejas
dfSE.selectExpr("*", # Selecciona todas las columnas
                "(AnswerCount IS NOT NULL) as respuestaValida").show()

+---+----------------+-----------+--------------------+----------+------------+------------------+--------------+--------------------+-------------+--------------------+--------------------+---------------------+----------------+----------------+-----------+--------+----------+-----+--------------------+--------------------+---------+---------------+
| Id|AcceptedAnswerId|AnswerCount|                Body|ClosedDate|CommentCount|CommunityOwnedDate|ContentLicense|        CreationDate|FavoriteCount|    LastActivityDate|        LastEditDate|LastEditorDisplayName|LastEditorUserId|OwnerDisplayName|OwnerUserId|ParentId|PostTypeId|Score|                Tags|               Title|ViewCount|respuestaValida|
+---+----------------+-----------+--------------------+----------+------------+------------------+--------------+--------------------+-------------+--------------------+--------------------+---------------------+----------------+----------------+-----------+--------+----------+-----+----------

### Renombrar, añadir y eliminar columnas


In [37]:
# Renombramos la columna creationDate
dfSE: DataFrame = dfSE.withColumnRenamed("CreationDate", "Fecha_de_creación")
dfSE.select("Fecha_de_creación",
            dfSE.ViewCount.alias("Número_de_vistas"),
            "Score",
            "PostTypeId")\
            .show(truncate=False)

+-----------------------+----------------+-----+----------+
|Fecha_de_creación      |Número_de_vistas|Score|PostTypeId|
+-----------------------+----------------+-----+----------+
|2015-10-29 15:56:52.933|780             |40   |1         |
|2015-10-29 19:14:23.673|NULL            |31   |2         |
|2015-10-29 23:54:31.947|1035            |20   |1         |
|2015-10-30 00:45:47.64 |NULL            |6    |2         |
|2015-10-30 01:15:27.267|37867           |37   |1         |
|2015-10-30 01:36:21.21 |3291            |27   |1         |
|2015-10-30 05:30:50.993|NULL            |29   |2         |
|2015-10-30 10:26:44.223|320             |14   |1         |
|2015-10-30 13:59:47.113|NULL            |12   |2         |
|2015-10-30 14:33:50.523|3407            |28   |1         |
|2015-10-30 14:43:43.737|NULL            |25   |2         |
|2015-10-30 15:37:40.45 |422             |28   |1         |
|2015-10-30 16:53:47.187|NULL            |0    |5         |
|2015-10-30 16:53:47.187|NULL           

In [38]:
# Añadimos una nueva columna con todos sus valores iguales a 1
from pyspark.sql.functions import lit
# lit convierte un literal en Python al formato interno de Spark
# (en este ejemplo IntegerType)
dfSE: DataFrame = dfSE.withColumn("unos", lit(1))
dfSE.show(5)

+---+----------------+-----------+--------------------+----------+------------+------------------+--------------+--------------------+-------------+--------------------+--------------------+---------------------+----------------+----------------+-----------+--------+----------+-----+--------------------+--------------------+---------+----+
| Id|AcceptedAnswerId|AnswerCount|                Body|ClosedDate|CommentCount|CommunityOwnedDate|ContentLicense|   Fecha_de_creación|FavoriteCount|    LastActivityDate|        LastEditDate|LastEditorDisplayName|LastEditorUserId|OwnerDisplayName|OwnerUserId|ParentId|PostTypeId|Score|                Tags|               Title|ViewCount|unos|
+---+----------------+-----------+--------------------+----------+------------+------------------+--------------+--------------------+-------------+--------------------+--------------------+---------------------+----------------+----------------+-----------+--------+----------+-----+--------------------+-----------

In [51]:
# Elimina una columna con drop
dfSE: DataFrame = dfSE.drop(col("unos"))
dfSE.columns

['Id',
 'AcceptedAnswerId',
 'AnswerCount',
 'Body',
 'ClosedDate',
 'CommentCount',
 'CommunityOwnedDate',
 'ContentLicense',
 'Fecha_de_creación',
 'FavoriteCount',
 'LastActivityDate',
 'LastEditDate',
 'LastEditorDisplayName',
 'LastEditorUserId',
 'OwnerDisplayName',
 'OwnerUserId',
 'ParentId',
 'PostTypeId',
 'Score',
 'Tags',
 'Title',
 'ViewCount']

### Eliminar valores nulos y duplicados

In [52]:
# Eliminamos todas las filas que tengan null en alguna de sus columnas
dfNoNulls: DataFrame = dfSE.dropna("any")
print("Numero de filas inicial: {0}; número de filas sin null: {1}"
       .format(dfSE.count(), dfNoNulls.count()))

Numero de filas inicial: 410346; número de filas sin null: 0


In [41]:
# Elimina las filas que tengan null en todas sus columnas
dfNingunNull = dfSE.dropna("all")
print("Número de filas con todo a null: {0}"
       .format(dfSE.count() - dfNingunNull.count()))

Número de filas con todo a null: 0


In [42]:
# Elimina las filas duplicadas
dfSinDuplicadas: DataFrame = dfSE.dropDuplicates()
print("Número de filas duplicadas: {0}"
       .format(dfSE.count() - dfSinDuplicadas.count()))

24/11/12 16:19:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/12 16:19:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/12 16:19:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/12 16:19:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/12 16:19:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/12 16:19:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/12 16:19:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/12 16:19:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/12 16:19:33 WARN RowBasedKeyValueBatch: Calling spill() on

Número de filas duplicadas: 0


                                                                                

In [43]:
# Elimina las filas duplicadas en alguna columna
dfSinUserDuplicado: DataFrame = dfSE.dropDuplicates(["OwnerUserId"])
print("Número de usuarios únicos: {0}"
       .format(dfSinUserDuplicado.count()))

[Stage 58:>                                                         (0 + 8) / 8]

Número de usuarios únicos: 78685


                                                                                

In [44]:
# Otros ejemplos
dfNoNullViewCountAcceptedAnswerId: DataFrame = dfSE\
        .dropna("any", subset=["ViewCount", "AcceptedAnswerId"])
print("Número de filas con ViewCount y AcceptedAnswerId no nulo: {0}"
       .format(dfNoNullViewCountAcceptedAnswerId.count()))
dfNoNullViewCountAcceptedAnswerId = dfSE\
        .dropna("all", subset=["ViewCount", "AcceptedAnswerId"])
print("Número de filas con ViewCount o AcceptedAnswerId no nulo: {0}"
       .format(dfNoNullViewCountAcceptedAnswerId.count()))

Número de filas con ViewCount y AcceptedAnswerId no nulo: 82491
Número de filas con ViewCount o AcceptedAnswerId no nulo: 194788


### Reemplazar valores

In [53]:
# Reemplazamos los null en los campos ViewCount y AnswerCount
dfSE.select("ViewCount", "AnswerCount").show(10)
dfSE: DataFrame = dfSE.fillna(0, subset=["ViewCount", "AnswerCount"])
dfSE.select("ViewCount", "AnswerCount").show(10)

+---------+-----------+
|ViewCount|AnswerCount|
+---------+-----------+
|      780|          1|
|     NULL|       NULL|
|     1035|          3|
|     NULL|       NULL|
|    37867|          7|
|     3291|          2|
|     NULL|       NULL|
|      320|          1|
|     NULL|       NULL|
|     3407|          1|
+---------+-----------+
only showing top 10 rows

+---------+-----------+
|ViewCount|AnswerCount|
+---------+-----------+
|      780|          1|
|        0|          0|
|     1035|          3|
|        0|          0|
|    37867|          7|
|     3291|          2|
|        0|          0|
|      320|          1|
|        0|          0|
|     3407|          1|
+---------+-----------+
only showing top 10 rows



In [54]:
# Reemplaza el valor 9 por 3000 en las columnas "Id" y "AcceptedAnswerId"
dfSE.select("Id", "AcceptedAnswerId").show(10)
dfSE.replace(9, 3000, subset=["Id", "AcceptedAnswerId"])\
    .select("Id", "AcceptedAnswerId")\
    .show(10)

+---+----------------+
| Id|AcceptedAnswerId|
+---+----------------+
|  1|               2|
|  2|            NULL|
|  3|               9|
|  4|            NULL|
|  5|             208|
|  6|             654|
|  7|            NULL|
|  8|             442|
|  9|            NULL|
| 10|              11|
+---+----------------+
only showing top 10 rows

+----+----------------+
|  Id|AcceptedAnswerId|
+----+----------------+
|   1|               2|
|   2|            NULL|
|   3|            3000|
|   4|            NULL|
|   5|             208|
|   6|             654|
|   7|            NULL|
|   8|             442|
|3000|            NULL|
|  10|              11|
+----+----------------+
only showing top 10 rows



# Guardando DataFrames

Al igual que con la lectura, Spark puede guardar los DataFrames en múltiples formatos

- CSV, JSON, Parquet, Hadoop...

También puede escribir en bases de datos

In [55]:
# Guardo el DataFrame dfSE en formato JSON
dfSE.write.format("json")\
    .mode("overwrite")\
    .save("dfSE.json")

                                                                                

In [56]:
%%sh
ls -lh dfSE.json
head dfSE.json/part-*.json

total 1842176
-rw-r--r--  1 luisi  staff     0B Nov 12 16:37 _SUCCESS
-rw-r--r--  1 luisi  staff   108M Nov 12 16:37 part-00000-4651a142-8d75-4646-82a7-7ff77757c3bf-c000.json
-rw-r--r--  1 luisi  staff   107M Nov 12 16:37 part-00001-4651a142-8d75-4646-82a7-7ff77757c3bf-c000.json
-rw-r--r--  1 luisi  staff   107M Nov 12 16:37 part-00002-4651a142-8d75-4646-82a7-7ff77757c3bf-c000.json
-rw-r--r--  1 luisi  staff   107M Nov 12 16:37 part-00003-4651a142-8d75-4646-82a7-7ff77757c3bf-c000.json
-rw-r--r--  1 luisi  staff   107M Nov 12 16:37 part-00004-4651a142-8d75-4646-82a7-7ff77757c3bf-c000.json
-rw-r--r--  1 luisi  staff   107M Nov 12 16:37 part-00005-4651a142-8d75-4646-82a7-7ff77757c3bf-c000.json
-rw-r--r--  1 luisi  staff   107M Nov 12 16:37 part-00006-4651a142-8d75-4646-82a7-7ff77757c3bf-c000.json
-rw-r--r--  1 luisi  staff   102M Nov 12 16:37 part-00007-4651a142-8d75-4646-82a7-7ff77757c3bf-c000.json
==> dfSE.json/part-00000-4651a142-8d75-4646-82a7-7ff77757c3bf-c000.json <==
{"Id":1,"Accep

In [57]:
# Guardo el DataFrame usando Parquet
dfSE.write.format("parquet")\
    .mode("overwrite")\
    .option("compression", "gzip")\
    .save("dfSE.parquet")

                                                                                

In [58]:
print(dfSE.rdd.getNumPartitions())

8


In [59]:
%%sh
# Parquet usa por defecto formato comprimido snappy
ls -lh dfSE.parquet

total 445440
-rw-r--r--  1 luisi  staff     0B Nov 12 16:37 _SUCCESS
-rw-r--r--  1 luisi  staff    27M Nov 12 16:37 part-00000-8cca6956-dabb-4049-9a57-28d2aa70001b-c000.gz.parquet
-rw-r--r--  1 luisi  staff    26M Nov 12 16:37 part-00001-8cca6956-dabb-4049-9a57-28d2aa70001b-c000.gz.parquet
-rw-r--r--  1 luisi  staff    26M Nov 12 16:37 part-00002-8cca6956-dabb-4049-9a57-28d2aa70001b-c000.gz.parquet
-rw-r--r--  1 luisi  staff    27M Nov 12 16:37 part-00003-8cca6956-dabb-4049-9a57-28d2aa70001b-c000.gz.parquet
-rw-r--r--  1 luisi  staff    27M Nov 12 16:37 part-00004-8cca6956-dabb-4049-9a57-28d2aa70001b-c000.gz.parquet
-rw-r--r--  1 luisi  staff    27M Nov 12 16:37 part-00005-8cca6956-dabb-4049-9a57-28d2aa70001b-c000.gz.parquet
-rw-r--r--  1 luisi  staff    27M Nov 12 16:37 part-00006-8cca6956-dabb-4049-9a57-28d2aa70001b-c000.gz.parquet
-rw-r--r--  1 luisi  staff    26M Nov 12 16:37 part-00007-8cca6956-dabb-4049-9a57-28d2aa70001b-c000.gz.parquet


Se crean tantos ficheros como particiones tenga el DataFrame

In [60]:
dfSE2 = dfSE.repartition(2)
# Guardo el DataFrame  usando Parquet, con compresión gzip
dfSE2.write.format("parquet")\
     .mode("overwrite")\
     .option("compression", "gzip")\
     .save("dfSE2.parquet")

                                                                                

In [61]:
%%sh
ls -lh dfSE2.parquet

total 459008
-rw-r--r--  1 luisi  staff     0B Nov 12 16:39 _SUCCESS
-rw-r--r--  1 luisi  staff   112M Nov 12 16:39 part-00000-11f17a8b-d8ea-453b-a904-d60b39cf3137-c000.gz.parquet
-rw-r--r--  1 luisi  staff   111M Nov 12 16:39 part-00001-11f17a8b-d8ea-453b-a904-d60b39cf3137-c000.gz.parquet


#### Particionado
Permite particionar los ficheros guardados por el valor de una columna

- Se crea un directorio por cada valor diferente en la columna de particionado
    - Todos los datos asociados a ese valor se guardan en ese directorio
- Permite simplificar el acceso a los valores asociados a una clave


In [62]:
# Guardo el DataFrame particionado por el PostTypeId (usando Parquet)
dfSE.write.format("parquet")\
    .mode("overwrite")\
    .partitionBy("PostTypeId")\
    .save("dfSE-particionado.parquet")

                                                                                

In [63]:
%%sh
ls dfSE-particionado.parquet
ls -lh dfSE-particionado.parquet/PostTypeId=2
rm -rf dfSE-particionado.parquet

[34mPostTypeId=1[m[m
[34mPostTypeId=2[m[m
[34mPostTypeId=4[m[m
[34mPostTypeId=5[m[m
[34mPostTypeId=6[m[m
[34mPostTypeId=7[m[m
_SUCCESS
total 302080
-rw-r--r--  1 luisi  staff    21M Nov 12 16:39 part-00000-d2a30cf6-dd00-4fce-8015-4978d288a55d.c000.snappy.parquet
-rw-r--r--  1 luisi  staff    19M Nov 12 16:39 part-00001-d2a30cf6-dd00-4fce-8015-4978d288a55d.c000.snappy.parquet
-rw-r--r--  1 luisi  staff    18M Nov 12 16:39 part-00002-d2a30cf6-dd00-4fce-8015-4978d288a55d.c000.snappy.parquet
-rw-r--r--  1 luisi  staff    18M Nov 12 16:39 part-00003-d2a30cf6-dd00-4fce-8015-4978d288a55d.c000.snappy.parquet
-rw-r--r--  1 luisi  staff    18M Nov 12 16:39 part-00004-d2a30cf6-dd00-4fce-8015-4978d288a55d.c000.snappy.parquet
-rw-r--r--  1 luisi  staff    18M Nov 12 16:39 part-00005-d2a30cf6-dd00-4fce-8015-4978d288a55d.c000.snappy.parquet
-rw-r--r--  1 luisi  staff    17M Nov 12 16:39 part-00006-d2a30cf6-dd00-4fce-8015-4978d288a55d.c000.snappy.parquet
-rw-r--r--  1 luisi  staff   