<font size=10>Práctica 1 de Computación de Altas Prestaciones</font>

# Introducción  
En este laboratorio, cubriremos los elementos básicos de la programación utilizando la metodología de *map-reduce*. Para ello, usaremos Apache Spark como referencia, pero ten en cuenta que existen *frameworks* similares y que los principios se pueden extrapolar.  

## Algunos conceptos y hechos  

- Apache Spark es una plataforma de computación distribuida que opera en un clúster. Al igual que MPI, esperamos que los nodos **NO** compartan un espacio de memoria, pero que estén conectados mediante una red dedicada de alta velocidad. Los sistemas de archivos distribuidos que funcionan sobre la red son extremadamente útiles.  

- Usaremos la siguiente generación del estándar previo de *map-reduce*, Apache Hadoop. La principal diferencia se cree que es el uso de la memoria en lugar del disco para operaciones intermedias, aunque existen muchas más mejoras. 

- Está construido sobre Java. A pesar de ello, puede programarse en Java, Scala, Python o R. La API completa solo se encuentra en lenguajes basados en la JVM, pero el más utilizado es PySpark, ya que la gente suele ser reacia a usar lenguajes basados en la JVM en ciencia de datos. De hecho, dado que Hadoop solo estaba disponible en Java, es probable que algunos códigos en Java de Spark sean adaptaciones de los códigos previos de Hadoop.  

- *Resilient Distributed Dataset* (RDD): la unidad básica que se procesa en Spark. Equivalente a un array de *numpy*, pero distribuido.  
- La API de RDDs generalmente expone las operaciones de bajo nivel de Apache Spark, útiles para el preprocesamiento de datos pero inútiles para el análisis de datos.  

- Para análisis de datos, se utilizan *DataFrame* y Spark SQL. Se basan en una API similar a *pandas* que incluso acepta código SQL (lo cual puede sonar extraño e inútil para desarrolladores, pero muchos científicos de datos y estadísticos *antiguos* son muy competentes en SQL pero no en Python).

- Existes otras APIs para datos en tiempo real (Spark Streaming) o aprendizaje automático (SparkML).


## Como usar en el clúster Apache Spark

El clúster ya cuenta con un pequeño clúster de Apache Spark desplegado, por lo que solo hay que instalar PySpark (cliente de Apache Spark en Python) y registrar nuestra aplicación. Se provee el siguiente *snippet* de código para tal propósito.

In [1]:
%pip install pyspark==4.0.0

Note: you may need to restart the kernel to use updated packages.


In [None]:
import os

In [3]:
IP = !hostname -I | cut -d' ' -f1 
IP = IP[0]
IP

'10.42.10.151'

In [4]:
from pyspark.sql import SparkSession

APP_NAME = f"Tutorial de Spark {os.environ['JUPYTERHUB_USER']}" # Mantener siempre en el nombre de la app el nombre del usuario para fácil identificación
SPARK_URL = "spark://spark-master.spark.svc.cluster.local:7077"
spark = SparkSession.builder.appName(APP_NAME).master(SPARK_URL).config("spark.driver.host", IP) \
    .config("spark.executor.memory", "512M") \
    .config("spark.cores.max", 2) \
    .config("spark.executor.instances", 2) \
    .getOrCreate()
sc = spark.sparkContext

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/12 19:39:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
assert sc.parallelize(range(100)).count() == 100 # Si falla, es que algo ha ido mal

                                                                                

# Parte : RDDs

## Operaciones básicas

### Parallelize & collect

*Parallize*: Crea un RDD a partir de una lista o un array.  
El segundo argumento indica el número de particiones del RDD. En Apache Spark suele ser frecuente que esto sea un parámetro o depende del origen de los datos (e.g. particiones de datos en el sistema de ficheros distribuidos)

In [6]:
array = sc.parallelize([1,2,3,4,5,6,7,8,9,10], 2)
array

ParallelCollectionRDD[2] at readRDDFromFile at PythonRDD.scala:297

In [7]:
import numpy as np
randomSamples = sc.parallelize(np.random.randn(10))
randomSamples

ParallelCollectionRDD[3] at readRDDFromFile at PythonRDD.scala:297

Pero..., ¿por qué no se pueden imprimir?

Los RDDs no se pueden imprimir, pues *viven* en el clúster y este programa Python se ejecuta en el *driver*.

In [8]:
print(array.collect())
print(randomSamples.collect())

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[np.float64(0.7152596019156895), np.float64(-0.08538573511461646), np.float64(-0.3536562590985943), np.float64(0.20598946392512513), np.float64(0.34609828581615243), np.float64(0.053970589350636894), np.float64(0.24313359906038223), np.float64(0.27106784317816507), np.float64(0.7272606394891706), np.float64(-0.9715573264098758)]


