Sistema diseñado para procesar datos de manera distribuida sobre clusters. Spark puede procesar cantidades de datos en el orden de terabytes incluso petabytes.

El concepto de Spark es imaginar un cluster como una memoria gigante, la memoria resultante de combinar las memorias de todos los clusters. Se prioriza el uso de memoria y consigue ser muy rápido el procesamiento de la inforamción, mayor que si utilizara MapReduce (Google).

Spark utiliza MapReduce para algunas ateas de clasificación mediante regresión. Apache Spark esta implementado en Scala que es ejecutado en la máquina virtual de Java. Además de Spark ofrece interfaces de programación para Java, Python y R.

Caracteristicas: velocidad de procesamiento, soporte multilenguaje, análisis avanzado. Se puede desarrollar en tres maneras:
1:)solo : standalone. Utiliza como base HDFS y encima se encuentra spark.
2:)Hadoop (Yarn, adminsitrador de recursos) : Base HDFS -> Yarn/Mesos(kernel administrador del cluster) -> Spark
3:)Spark con MapReduce (Spark In MapReduce, SIMR) : Base HDFS -> MapReduce y dentro se encuentra Spark

Componentes de spark
Cuando se ejecuta Spark en un ambiente distribuida se distinguen dos tipos de procesos: driver y executor. Un proceso driver conectado a 3 procesos excutor localizados en dos nodos del cluster:

driver: proceso principal. Este proceso tiene un objeto SparkContext que te permite conectar con el gestor del cluster y reservar procesos executor en los distintos nodos del cluster. Cada uno de los nodos del cluster (tambien se conocen como worker) pordrá ejecutar uno o varios procesos executor que almaenará fragmentos de los datos del programa y realizará operaciones sobre ellos. Durante la ejecución irá enviando peticiones a los distintos procesos executor que pueden contactar enre ellos para realizar tareas y comunicar con el proceso driver para devolver resultados.

RDD (Resilient Distributed Datasets)
Tipo de datos básicos. Estos datos almacenan información de manera distribuida entre todos los equipos del cluster. Durante la ejecución de un programa Sparck se construyen varios RDDs que se dividen en distintos fragmentos y son almacenados en la memoria de los equipos del cluster.

Caracteristicas
Están formados por un conjunto de registros, también llamados elementos, todos del mismo tipo. Por ejemplo, si cargamos un RDD a partir de un archivo plano se creará un RDD de cadenas de texto, una por cada linea del archivo.

En Scala o Java, al declarar un RDD se debe definir el tipo de los registros:

Lenguajes estáticos : RDD[String] en Scala y JavaRDD en Java.
Lenguajes dinámicos : Python se pueden mezclar los tipos de datos.
RDDs han sido diseñados desde el inicio para ser distribuidos: Los registros que lo componen se repartirán entre los clúster.

Para realizar esta distribución: los RDDs se dividen en particiones. Cada partición se almacena únciamente en un proceso executor dentro de un nodo del clúster, aunque un proceso executor puede albergar distintas particiones de distintos RDDs. El número de particiones en las que dividir un RDD se puede configurar e incluso cambiar a lo largo de la ejecución, por default es el número de núcleos de procesamiento disponibles en el clúster.

Para decidir qué registros forman parte de cada partición, Spark utiliza particionadores, que son funciones que toman un registro y devuelven el número de la partición a la que pertenecen. Estos particionadores se puede configurar si se desea mejorar el rendimiento o dejar el default.

Los RDDs son inmutables: no se puede modificar ni actualizar. Una vez creado, así permanece hasta que se termina la ejecución del programa.

Ejemplo
Cómo se podría particionar un RDD de 13 parejas (int, str) sobre 3 procesos executor utilizando el rango de valores del primer elemento de la pareja.

![particionandoRDD.png](attachment:particionandoRDD.png)

Los RDDs son inmutables: no se puede modificar ni actualizar. Una vez creado, así permanece hasta que termina la ejecución del programa.

RDDs admiten dos tipos de operaciones:
    transformaciones y acciones
pero ninguna de ellas modifica el RDD. 
*Transformaciones:
    son operaciones que toman un RDD de partida y crean un nuevo
    RDD, dejando el original intacto. 
    Ejemplos: son aplicar una
    función a todos los registros del RDD
    (por ejemplo, sumar una cierta cantidad), 
    filtrar únicamente aquellos registros que cumplan una cierta condición u ordenarlos
    mediante algún campo.
*Acciones:
    son operaciones que realizan algún cómputo sobre
    el RDD y devuelven un valor, dejando también el 
    RDD original inalterado. 
    Ejemplos : sumar todos los elementos almacenados 
    en un RDD de números, generando un valor final, o
    "vaciar" un RDD a un archivo de texto. 
    
    
Concepto importante del RDD: Resilencia
    Tienen la capacidad de recuperar su estado inicial
    cuando existe algún problema.
    Esto se debe a que los RDDs son particionados y cada 
    partición ha sido almacenada en un proceso executor. 
    Por lo que apartir de su estado inicial, repite las
    transformaciónes que tiene programadas. De esta manera,
    regenera las particiones perdidas.
    
   https://spark.apache.org/docs/1.1.1/api/python/pyspark.rdd.RDD-class.html

