In this project, the goal is to use Spark to search through a textfile and  
find all the pairs of two consequent words where the first word is “narodnaya”.  
Then for each pair, count the number of occurrences in the Wikipedia dump.  
Finally, print all the pairs with their count in a lexicographical order. 

Initialize spark context

In [None]:
from pyspark import SparkConf, SparkContext
sc = SparkContext(conf=SparkConf().setAppName("MyApp").setMaster("local"))

define parsing funcion and create RDD with 16 partitions;
map the parsing funciton to the RDD

In [None]:
import re

def parse_article(line):
    try:
        article_id, text = unicode(line.rstrip()).split('\t', 1)
        text = re.sub("^\W+|\W+$", "", text, flags=re.UNICODE)
        words = re.split("\W*\s+\W*", text, flags=re.UNICODE)
        return words
    except ValueError as e:
        return []

wiki = sc.textFile("/data/wiki/en_articles_part/articles-part",16).map(parse_article)

filter function returns an RDD with only the lists in the RDD that contain the string 'Narodnaya'
or 'narodnaya'

In [None]:
filteredWiki = wiki.filter(lambda x: ('Narodnaya' in x) or ('narodnaya' in x))

function to iterate through each list in the RDD and return matches to  
(lower case) 'narodnaya' along with the item after the match; then concatenate the items together  
with '_'

In [None]:
def indexNarod(item):
    final_list = []
    for i in range(len(item)):
        word = item[i].lower()
        if 'narodnaya' in word:
            result = item[i]+"_"+item[i+1]
            final_list.append(result.lower())
    return final_list 

execute the function on the filterWiki RDD with flatMap();  
flatMap is used instead of map because map returned an RDD  
that was composed of an array of arrays instead of an array of strings;  
flatMap here 'flattens' the results 

In [None]:
resultWiki = filteredWiki.flatMap(indexNarod)

aggregate matching values


In [None]:
aggregatedRddWiki = resultWiki.map(lambda x: (x, 1)).reduceByKey(lambda x,y: x+y).collect()

print the results; result is already sorted in lexicographical order

In [None]:
for a,b in aggregatedRddWiki:
    resultItem = a + '\t' + str(b)
    print resultItem