Spark utiliza operaciones *perezosas* (*lazy*) para todo, lo que significa que nada se evalúa hasta que se ejecuta una acción, normalmente una operación de reducción.  
La operación básica de reducción es `collect`, la cual devuelve el RDD completo (es decir, no se realiza ninguna reducción).  


### Otras maneras de cargar datos

Aquí puedes ver tanto un método para cargar un archivo de texto línea por línea como otra operación de reducción.

Nota: Como esto es un clúster, el fichero ha de estar distribuido...

In [9]:
quijote = sc.textFile("hdfs://listener-simple-hdfs-namenode-default-0.hdfs.svc.cluster.local/elquijote.txt")

In [10]:
quijote.take(2)

                                                                                

['DON QUIJOTE DE LA MANCHA', '']

In [11]:
# ¿Quieres ver la documentación en Python?
quijote.take?

[31mSignature:[39m quijote.take(num: int) -> List[~T]
[31mDocstring:[39m
Take the first num elements of the RDD.

It works by first scanning one partition, and use the results from
that partition to estimate the number of additional partitions needed
to satisfy the limit.

Translated from the Scala implementation in RDD#take().

.. versionadded:: 0.7.0

Parameters
----------
num : int
    first number of elements

Returns
-------
list
    the first `num` elements

See Also
--------
:meth:`RDD.first`
:meth:`pyspark.sql.DataFrame.take`

Notes
-----
This method should only be used if the resulting array is expected
to be small, as all the data is loaded into the driver's memory.

Examples
--------
>>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
[2, 3]
>>> sc.parallelize([2, 3, 4, 5, 6]).take(10)
[2, 3, 4, 5, 6]
>>> sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3)
[91, 92, 93]
[31mFile:[39m      /opt/conda/lib/python3.12/site-packages/pyspark/core/rdd.py
[31mType

### Transformaciones

Revisemos algunas transformaciones que se pueden hacer a RDDs.

In [12]:
charsPerLine = quijote.map(lambda s: len(s))
allWords = quijote.flatMap(lambda s: s.split())
allWordsNoArticles = allWords.filter(lambda a: a.lower() not in ["el", "la"])
allWordsUnique = allWords.map(lambda s: s.lower()).distinct()
sampleWords = allWords.sample(withReplacement=True, fraction=0.2, seed=666)
weirdSampling = sampleWords.union(allWordsNoArticles.sample(False, fraction=0.3))

In [13]:
allWordsUnique.take(10)

                                                                                

['viudas,',
 'todas',
 'fuesen',
 'ofrece;',
 'dicho',
 'vuestra',
 'ceremonias;',
 'proezas',
 'valor',
 'soldada']

--------


<font size=10 color=red>Ejercicio</font>

Explica el uso y propósito de las operaciones anteriores.

Comenta también sobre el tamaño del RDD resultante en relación con el tamaño del RDD original.  
Por ejemplo, si el RDD original tiene tamaño $N$, entonces `rdd.filter()` tendrá tamaño $K \leq N$.  


--------
Respuesta:
- `map`: aplica una función a cada elemento del RDD y devuelve un nuevo RDD con los resultados. Transforma los datos elemento a elemento. El RDD resultante tiene el mismo tamaño que el de entrada, es decir, K = N.
- `flatMap`: aplica una función que devuelve una lista de elementos por cada elemento de entrada y luego aplana esas listas en un único RDD. Tiene como propósito descomponer elementos complejos. Por lo tanto puede ser mayor o igual al original (K ≥ N), porque de una sola línea pueden surgir varias palabras.
- `filter`: selecciona los elementos que cumplen una condición booleana. Sirve basicamente para eliminar elementos deseados y quedarnos con la información que nos interesa. El nuevo RDD tiene un tamaño menor o igual al original (K ≤ N), ya que se descartan elementos que no cumplen la condición.
- `distinct`: elimina los elementos duplicados del RDD. Sirve para obtener un conjunto de datos únicos por lo tanto el tamaño se reduce respecto al original (K ≤ N), dependiendo del número de repeticiones existentes.
- `sample`: selecciona aleatoriamente una muestra de los elementos del RDD, con o sin reemplazo. Esto sirve para analizar subconjuntos de los datos de forma representativa sin procesar el conjunto completo. El tamaño resultant depende del parámetro fraction. Si fraction = f, el RDD resultante tendrá aproximadamente f × N elementos. Si se usa withReplacement=True, pueden existir duplicados, por lo que K puede ser mayor que la fracción esperada.
- `union`: combina los elementos de dos RDDs en uno solo, uniendo ambos conjuntos. El objetivo es fusionar resultados de distintas muestrar, por lo que el tamaño resultante: el tamaño es la suma de los tamaños de los dos RDDs: K = N1+N2.

----

 ### Acciones



In [14]:
numLines = quijote.count()
numChars = charsPerLine.reduce(lambda a,b: a+b) # also charsPerLine.sum()
sortedWordsByLength = allWordsNoArticles.takeOrdered(10, key=lambda x: -len(x))
numLines, numChars, sortedWordsByLength

                                                                                

(4372,
 1036211,
 ['procuremos.Levántate,',
  'extraordinariamente,',
  'estrechísimamente,',
  'convirtiéndoseles',
  'entretenimientos,',
  'inadvertidamente.',
  'cortesísimamente',
  'Agredeciéronselo',
  'Pintiquiniestra,',
  'entretenimiento,'])

--------


<font size=10 color=red>Ejercicio</font>

Explica el uso y propósito de las transformaciones anteriores.

¿Cómo contarías los elementos de un RDD utilizando únicamente `map` y/o `reduce`?

--------
Respuesta:
`reduce(lambda a,b: a+b)` suma todos los elementos del RDD `charsPerLine`, calculando el número total de caracteres del texto.  
`takeOrdered(10, key=lambda x: -len(x))` obtiene las 10 palabras más largas del RDD `allWordsNoArticles`, ordenando por longitud descendente.  
Para contar los elementos de un RDD usando solo `map` y `reduce`, se puede hacer:  
`rdd.map(lambda x: 1).reduce(lambda a,b: a+b)`


----

## Key-Value RDDs

In [15]:
import re
import requests
allWords = allWords.flatMap(lambda w: re.sub(""";|:|\.|,|-|–|"|'|\s"""," ", w.lower()).split(" ")).filter(lambda a: len(a)>0)
allWords2 = sc.parallelize(requests.get("https://gist.githubusercontent.com/jsdario/9d871ed773c81bf217f57d1db2d2503f/raw/585de69b0631c805dabc6280506717943b82ba4a/el_quijote_ii.txt").iter_lines())
allWords2 = allWords2.flatMap(lambda w: re.sub(""";|:|\.|,|-|–|"|'|\s"""," ", w.decode("utf8").lower()).split(" ")).filter(lambda a: len(a)>0)

  allWords = allWords.flatMap(lambda w: re.sub(""";|:|\.|,|-|–|"|'|\s"""," ", w.lower()).split(" ")).filter(lambda a: len(a)>0)
  allWords2 = allWords2.flatMap(lambda w: re.sub(""";|:|\.|,|-|–|"|'|\s"""," ", w.decode("utf8").lower()).split(" ")).filter(lambda a: len(a)>0)


In [16]:
allWords.take(10), allWords2.take(10)

(['don',
  'quijote',
  'de',
  'la',
  'mancha',
  'miguel',
  'de',
  'cervantes',
  'saavedra',
  'primera'],
 ['don',
  'quijote',
  'de',
  'la',
  'mancha',
  'miguel',
  'de',
  'cervantes',
  'saavedra',
  'segunda'])

A continuación, pasamos a operaciones más interesantes que involucran RDDs de tipo clave-valor.  
Los RDDs clave-valor son un tipo especial de RDD en los que cada elemento es una tupla `(K, V)`,  
donde `K` es la clave y `V` el valor.  


In [17]:
words = allWords.map(lambda e: (e,1))
words2 = allWords2.map(lambda e: (e,1))

words.take(10)

[('don', 1),
 ('quijote', 1),
 ('de', 1),
 ('la', 1),
 ('mancha', 1),
 ('miguel', 1),
 ('de', 1),
 ('cervantes', 1),
 ('saavedra', 1),
 ('primera', 1)]

### Cómo manipular RDDs clave-valor

In [18]:
frequencies = words.reduceByKey(lambda a,b: a+b)
frequencies2 = words2.reduceByKey(lambda a,b: a+b)
frequencies.takeOrdered(10, key=lambda a: -a[1])

                                                                                

[('que', 10705),
 ('de', 9033),
 ('y', 8668),
 ('la', 5015),
 ('a', 4815),
 ('en', 4046),
 ('el', 3857),
 ('no', 3083),
 ('se', 2382),
 ('los', 2148)]

In [19]:
res = words.groupByKey().takeOrdered(10, key=lambda a: -len(a))
res # To see the content, res[i][1].data

                                                                                

[('dicho', <pyspark.resultiterable.ResultIterable at 0x7fc2a64233e0>),
 ('fuesen', <pyspark.resultiterable.ResultIterable at 0x7fc2a6422e10>),
 ('todas', <pyspark.resultiterable.ResultIterable at 0x7fc2a6423440>),
 ('puntualidad', <pyspark.resultiterable.ResultIterable at 0x7fc2a6423680>),
 ('vuestra', <pyspark.resultiterable.ResultIterable at 0x7fc2a64239e0>),
 ('oración', <pyspark.resultiterable.ResultIterable at 0x7fc2a6423a40>),
 ('proezas', <pyspark.resultiterable.ResultIterable at 0x7fc2a6423aa0>),
 ('valor', <pyspark.resultiterable.ResultIterable at 0x7fc2a6423b00>),
 ('soldada', <pyspark.resultiterable.ResultIterable at 0x7fc2a6423b60>),
 ('ello', <pyspark.resultiterable.ResultIterable at 0x7fc2a6423bf0>)]

In [20]:
joinFreq = frequencies.join(frequencies2)
joinFreq.take(10)

                                                                                

[('pasaban', (14, 3)),
 ('liberalidad', (11, 9)),
 ('suben', (2, 2)),
 ('alcancé', (1, 2)),
 ('dignas', (9, 2)),
 ('paredes', (11, 7)),
 ('herederos', (1, 1)),
 ('bastase', (1, 1)),
 ('tocaban', (1, 1)),
 ('tierno', (5, 2))]

In [21]:
joinFreq.map(lambda e: (e[0], (e[1][0] - e[1][1])/(e[1][0] + e[1][1]))).takeOrdered(10, lambda v: -v[1]), joinFreq.map(lambda e: (e[0], (e[1][0] - e[1][1])/(e[1][0] + e[1][1]))).takeOrdered(10, lambda v: +v[1])

                                                                                

([('bacía', 0.9393939393939394),
  ('venia', 0.9230769230769231),
  ('hermandad', 0.9),
  ('micomicona', 0.8823529411764706),
  ('peña', 0.8823529411764706),
  ('andrés', 0.8823529411764706),
  ('barca', 0.875),
  ('yerme', 0.875),
  ('novela', 0.875),
  ('digno', 0.8666666666666667)],
 [('teresa', -0.9767441860465116),
  ('roque', -0.96),
  ('refranes', -0.9375),
  ('condesa', -0.9333333333333333),
  ('leones', -0.9333333333333333),
  ('gobernadores', -0.9166666666666666),
  ('lacayo', -0.9166666666666666),
  ('visorrey', -0.9130434782608695),
  ('antonio', -0.9076923076923077),
  ('zaragoza', -0.9047619047619048)])

--------


<font size=10 color=red>Ejercicio</font>

Explica el uso y propósito de las funciones usadas con RDDs clave-valor.

¿Cuáles son acciones y cuáles transformaciones?

¿Qué hace la celda anterior?

--------
Respuesta:
**Funciones (RDDs clave–valor) y propósito**
- `map(lambda e: (e,1))` → **Transformación**. Convierte cada palabra en un par `(palabra, 1)` para poder contar frecuencias.
- `reduceByKey(lambda a,b: a+b)` → **Transformación (wide)**. Suma los valores por clave: obtiene `frequencies` = conteo de cada palabra.
- `groupByKey()` → **Transformación (wide)**. Agrupa todos los valores de una misma clave en un iterable (menos eficiente que `reduceByKey` para contar).
- `join(otroRDD)` → **Transformación (wide)**. Une por clave: para cada palabra común devuelve `(palabra, (freq_en_RDD1, freq_en_RDD2))`.
- `take(n)` / `takeOrdered(n, key=…)` → **Acciones**. Traen a driver los primeros/los mejor ordenados `n` elementos.

**Qué hace la celda anterior**
1) Limpia y tokeniza dos libros (Quijote I y II) y crea `words`, `words2` como pares `(palabra,1)`.
2) `frequencies`, `frequencies2` cuentan ocurrencias por palabra en cada libro con `reduceByKey`.
3) Muestra las palabras más frecuentes con `takeOrdered(…, key=lambda a: -a[1])`.
4) `groupByKey()` ejemplifica cómo agrupar valores por palabra (se ven `ResultIterable`).
5) `joinFreq = frequencies.join(frequencies2)` alinea las frecuencias de cada palabra común en ambos libros.
6) Calcula un **índice de asimetría** por palabra: `(f1−f2)/(f1+f2)` y con `takeOrdered` lista:
   - las más **características del Quijote I** (valores cercanos a +1),
   - y las más **características del Quijote II** (valores cercanos a −1).



