<h1><center>Big Data Algorithms Techniques & Platforms</center></h1>

<h2>
<hr style=" border:none; height:3px;">
<center>Assignment 2: Introduction to Spark</center>
<hr style=" border:none; height:3px;">
</h2>

# 1. Introduction


<p align="justify">
<font size="3">
In this set of exercises you'll learn basic Spark programming skills that are necessary to develop simple, yet powerful, applications to be executed in a distributed environment.
</font>
</p>

<p align="justify">
<font size="3">
The assignment is presented in this __Jupyter Notebook__, an interface that offers support for text, code, images and other media. Essentially, a Jupyter Notebook consists of multiple _cells_, either containing some text, like the one that you are reading, or code that you can execute. 
</font>
</p>



In [116]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local")
sc = SparkContext(conf = conf)
print("initialization successful!")
import random as rn
import numpy as np
import os
seed_value = 0
os.environ['PYTHONHASHSEED']=str(seed_value)


initialization successful!


# B. Data import


<p align="justify">
<font size="3">
Upload the folder data.zip inside the colab data folder and then execute the following code.
</font>
</p>



# C. Support functions 


<p align="justify">
<font size="3">
Some support functions are provided. Read carefully the signatures of the fuctions.
<ul>
<li> $remove\_non\_letters(word)$
<li> $load\_stopwords(stopwords\_file)$
<li> $preprocess(text, stopwords)$
<li> $word\_count(words)$
</ul>
</font>
</p>



In [117]:
import re
# Regular expression for removing all non-letter characters in the file.
regex = re.compile('[^a-zA-Z ]')


'''
Removes any non-letter character from the given word.

INPUT:
        word: A word

OUTPUT:
        the input word without the non-letter characters.

'''
def remove_non_letters(word):
    return regex.sub('', word)


'''
INPUT: 
        stopwords_file: name of the file containing the stopwords.
OUTPUT:
        a Python list with the stopwords read from the file.
'''
def load_stopwords(stopwords_file):
    stopwords = []
    with open(stopwords_file) as file:
        for sw in file:
            stopwords.append(sw.strip())
    return stopwords


'''
INPUT: 
        text: RDD where each element is a line of the input text file.
        stopwords: Python list containing the stopwords.
OUTPUT: 
        RDD where each element is a word from the input text file.
'''
def preprocess(text, stopwords) :
  words = text.flatMap(lambda line: line.split(" ")).map(lambda word: remove_non_letters(word)).filter(lambda word: len(word) > 0).map(lambda word: word.lower()).filter(lambda word: word not in stopwords)
  return words

'''
Returns how many times a word appears in a RDD 
INPUT:
        words: RDD, where each element is word from the input text file (preprocessing already done!).
OUTPUT:
        RDD, where each element is (w, occ), w is a word and occ the number of occurrences of w.
        The RDD is sorted by value in decreasing order.
'''

def word_count(words):    
    occs = words.map(lambda word: (word, 1))\
                .reduceByKey(lambda x, y: x+y)\
                .sortBy(lambda f: f[1], ascending=False)
    return occs

# Storing in stopwords the list of the stopwords that is provided
stopwords = load_stopwords("./data/stopwords.txt")





<hr style="border:solid 2px;">

##  Exercise 1

<p align="justify">
<font size="3">
In the folder _./data/bbc_ you'll find a collection of 50 documents from the BBC news website corresponding to stories in five topics. The five topics are:
<ul>
<li> _business_ 
<li>_entertainment_
<li> _politics_
<li> _sport_ 
<li> _tech_
</ul>

In the directory, the stories are text files (named: $\_001.txt\_$, $\_002.txt\_$, ...) organized into five directories, one for topic.
</font>
</p>

<p align="justify">
<font size="3">
In this exercise, we want to create an **inverted index**. An inverted index is an essential component of a search engine. In fact, given any word, the inverted index allows the search engine to quickly retrieve all documents containing that word.

An inverted index associates each word (you can find in the files) to the list of the names files the word occurs in.

More precisely, for each word, the inverted index will have a list of the names  of the files (path relative to the folder _./data_) that contain the word. 


