# Fundamentos de Apache Spark: RDDs


En este notebook trabajaremos con los RDDs que forma parte del Spark Core.La implementación de Spark Core es un **RDD (Resilient Distributed Dataset)** que es una colección de datos distribuidos en diferentes nodos del clúster que se procesan en paralelo.

Utilizaremos la API de PySpark, pero los conceptos aplican por igual a todas las APIs (Scala, R, etc)

### Inicialización de Spark en Notebooks

In [3]:
conda install -c conda-forge findspark

Collecting package metadata (current_repodata.json): ...working... done
Solving environment: ...working... done

## Package Plan ##

  environment location: C:\Users\HP\anaconda3

  added / updated specs:
    - findspark


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    ca-certificates-2021.10.8  |       h5b45459_0         176 KB  conda-forge
    certifi-2021.10.8          |   py39hcbf5309_1         145 KB  conda-forge
    conda-4.11.0               |   py39hcbf5309_0        16.8 MB  conda-forge
    findspark-2.0.1            |     pyhd8ed1ab_0           8 KB  conda-forge
    openssl-1.1.1l             |       h8ffe710_0         5.7 MB  conda-forge
    python_abi-3.9             |           2_cp39           4 KB  conda-forge
    ------------------------------------------------------------
                                           Total:        22.9 MB

The following NEW packages will be

In [1]:
import findspark
findspark.init()

import pandas as pd
import pyspark

In [2]:
from pyspark.sql import SparkSession

### Crear el SparkSession y el SparkContext

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local[*]")\
        .appName('PySpark_training')\
        .getOrCreate()

In [5]:
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

### Crear un RDD de una colección

In [6]:
num = [1,2,3,4,5]

num_rdd = sc.parallelize(num)
num_rdd.collect()

[1, 2, 3, 4, 5]

# Transformaciones
* Como sabemos, las Transformaciones son de naturaleza perezosa y no se ejecutarán hasta que se ejecute una Acción sobre ellas.
* Intentemos comprender las distintas transformaciones disponibles.

### map
* Esto mapeará su entrada a alguna salida basada en la función especificada en la función 

In [7]:
double_rdd = num_rdd.map(lambda x : x * 2)
double_rdd.collect()

[2, 4, 6, 8, 10]

### filtro
* Para filtrar los datos en función de una determinada condición. Intentemos encontrar los números pares de num_rdd.

In [8]:
even_rdd = num_rdd.filter(lambda x : x % 2 == 0)
even_rdd.collect()

[2, 4]

### flatMap
* Esta función es muy similar a map, pero puede devolver múltiples elementos para cada entrada en el RDD dado.

In [9]:
flat_rdd = num_rdd.flatMap(lambda x : range(1,x))
flat_rdd.collect()

[1, 1, 2, 1, 2, 3, 1, 2, 3, 4]

### distinct
* Esto devolverá elementos distintos de un RDD.

In [10]:
rdd1 = sc.parallelize([10, 11, 10, 11, 12, 11])
dist_rdd = rdd1.distinct()
dist_rdd.collect()

[10, 11, 12]

### reduceByKey
* Esta función reduce los pares de valores clave en función de las claves y una función determinada dentro de reduceByKey

In [10]:
pairs = [ ("a", 5), ("b", 7), ("c", 2), ("a", 3), ("b", 1), ("c", 4)]
pair_rdd = sc.parallelize(pairs)

output = pair_rdd.reduceByKey(lambda x, y : x + y)

result = output.collect()
print(*result, sep='\n')

('a', 8)
('b', 8)
('c', 6)


### groupByKey
* Esta función es otra función ByKey que puede operar en un par (clave, valor) RDD pero esto solo agrupará los valores basados en las claves. En otras palabras, esto solo realizará el primer paso de reduceByKey.

In [11]:
grp_out = pair_rdd.groupByKey()
grp_out.collect()

[('a', <pyspark.resultiterable.ResultIterable at 0x1cc3ebdd2e0>),
 ('b', <pyspark.resultiterable.ResultIterable at 0x1cc2e1d5100>),
 ('c', <pyspark.resultiterable.ResultIterable at 0x1cc2f007e50>)]

### sortByKey
* Esta función realizará la clasificación en un par (clave, valor) RDD basado en las claves. De forma predeterminada, la clasificación se realizará en orden ascendente.

In [8]:
pairs = [ ("a", 5), ("d", 7), ("c", 2), ("b", 3)]
raw_rdd = sc.parallelize(pairs)

sortkey_rdd = raw_rdd.sortByKey()
result = sortkey_rdd.collect()
print(*result,sep='\n')

# Para clasificar en orden descendente, pase  “ascending=False”.

('a', 5)
('b', 3)
('c', 2)
('d', 7)


### Ordenar por
* sortBy es una función más generalizada para ordenar.

In [15]:
# Create RDD.
pairs = [ ("a", 5, 10), ("d", 7, 12), ("c", 2, 11), ("b", 3, 9)]
raw_rdd = sc.parallelize(pairs)

# Let’s try to do the sorting based on the 3rd element of the tuple.
sort_out = raw_rdd.sortBy(lambda x : x[2])
result = sort_out.collect()
print(*result, sep='\n')

('b', 3, 9)
('a', 5, 10)
('c', 2, 11)
('d', 7, 12)


# Acciones

* Las acciones son operaciones en RDD que se ejecutan inmediatamente. Mientras que las transformaciones devuelven otro RDD, las acciones devuelven estructuras de datos nativas 

### count
* Esto contará el número de elementos en el RDD dado.

In [12]:
num = sc.parallelize([1,2,3,4,2])
num.count()

5

### first
* Esto devolverá el primer elemento del RDD dado.

In [13]:
num.first()

1

### Collect
* Esto devolverá todos los elementos para el RDD dado.


In [14]:
num.collect()

[1, 2, 3, 4, 2]

**No debemos utilizar la operación de collect mientras trabajamos con grandes conjuntos de datos**. Porque devolverá todos los datos que se distribuyen entre los diferentes trabajadores dl clúster a un controlador. Todos los datos viajarán a través de la red del trabajador al conductor y también el conductor necesitaría almacenar todos los datos. Esto obstaculizará el rendimiento de su aplicación.

### Take
* Esto devolverá el número de elementos especificados.

In [11]:
num.take(3)

AttributeError: 'list' object has no attribute 'take'