------

In [22]:
joinFreq.map(lambda e: (e[0], (e[1][0] - e[1][1])/(e[1][0] + e[1][1]))).takeOrdered(10, lambda v: -v[1]), joinFreq.map(lambda e: (e[0], (e[1][0] - e[1][1])/(e[1][0] + e[1][1]))).takeOrdered(10, lambda v: +v[1])

                                                                                

([('bacía', 0.9393939393939394),
  ('venia', 0.9230769230769231),
  ('hermandad', 0.9),
  ('micomicona', 0.8823529411764706),
  ('peña', 0.8823529411764706),
  ('andrés', 0.8823529411764706),
  ('barca', 0.875),
  ('yerme', 0.875),
  ('novela', 0.875),
  ('digno', 0.8666666666666667)],
 [('teresa', -0.9767441860465116),
  ('roque', -0.96),
  ('refranes', -0.9375),
  ('condesa', -0.9333333333333333),
  ('leones', -0.9333333333333333),
  ('gobernadores', -0.9166666666666666),
  ('lacayo', -0.9166666666666666),
  ('visorrey', -0.9130434782608695),
  ('antonio', -0.9076923076923077),
  ('zaragoza', -0.9047619047619048)])

In [23]:
result = joinFreq.map(lambda e: (e[0], (e[1][0] - e[1][1])/(e[1][0] + e[1][1])))
result.takeOrdered(10, lambda v: -v[1]), result.takeOrdered(10, lambda v: +v[1])

                                                                                

