# **Práctica de laboratorio 5b: k-Means para cuantificar atributos**

#### Los algoritmos de agrupación de datos, además de utilizarse en el análisis exploratorio para extraer patrones de similitud entre objetos, pueden utilizarse para comprimir el espacio de datos.

#### En este notebook usaremos nuestra base de datos Sentiment Movie Reviews para los experimentos. Primero usaremos la técnica word2vec que aprende una transformación de tokens desde una base a un vector de atributos. A continuación, utilizaremos el algoritmo k-Means para comprimir la información sobre estos atributos y proyectar cada objeto en un espacio de atributos de tamaño fijo.

#### Las celdas de ejercicio comienzan con el comentario `# EJERCICIO` y los códigos a completar están marcados con los comentarios `<COMPLETO>`.

#### ** En este notebook: **
#### *Parte 1:* Word2Vec
#### *Parte 2:* k-Means para cuantificar atributos
#### *Parte 3:* Aplicar un k-NN

### **Parte 0: Preliminares**

#### Para este notebook usaremos la base de datos de reseñas de películas que se usará para el segundo proyecto.

#### La base de datos tiene los campos separados por '\t' y el siguiente formato:
    `"id de frase","id de oración","Frase","Sentimiento"`

#### Para esta práctica de laboratorio solo usaremos el campo "Frase".

In [1]:
import os
import numpy as np

def parseRDD(point):
    """ Parser for the current dataset. It receives a data point and return
        a sentence (third field).
    Args:
        point (str): input data point
    Returns:
        str: a string
    """    
    data = point.split('\t')
    return (int(data[0]),data[2])

def notempty(point):
    """ Returns whether the point string is not empty
    Args:
        point (str): input string
    Returns:
        bool: True if it is not empty
    """   
    return len(point[1])>0

filename = os.path.join("Data","MovieReviews2.tsv")
rawRDD = sc.textFile(filename,100)
header = rawRDD.take(1)[0]

dataRDD = (rawRDD
           #.sample(False, 0.1, seed=42)
           .filter(lambda x: x!=header)
           .map(parseRDD)
           .filter(notempty)
           #.sample( False, 0.1, 42 )
           )

print ('Read {} lines'.format(dataRDD.count()))
print ('Sample line: {}'.format(dataRDD.takeSample(False, 1)[0]))

                                                                                

Read 8528 lines




Sample line: (155024, 'A ragbag of cliches .')




### **Parte 1: Word2Vec**

#### La técnica [word2vec][word2vec] aprende a través de una red neuronal semántica una representación vectorial de cada token en un corpus de tal manera que las palabras semánticamente similares son similares en la representación vectorial.

#### PySpark contiene una implementación de esta técnica, para aplicarla basta con pasar un RDD en el que cada objeto representa un documento y cada documento está representado por una lista de tokens en el orden en que aparecen originalmente en el corpus. Después del proceso de entrenamiento, podemos transformar un token usando el método [`transform`](https://spark.apache.org/docs/latest/ml-features) para convertir cada token en una representación vectorial.

#### En este punto, cada objeto en nuestra base estará representado por una matriz de tamaño variable.

[word2vec]: https://code.google.com/p/word2vec/

### **(1a) Generación de RDD a partir de tokens**

#### Use la función de tokenización `tokenize` para generar un RDD `wordsRDD` que contenga listas de tokens de nuestra base de datos original.

In [2]:
# EXERCICIO
import re

split_regex = r'\W+'

stopfile = os.path.join("Data","stopwords.txt")
stopwords = set(sc.textFile(stopfile).collect())

def tokenize(string):
    """ An implementation of input string tokenization that excludes stopwords
    Args:
        string (str): input string
    Returns:
        list: a list of tokens without stopwords
    """
    string=re.sub(r'[^A-Za-z0-9 ]','', string).strip().lower()
    listStr=string.split(' ')
    listStr=list(filter(lambda x:x not in stopwords,listStr))
    listStr=list(filter(lambda x:x!='',listStr))
    return listStr

wordsRDD = dataRDD.map(lambda x: tokenize(x[1]))


print (wordsRDD.take(1)[0])

['quiet', 'introspective', 'entertaining', 'independent', 'worth', 'seeking']


In [3]:
# TEST Tokenize a String (1a)
assert wordsRDD.take(1)[0]==[u'quiet', u'introspective', u'entertaining', u'independent', u'worth', u'seeking'], 'lista incorreta!'
print('ok')