(family, \[./data/bbc/tech/006.txt, ./data/bbc/entertainment/003.txt, ./data/bbc/entertainment/005.txt, ...\]
</font>
</p>

<p align="justify">
<font size="3">
The function $inverted\_index$ has the following input and output:
<ul>
    <li> **Input.** A RDD $files$, where each element is $(f, content)$, $f$ being the name of a text file in the collection and $content$ being the content of that file; 
a Python list $stopwords$, containing the most common English stopwords.
    <li> **Output.** A RDD, where each element is $(w, L)$, $w$ is a word and $L$ is the list of the names of the files containing $w$. The list must not contain duplicate file names.
</ul>
</font>
</p>

<p align="justify">
<font size="3" color='#91053d'>**Write the code of the function $inverted\_index()$. The function must apply a sequence of RDD transformations to:**
<ol>
  <li> split the content of each file into its constituent words.
  <li> lowercase each word.
  <li> remove the non-letter characters from each word (you can use the function $remove\_non\_letters$ defined in Exercise 1).
  <li> remove empty words.
  <li> remove the stopwords.
  <li> remove duplicate words.
</ol>
</font>
</p>
<hr style="border:solid 2px;">

In [118]:
'''
INPUT:
        files: RDD, each element is (f, content), where f is the name of a file in the collection and content is 
                its content.
        stopwords: a Python list containing the stopwords.

OUTPUT:y

        a RDD, each element is (w, L), where w is a word and L is the list of the names of the files containing
        w (without repetition).

'''

def inverted_index(files, stopwords):
    '''############## WRITE YOUR CODE HERE ##############'''

    #Split the content of each file into its constituent words and lowercase each word.
    line_split = files.map(lambda x: (x[0],x[1].lower().split()))      
    #remove the non-letter characters from each word (you can use the function  𝑟𝑒𝑚𝑜𝑣𝑒_𝑛𝑜𝑛_𝑙𝑒𝑡𝑡𝑒𝑟𝑠  defined in Exercise 1).
    no_non_letters = line_split.map(lambda x: (x[0],[remove_non_letters(i) for i in x[1]])) 
    #remove empty words
    no_empty = no_non_letters.map(lambda x: (x[0],[i for i in x[1] if i != ""])) 
    #remove the stopwords
    no_stop_words = no_empty.map(lambda x: (x[0],[i for i in x[1] if i not in stopwords])) 
    #remove duplicate words.
    duplicates_removed = no_stop_words.map(lambda x: (x[0],list(set(x[1])))) 
    #produce the inverted index dictionary
    output = duplicates_removed.flatMap(lambda x: [(i,x[0]) for i in x[1]]).reduceByKey(lambda x,y:x+y)
    return output


    '''############## END OF THE EXERCISE ##############'''

'''
INPUT:
        iindex: RDD containing the inverted index, as returned by the function inverted_index.
        word: a word.

OUTPUT:
        prints the list of the files contain the given word.
'''
        
def lookup(iindex, word):
    ld = iindex.sortByKey().lookup(word)
    if len(ld) > 0:
        for i in str(ld).split("file"):
            print("".join(re.findall(r"assignment_2/(.*)",i)))
    else:
        print("No documents contain the word '",word,"'")

####################   GOOD TO KNOW  ####################
# The Spark function wholeTextFiles loads into a RDD the content of the text files contained
# in the given directory.
# Each item of the RDD is a pair (f, content), where f is the name of a file and content is the content
# of the file.
#######################################################
file_collection = sc.wholeTextFiles("./data/bbc/*")        
iindex = inverted_index(file_collection, stopwords)
lookup(iindex, "family")

################# EXPECTED OUTPUT #################
#
# data/bbc/entertainment/002.txt
# data/bbc/entertainment/003.txt
# data/bbc/entertainment/005.txt
# data/bbc/politics/001.txt
# data/bbc/sport/004.txt
# data/bbc/tech/004.txt
# data/bbc/tech/006.txt
#
###################################################


data/bbc/entertainment/003.txt
data/bbc/entertainment/002.txt
data/bbc/entertainment/005.txt
data/bbc/sport/004.txt
data/bbc/politics/001.txt
data/bbc/tech/004.txt
data/bbc/tech/006.txt']


<hr style="border:solid 2px;">

##  Exercise 2

<p align="justify">
<font size="3">
Given the BBC collection, we want to calculate the **co-occurrence matrix** $M$, such that $M[w_1][w_2]$ is the number of documents in which two words $w_1$ and $w_2$ appear in the same document (it does not matter if they are consecutive or not).
</font>
</p>

<p align="justify">
<font size="3">
The function $co\_occurrence\_matrix()$ has the following input and output:
<ul>
 <li> **Input.** A RDD $files$ and a Python list $stopwords$, as in the previous exercise.
 <li> **Output.** A RDD, where each element is $((w_1, w_2), occ)$, where $w_1$ and $w_2$ are words and $occ$ is the number of files in which the two words appear together.
</ul>
As in the case of the function $inverted\_index()$, words must be lowercases, non-letter characters, empty words and stopwords should be removed.
</font>
</p>

<p align="justify">
<font size="3" color='#91053d'>**Write the code of the function $co\_occurrence\_matrix()$. You can draw inspiration from the MapReduce algorithms that we discussed in class. Also, you can use the already implemented function $create\_pairs()$ to generate all the possible pairs from a list of words. The function assumes that the words in the input list are sorted lexicographically.**
<br>
</font>
</p>

<hr style="border:solid 2px;">

In [119]:
'''
INPUT:
        words: Python list containing words. IMPORTANT: the function assumes that the 
        list is sorted in lexicographic order.
OUTPUT:
        Python list containing all possible pairs from the given list.
'''
def create_pairs(words):
    n = len(words)
    output = []
    for i in range(0, n):
        for j in range(i+1, n):
            if ord(words[i][0]) < ord(words[j][0]):
                output.append((words[i], words[j]))
            else:
                output.append((words[j], words[i]))
    return output

'''
INPUT:
        files: RDD, each item is (f, content), where f is the name of a file and line is the content of the file.
        stopwords: A RDD, each item is ((w1, w2), occ), where w1 and w2 are words and occ is the number of
                    files in which w1 and w2 appear together.
'''
def co_occurrence_matrix(files, stopwords):
    '''############## WRITE YOUR CODE HERE ##############'''
    
    cleaned = files.map(lambda x: (x[0],x[1].lower().split(" ")))\
                .map(lambda x: (x[0],[remove_non_letters(i) for i in x[1]]))\
                .map(lambda x: (x[0],[i for i in x[1] if i != ""]))\
                .map(lambda x: (x[0],[i for i in x[1] if i not in stopwords]))\
                .map(lambda x: (x[0],list(set(x[1]))))
    output = cleaned.map(lambda x: (create_pairs(x[1])))\
                    .flatMap(lambda x: x)\
                    .map(lambda x: (x,1))\
                    .reduceByKey(lambda x,y:x+y)
    return output

    '''############## END OF THE EXERCISE ##############'''


file_collection = sc.wholeTextFiles("./data/bbc/*")
output = co_occurrence_matrix(file_collection, stopwords)    
output.takeOrdered(10, key = lambda x: -x[1])

################# EXAMPLE OF FORMAT FOR THE EXPECTED OUTPUT #################
################# THIS IS NOT THE SOLUTION #################
#
#[(('a', 'b'), 3),
# (('c', 'f'), 12),
# ... ]
#
###################################################


[(('also', 'said'), 24),
 (('new', 'said'), 19),
 (('said', 'world'), 18),
 (('said', 'year'), 17),
 (('also', 'world'), 16),
 (('last', 'said'), 15),
 (('one', 'said'), 15),
 (('said', 'set'), 15),
 (('said', 'years'), 14),
 (('said', 'time'), 14)]

<hr style=" border:solid 2px;">

##  Exercise 3 - OPTIONAL - enjoy with what you just wrote

<p align="justify">
<font size="3">
We want to code a function $term\_freq$ that computes the frequency of each word in a 
text document. 
More precisely, given a document $d$ and a word $w$ in that document, we want to 
compute its frequency $tf(w, d)$, as follows:
    
<p>    
$$ tf(w, d) = \frac{f_{w, d}}{\sum\limits_{w^\prime \in d} f_{w^\prime, d}}$$
</p>

where $f_{w, d}$ is the number of occurrences of word $w$ in $d$.
</font>
</p>

<p>
<font size="3">
The function $term\_freq$ has the following input and output:
<ul>
<li> **Input.** A RDD $words$, where each element is a word in a text document $d$ (pre-processing already done).
<li> **Output.** A RDD, where each element is a key-value pair $(w, tf(w, d))$.
</ul>
</font>
</p>
<p align="justify">
<font size="3" color='#91053d'>**Write the code of the function $term\_freq$. You can take advantage of the 
    function $word\_count$.**
</font>
</p>

<hr style=" border:solid 2px;">

In [120]:
def term_freq(words):
    '''############## WRITE YOUR CODE HERE ##############'''
    
    cleaned = text.map(lambda x: x.lower().split(" "))\
              .flatMap(lambda x: [remove_non_letters(i) for i in x])\
              .filter(lambda x: len(x) > 0)\
              .filter(lambda x: x not in stopwords)
    
    n = cleaned.count()
    
    raw_count = cleaned.map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y)
    
    precentages = raw_count.map(lambda x: (x[0],round(x[1]/n,3)))

    return precentages

    '''############## END OF THE EXERCISE ##############'''
 
text = sc.textFile('./data/bbc/politics/001.txt')
words = preprocess(text, stopwords)
tf = term_freq(words)
tf.take(5)

################# EXPECTED OUTPUT #################
################# THIS IS NOT THE SOLUTION #################
#
#[('a', 0,333),
# ('b', 0.032),
# ... ]
#
###################################################

[('labour', 0.004),
 ('plans', 0.02),
 ('maternity', 0.037),
 ('pay', 0.045),
 ('rise', 0.008)]