([('bacía', 0.9393939393939394),
  ('venia', 0.9230769230769231),
  ('hermandad', 0.9),
  ('micomicona', 0.8823529411764706),
  ('peña', 0.8823529411764706),
  ('andrés', 0.8823529411764706),
  ('barca', 0.875),
  ('yerme', 0.875),
  ('novela', 0.875),
  ('digno', 0.8666666666666667)],
 [('teresa', -0.9767441860465116),
  ('roque', -0.96),
  ('refranes', -0.9375),
  ('condesa', -0.9333333333333333),
  ('leones', -0.9333333333333333),
  ('gobernadores', -0.9166666666666666),
  ('lacayo', -0.9166666666666666),
  ('visorrey', -0.9130434782608695),
  ('antonio', -0.9076923076923077),
  ('zaragoza', -0.9047619047619048)])

## Optimizaciones y notas finales de RDDs

### Movimientos de datos

Uno de los principales problemas podría ser que, si los datos después de una operación no están balanceados, es posible que no estemos utilizando el clúster de forma adecuada.  
Para ese propósito, disponemos de dos operaciones.



In [24]:
coalesced_result = result.coalesce(numPartitions=2) # Avoids the data movement, so it tries to balance inside each machine
repartitioned_result = result.repartition(numPartitions=2) # We don't care about data movement, this balance the whole thing to ensure all machines are used

