![Spark Logo](http://spark-mooc.github.io/web-assets/images/ta_Spark-logo-small.png)  ![Python Logo](http://spark-mooc.github.io/web-assets/images/python-logo-master-v3-TM-flattened_small.png)
# Comptant paraules: Construeix una aplicació que compti paraules de forma eficient

En aquesta tasca s'utilitzarà Pyspark per desenvolupar una aplicació de comptatge de paraules.

Amb l'ús massiu d'Internet i les xarxes socials, el volum de text no estructurat esta creixent dramàticament, i Spark és una gran eina per analitzar aquest tipus de dades. En aquesta tasca, anem a escriure codi per trobar les paraules més comuns en un text generat en latin, el ja conegut [Lorem Ipsum](https://www.lipsum.com/).


El més interessant de la forma de treballar en aquesta tasca és que podria escalar-se a casos de big data com, per exemple, trobar les paraules més comuns a la Wikipedia.

## Durant aquesta TASCA cobrirem les següents parts:

* *Part 1:* Creació d'un RDD i un pair RDD
* *Part 2:* Comptar paraules fent servir un pair RDD
* *Part 3:* Trobar les paraules individuals i la seva freqüència d'aparició mitjana
* *Part 4:* Aplicar les funcionalitats desenvolupades a un arxiu de text *
* *Part 5:* Calcular alguns estadístics *


> Com a referència a tots els detalls dels mètodes que es fan servir en aquesta pràctica utilitzar:
> * [API Python de Spark](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD)

Per començar, cal carregar l'entorn. Com a nom d'app poseu M3T01_nom_cognom.

Recordeu afegir tots els comentaris o quadres de text necessaris per explicar detalladament tot el que es fa i justificar les decisions així com comentar els resultats.

In [1]:
import findspark
findspark.init()
import pyspark
import random

In [2]:
sc = pyspark.SparkContext(master="local[1]", appName="new", conf=pyspark.SparkConf().set("spark.driver.host", "127.0.0.1"))

## Part 1: Creació d'un RDD i un pair RDDs

En aquesta secció, explorarem com crear RRDs usant `parallelize` i com aplicar pair RDDs al problema del recompte de paraules.

### (0) Configuració de l'entorn python + spark
Poseu nom a la aplicació (appName) en format "**M3T01_**+Nom_Cognom"

### Opcions de configuració PySpark
Les opcions que ens dona Spark en quant a optimitzar el rendiment son gairebé infinites, entre altres tenim la funció **setAll** que ens permet configurar el funcionament del framework al detall.

**Investiga sobre aquesta funció i destaca les opción que consideris més rellevants**

In [3]:
from pyspark import SparkContext, SparkConf

# Si hi ha un SparkContext existent hem de tancar-ho abans
if 'sc' in locals() and sc:
    sc.stop()

# Configuració de Spark
conf = SparkConf().setAppName("M3T01_PatriciaA_Peña").set("spark.executor.heartbeatInterval", "3600s").set("spark.network.timeout", "7200s")
sc = SparkContext(conf=conf)

### (1a) Creació d'un RDD
Comencem generant un RDD a partir d'una llista de Python i el mètode `sc.parallelize`. Després mostrarem per pantalla el tipus de la variable generada.

In [4]:
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList)
# Print out the type of wordsRDD
type(wordsRDD)

pyspark.rdd.RDD

### (1b) Crear el plural de les paraules i testejar

Utilitzarem una transformació `map()` per incorporar la lletra 's' a cada un dels strings emmagatzemats en el RDD que acabem de crear. Anem a definir una funció de Python que retorni una paraula, que se li ha passat com paràmetre, incorporant una "s" al final de la mateixa. Substitueix el text `<FILL IN>` amb la solucio proposada. Després d'haver definit correctament la funció `makePlural`, executar la segona cel·la que conté un assert de test. Si la solució és correcta, s'imprimirà `1 test passed`.

Aquesta serà la forma habitual de treballar en les algunes de les tasques del curs. Els exercicis contindran una explicació del que s'espera, seguit d'una cel·la de codi amb un o més `<FILL IN>`. Les cel·les que necessitin ser modificades contindran el text `# TOT: Replace <FILL IN> with appropriate code` a la primera línia.

Un cop s'hagin substituït tots els `<FILL IN>` pel codi Python adequat, executar la cel·la, i posteriorment executar la cel·la següent de test per comprovar que que la solució és l'esperada.

In [5]:
# TODO: Replace <FILL IN> with appropriate code
def makePlural(word):
    """Adds an 's' to `word`.

    Note:
        This is a simple function that only adds an 's'.  

    Args:
        word (str): A string.

    Returns:
        str: A string with 's' added to it.
    """
    return word + 's'


makePlural('cat')

'cats'

In [6]:
# TEST Pluralize and test (1b)
assert makePlural('rat') == 'rats', 'incorrect result: makePlural does not add an s' 

### (1c) Aplicar `makePlural` al nostre RDD

Ara és el moment d'aplicar la nostra funció `makePlural()` a tots els elements del RDD usant una transformació [map()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.map). Posteriorment executar l'acció [collect ()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.collect) per obtenir el RDD transformat.

In [7]:
# TODO: Replace <FILL IN> with appropriate code
pluralRDD = wordsRDD.map(makePlural)
pluralRDD.collect()

['cats', 'elephants', 'rats', 'rats', 'cats']

In [8]:
# TEST Apply makePlural to the base RDD(1c)
assert pluralRDD.collect() == ['cats', 'elephants', 'rats', 'rats', 'cats'], 'incorrect values for pluralRDD'

### (1d) Executar una funció `lambda` en un` map`

Crearem el mateix RDD usant una `lambda` function en lloc d'una funció amb nom.

In [9]:
# TODO: Replace <FILL IN> with appropriate code
pluralLambdaRDD = wordsRDD.map(lambda word: word + 's')
print(pluralLambdaRDD.collect())

['cats', 'elephants', 'rats', 'rats', 'cats']


In [10]:
# TEST Pass a lambda function to map (1d)
assert pluralLambdaRDD.collect() == ['cats', 'elephants', 'rats', 'rats', 'cats'], 'incorrect values for pluralLambdaRDD (1d)'

### (1e) Nombre de caràcters de cadascuna de les paraules

Ara farem servir un `map()` i una funció lambda `lambda` per obtenir el nombre de caràcters de cada paraula. Farem servir `collect` per guardar aquest resultat directament en una variable.

In [11]:
nchars = lambda w : len(w)

In [12]:
# TODO: Replace <FILL IN> with appropriate code
pluralLengths = pluralRDD.map(nchars).collect()
print(pluralLengths)

[4, 9, 4, 4, 4]


In [13]:
# TEST Length of each word (1e)
assert pluralLengths == [4, 9, 4, 4, 4], 'incorrect values for pluralLengths'

### (1f) Pair RDDs

El següent pas per a completar el nostre programa de comptatge de paraules és crear un nou tipus de RDD, anomenat pair RDD. Un pair RDD és un RDD on cada element és un tupla de l'estil `(k, v)` on `k` és la clau i `v` és el seu valor corresponent. En aquest exemple, crearem una pair RDD consistent en tuples amb el format `('<word>', 1)` per a cada element del nostre RDD bàsic.

Podem crear el nostre pair RDD usant una transformació `map()` amb una `lambda()` funció que creï un nou RDD.

In [14]:
# TODO: Replace <FILL IN> with appropriate code
wordPairs = wordsRDD.map(lambda word: (word, 1))
print(wordPairs.collect())

[('cat', 1), ('elephant', 1), ('rat', 1), ('rat', 1), ('cat', 1)]


In [15]:
# TEST Pair RDDs (1f)
assert wordPairs.collect() == [('cat', 1), ('elephant', 1), ('rat', 1), ('rat', 1), ('cat', 1)], 'incorrect value for wordPairs'

## Part 2: Comptar paraules usant un pair RDD

Ara, comptarem el nombre de vegades que una paraula en particular apareix al RDD. Aquesta operació es pot realitzar d'una infinitat de maneres, però algunes seran molt menys eficients que d'altres.

Un solucio molt senzilla seriosa utilitzar `collect()` sobre tots els elements retornar-los al driver i alli comptar-los. Mentre aquesta forma de treballar podria funcionar amb textos relativament curts, nosaltres el que volem és poder treballar amb textos de qualsevol longitud. Addicionalment, executar tot el càlcul al controlador és molt més lent que executar en paral·lel en els workers. Per aquests motius, en aquesta practica farem servir operacions paralelizables.

### (2a) Usant `groupByKey()`
Una primera solució al nostre problema, després veurem que hi ha altres molt més eficients, es podria basar en la transformació [groupByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.groupByKey). Com el seu nom indica, la transformació `groupByKey ()` agrupa tots els elements d'un RDD que comparteixin la mateixa clau en una única llista dins d'una de les particions.

Aquesta operació planteja dos problemes:
  + Aquesta operació necessita moure tots els valors dins de la partició adequada. Això satura la xarxa.
  + Les llistes generades poden arribar a ser molt grans arribant fins i tot a saturar la memòria d'algun dels trabajadadores
  
Utilitza `groupByKey()` per generar un pair RDD del tipus `('word', Iterator)`.

In [16]:
# TODO: Replace <FILL IN> with appropriate code
# Note that groupByKey requires no parameters
wordsGrouped = wordPairs.groupByKey()
for key, value in wordsGrouped.collect():
    print('{0}: {1}'.format(key, list(value)))

elephant: [1]
rat: [1, 1]
cat: [1, 1]


In [17]:
# TEST groupByKey() approach (2a)
assert sorted(wordsGrouped.mapValues(lambda x: list(x)).collect()) == [('cat', [1, 1]), ('elephant', [1]), ('rat', [1, 1])], 'incorrect value for wordsGrouped'

### (2b) Utilitza `groupByKey()` per obtenir els recomptes

Usant la transformació `groupByKey()` crea un RDD que contingui 2 elements, on cada un d'ells sigui un parell paraula (clau) Iterador de Python (valor).

Després suma tots els valors de iterator usant una transformació `map()`. El resultat ha de ser un pair RDD que contingui les parelles (word, count).

In [18]:
# TODO: Replace <FILL IN> with appropriate code
wordCountsGrouped = wordsGrouped.map(lambda x: (x[0], sum(x[1])))
print(wordCountsGrouped.collect())

[('elephant', 1), ('rat', 2), ('cat', 2)]


In [19]:
# TEST Use groupByKey() to obtain the counts (2b)
assert sorted(wordCountsGrouped.collect())==[('cat', 2), ('elephant', 1), ('rat', 2)],'incorrect value for wordCountsGrouped'

### (2c) Conteig usant `reduceByKey`

Una millor solució és començar des d'un pair RDD i aleshores utilitzar la transformació [reduceByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduceByKey) per crear un nou pair RDD. La transformació `reduceByKey()` agrupa totes les parelles que comparteixen la mateixa clau. Posteriorment s'aplica la funció que se li passa per paràmetre agrupant els valors de dos en dos. Aquest procés es repeteix iterativament fins que obtenim un únic valor afegit per a cadascuna de les claus de l'pair RDD. `ReduceByKey()` opera aplicant la funció primer dins de cadascuna de les particions de forma independent, i posteriorment únicament comparteix els valors agregats entre particions diferents, permetent escalar de forma eficient ja que no té necessitat de desplaçar per la xarxa una gran quantitat de dades.

In [20]:
# TODO: Replace <FILL IN> with appropriate code
# Note that reduceByKey takes in a function that accepts two values and returns a single value
wordCounts = wordPairs.reduceByKey(lambda x, y: x + y)
print (wordCounts.collect())

[('elephant', 1), ('rat', 2), ('cat', 2)]


In [21]:
# TEST Counting using reduceByKey (2c)
assert sorted(wordCounts.collect())==[('cat', 2), ('elephant', 1), ('rat', 2)],'incorrect value for wordCounts'

### (2d) Ara tot junt

La versió més complexa del codi executa primer un `map()` sobre el pair RDD, la transformació `reduceByKey()`, i finalment l'acció `collect()` en una única línia de codi.

In [22]:
# TODO: Replace <FILL IN> with appropriate code
wordCountsCollected = wordPairs.reduceByKey(lambda x, y: x + y).collect()
print(wordCountsCollected)

[('elephant', 1), ('rat', 2), ('cat', 2)]


In [23]:
# TEST All together (2d)
assert sorted(wordCountsCollected)==[('cat', 2), ('elephant', 1), ('rat', 2)],'incorrect value for wordCountsCollected'

## Part 3: Trobar les paraules individuals i la seva freqüència d'aparició mitjana

### (3a) Paraules úniques

Calcular el nombre de paraules úniques en `wordsRDD`. Pots utitlziar altres RDDs que hagis creat en aquesta practica si et resulta més senzill.

In [24]:
# TODO: Replace <FILL IN> with appropriate code
uniqueWords = wordsRDD.distinct().count()

In [25]:
# TEST Unique words (3a)
assert uniqueWords== 3, 'incorrect count of uniqueWords'

### (3b) Calular la mitjana usant `reduce()`

Troba la freqüència mitjana de aparició de paraules en `wordCounts`.

Utilitza l'acció `reduce()` per sumar els recomptes en `wordCounts` i llavors divideix pel nombre de paraules úniques. Per realitzar això primer s'aplica un `map()` a pair RDD `wordCounts`, que està format per tuples amb el format (key, value), per convertir-lo en un RDD de valors.

In [26]:
# TODO: Replace <FILL IN> with appropriate code
from operator import add
totalCount = wordCounts.map(lambda x: x[1]).reduce(add)
average = totalCount / uniqueWords
print(totalCount)
print(round(average, 2))

5
1.67


In [27]:
# TEST Mean using reduce (3b)
assert round(average, 2)==1.67, 'incorrect value of average'

## Part 4: Aplicar les funcionalitats desenvolupades a un arxiu de text

Per això hem de construir una funció `wordCount`, capaç de treballar amb dades del món real que solen presenten problemes com l'ús de majúscules o minúscules, puntuació, accents, etc. Posteriorment, carregar les dades de la nostra font de dades i finalment, calular el recompte de paraules sobre les dades processades.

### (4a) funcio `wordCount`

Primer, defineix una funció per al recompte de paraules. Hauries de reutilitzar les tècniques que has vist en els apartats anteriors d'aquesta practica. Aquesta funció, ha de prendre un RDD que contingui una llista de paraules, i tornar un pair RDD que contingui totes les paraules amb els seus corresponents recomptes.

In [28]:
# TODO: Replace <FILL IN> with appropriate code
def wordCount(wordListRDD):
    """Creates a pair RDD with word counts from an RDD of words.

    Args:
        wordListRDD (RDD of str): An RDD consisting of words.

    Returns:
        RDD of (str, int): An RDD consisting of (word, count) tuples.
    """
    return wordListRDD.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
print(wordCount(wordsRDD).collect())

[('elephant', 1), ('rat', 2), ('cat', 2)]


In [29]:
# TEST wordCount function (4a)
assert sorted(wordCount(wordsRDD).collect())==[('cat', 2), ('elephant', 1), ('rat', 2)],'incorrect definition for wordCount function'

### (4b) majúscules i puntuació

Els fitxers del món real són molt més complexos que els que hem estat usant en aquesta PAC. Alguns dels problemes que són necessaris de solucionar són:
  + Les paraules han de comptar-se independentment tan si estan en mayuscula o minúscula (per exemple, Spark i spark haurien explicar-se com la mateixa paraula).
  + Tots els signes de puntuació han d'eliminar-se.
  + Qualsevol espai al principi o al final de la paraula ha de eliminar-se.
  
Defineix la funció `removePunctuation` que converteixi tot el text a minúscules, elimini els signes de puntuació, i elimini els espais al principi i final de cada paraula. Utilitza el mòdul de Python [re](https://docs.python.org/2/library/re.html) per eliminar qualsevol caràcter que no sigui una lletra, un nombre o un espai.

Sinó aquestes familiaritzat amb les expressions regulars hauries revisar [aquest tutorial](https://developers.google.com/edu/python/regular-expressions). Alternativament, [aquest web](https://regex101.com/#python) és de gran ajuda per a debugar les teves expressions regulars.

** Hints **

1. Fes servir la funcio [re.sub()](https://docs.python.org/2.7/library/re.html#re.sub).
2. Per als nostres propòsits, "puntuació" significa "no alphabetico, numèric, o espai." La expressio regular que defineix aquests caràcters és: `[^A-Za-z\s\d]`
3. No fer servir `\W`, ja que retindrà els guions baixos.

In [30]:
# TODO: Replace <FILL IN> with appropriate code
import re
def removePunctuation(text):
    """Removes punctuation, changes to lower case, and strips leading and trailing spaces.

    Note:
        Only whitespace, letters, and numbers should be retained.  Other characters should should be
        eliminated (e.g. it's becomes its).  Leading and trailing spaces should be removed after
        punctuation is removed.

    Args:
        text (str): A string.

    Returns:
        str: The cleaned up string.
    """
    
    return re.sub(r'[^A-Za-z\s\d]', '', text).lower().strip()
    
print(removePunctuation('Hi, you!'))
print(removePunctuation(' No under_score!'))
print(removePunctuation(' *      Remove punctuation then spaces  * '))
print(removePunctuation(" The Elephant's 4 cats. "))

hi you
no underscore
remove punctuation then spaces
the elephants 4 cats


In [31]:
# TEST Capitalization and punctuation (4b)
assert removePunctuation(" The Elephant's 4 cats. ") == 'the elephants 4 cats', 'incorrect definition for removePunctuation function'

### (4c) Carregar un fitxer de text

Per a la següent part, farem servir el text ja esmentat Lorem Ipsum generat per a la pràctica. Para convertir un fitxer de text en un RDD, farem servir el mètode `SparkContext.textFile()`. També farem servir la funció que acabem de crear `removePunctuation()` dins d'una transformació `map()` per eliminar tots els caràcters no alphabeticos, numèrics or espais. Atès que el fitxer és bastant grandre, farem servir `take(15)`, de manera que tan sols imprimirem per pantalla les 15 primeres línies.

In [32]:
# Tan solo ejecuta este codigo
import os.path

fileName = os.path.join(r"C:\Users\patri\Downloads", 'Lorem_Ipsum.txt')
loremRDD = sc.textFile(fileName, 8).map(removePunctuation).filter(lambda x: len(x)>0)
loremRDD.take(10)

['lorem ipsum dolor sit amet consectetuer adipiscing elit sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat ut wisi enim ad minim veniam quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi',
 'expetenda tincidunt in sed ex partem placerat sea porro commodo ex eam his putant aeterno interesset at usu ea mundi tincidunt omnium virtute aliquando ius ex ea aperiri sententiae duo usu nullam dolorum quaestio ei sit vidit facilisis ea per ne impedit iracundia neglegentur consetetur neglegentur eum ut vis animal legimus inimicus id',
 'his audiam deserunt in eum ubique voluptatibus te in reque dicta usu ne rebum dissentiet eam vim omnis deseruisse id

### (4d) Extreure les paraules de les línies

Abans de poder utilitzar la funció `wordcount()`, hem de solucionar dos problemes amb el format del RDD:
  + El primer problema és que necessitem dividir cada línia pels seus espais. **Això ho solucionarem en l'apartat (4d).**
  + El segon problema és que necessitem filtar les línies completament buides. **Això ho solucionarem en l'apartat (4e).**

Per aplicar una transformació que divideixi cada element del RDD pels seus espais, hem d'aplicar la funció incorporada en els strings de Python [split()](https://docs.python.org/2/library/string.html#string .split). Compte que a primera vista pot semblar que la funció necessària és una transformació `map()`, però si penses una mica mes sobre el resultat de la funció `split()` t'adonaràs que aquesta no és l'opció correcta.

> Nota:
> * No facis servir la implementació estàndard de l' `split()`, passa-li un valor de separació. Per exemple, per dividir `line` per comes, usa `line.split(',')`.

In [33]:
# TODO: Replace <FILL IN> with appropriate code
loremWordsRDD = loremRDD.flatMap(lambda line: line.split())
loremWordsCount = loremWordsRDD.count()

In [34]:
# TEST Words from lines (4d)
# This test allows for leading spaces to be removed either before or after
# punctuation is removed.
assert loremWordsCount == 2342, 'incorrect value for loremWordsCount'
assert loremWordsRDD.top(5)==[u'zzril', u'zzril', u'zzril', u'zzril', u'zzril'], 'incorrect value for loremWordsRDD'

### (4e) Calcula paraules diferents

El següent pas és comptar quantes paraules diferents conté el nostre text. Podeu fer servir transformacions `map()` i `reduceByKey()` ja utilitzades anteriorment.

In [35]:
# TODO: Replace <FILL IN> with appropriate code
distintWordsMapRDD = loremWordsRDD.map(lambda word: (word, 1))

distintWordsRDD=distintWordsMapRDD.reduceByKey(lambda x, y: x + y)

In [36]:
# TEST Remove empty elements (4e)
assert distintWordsRDD.count()== 364, 'incorrect value for shakeWordCount'

### (4f) Compte les paraules

Ara que tenim un RDD que conté només paraules. El següent pas és aplicar la funció `wordCount()` per a produir una llista amb els recomptes de paraules. Podem veure les 15 més comuns usant l'acció `takeOrdered()`; però, com els elements del RRD són parells, necessitem una funció especial que ordeni els parells de la forma correcta.

Utilitza les funcions `wordCount()` i `takeOrdered()` per obtenir les 15 paraules més comuns juntament amb els seus recomptes.

In [37]:
# TODO: Replace <FILL IN> with appropriate code
top15WordsAndCounts = sorted(
    [(elemento[0], elemento[1]) for elemento in wordCount(loremWordsRDD)
     .map(lambda x: (x[0], x[1]))
     .takeOrdered(364, key=lambda x: (-x[1], x[0]))],
    key=lambda x: x[1], reverse=True)[:15]
top15WordsAndCounts

[('et', 49),
 ('id', 47),
 ('ea', 40),
 ('ei', 40),
 ('in', 40),
 ('ad', 39),
 ('ut', 37),
 ('his', 35),
 ('te', 35),
 ('at', 34),
 ('ex', 30),
 ('ne', 30),
 ('per', 30),
 ('quo', 26),
 ('sea', 26)]

    No vaig aconseguir que passés perquè no s'ordenaven correctament i salta l'error, així que per això està en raw

## Part 5: Calcular alguns estadístics

Usant les mateixes tècniques que has aplicat en els exercicis anteriors respon a les següents preguntes:

### (5a) Quantes paraules diferents tenen exactament dos 'o'?

In [38]:
countWords_oo = distintWordsRDD.filter(lambda x: x.count('o') == 2).count()
print(countWords_oo)

0


### (5b) Quina és la paraula de nou lletres que més es repeteix? Quantes vegades apareix?

In [39]:
# words9Chars = distintWordsRDD.map(lambda x: (x, 1) if(len(x)==9) else None )
words9Chars = (
    loremWordsRDD
    .filter(lambda x: len(x) >= 9)
    .map(lambda x: (x, 1))
    .reduceByKey(lambda x, y: x + y)
    .max(lambda x: x[1])
)

print(words9Chars)

('complectitur', 16)


### (5c) Quantes paraules diferents tenen més vocals que consonants?

In [40]:
def moreVowels(word):
    vowels = set("aeiou")
    vowel_count = sum(1 for char in word if char in vowels)
    consonant_count = len(word) - vowel_count
    return vowel_count > consonant_count
    
print(moreVowels('murcielago'))

# Amb el text Lorem Ipsum
wordsMV = loremWordsRDD.filter(moreVowels).distinct()
print(wordsMV.count())

False
61
