In [1]:
#@title Instalar Spark
# Instalar SDK java 8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Descargar Spark 3.4.3
!wget -q https://archive.apache.org/dist/spark/spark-3.4.3/spark-3.4.3-bin-hadoop3.tgz

# Descomprimir el archivo descargado de Spark
!tar xf spark-3.4.3-bin-hadoop3.tgz

# Establecer las variables de entorno
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.3-bin-hadoop3"

# Instalar la librería findspark
!pip install -q findspark

# Instalar pyspark
!pip install -q pyspark


[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
import findspark
findspark.init()

from pyspark.sql import SparkSession


spark = SparkSession.builder.getOrCreate()

# Punto de entrada para crear los RDD y acumuladores
sc = spark.sparkContext

In [5]:
# Crear RDD
rdd = sc.parallelize([item for item in range(10)]).map(lambda x: (x, x ** 2))
rdd.collect()

[(0, 0),
 (1, 1),
 (2, 4),
 (3, 9),
 (4, 16),
 (5, 25),
 (6, 36),
 (7, 49),
 (8, 64),
 (9, 81)]

In [6]:
# Crear dataframe a partir de un RDD
df = rdd.toDF(['numero', 'cudrado'])

In [11]:
# Ver el esquema
df.printSchema()

root
 |-- numero: long (nullable = true)
 |-- cudrado: long (nullable = true)



In [9]:
# Ver primeros registros del dataframe
df.show()

+------+-------+
|numero|cudrado|
+------+-------+
|     0|      0|
|     1|      1|
|     2|      4|
|     3|      9|
|     4|     16|
|     5|     25|
|     6|     36|
|     7|     49|
|     8|     64|
|     9|     81|
+------+-------+



In [10]:
# Crear un DataFrame a partir de un RDD con schema
# Decirle explicitamente el esquema
rdd1 = sc.parallelize([(1, 'Jose', 35.5), (2, 'Teresa', 54.3), (3, 'Katia', 12.7)])

In [13]:
# Librerías
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

In [14]:
# Primera vía para especificar el esquema
esquema1 = StructType(
    [
     StructField('id', IntegerType(), True),
     StructField('nombre', StringType(), True),
     StructField('saldo', DoubleType(), True)
    ]
)


In [15]:
# Segunda vía para especificar el esquema
esquema2 = "`id` INT, `nombre` STRING, `saldo` DOUBLE"

In [16]:
df1 = spark.createDataFrame(rdd1, schema=esquema1)

In [17]:
df1.printSchema()

root
 |-- id: integer (nullable = true)
 |-- nombre: string (nullable = true)
 |-- saldo: double (nullable = true)



In [18]:
df1.show()

+---+------+-----+
| id|nombre|saldo|
+---+------+-----+
|  1|  Jose| 35.5|
|  2|Teresa| 54.3|
|  3| Katia| 12.7|
+---+------+-----+



In [19]:
df2 = spark.createDataFrame(rdd1, schema=esquema2)

In [20]:
df2.printSchema()

root
 |-- id: integer (nullable = true)
 |-- nombre: string (nullable = true)
 |-- saldo: double (nullable = true)



In [21]:
df2.show()

+---+------+-----+
| id|nombre|saldo|
+---+------+-----+
|  1|  Jose| 35.5|
|  2|Teresa| 54.3|
|  3| Katia| 12.7|
+---+------+-----+



In [24]:
# Crear un DataFrame a partir de un rango de números
spark.range(5).toDF('id').show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [25]:
# Crear un DataFrame a partir de un rango de números
spark.range(3, 15).toDF('id').show()

+---+
| id|
+---+
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
+---+



In [26]:
# Crear un DataFrame a partir de un rango de números
spark.range(0, 20, 2).toDF('id').show()


+---+
| id|
+---+
|  0|
|  2|
|  4|
|  6|
|  8|
| 10|
| 12|
| 14|
| 16|
| 18|
+---+