### Persistencia
A diferencia de Hadoop, los RDDs intermedios no se preservan;  
cada vez que usamos una acción/reducción, toda el DAG se ejecuta desde las fuentes de datos.  
Para evitar esto, podemos usar `cache`o `persist` en puntos intermedios del grafo.



In [25]:
words = allWords.cache() # Best effort, le decimos a Spark que intente guardar este resultado intermedio con mayor prioridad.
words.map(lambda e: (e,1))
words.map(lambda e: (e,-1))

PythonRDD[58] at RDD at PythonRDD.scala:56

In [26]:
from pyspark import StorageLevel
# https://spark.apache.org/docs/4.0.0/rdd-programming-guide.html#rdd-persistence
allWords2.persist(StorageLevel.MEMORY_AND_DISK) # Añadimos la persistencia en disco

PythonRDD[59] at RDD at PythonRDD.scala:56

----

### Variables globales

There are two kind of global variables, read-only and write-only.There are two kind of global variables, read-only and write-only.

In [27]:
articles = sc.broadcast(["el", "la"])
articles.value

['el', 'la']

Las variables *broadcast* son de solo lectura.  
Nos ayudan a evitar que las variables locales de los *closures*
(las funciones que usamos dentro de `map`, `reduce`, ...)  
se transfieran en cada operación de Spark.  
De esta manera, se transfieren **solo una vez**.


In [28]:
acc = sc.accumulator(0)
def incrementar(x):
  global acc
  acc += x

allWords.map(lambda l:1).foreach(incrementar)
acc

                                                                                

Accumulator<id=0, value=187045>

Las variables de solo escritura también pueden declararse e inicializarse,  
pero no pueden ser leídas, ya que leerlas forzaría una **sincronización completa** del clúster.  


# API de alto nivel: Spark SQL

Ahora veremos el API de alto nivel, de DataFrames, de Apache Spark.

In [29]:
df = spark.read.parquet("hdfs://listener-simple-hdfs-namenode-default-0.hdfs.svc.cluster.local/reddit_02_2016.parquet")

In [30]:
df.show()

[Stage 46:>                                                         (0 + 1) / 1]