ok


### **(1b) Aplicando la transformación word2vec**

#### Cree una plantilla word2vec aplicando el método `fit` al RDD creado en el ejercicio anterior.

#### Para aplicar este método debes hacer un pipeline de métodos, primero ejecutando `Word2Vec()`, luego aplicando el método `setVectorSize()` con el tamaño que queremos para nuestro vector (usa el tamaño 5), seguido de ` setSeed()` para la semilla aleatoria, en caso de experimentos controlados (usaremos 42) y finalmente `fit()` con nuestro `wordsRDD` como parámetro.

In [4]:
# EXERCICIO
from pyspark.mllib.feature import Word2Vec

model = Word2Vec().setVectorSize(5).setSeed(42).fit(wordsRDD)
print (model.transform(u'entertaining'))
print (list(model.findSynonyms(u'entertaining', 2)))

[Stage 10:>                                                         (0 + 1) / 1]

[0.00018144746718462557,-0.11685658246278763,0.07119239866733551,0.06759867817163467,-0.1411903202533722]
[('middleaged', 0.9950169920921326), ('never', 0.9907814860343933)]


                                                                                

In [5]:
dist = np.abs(model.transform(u'entertaining')-np.array([0.0136831374839,0.00371457682922,-0.135785803199,0.047585401684,0.0414853096008])).mean()
assert dist<1e-6, 'valores incorretos'
assert list(model.findSynonyms(u'entertaining', 1))[0][0] == 'god', 'valores incorretos'

AssertionError: valores incorretos

### **(1c) Generando un RDD de arreglos**

#### Como primer paso, necesitamos generar un diccionario donde la clave son las palabras y el valor es el vector que representa esa palabra.

#### Para esto primero generaremos una lista `uniqueWords` que contiene las palabras únicas de las palabras RDD, eliminando aquellas que aparecen menos de 5 veces [$^1$](#1). A continuación, crearemos un diccionario `w2v` donde la clave es un token y el valor es un `np.array` del arreglo transformado de ese token[$^2$](#2).

#### Finalmente, creemos un RDD llamado `vectorsRDD` donde cada registro está representado por una matriz donde cada fila representa una palabra transformada.

In [61]:
# EXERCICIO
uniqueWords = (wordsRDD
               .flatMap(lambda x:x)
               .map(lambda x:(x,1))
               .reduceByKey(lambda x,y:x+y)
               .filter(lambda x:x[1]>=5).map(lambda x:x[0])
               .collect()
               )
print ('{} tokens únicos'.format(len(uniqueWords)))
w2v = {}
for w in uniqueWords:
    w2v[w] = model.transform(w)
w2vb = sc.broadcast(w2v)  # acesse como w2vb.value[w]     
print ('Vetor entertaining: {}'.format( w2v[u'entertaining']))
vectorsRDD = (wordsRDD
              .map(lambda x:list(map(lambda y:w2vb.value[str(y)],x)))
             )
print(vectorsRDD.take(2))
recs = vectorsRDD.take(2)
firstRec, secondRec = recs[0], recs[1]
#print (firstRec.shape, secondRec.shape)

                                                                                

3266 tokens únicos
Vetor entertaining: [0.00018144746718462557,-0.11685658246278763,0.07119239866733551,0.06759867817163467,-0.1411903202533722]


22/01/22 16:12:06 ERROR Executor: Exception in task 0.0 in stage 146.0 (TID 10066)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/ubuntu/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/home/ubuntu/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
    serializer.dump_stream(out_iter, outfile)
  File "/home/ubuntu/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/ubuntu/spark-3.2.0-bin-hadoop3.2/python/pyspark/rdd.py", line 1562, in takeUpToNumLeft
    yield next(iterator)
  File "/home/ubuntu/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_7552/3181731386.py", line 16, in <lambda>
  File "/tmp/ipykernel_7552/3181731386.py", line 16, in <lam

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 146.0 failed 1 times, most recent failure: Lost task 0.0 in stage 146.0 (TID 10066) (192.168.1.2 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/ubuntu/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/home/ubuntu/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
    serializer.dump_stream(out_iter, outfile)
  File "/home/ubuntu/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/ubuntu/spark-3.2.0-bin-hadoop3.2/python/pyspark/rdd.py", line 1562, in takeUpToNumLeft
    yield next(iterator)
  File "/home/ubuntu/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_7552/3181731386.py", line 16, in <lambda>
  File "/tmp/ipykernel_7552/3181731386.py", line 16, in <lambda>
KeyError: 'introspective'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:545)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:703)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:685)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:498)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:166)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2352)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2351)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2351)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1109)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2591)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2533)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:898)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at jdk.internal.reflect.GeneratedMethodAccessor80.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/ubuntu/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/home/ubuntu/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
    serializer.dump_stream(out_iter, outfile)
  File "/home/ubuntu/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/ubuntu/spark-3.2.0-bin-hadoop3.2/python/pyspark/rdd.py", line 1562, in takeUpToNumLeft
    yield next(iterator)
  File "/home/ubuntu/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_7552/3181731386.py", line 16, in <lambda>
  File "/tmp/ipykernel_7552/3181731386.py", line 16, in <lambda>
