In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
import re
import numpy as np
from pyspark.sql.functions import monotonically_increasing_id

## 5. Calcular el peso tf-idf para cada palabra, bigrama y trigrama

### Build corpus

In [2]:
def build_corpus(path, sc, ngram, sample_size=None):
    original_rdd = sc.textFile(path)
    if sample_size is None: 
        return original_rdd.map(build_document)
    sample_list = original_rdd.takeSample(False, sample_size)
    corpus_rdd = sc.parallelize(sample_list)
    if ngram == 'n':
        corpus_rdd = corpus_rdd.map(build_document)
    elif ngram == 'bi':
        corpus_rdd = corpus_rdd.map(build_document).map(filter_bigram)
    elif ngram == 'tri':
        corpus_rdd = corpus_rdd.map(build_document).map(filter_trigram)
    else:
        raise Exception(f"Incorrect option: \'{ngram}\'. Valid options are \'n\' for whole words, \'bi\' for bigrams and \'tri\' for trigram.")
    return (sample_list, corpus_rdd)

def build_document(row):
    document = row.strip().lower().replace("\"", ' ').replace(r"\n", ' ').replace("<br />", ' ').replace("-", '').replace("/", ' ').replace("<br>", ' ').replace("..", ' ').replace("?", " ").split('\" , \"')[0]
    document = re.sub('[^A-Za-z0-9 ]+', '', document).strip().split(' ')
    found_empty_spaces_in = lambda word: False if word == '' or word == ' ' else True
    document = list(word for word in document if found_empty_spaces_in(word))
    #Ignoring Yahoo! Answer's 'category' field
    document = document[1:]
    return document

def filter_bigram(document):
    return list(x[:2] for x in document if len(x) >= 2)

def filter_trigram(document):
    document = list(gram for gram in document if len(gram) >= 3)
    return list(x[:3] for x in document)

### Inverse Document Frequency (IDF)

$$\text{idf}(t,D) = \log\frac{N}{|\{d\in D: t\in d\}|}$$

- término $t$
- corpus $D$
- documento $d$
- $N$: tamaño del corpus
- $|\{d\in D: t\in d\}|$: número de documentos en los cuales aparece $t$

In [3]:
def build_idf_dict(rdd, corpus_size):
    idf_per_word_rdd = rdd.map(lambda x: [(word, 1) for word in x]) \
    .flatMap(lambda x: x) \
        .reduceByKey(lambda a, b: a + b) \
            .mapValues(lambda word_count : np.log(corpus_size / word_count))
    idf_dict = dict()
    for word, idf in idf_per_word_rdd.collect():
        idf_dict[word] = idf
    return (idf_dict, idf_per_word_rdd)

### Term Frequency (TF)

$$\text{tf}(t,d) = \frac{f_{t,d}}{|d|}$$

- término $t$
- documento $d$
- $f_{t,d}$: frecuencia de $t$ en $d$, i.e., el número de veces que aparecer $t$ en $d$
- $|d|$: cantidad de términos contenidos en $d$

In [4]:
def count_words(document):
    counted_words = dict()
    elements = [(word, 1) for word in document]
    for word, count in elements:
        if word in counted_words:
            t = counted_words[word]
            counted_words[word] = (t[0], t[1] + 1)
        else:
            counted_words[word] = (word, count)
    return list(counted_words.values())

def build_tf(rdd, idf_dict):
    word_freq_rdd = rdd.map(count_words)
    tf_rdd = word_freq_rdd.map(lambda document: [(word, np.round(word_freq/len(document),5)) for word, word_freq in document])
    return (tf_rdd, word_freq_rdd)

### Auxiliary functions

In [5]:
def verbose_corpus(rdd, original_rdd_list, sample_size):
    print("-------------------------------------------")
    print("|                                         |")
    print("|                Corpus                   |")
    print("|                                         |")
    print("-------------------------------------------")    
    for idx, document in enumerate(rdd.take(sample_size)):
        print(f"Index: {idx} \t Length: {len(document)}")
        print("-----------------------------------------")
        if original_rdd_list is None:
            print(f"Cleaned doc: \n{document}\n\n")
            continue
        print(f"Original doc: \n{original_rdd_list[idx]}\n\nCleaned doc: \n{document}\n\n")