+------------------+-----------+--------------------+--------------------+--------------------+-------+
|            author|created_utc|               title|           subreddit|            selftext|over_18|
+------------------+-----------+--------------------+--------------------+--------------------+-------+
|BlairTheWiseViking| 1454326164|(24M) with a DB p...|         deadbedroom|This post will be...|  false|
|        DAVIDSPZGZ| 1454326165|EAL: Rebajas Sony...|     ElAndroideLibre|                    |  false|
|      chasdave1981| 1454326165|  Ms Juicy (retired)|            bigasses|                    |   true|
|         [deleted]| 1454326165|[H] ST MW Aquamar...|GlobalOffensiveTrade|           [deleted]|   true|
|       SableArgyle| 1454326166|Something might h...|            OnePiece|So waaaaaaaay bac...|   true|
|      nilimajaveri| 1454326166|Buy queen bed mat...| InternetIsBeautiful|                    |  false|
|         [deleted]| 1454326166|      Grand entrance|           

                                                                                

## Operaciones básicas

In [31]:
df = df.withColumn("over_18tmp", df.over_18.try_cast("boolean")) \
    .drop("over_18") \
    .withColumnRenamed("over_18tmp", "over_18")
df = df.cache()
df.show()

[Stage 47:>                                                         (0 + 1) / 1]

+------------------+-----------+--------------------+--------------------+--------------------+-------+
|            author|created_utc|               title|           subreddit|            selftext|over_18|
+------------------+-----------+--------------------+--------------------+--------------------+-------+
|BlairTheWiseViking| 1454326164|(24M) with a DB p...|         deadbedroom|This post will be...|  false|
|        DAVIDSPZGZ| 1454326165|EAL: Rebajas Sony...|     ElAndroideLibre|                    |  false|
|      chasdave1981| 1454326165|  Ms Juicy (retired)|            bigasses|                    |   true|
|         [deleted]| 1454326165|[H] ST MW Aquamar...|GlobalOffensiveTrade|           [deleted]|   true|
|       SableArgyle| 1454326166|Something might h...|            OnePiece|So waaaaaaaay bac...|   true|
|      nilimajaveri| 1454326166|Buy queen bed mat...| InternetIsBeautiful|                    |  false|
|         [deleted]| 1454326166|      Grand entrance|           

                                                                                

### Filtrado

In [32]:
df.filter(~df.over_18).show()

+------------------+-----------+--------------------+-------------------+--------------------+-------+
|            author|created_utc|               title|          subreddit|            selftext|over_18|
+------------------+-----------+--------------------+-------------------+--------------------+-------+
|BlairTheWiseViking| 1454326164|(24M) with a DB p...|        deadbedroom|This post will be...|  false|
|        DAVIDSPZGZ| 1454326165|EAL: Rebajas Sony...|    ElAndroideLibre|                    |  false|
|      nilimajaveri| 1454326166|Buy queen bed mat...|InternetIsBeautiful|                    |  false|
|         [deleted]| 1454326166|      Grand entrance|               gifs|           [deleted]|  false|
|     a2zlifestyles| 1454326167|9 Cool Indian Hai...|          HairStyle|                    |  false|
|        dingdong89| 1454326169|Robert Leslie - I...|      BinauralMusic|                    |  false|
|      sleepyechoes| 1454326170|Presidential Foot...|                CFB|

In [33]:
df.where(~df.over_18).show()

+------------------+-----------+--------------------+-------------------+--------------------+-------+
|            author|created_utc|               title|          subreddit|            selftext|over_18|
+------------------+-----------+--------------------+-------------------+--------------------+-------+
|BlairTheWiseViking| 1454326164|(24M) with a DB p...|        deadbedroom|This post will be...|  false|
|        DAVIDSPZGZ| 1454326165|EAL: Rebajas Sony...|    ElAndroideLibre|                    |  false|
|      nilimajaveri| 1454326166|Buy queen bed mat...|InternetIsBeautiful|                    |  false|
|         [deleted]| 1454326166|      Grand entrance|               gifs|           [deleted]|  false|
|     a2zlifestyles| 1454326167|9 Cool Indian Hai...|          HairStyle|                    |  false|
|        dingdong89| 1454326169|Robert Leslie - I...|      BinauralMusic|                    |  false|
|      sleepyechoes| 1454326170|Presidential Foot...|                CFB|

In [34]:
df.where("not over_18").show() # SQL syntax

