<a href="https://colab.research.google.com/github/plerzundidev/pyspark-ejemplos/blob/main/Pyspark_Convert_PySpark_RDD_to_DataFrame.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Iniciamos librerias esenciales para su funcionamiento

In [4]:
# Levantamos google drive para ejecutar el instalador
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [5]:
# Ahora iniciamos pyspark
exec(open('/content/drive/MyDrive/BigDataSw/spark_colab_installer_new.py').read())

Servicios Activos:
5697 Jps
3713 SparkSubmit
1779 NameNode
2073 JobHistoryServer
1930 ResourceManager
5514 DataNode
1997 NodeManager

Apache Spark installed


En PySpark, la función toDF() del RDD se utiliza para convertir RDD a DataFrame. Necesitaríamos convertir RDD a DataFrame ya que DataFrame proporciona más ventajas sobre RDD. Por ejemplo, DataFrame es una colección distribuida de datos organizados en columnas con nombre similar a las tablas de bases de datos y proporciona optimización y mejoras de rendimiento.

# 1. Crear PySpark RDD

En primer lugar, vamos a crear un RDD pasando el objeto de lista Python a la función sparkContext.parallelize(). Necesitaremos este objeto rdd para todos nuestros ejemplos a continuación.

En PySpark, cuando tienes datos en una lista significa que tienes una colección de datos en la memoria de un controlador PySpark cuando creas un RDD, esta colección va a ser paralelizada.

In [6]:
# esencial para que el contenedor reconozca la instalacion de pyspark
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

In [7]:
# Generamos un rdd
dept = [("Finance",10),("Marketing",20),("Sales",30),("IT",40)]
rdd = spark.sparkContext.parallelize(dept)

In [8]:
rdd.take(3)

[('Finance', 10), ('Marketing', 20), ('Sales', 30)]

# 2. Convertir PySpark RDD a DataFrame

La conversión de PySpark RDD a DataFrame se puede hacer usando toDF(), createDataFrame(). En esta sección, voy a explicar estos dos métodos.

## 2.1 Utilización de la función rdd.toDF()

PySpark proporciona la función toDF() en RDD que se puede utilizar para convertir RDD en Dataframe

Por defecto, la función toDF() crea nombres de columna como "_1" y "_2". Este fragmento produce el siguiente esquema.

In [9]:
df = rdd.toDF()
df.printSchema()
df.show(truncate=False)

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)

+---------+---+
|_1       |_2 |
+---------+---+
|Finance  |10 |
|Marketing|20 |
|Sales    |30 |
|IT       |40 |
+---------+---+



toDF() tiene otra firma que toma argumentos para definir los nombres de las columnas como se muestra a continuación.

Salida del siguiente esquema.

In [10]:
deptColumns = ["dept_name","dept_id"]
df2 = rdd.toDF(deptColumns)
df2.printSchema()
df2.show(truncate=False)

root
 |-- dept_name: string (nullable = true)
 |-- dept_id: long (nullable = true)

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+



## 2.2 Uso de la función createDataFrame() de PySpark
La clase SparkSession proporciona el método createDataFrame() para crear el DataFrame y toma el objeto rdd como argumento.

In [11]:
deptDF = spark.createDataFrame(rdd, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)

root
 |-- dept_name: string (nullable = true)
 |-- dept_id: long (nullable = true)

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+



## 2.3 Utilización de createDataFrame() con el esquema StructType

Cuando se infiere el esquema, por defecto el tipo de datos de las columnas se deriva de los datos y se establece nullable a true para todas las columnas. Podemos cambiar este comportamiento proporcionando el esquema utilizando StructType - donde podemos especificar un nombre de columna, tipo de datos y nullable para cada campo/columna.

Si desea saber más acerca de StructType, por favor vaya a través de cómo utilizar StructType y StructField para definir el esquema personalizado.

In [12]:
from pyspark.sql.types import StructType,StructField, StringType
deptSchema = StructType([
    StructField('dept_name', StringType(), True),
    StructField('dept_id', StringType(), True)
])

deptDF1 = spark.createDataFrame(rdd, schema = deptSchema)
deptDF1.printSchema()
deptDF1.show(truncate=False)

root
 |-- dept_name: string (nullable = true)
 |-- dept_id: string (nullable = true)

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+