def verbose_idf(rdd, idf_dict, sample_size):
    print("\n\n-------------------------------------------")
    print("|                                         |")
    print("|       Inverse Document Frequency        |")
    print("|                                         |")
    print("-------------------------------------------")
    rdd = rdd.sortBy(lambda x: x[1], ascending=False)
    print("Término ------------ IDF\n")
    for el in rdd.take(sample_size//3):
        print(el, end="\n\n")

def verbose_tf(rdd, word_freq_rdd, original_rdd, sample_size):
    print("\n\n-------------------------------------------")
    print("|                                         |")
    print("|              Term Frequency             |")
    print("|                                         |")
    print("-------------------------------------------")
    wf_rdd = word_freq_rdd.take(num_documents)
    for idx, element in enumerate(rdd.take(sample_size)):
        print(f"Index: {idx} \tWords: {len(element)}\n--------------------------------")
        print(f"Original:\n\n{original_rdd[idx]}\n")
        print(f"Word frequency: \n\n{wf_rdd[idx]}\n")
        print(f"Term Frequency:\n")
        print(element)
        print("\n\n")

def verbose_tfidf(rdd, sample_size):
    print("\n\n-------------------------------------------")
    print("|                                         |")
    print("|                   TFIDF                 |")
    print("|        (ngram, (doc_id, tfidf, ...))    |")
    print("|                                         |")
    print("-------------------------------------------")
    for document in rdd:
        print(document, end='\n')

### Term Frequency Inverse Document Frequency

$$\text{tfidf}(t, d, D) = \text{tf}(t, d) \cdot \text{idf}(t, D)$$

In [6]:
idf_dict = dict()
def tfidf_weight(path, sc, ngram, sample_size=None, verbose=False):
    """
    Obtains the term frequency inverse document frequency (tfidf) for each term inside a dataset

    Parameters:
    -----------
    path: Spark's Resilient Distributed Dataset (RDD) path

    sc: Spark's SparkContext object

    ngram: str
        String describing if the tfidf value should be obtained in a complete word, bigram or trigram. 
        Accepted strings: 'n'    <- whole word
                          'bi'   <- bigram
                          'tri'  <- trigram

    sample_size: double
        Represents either the sample percentaje (number between 0 and 1 inclusive) or a fixed number
        of elements for the analysis to take. Default is 'None' and represents the whole RDD.

    Returns:
    --------
    tfidf_rdd: Spark's Resilient Distributed Dataset (RDD)
        RDD with the following format: 
            [   
                ...,
                (ti, [..., (dj, tfidf_ti_dj),...]),
                ...
            ]
        where ti is an ith term, dj is a jth document and tfidf_ti is the tfidf value for term i and document j
    """
    #0: Build Corpus
    sample_size = sample_size if type(sample_size) is int else int(sample_size*rdd.count())
    (original_rdd, rdd) = build_corpus(path, sc, ngram, sample_size)
    if verbose: verbose_corpus(rdd, original_rdd, 3)
    #1: Inverse Document Frequency IDF
    global idf_dict
    (idf_dict, idf_per_word_rdd) = build_idf_dict(rdd, sample_size)
    if verbose: verbose_idf(idf_per_word_rdd, idf_dict, sample_size)
    #2: Term Frequency TF
    (rdd, word_freq_rdd) = build_tf(rdd, idf_dict)
    if verbose: verbose_tf(rdd, word_freq_rdd, original_rdd, 3)
    #3: TFIDF
    tfidf_rdd = rdd.map(build_tfidf) \
    .map(add_document_id) \
        .map(lambda x: [(word, (x[0], tfidf)) for word, tfidf in x[1]]) \
            .flatMap(lambda x: x) \
                .reduceByKey(lambda a, b: a + b) \
                    .sortBy(lambda x: len(x[1])) \
                        .collect()
    if verbose: verbose_tfidf(tfidf_rdd, sample_size)
    return tfidf_rdd

def build_tfidf(document):
    # x[0] <- term
    # x[1] <- tf for term t
    # idf_dict[x[0]] <- idf for term t
    global idf_dict
    tfidf = lambda x: (x[0], np.round(x[1]*idf_dict[x[0]], 5))
    return list(map(tfidf, document))

doc_id = 0
def add_document_id(document):
    global doc_id
    doc_id += 1
    return (doc_id, document)

## Palabras

In [7]:
conf = SparkConf().setAppName('yahoo_answers_project').setMaster('local')
sc = SparkContext(conf=conf)
dataset_path = './yahoo_answers_csv/train.csv'
num_documents = 100

In [8]:
words_rdd = tfidf_weight(dataset_path, sc, ngram='n', sample_size=100, verbose=True)

-------------------------------------------
|                                         |
|                Corpus                   |
|                                         |
-------------------------------------------
Index: 0 	 Length: 103
-----------------------------------------
Original doc: 
"1","Gay Men and Butts?","So, I am just wondering, why do gay men like dudes butts do much? Do they actually look at butts? or do they just need somewhere to fuck??  Do gay men find a mans butt more attractive then straight men find a womens butt attractive?","Saying gay men like butts is like saying all men like womens butts...\n\nEveryone of them has diffrent things they find most atractive in another man.\n\nTrust me.. I know enugh gay people to know they don't have a big thing over butts... most don't... most are more obessed with whats around front... lol"

Cleaned doc: 
['gay', 'men', 'and', 'butts', 'so', 'i', 'am', 'just', 'wondering', 'why', 'do', 'gay', 'men', 'like', 'dudes', 'but

## Bigrama

In [9]:
bigram_rdd = tfidf_weight(dataset_path, sc, ngram='bi', sample_size=100, verbose=True)

-------------------------------------------
|                                         |
|                Corpus                   |
|                                         |
-------------------------------------------
Index: 0 	 Length: 16
-----------------------------------------
Original doc: 
"4","What is a Skill people don't have but could benefit from?","","Understanding women / reading their minds lol!!!"

Cleaned doc: 
['wh', 'is', 'sk', 'pe', 'do', 'ha', 'bu', 'co', 'be', 'fr', 'un', 'wo', 're', 'th', 'mi', 'lo']


Index: 1 	 Length: 38
-----------------------------------------
Original doc: 
"4","My son told me this--"" Your cerebral cortex has disapated to a minute capasity.""  What the heck is he saying?","","I think he's questioning your intelligence... Or at least how you use your brain.\nHe does it with style!\n\nGoodluck!"

Cleaned doc: 
['my', 'so', 'to', 'me', 'th', 'yo', 'ce', 'co', 'ha', 'di', 'to', 'mi', 'ca', 'wh', 'th', 'he', 'is', 'he', 'sa', 'th', 'he', 'qu', 

## Trigrama

In [10]:
trigram_rdd = tfidf_weight(dataset_path, sc, ngram='tri', sample_size=100, verbose=True)

-------------------------------------------
|                                         |
|                Corpus                   |
|                                         |
-------------------------------------------
Index: 0 	 Length: 34
-----------------------------------------
Original doc: 
"2","how far is the earth from the sun?","","Earth's average distance from the sun is a little bit under 93 million miles. The closets the Earth gets to the sun is about 90 million miles and the farthest is about 96 million miles."

Cleaned doc: 
['how', 'far', 'the', 'ear', 'fro', 'the', 'sun', 'ear', 'ave', 'dis', 'fro', 'the', 'sun', 'lit', 'bit', 'und', 'mil', 'mil', 'the', 'clo', 'the', 'ear', 'get', 'the', 'sun', 'abo', 'mil', 'mil', 'and', 'the', 'far', 'abo', 'mil', 'mil']


Index: 1 	 Length: 114
-----------------------------------------
Original doc: 
"4","Live in Louisville, ky with girlfriend and 5mon baby. How should we file, girlfriend going back to school?","My girlfriend is tr

## Análisis

Tanto para palabras, bigramas y trigramas, el patrón es el mismo. Los primeros elementos tienen un peso _tfidf_ que varía entre $0.1$ y $0.45$. Lo que nos indica esto, es que estos gramas, dentro de los documentos, son aquellos que aportan mayor valor dentro de los documentos. Recordemos que el _term frequency inverse document frequency_ es una medida estadística que relaciona la frecuencia de cada palabra en cada documento y en todo el corpus, por lo que, para un determinado corpus, nos indica la relevancia de cada palabra. 

Dicho lo anterior, podemos esperar que los gramas más comunes dentro del inglés, como artículos, sean aquellos que tengan un valor muy bajo de _tfidf_. Y podemos verificar que, en efecto, esto se cumple. Para el artículo 'the', el grama más común en el inglés, aparece en la mayoría de los documentos y tiene valores de _tfidf_ que varían entre $-0.12$ y $-0.03$. Valores muy pequeños en comparación con los gramas más comunes. 

Interesante destacar que para los trigramas, 'the' sigue siendo el más común pero los valores cambian radicalmente. Ahora se encuentra en un rango entre $-0.76$ y $-0.18$ lo que nos indica que su valor y relevancia dentro del corpus de trigramas es mucho menor. De igual manera, para los bigramas, 'th' es el más común con valores en un rango entre $-0.8$ y $-0.3$

## 6. Calcular el histograma del número de ocurrencias por documento de cada una de las 10 palabras con mayor ocurrencia total

### Auxiliary Functions

In [11]:
def verbose_common(rdd, mcw):
    print("\n\n-------------------------------------------")
    print("|                                         |")
    print(f"|         Top {mcw} most common words        |")
    print("|                                         |")
    print("-------------------------------------------")
    for el in rdd: print(el)

def verbose_doc_id(rdd, sample_size):
    print("-------------------------------------------")
    print("|                                         |")
    print("|              Addind doc_id              |")
    print("|                                         |")
    print("-------------------------------------------")
    for el in rdd.take(sample_size): print(el)

def verbose_broadcast(rdd, sample_size):
    print("\n\n-------------------------------------------")
    print("|                                         |")
    print("|           Broadcasting doc_id           |")
    print("|                                         |")
    print("-------------------------------------------")
    for el in rdd.take(sample_size): print(el)

def verbose_filter(rdd, sample_size):
    print("\n\n-------------------------------------------")
    print("|                                         |")
    print("|         Filtering by mcw words          |")
    print("|             (doc_id, word)              |")
    print("|                                         |")
    print("-------------------------------------------")
    for el in rdd.take(sample_size): print(el)


def verbose_grouping(rdd, sample_size):
    print("\n\n-------------------------------------------")
    print("|                                         |")
    print("|          Grouping by mcw words          |")
    print("|        (word, list of documents)        |")
    print("|                                         |")
    print("-------------------------------------------")
    for el in rdd.take(sample_size):
        word = el[0]
        documents = el[1]
        print(f"Key word: \'{word}\'\nDocuments where \'{word}\' appears:")
        print(f"{documents}\n\n")

def verbose_counting(rdd, sample_size):
    print("\n\n-------------------------------------------")
    print("|                                         |")
    print("|      Counting document appearances      |")
    print("|    (word, [(doc_id, frequency), ...])   |")
    print("|                                         |")
    print("-------------------------------------------")
    for el in rdd.take(sample_size):
        word = el[0]
        documents = el[1]
        print(f"Key word: \'{word}\'\nDocuments where \'{word}\' appears with its frequency:")
        print(f"{documents}\n\n")

### Most common words MCW

In [12]:
def get_most_common_words(mcw, path, sc, sample_size, verbose=False):
    """
    Given a Spark's Resilient Distributed Dataset, this function finds the top 10 most common words. 

    Parameters:
    -----------
    mcw: int
        Represents the number of most common words wanted

    path: Spark's Resilient Distributed Dataset (RDD) path

    sc: Spark's SparkContext object

    Returns:
    --------
    top10_common_words: list
    """
    #0: Build Corpus
    (original_rdd_list, rdd) = build_corpus(path, sc, 'n', sample_size)
    if verbose: verbose_corpus(rdd, original_rdd_list, sample_size=3)
    #1: Top 10 words
    mcw_words = rdd.map(lambda x: [(word, 1) for word in x]) \
        .flatMap(lambda x: x) \
            .reduceByKey(lambda a, b: a + b) \
                .sortBy(lambda x: x[1], ascending=False) \
                    .take(mcw)
    if verbose: verbose_common(mcw_words, mcw)
    return (mcw_words, rdd)

In [13]:
(mcw_words, rdd) = get_most_common_words(mcw=10, path=dataset_path, sc=sc, sample_size=100, verbose=True)

-------------------------------------------
|                                         |
|                Corpus                   |
|                                         |
-------------------------------------------
Index: 0 	 Length: 228
-----------------------------------------
Original doc: 
"1","Holy Prepuce?  Where is it?   Or, does Jesus have it back?","Holy Prepuce\nFrom Wikipedia, the free encyclopedia\n\n11th-century Icon of Christ Pantocrator.The Holy Prepuce, or Holy Foreskin (Latin præputium) is one of several relics purported to be associated with Jesus. At various points in history, a number of churches in Europe have claimed to possess it, sometimes at the same time. Various miraculous powers have been ascribed to it.\n\nWhen Jesus was retored on Sunday and His body completely Resurrrected, was his foreskin restored along with the dead brain cells and the open wounds?\n\nIf you went to Heaven and met Jesus tomorrow, would he have his foreskin?\n\n\nWhat should I teac

### Word Frequency per Document

In [28]:
def word_frequency_in_document(rdd, words, verbose=False):
    """
    For each word in words list, this function will return a list of documents
    containing that word and the word frequency for each document. 

    Parameters:
    -----------
    rdd: Spark's Resilient Distributed Dataset RDD

    words: list of str

    Return: 
    -------
    rdd: Spark's Resilient Distributed Dataset RDD
        Format:
            key <- word
            value <- list of tuples. Each tuple contains the document id and the
                word frequency
    """
    #0: Add document id
    rdd = rdd.map(add_document_id)
    if verbose: verbose_doc_id(rdd, sample_size=2)
    #1: Broadcasting document id to words in document
    rdd = rdd.map(lambda x: [(x[0], word) for word in x[1]])
    if verbose: verbose_broadcast(rdd, sample_size=1)
    #2: Filtering per most common words list
    rdd = rdd.flatMap(lambda x: x) \
                .filter(lambda x: x[1] in words)
    if verbose: verbose_filter(rdd, sample_size=20)
    #3: Grouping by key
    rdd = rdd.map(lambda x: (x[1], x[0])) \
                        .groupByKey() \
                            .mapValues(list)
    if verbose: verbose_grouping(rdd, sample_size=len(words))
    #4: Counting repeated documents per word
    rdd = rdd.map(lambda x: [((x[0], doc_id), 1) for doc_id in x[1]]) \
        .flatMap(lambda x: x) \
            .reduceByKey(lambda a, b: a + b) \
                .map(lambda x: (x[0][0], (x[0][1], x[1]))) \
                    .groupByKey() \
                        .mapValues(list) \
                            .sortBy(lambda x: x[0])
    if verbose: verbose_counting(rdd, sample_size=len(words))
    return rdd

doc_id = 0
def add_document_id(document):
    global doc_id
    doc_id += 1
    return (doc_id, document)

In [31]:
words = [word for word, _ in mcw_words]
wf_rdd = word_frequency_in_document(rdd, words, verbose=True)

-------------------------------------------
|                                         |
|              Addind doc_id              |
|                                         |
-------------------------------------------
(1, ['holy', 'prepuce', 'where', 'is', 'it', 'or', 'does', 'jesus', 'have', 'it', 'back', 'holy', 'prepuce', 'from', 'wikipedia', 'the', 'free', 'encyclopedia', '11thcentury', 'icon', 'of', 'christ', 'pantocratorthe', 'holy', 'prepuce', 'or', 'holy', 'foreskin', 'latin', 'prputium', 'is', 'one', 'of', 'several', 'relics', 'purported', 'to', 'be', 'associated', 'with', 'jesus', 'at', 'various', 'points', 'in', 'history', 'a', 'number', 'of', 'churches', 'in', 'europe', 'have', 'claimed', 'to', 'possess', 'it', 'sometimes', 'at', 'the', 'same', 'time', 'various', 'miraculous', 'powers', 'have', 'been', 'ascribed', 'to', 'it', 'when', 'jesus', 'was', 'retored', 'on', 'sunday', 'and', 'his', 'body', 'completely', 'resurrrected', 'was', 'his', 'foreskin', 'restored', 'along'

### Histograms

In [32]:
import plotly.graph_objects as go
import plotly.express as px
import pandas as pd

In [37]:
def plot_histograms(rdd, words):
    """
    Plots histograms per key word inside the rdd

    Parameters: 
    -------
    rdd: Spark's Resilient Distributed Dataset RDD
        Format:
            key <- word
            value <- list of tuples. Each tuple contains the document id and the
                word frequency
    
    words: list of str
    """
    word_dict = dict()
    for word, freq in words:
        word_dict[word] = freq
    for el in rdd.collect():
        key_word = el[0]
        doc_ids = []
        freqs_per_doc = []
        for i in range(len(el[1])):
            t = el[1][i]
            doc_ids.append(t[0])
            freqs_per_doc.append(t[1])
        df = pd.DataFrame()
        df['document'] = doc_ids
        df['count'] = freqs_per_doc
        fig = px.bar(df, x='document', y='count', title=f'Word: \'{key_word}\' - {word_dict[key_word]} appearances')
        fig.show()


In [38]:
plot_histograms(wf_rdd, mcw_words)