+------------------+-----------+--------------------+-------------------+--------------------+-------+
|            author|created_utc|               title|          subreddit|            selftext|over_18|
+------------------+-----------+--------------------+-------------------+--------------------+-------+
|BlairTheWiseViking| 1454326164|(24M) with a DB p...|        deadbedroom|This post will be...|  false|
|        DAVIDSPZGZ| 1454326165|EAL: Rebajas Sony...|    ElAndroideLibre|                    |  false|
|      nilimajaveri| 1454326166|Buy queen bed mat...|InternetIsBeautiful|                    |  false|
|         [deleted]| 1454326166|      Grand entrance|               gifs|           [deleted]|  false|
|     a2zlifestyles| 1454326167|9 Cool Indian Hai...|          HairStyle|                    |  false|
|        dingdong89| 1454326169|Robert Leslie - I...|      BinauralMusic|                    |  false|
|      sleepyechoes| 1454326170|Presidential Foot...|                CFB|

### Operaciones



In [35]:
df.select(df.created_utc * 2).show()

+-----------------+
|(created_utc * 2)|
+-----------------+
|       2908652328|
|       2908652330|
|       2908652330|
|       2908652330|
|       2908652332|
|       2908652332|
|       2908652332|
|       2908652334|
|       2908652338|
|       2908652340|
|       2908652342|
|       2908652342|
|       2908652342|
|       2908652344|
|       2908652344|
|       2908652346|
|       2908652346|
|       2908652346|
|       2908652348|
|       2908652348|
+-----------------+
only showing top 20 rows


In [36]:
from pyspark.sql.functions import log
df.select(log(df.created_utc * 2)).show()

+---------------------+
|ln((created_utc * 2))|
+---------------------+
|      21.790955693332|
|     21.7909556940196|
|     21.7909556940196|
|     21.7909556940196|
|   21.790955694707204|
|   21.790955694707204|
|   21.790955694707204|
|   21.790955695394807|
|   21.790955696770016|
|    21.79095569745762|
|   21.790955698145222|
|   21.790955698145222|
|   21.790955698145222|
|   21.790955698832825|
|   21.790955698832825|
|    21.79095569952043|
|    21.79095569952043|
|    21.79095569952043|
|   21.790955700208034|
|   21.790955700208034|
+---------------------+
only showing top 20 rows


### Agregaciones



In [37]:
df.where("not over_18").groupby(["author", df.subreddit]).count().show()



+-----------------+--------------------+-----+
|           author|           subreddit|count|
+-----------------+--------------------+-----+
|   truthfulcoffee|    AndroidQuestions|    1|
|     SuperSaguaro|   10cloverfieldlane|    1|
|        [deleted]|                 CFB|   14|
|       rafamorgan|          airconsole|    1|
|         jsntz007|fastloanunemployedau|    1|
|    AutoModerator|           madisonwi|    1|
|  KarthikVishwash|          technology|    1|
|  thecaptaincltch|     Thecaptaincltch|    1|
|       Born-Hater|             Sverige|    1|
|        [deleted]|              QuestU|    1|
|  jitendragroup01|            business|    2|
|AutonomyForbidden|              MURICA|    1|
|totalitarianghost|      AskProgramming|    1|
|  Gamesfreak13563|    totallynotrobots|    1|
|       fusyanasan|               otoge|    1|
|        [deleted]|                 WC3|    2|
|     Moonkaman227|     leagueoflegends|    1|
|        [deleted]|                GMAT|    1|
|          bu

                                                                                

### Funciones definidas por el usuario (UDFs)

In [38]:
from pyspark.sql.functions import length

df = df.withColumn("length", length(df.selftext)) # This adds a column

df.where("length > 1000").toPandas()

Unnamed: 0,author,created_utc,title,subreddit,selftext,over_18,length
0,BlairTheWiseViking,1454326164,(24M) with a DB problem that's going on three ...,deadbedroom,This post will be long as to give detail about...,False,3872
1,sleepyechoes,1454326170,Presidential Football Poll Results,CFB,I put together the results in probably the lea...,False,7831
2,Daedin_,1454326175,Looking for 3-4 rank 14-18 players to progress...,hearthstone,Hey reddit! Glad to meet you all :)\n\nI'm osc...,False,1669
3,Holo_of_Yoitsu,1454326188,[Spoilers] Diamond no Ace: Second Season - Epi...,anime,**Episode title:** To This Side \n**Episode d...,False,3278
4,SmellyFbuttface,1454326190,Why doesn't challenge mode change?,DestinyTheGame,I like that they integrated challenge mode. G...,False,1043
...,...,...,...,...,...,...,...
9809,-Boman-,1454326072,Warwick The Fast Ticket Out Of Bronze and Silv...,summonerschool,Hello my name is Boman and today i would like ...,False,3409
9810,GoldenJermbag,1454326080,I've finally had my first true high sodium mom...,DestinyTheGame,Before I go tell my woeful tale I will say I l...,False,1767
9811,BhiQ,1454326085,FtMs - your thoughts on the male gender role/s...,asktransgender,"Hey guys, I'll try to be as respectful as I ca...",False,1984
9812,SpaceyMoogle,1454326087,Freaking out a little about possible risk of p...,TwoXChromosomes,Not really sure if this is the right place. \n...,False,1196


