In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=a8a6aeefac62ef124892d16ff05403b4a72044198d93adda74e0de114e8f09c8
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


# Inicializacion del entorno de PySpark

In [47]:
from pyspark import SparkContext
# crea el contexto de spark, maneja el cluster y las tareas
sc = SparkContext()

# Inicializacion de SQL Spark

In [3]:
from pyspark.sql import SparkSession

# para trabajar con dataframes y datasets
spark = SparkSession.builder.getOrCreate()

# Apertura de Archivos, Creaccion de RDDs y DataFrames

In [4]:
#pasar un txt a rdd
textfile = sc.textFile("/content/datasets/ejemplo.txt")

#no se muestra hasta que no ocurra una accion
textfile

/content/datasets/ejemplo.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [5]:
#pase csv a rdd
#para este caso es mejor abrirlo con un metodo distinto

cvsfile = spark.read.option("header", True).csv("/content/datasets/ratings.csv")
cvsfile.take(5)

[Row(userId='1', movieId='1', rating='4.0', timestamp='964982703'),
 Row(userId='1', movieId='3', rating='4.0', timestamp='964981247'),
 Row(userId='1', movieId='6', rating='4.0', timestamp='964982224'),
 Row(userId='1', movieId='47', rating='5.0', timestamp='964983815'),
 Row(userId='1', movieId='50', rating='5.0', timestamp='964982931')]

In [6]:
#toma solo los elementos de la columna movieId

# Leer el archivo CSV, CSV a DataFrame
df = spark.read.csv("/content/datasets/ratings.csv", header=True, inferSchema=True)

# Nombre de la columna que deseas seleccionar
column_name = "movieId"

# Seleccionar la columna y convertirla a un RDD
column_movieId = df.select(column_name).rdd.map(lambda row: row[0])

# Mostrar los primeros elementos del RDD
print(column_movieId.take(10))

[1, 3, 6, 47, 50, 70, 101, 110, 151, 157]


# Acciones

In [7]:
#para mostrar
textfile.collect()

['primera linea ', 'segunda linea', 'tercera linea', 'cuarta linea']

In [8]:
#muetra el primer elemento
textfile.first()

'primera linea '

In [9]:
#imprime un numero determinado de lineas
textfile.take(3)

['primera linea ', 'segunda linea', 'tercera linea']

In [10]:
#cuenta los elementos
textfile.count()

4

In [11]:
#muestra dataframes
df.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



# Transformaciones


In [12]:
#Filtrado por la palabra segunda
segunda = textfile.filter( lambda  linea : "segunda" in linea )
segunda.collect()

['segunda linea']

In [13]:
# map()

#suma 1
def suma(x):
  return(x+1)

In [14]:
#aplica la funcion suma a cada uno de los elemento de column_movieID
column_movieId_suma = column_movieId.map(suma)
column_movieId_suma.take(5)

[2, 4, 7, 48, 51]

In [15]:
#Se pueden aplicar varias funciones a la vez y guardar en tuplas

column_movieId_variado = (column_movieId
                          .map(suma)
                          .map(lambda x: (x,x+2))
                          )

column_movieId_variado.take(5)

[(2, 4), (4, 6), (7, 9), (48, 50), (51, 53)]

In [16]:
#flatMap()
#transforma a una lista simple, lo aplana

column_movieId_flat = (column_movieId
                          .map(suma)
                          .flatMap(lambda x: (x,x+2))
                          )

column_movieId_flat.take(5)

[2, 4, 4, 6, 7]

# RDD clave - valor

## Desde archivo csv

In [17]:
# crear una lista de tuplas de un archivo

#abrir el archivo
df_2 = spark.read.csv("/content/datasets/movies.csv", header=True, inferSchema=True)

# Seleccionar la columna y convertirla a un RDD
lista_tupla =  df_2.rdd.map(lambda row: (row[1],row[0]))

lista_tupla.take(5)


[('Toy Story (1995)', 1),
 ('Jumanji (1995)', 2),
 ('Grumpier Old Men (1995)', 3),
 ('Waiting to Exhale (1995)', 4),
 ('Father of the Bride Part II (1995)', 5)]

## Desde archivo txt

In [18]:

lista_tupla1 = textfile.map(lambda x: (x.split(" ")[0],x))

lista_tupla1.collect()

[('primera', 'primera linea '),
 ('segunda', 'segunda linea'),
 ('tercera', 'tercera linea'),
 ('cuarta', 'cuarta linea')]

