In [None]:
import pyspark
from pyspark.sql import SparkSession
app_name = "accidentes"
master = "local[*]"
spark = (SparkSession.builder
    .master(master)
    .config("spark.driver.cores", 1)
    .appName(app_name)
    .getOrCreate() )
sc = spark.sparkContext
print ('SparkContext created')

In [None]:
import urllib.request
def getLinesFromUrl(url:str):
    """
    Downloads content from a url and
    creates a text based RDD
    """
    response = urllib.request.urlopen(url)
    data = response.read().decode('utf-8')
    lines = data.split('\n')
    # creates a RDD for the book
    return sc.parallelize(lines)


In [None]:
huckleberry_lines = getLinesFromUrl ('https://www.gutenberg.org/files/76/76-0.txt')
hamlet_lines = getLinesFromUrl ('https://www.gutenberg.org/files/2265/2265.txt')

In [None]:
import re

def getRDDWords(book):
    def clean_line(line):
        """
        Remove \ufeff\r characters
        Remove \t \n \r
        Remove additional characters
        """
        return line.replace('\ufeff\r', '').\
            replace('\t', ' ').replace('\n', '').replace('\r', '').\
            replace('(', '').replace(')', '').replace("'", '').\
            replace('"', '').replace(',', ''). replace('.', '').\
            replace('*', '')

    def normalize_tokenize(line):
        """
        Normalize: lowercase
        tokenize: split in tokens
        """
        return re.sub('\s+', ' ', line).strip().lower().split(' ')

    return book.map(lambda x: clean_line(x)).\
        filter(lambda x: x != '').flatMap(normalize_tokenize) #.\
        #filter(lambda x: len(x)>3).\
        #map (lambda x: (x, 1)).\
        #reduceByKey (lambda x, y: x + y)


In [None]:
huckleberry_words = getRDDWords (huckleberry_lines)
huckleberry_counts = huckleberry_words.map (lambda x: (x, 1)).\
                                       reduceByKey (lambda x, y: x + y)
print ('Tokens: {} / First {}'.format (huckleberry_counts.count(), huckleberry_counts.take(5)))

In [None]:
hamlet_words = getRDDWords (hamlet_lines)
hamlet_counts = hamlet_words.map (lambda x: (x, 1)).\
                                       reduceByKey (lambda x, y: x + y)
print ('Tokens: {} / First {}'.format (hamlet_counts.count(), hamlet_counts.take(5)))

# Calcular la media de la longitud de palabra
---
<br>
puede hacer uso de la funcion [RDD].reduce(f)
<br>
<br>
<br>
<br>
<br>
<br>

In [None]:
#Calculate average length word. Reduce function
import operator

huckleberry_words.map (lambda x: len(x)).reduce(operator.add)/ huckleberry_words.count()

In [None]:
# ¿Como se haría con el RDD hucdkleberry_counts()?
huckleberry_counts.take(4)

In [None]:
huckleberry_counts.map (lambda x: len(x[0])*x[1]).sum()/huckleberry_counts.values().sum()

In [None]:


huckleberry_words.map (lambda x: len(x)).sum()/ huckleberry_words.count()


In [None]:
huck_word_len_mean = huckleberry_words.map (lambda x: len(x)).mean()

In [None]:
'''
El método aggregate tiene un valor base sobre el que se hacen las agregaciones

Después se le define una función de transformación de los elementos del RDD en tuplas
en las que el primer elemento es la long. acumulada y el segundo el contador de palabras.
A la función lambda le llega como primer parámetro el acumulador, y como segundo los 
elementos del RDD 'huckleberry_words'

Por último definimos otra función que hace un merge de todas las tuplas
'''#El método aggregate tiene un valor base: (0,0)
#
sumChars,numWords = huckleberry_words.aggregate ( (0,0),
                           lambda ac, v: (ac[0]+len(v), ac[1]+1),
                           lambda item1, item2: (item1[0]+item2[0], item1[1]+item2[1]))
print ('La media por palabra es {}'.format (sumChars/numWords))

# Desviación estándar
---
$\Large\text{DE} = \sqrt{\dfrac{\sum\limits_{}^{}{{\lvert x-\mu\rvert^2}}}{N}}$

### donde: 
* $\text{x}$ son los elementos de la población, cada una de las longitudes
* $\mu$ es la media de la población, en este caso la longitud media de las palabras
* $\text{N}$ es el número de elementos de la población, la cantidad de palabras
<br>
<br>
<br>
<br>
<br>
<br>
<br>
<br>

In [None]:
import math
math.sqrt(huckleberry_words.map (lambda x: (len(x)-huck_word_len_mean)**2).sum()/huckleberry_words.count())

In [None]:
huckleberry_words.map(lambda x:len(x)).stdev()

# 5 palabras más frecuentes
---
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>

In [None]:
huckleberry_counts.top(5, key=lambda k: k[1])

In [None]:
huckleberry_counts.takeOrdered (5,key=lambda k:-k[1])

In [None]:
#Elimina el contexto spark
#Debe ser la última sentencia
sc.stop()