In [39]:
from pyspark.sql.functions import udf

def splitWords(e):
  return e.split(" ") if e else e

splitWords = udf(splitWords)
df.select(splitWords(df.selftext)).show()

[Stage 58:>                                                         (0 + 1) / 1]

+--------------------+
|splitWords(selftext)|
+--------------------+
|[This, post, will...|
|                    |
|                    |
|         [[deleted]]|
|[So, waaaaaaaay, ...|
|                    |
|         [[deleted]]|
|                    |
|                    |
|[I, put, together...|
|                    |
|                    |
|                    |
|                    |
|         [[removed]]|
|                    |
|                    |
|                    |
|                    |
|                    |
+--------------------+
only showing top 20 rows


                                                                                

-----

## SQL

### Como declarar vistas de Dataframes

Todo dataframe puede ser usado como una tabla SQL y una sintaxis similar a la usada en otras asignaturas de bases de datos. De hecho, esta es la base del proyecto Apache Hive.

In [40]:
df.createOrReplaceTempView("reddit")

In [41]:
spark.sql("select * from reddit limit 10").show()

+------------------+-----------+--------------------+--------------------+--------------------+-------+------+
|            author|created_utc|               title|           subreddit|            selftext|over_18|length|
+------------------+-----------+--------------------+--------------------+--------------------+-------+------+
|BlairTheWiseViking| 1454326164|(24M) with a DB p...|         deadbedroom|This post will be...|  false|  3872|
|        DAVIDSPZGZ| 1454326165|EAL: Rebajas Sony...|     ElAndroideLibre|                    |  false|     0|
|      chasdave1981| 1454326165|  Ms Juicy (retired)|            bigasses|                    |   true|     0|
|         [deleted]| 1454326165|[H] ST MW Aquamar...|GlobalOffensiveTrade|           [deleted]|   true|     9|
|       SableArgyle| 1454326166|Something might h...|            OnePiece|So waaaaaaaay bac...|   true|   650|
|      nilimajaveri| 1454326166|Buy queen bed mat...| InternetIsBeautiful|                    |  false|     0|
|

----

In [42]:
spark.sql("select author, count(*) as c from reddit group by author order by c desc").toPandas()

Unnamed: 0,author,c
0,[deleted],37035
1,rotoreuters,1017
2,ImaBlue,989
3,AutoModerator,773
4,RPBot,663
...,...,...
105391,aero2146,1
105392,Rezurektme,1
105393,bcseismail,1
105394,PM_ME_FOR_POTATO_PIC,1


--------


<font size=10 color=red>Ejercicio</font>

Obten los usuarios que han subido a reddit más de 10 posts de más de 100 carácteres.

--------
Respuesta:

In [43]:
spark.sql("""
    SELECT author, COUNT(*) AS num_posts
    FROM reddit
    WHERE length > 100
    GROUP BY author
    HAVING COUNT(*) > 10
""").toPandas()


Unnamed: 0,author,num_posts
0,boardgamerecommender,12
1,dota2matchbot,230
2,[deleted],529
3,JmodTracker,11
4,lolretkj,12
5,gyfaglover4,242
6,AutoModerator,616
7,Jesupekka,21
8,autotldr,221
9,Chabombs,13


Ejemplo de comprobación de homogeneidad:


In [48]:
import math
def rdd_is_homogeneous(rdd):
    tipos = rdd.map(lambda x: type(x)).distinct().collect()
    return len(tipos) == 1, tipos

r1 = sc.parallelize([1, 2, 3])
r2 = sc.parallelize([1, "dos", 3.0])

print(rdd_is_homogeneous(r1))
print(rdd_is_homogeneous(r2))


(True, [<class 'int'>])
(False, [<class 'int'>, <class 'str'>, <class 'float'>])


In [55]:
r1.getNumPartitions()


2

In [56]:
r1.mapPartitions(lambda it: [sum(1 for _ in it)]).collect()


[1, 2]

In [65]:
base = sc.parallelize(range(100_000), 8)
print("particiones:", base.getNumPartitions())

desbalanceado = base.filter(lambda x: x < 2_000)

base_dist  = base.mapPartitions(lambda it: [sum(1 for _ in it)]).collect()
desbalanceado_dist  = desbalanceado.mapPartitions(lambda it: [sum(1 for _ in it)]).collect()
base_dist, desbalanceado_dist



particiones: 8


([12500, 12500, 12500, 12500, 12500, 12500, 12500, 12500],
 [2000, 0, 0, 0, 0, 0, 0, 0])