In [26]:
#conda install -c conda-forge findspark
import findspark
findspark.init()
findspark.find()

'C:\\spark'

In [27]:
import pyspark

In [28]:
sc = pyspark.SparkContext(appName = "prueba")#sc objeto que apunta al sc al cluster local

In [29]:
type(sc)

pyspark.context.SparkContext

In [5]:
sc.master

'local[*]'

In [6]:
sc.appName

'prueba'

In [7]:
import random

In [8]:
#Creando un RDD de 5 enteros: se crea en la memoria del proceso driver
r = sc.parallelize([1, 2, 3, 4, 5])
type(r)

pyspark.rdd.RDD

In [9]:
print(r)

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274


In [10]:
#Creando un RDD de 3 cadenas de texto
r = sc.parallelize(["hola", "hi", "ciao"])
type(r)

pyspark.rdd.RDD

In [11]:
r.collect()

['hola', 'hi', 'ciao']

In [12]:
#Al crear RDDs a partir de colecciones en memoria del proceso driver estamos
#limitados a la memoria que este proceso tenga disponible. 
#Por eso, es mejor generar RDDs a partir de archivos.
#Spark proporciona el método textFile, que carga el arch de texto y genera un RDD
#de cadenas de texto
#en un sistema distribuido : "hdfs://node:port/data/file.txt, Amazon S3 "s3n://bucket/file.txt"
#Spark cuenta con métodos para abrir archivos en formato hadoop sc.hadoopFile
r = sc.textFile("D:\\7 Semestre\\Big Data\\Jupiter\\titanic.csv")
print(r)

D:\7 Semestre\Big Data\Jupiter\titanic.csv MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0


In [13]:
print(r.collect(),"\n")