KeyError: 'introspective'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:545)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:703)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:685)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:498)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:166)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


In [28]:
# TEST Tokenizing the small datasets (1c)
assert len(uniqueWords) == 3388,  'valor incorreto'
assert np.mean(np.abs(w2v[u'entertaining']-[0.0136831374839,0.00371457682922,-0.135785803199,0.047585401684,0.0414853096008]))<1e-6,'valor incorreto'
assert secondRec.shape == (10,5)

AssertionError: valor incorreto

### **Parte 2: k-Means para cuantificar atributos**

#### Llegados a este punto, es fácil ver que no podemos aplicar nuestras técnicas de aprendizaje supervisado a esta base de datos:

   * #### La regresión logística requiere un vector de tamaño fijo que represente cada objeto
   * #### k-NN necesita una forma clara de comparar dos objetos, ¿qué métrica de similitud debemos aplicar?
  
#### Para resolver esta situación, realicemos una nueva transformación en nuestro RDD. Primero, aprovechemos el hecho de que dos tokens con un significado similar se asignan a vectores similares para agruparlos en un solo atributo.

#### Al aplicar k-Means a este conjunto de vectores, podemos crear $k$ puntos representativos y, para cada documento, generar un histograma de recuento de tokens en los clústeres generados.

#### **(2a) Agrupando los vectores y creando centros representativos**

#### Como primer paso generaremos un RDD con los valores del diccionario `w2v`. A continuación, aplicaremos el algoritmo `k-Means` con $k = 200$ y $seed = 42$.

In [None]:
# EXERCICIO
from  pyspark.mllib.clustering import KMeans

vectors2RDD = sc.parallelize(np.array(list(w2v.values())),1)
print ('Sample vector: {}'.format(vectors2RDD.take(1)))

modelK = KMeans.<COMPLETAR>

clustersRDD = vectors2RDD.<COMPLETAR>
print ('10 first clusters allocation: {}'.format(clustersRDD.take(10)))

In [None]:
# TEST Amazon record with the most tokens (1d)
assert clustersRDD.take(10)==[142, 83, 42, 0, 87, 52, 190, 17, 56, 0], 'valor incorreto'

#### **(2b) Transformación de matriz de datos en vectores cuantificados**

#### El siguiente paso es transformar nuestro RDD de frases en un RDD de pares (id, vector cuantificado). Para ello crearemos una función cuantificadora que recibirá como parámetros el objeto, el modelo k-means, el valor de k y el diccionario word2vec.

#### Para cada punto, separemos el id y apliquemos la función `tokenize` a la cadena. Luego transformamos la lista de tokens en una matriz word2vec. Finalmente, aplicamos cada vector de esta matriz al modelo k-Means, generando un vector de tamaño $k$ donde cada posición $i$ indica cuántos tokens pertenecen al clúster $i$.

In [None]:
# EXERCICIO
def quantizador(point, model, k, w2v):
    key = <COMPLETAR>
    words = <COMPLETAR>
    matrix = np.array( <COMPLETAR> )
    features = np.zeros(k)
    for v in matrix:
        c = <COMPLETAR>
        features[c] += 1
    return (key, features)
    
quantRDD = dataRDD.map(lambda x: quantizador(x, modelK, 500, w2v))

print quantRDD.take(1)

In [None]:
# TEST Implement a TF function (2a)
assert quantRDD.take(1)[0][1].sum() == 5, 'valores incorretos'