### zip

In [19]:
lista_tupla2 = sc.parallelize(zip(("a","b","c"),range(1,4,1)))

lista_tupla2.collect()

[('a', 1), ('b', 2), ('c', 3)]

## keyby

In [20]:
rdd = sc.parallelize(range(1,4,1))

lista_tupla3 = rdd.keyBy(lambda x: x+2 )
lista_tupla3.collect()

[(3, 1), (4, 2), (5, 3)]

## ZipWithIndex

In [21]:
#coloca id consecutivos comenzando de 0, pero no necesriamente unicos
rdd = sc.parallelize(["a","b","c"])

lista_tupla4 = rdd.zipWithIndex()
lista_tupla4.collect()

[('a', 0), ('b', 1), ('c', 2)]

## ZipWithUniqueID

In [22]:
# coloca id unicos y no necesariamente consecutivos

rdd = sc.parallelize(["a","b","c"])

lista_tupla5 = rdd.zipWithUniqueId()
lista_tupla5.collect()

[('a', 0), ('b', 1), ('c', 3)]

# Persistencia

La persistencia no se hereda

In [23]:
from pyspark import StorageLevel

# permite verificar si el rdd esta guardado en cache
rdd.is_cached

False

In [24]:
# permite guardar en cache
rdd.cache()

ParallelCollectionRDD[66] at readRDDFromFile at PythonRDD.scala:289

In [25]:
# ahora se guarda en cache
rdd.is_cached

True

In [26]:
#nivel de almacenamiento del rdd
rdd.getStorageLevel()

StorageLevel(False, True, False, False, 1)

In [27]:
#imprime cuantas veces esta replicado el rdd
print(rdd.getStorageLevel())

Memory Serialized 1x Replicated


In [28]:
#creo un nuevo rdd
rdd2 = rdd.map(lambda x: x+1)
#lo guardo en otra memoria
rdd2.persist(StorageLevel.MEMORY_AND_DISK)

#memorias usadas en pyspark
# StorageLevel.MEMORY_ONLY
# StorageLevel.MEMORY_AND_DISK
# StorageLevel.MEMORY_ONLY_SER
# StorageLevel.MEMORY_AND_DISK_SER
# StorageLevel.DISK_ONLY
# StorageLevel.OFF_HEAP

PythonRDD[68] at RDD at PythonRDD.scala:53

In [29]:
rdd2.getStorageLevel()

StorageLevel(True, True, False, False, 1)

In [30]:
print(rdd2.getStorageLevel())

Disk Memory Serialized 1x Replicated


# Particionado

El particionado se hereda

In [61]:
#rdd con 4 perticiones
rdd = sc.parallelize([1,1,2,2,3,3,4,4],4)
rdd.glom().collect()

[[1, 1], [2, 2], [3, 3], [4, 4]]

## getNumberPartitions

In [50]:
#verifica el numero de particiones
rdd.getNumPartitions()

4

## glom()

In [51]:
# agrupa los elementos de cada particion en listas
rdd.glom().collect()

[[1, 1], [2, 2], [3, 3], [4, 4]]

## reduceByKey

In [52]:
rdd_4 = rdd.map(lambda x: (x,x))
rdd_4.glom().collect()

[[(1, 1), (1, 1)], [(2, 2), (2, 2)], [(3, 3), (3, 3)], [(4, 4), (4, 4)]]

In [53]:

rdd_4.reduceByKey(lambda x,y: x+y).glom().collect()

[[(4, 8)], [(1, 2)], [(2, 4)], [(3, 6)]]

In [54]:
#particiona a 2
rdd_4.reduceByKey(lambda x,y: x+y,2).glom().collect()

[[(2, 4), (4, 8)], [(1, 2), (3, 6)]]

## reparticion()

In [55]:
#genera un numero dado de particiones, incluso vacias
rdd.repartition(4).glom().collect()

[[3, 3], [1, 1, 2, 2], [], [4, 4]]

coalesce()

In [56]:
#reduce el numero de particiones
rdd.coalesce(2).glom().collect()

[[1, 1, 2, 2], [3, 3, 4, 4]]

particionBy()

In [62]:
#para reducir particiones en  rdd de clave-valor
rdd_4.partitionBy(2).glom().collect()

[[(2, 2), (2, 2), (4, 4), (4, 4)], [(1, 1), (1, 1), (3, 3), (3, 3)]]

In [40]:
#detiene el entorno
#sc.stop()

# SQL Spark DataFrame