['survived,pclass,sex,age,sibsp,parch,fare,embarked,class,who,adult_male,deck,embark_town,alive,alone', '0,4,male,22.0,1,0,7.25,S,Third,man,True,,Southampton,no,False', '1,1,female,38.0,1,0,71.2833,C,First,woman,False,C,Cherbourg,yes,False', '1,3,female,26.0,0,0,7.925,S,Third,woman,False,,Southampton,yes,True', '1,1,female,35.0,1,0,53.1,S,First,woman,False,C,Southampton,yes,False', '0,3,male,35.0,0,0,8.05,S,Third,man,True,,Southampton,no,True', '0,3,male,,0,0,8.4583,Q,Third,man,True,,Queenstown,no,True', '0,1,male,54.0,0,0,51.8625,S,First,man,True,E,Southampton,no,True', '0,3,male,2.0,3,1,21.075,S,Third,child,False,,Southampton,no,False', '1,3,female,27.0,0,2,11.1333,S,Third,woman,False,,Southampton,yes,False', '1,2,female,14.0,1,0,30.0708,C,Second,child,False,,Cherbourg,yes,False', '1,3,female,4.0,1,1,16.7,S,Third,child,False,G,Southampton,yes,False', '1,1,female,58.0,0,0,26.55,S,First,woman,False,C,Southampton,yes,True', '0,3,male,20.0,0,0,8.05,S,Third,man,True,,Southampton,no,True',

In [14]:
#Acciones (se ejecutan inmediatamente en todo el cluster) 
#son operaciones que realizan un procesamiento
#sobre todo un RDD y devuelven un valor, dejando el RDD original
#en el mismo estado (es inmutable).
#El valor generado se envía al proceso driver
#Un RDD puede almacenar gigabytes, terabytes o incluso 
#petabytes de datos de manera distribuida pero el valor generado
#por las acciones se regresará al proceso driver. En este caso,
#tomar en cuenta la memoria del proceso driver
r = sc.parallelize([1,2,3,4,5,6])
print( type(r.collect()) )
r.collect()
#se generaron 6 elementos en el cluster, collect busca los elementos en el cluster
#y devuelve la información al proceso driver

<class 'list'>


[1, 2, 3, 4, 5, 6]

In [15]:
#Para muchos elementos en el cluster se utiliza take
#que devuelve los primeros elementos encontrados al driver, 
#no muestra todos
r = sc.parallelize(range(1000))
r.take(10)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [16]:
#Cuenta los elementos del RDD
r.count()

1000

Una de las acciones más útiles sobre RDDs es:reduce permite recorrer todos los valores de un RDD y calcular un valor en relación con ellos. Reduce un RDD a un único valor. Dado un RDD con elementos de tipo T, la función de reducción es una función binaria que acepta dos elementos de tipo T y devuelve un valor del mismo tipo T, es decir, tiene tipo T x T -> T. reduce(f) es un método que aplica la función de reducción f a los 2 primeros elementos, luego aplica de nuevo la función f al valor result
ante y al tercer elemento, y así sucesivamente hasta que procesa el último elemento y produce el valor final. Ejemplo: Se muestra el proceso para calcular la suma de un RDD de 5 elementos enteros. 



In [17]:
#Para crear un RDD con los números del 1 al 5 y calcular su suma 
#definiriamos una función de reducción
#add e invocaríamos a reduce como sigue:
def add(x, y):
    return x + y

#llamado a reduce
r = sc.parallelize(range(1, 6))
r.reduce(add)

15

In [18]:
#Utilizando una función anónima o lambda
r = sc.parallelize(range(1,6))
r.reduce(lambda x, y: x + y)

15

In [19]:
#Método takeOrdered, take()
r = sc.parallelize(range(1000))
r.takeOrdered(10, lambda x : -x)

[999, 998, 997, 996, 995, 994, 993, 992, 991, 990]

In [20]:
r = sc.parallelize(range(1000))
r.takeSample(False, 10)

[821, 335, 33, 942, 16, 931, 112, 34, 431, 416]

In [21]:
def multiply_positive(x, y):
    if x > 0 and y > 0:
        return x*y
    elif x > 0:
        return x
    elif y > 0:
        return y
    else:
        return 1
    
r = sc.parallelize([-1, 2, 1, -5, 8])
r.reduce(multiply_positive)

16

reduce  se requiere que la función sea conmutativa y asociativa para los dos valores
cualquiera x,y, f(x,y) = f(y,x) y para una triada : x,y,z, f(x,f(y,z) ) = f(f(x,y),z).
Ejemplos de funciones conmutativas y asociativas son la suma, la multiplicación, el mínimo o el máximo.
La resta o la división no son conmutativas ni asociativas.(no se pueden reducir)
***
* MapReduce requiere de acciones como la replicación, serialización y E/S de almacenamiento para poder ejecutar 
* las operaciones solicitadas.
* Éstas últmas son las que más requiere el sistemade archivos de Hadoop. 
* Cualquier operación de MapReduce requiere del almacenamiento.
reduce es una Acción en el RDD.
Acciones: collect(), count(), take(x), takeOrdered(x,y), takeSample(x, y), countByKey(), foreach(funcion())

In [22]:
#Ejemplo
r = sc.parallelize(range(3), 1) #una partición (0, 1, 2)
r.reduce(lambda x,y: x-y)
#(0-1) - 2 

-3

In [23]:
r = sc.parallelize(range(3), 2) #Dos particiones {0} y {1, 2}
r.reduce(lambda x,y: x-y)
# (0) y (1 -2)
#  0      -1
#  0  - ( -1 )

1

In [24]:
#Manejo de cadenas. Utilización del método aggregate.
#Tres parametros: 1) Un valor inicial para el acumulador, zeroValue que tendrá tipo C
#   2) una función seqop para combinar elementos de nuestro RDD (de tipo T)
#  con el acumulador de tipo C, devolviendo un valor de tipo C (C x T -> C).
#   3) Una función combOp para combinar dos acumuladores de tipo C y devolver un valor de tipo C.
#Cuenta las h en el RDD
r = sc.parallelize(["hola", "hi", "ciao"])
#          (int, cuenta las "h", suma los acumuladores)  => (valor_inicial, map, reduce)
#                      MAP                         Reduce
r.aggregate(0, lambda c, s : c + s.count("h"), lambda c1, c2: c1 + c2)
#(h, 1)
#(h, 1)
#(h, 0)

2

In [30]:
#Para trabajo de archivos en HDP. Consultar clase pyspark.RDD
#saveAsSequienceFile, saveAsNewAPIHadoopDataset o saveAsNewAPIHadoopFile
r = sc.parallelize(range(1000), 2)
r.saveAsTextFile("D:\\7 Semestre\\Big Data\\Jupiter\\num")
#Crea una carpeta con los archivos resultantes del RDD. 
#El RDD se particionará en varios prcesos executor, al convertir
#Un RDD a texto cada partición generará un archivo diferente.
#Cada archivo_ part-XXXXX (XXXXX numeración correlativa)
#Además se genera un archivo _SUCCESSS que indica que el 
#proceso de grabación fue exitosa.

# Transformaciones


In [31]:
#Transformacion map
#Permite aplicar una funcion elemento a elemento en un RDD
#RDD con elementos tipo T y una función f que acepta elementos T 
#y produce valores tipo V, la transformación
#map(f) generará un RDD de elementos tipo V
r = sc.parallelize([1,2,3,4])
r2 = r.map(lambda x: x + 1)
r2.collect()

[2, 3, 4, 5]

In [32]:
#Sin lambda
def increment(x):
    return x + 1

r = sc.parallelize([1,2,3,4])
r2 = r.map(increment)
r2.collect()

[2, 3, 4, 5]

In [33]:
r = sc.parallelize(["hola", "hi", "ciao"])
r2 = r.map(lambda x: len(x))
r2.collect()

[4, 2, 4]

In [34]:
import csv
r = sc.parallelize(["1,5,7","8,2,4"]) #cadenas en formato csv
r2 = r.map(lambda x: list(csv.reader([x]))[0]) #como resultado una lista/tupla. Una listra dentro de otra lista
#por eso se utiliza [0]
r2.collect()
#Nota: el resultado como se puede ver son cadenas de texto y NO NUMEROS

[['1', '5', '7'], ['8', '2', '4']]