In [10]:
# SOME IMPORTS
import os
import subprocess
import sys
import time
import multiprocessing
import random
import re

In [11]:
# SET SOME ENVIRONMENTAL VARIABLES
os.environ['PYSPARK_PYTHON']="python3.6"
os.environ['SPARK_LOCAL_HOSTNAME']="localhost"
os.environ['SPARK_HOME']="/home/i/Downloads/spark-2.2.1-bin-hadoop2.7"
os.environ['JAVA_HOME']="/usr/lib/jvm/java-1.8.0-openjdk-amd64/"

In [12]:
# CHECK IF FINDSPARK WORKS CORRECTLY
import findspark
findspark.init()

from pyspark import SparkContext, SparkConf

In [14]:
# START SPARK CONTEXT ON LOCAL MACHINE
sc = SparkContext("local", appName="Test")
##------------------------------------
# GO TO LOCALHOST:4040 and ....

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=InvertedIndex, master=local[1]) created by __init__ at <ipython-input-5-0f272c506c2e>:5 

In [15]:
# STOP SPARK CONTEXT
sc.stop()

In [16]:
# OBTAIN THE NUMBER OF LOGICAL CPUs
cpus = multiprocessing.cpu_count()
print("The number of logical CPUs is " + str(cpus))

The number of logical CPUs is 4


# Exercise 1: Compute the value of PI using Monte Carlo Simulation

This exercise is solved. Your task is to read and analyse the code.

In [17]:
# this method generates one sample point and verifies whether it is inside a circle or not.
# The input is passed via filter method, however, we do not need it here
def inside(inValue):
    x, y = random.random(), random.random()
    return x*x + y*y < 1.0

In [18]:
# This method estimates the value of PI
def computePI_MonteCarlo_v1(sc, samples, partitions):
    # Create Resilient Distributed Dataset (RDDs) containing SAMPLES elements.
    # This data is distributed (parallelized) among available nodes (here, CPUs - partitions).
    dff = sc.parallelize(range(0, samples), partitions)
    # Filter out these samples that are not inside a circle.
    # For this purpose, Inside method is run and returns
    # true/false (for each data element) with appropriate probability distribution
    # Why do we generate samples "on fly"?
    #
    # bo procesor generuje liczby pseudolosowo bazując na dacie - NIE
    # bo to bardziej pozwala zrównoleglić procesy
    #
    filtered = dff.filter(inside)
    # count the number of hits
    left = filtered.count()
    # Estimate the value of PI and return it
    return 4.0 * float(left) / float(samples)

In [19]:
### ESTIMATE VALUE OF PI 
samples = 10000000

print("Monte Carlo simulation for " + str(samples) + " samples")
print("True value of PI = 3.1415926535...")

## i = number of nodes (CPUs)
for i in range(1, cpus + 1):
    master = "local["+str(i)+"]" 
    sc = SparkContext(master, appName="PI_MonteCarlo")
    start_time = time.time()
    piValue = computePI_MonteCarlo_v1(sc, samples, i)
    elapsed = time.time() - start_time
    print("  Number of CPUs = %i | Time = %.4f s | Result(PI) = %.8f" % (i, elapsed, piValue))  
    sc.stop()

Monte Carlo simulation for 10000000 samples
True value of PI = 3.1415926535...
  Number of CPUs = 1 | Time = 5.2769 s | Result(PI) = 3.14155240
  Number of CPUs = 2 | Time = 2.6014 s | Result(PI) = 3.14170960
  Number of CPUs = 3 | Time = 2.7001 s | Result(PI) = 3.14127160
  Number of CPUs = 4 | Time = 2.7269 s | Result(PI) = 3.14212160


# Exercise 2: Wordcount

In [20]:
# Dummy collection 1: 3 short documents
# create RDD divided into n-paritions
def getSmallCollection_EX1(sc, partitions):
    doc1 = "Roses,are red "
    doc2 = "Roses are roses"
    doc3 = "The Sun is red."
    rdd1 = sc.parallelize([doc1, doc2, doc3], partitions)
    return rdd1

1) Dummy collection 2: ~200 documents about animals (ant.html, dog.html, panda.html, hedgehog.html, etc.). For this purpose, download www.cs.put.poznan.pl/mtomczyk/ir/lab6/pages.zip, unzip, and copy "pages" folder into your working directory.

In [21]:
def getLargeCollection_EX1(sc, partitions):
    DOCS = sc.wholeTextFiles("./pages/", partitions)
    rdd1 = DOCS.map(lambda x: x[1])
    return rdd1

In [23]:
# For a given text "x", this method performs simple tokenization and normalization (returns a list of terms)
def tokenizeAndNormalize(x):
    return [s.lower() for s in re.split(' |;|,|\t|\n|\.', x) if len(s) > 0]

2) Init spark context (1 core):

In [24]:
sc = SparkContext("local[1]", appName="Word_count")

3) TODO: Collect the data (getSmallCollection_EX1):

In [25]:
rdd1 = getSmallCollection_EX1(sc, 1)
# if you whish to print data stored in rdd, use print(rdd.collect())
print(rdd1.collect())

['Roses,are red ', 'Roses are roses', 'The Sun is red.']


4) TODO: Firslty, you should tokenize all documents. For this purpose use flatMap function (rdd2 = rdd1.flatMap) where you pass tokenizeAndNormalize method. There are two methods: map and flatMap. Both produce an output for each element of RDD object. The difference is that map keeps produced elements organised and flatMap puts them into a single list, e.g.: 

In [26]:
tempRDD = sc.parallelize([("a", 1), ("b", 2)])
print(tempRDD.map(lambda x: (x[0], x[1]+1)).collect())
print(tempRDD.flatMap(lambda x: (x[0], x[1]+1)).collect())

[('a', 2), ('b', 3)]
['a', 2, 'b', 3]


In [27]:
# Complete the task here (flatMap with tokenizeAndNormalize):
rdd2 = rdd1.flatMap(lambda x: tokenizeAndNormalize(x))
print(rdd2.collect())

['roses', 'are', 'red', 'roses', 'are', 'roses', 'the', 'sun', 'is', 'red']


5) TODO: Now for each term produce (term, 1). Use map (why not flatMap?) with lambda function:

In [28]:
rdd3 = rdd2.map(lambda x: ( x ,1))
print(rdd3.collect())

[('roses', 1), ('are', 1), ('red', 1), ('roses', 1), ('are', 1), ('roses', 1), ('the', 1), ('sun', 1), ('is', 1), ('red', 1)]


6) TODO: Now it is time to group the results. Use groupByKey method. When any "...byKey" method is invoked, the first element of a stored object is treated as a key. When invoking this method, you should also invoke .mapValues(list) so that all corresponding values will be stored in a single list. E.g.:

In [29]:
tempRDD = sc.parallelize([("a", 1), ("a", 1)])
print(tempRDD.groupByKey().mapValues(list).collect())

[('a', [1, 1])]


In [30]:
# Complete the task here:
rdd4 = rdd3.groupByKey().mapValues(list)
print(rdd4.collect())

[('roses', [1, 1, 1]), ('are', [1, 1]), ('red', [1, 1]), ('the', [1]), ('sun', [1]), ('is', [1])]


7) TODO: Now you could use countByKey method but it returns a dictionarty. Use map function again to sum the elements of a list:

In [31]:
rdd5 = rdd4.map(lambda x: (x[0], sum(x[1])))
print(rdd5.collect())

[('roses', 3), ('are', 2), ('red', 2), ('the', 1), ('sun', 1), ('is', 1)]


8) TODO: It is almost done but we wish the objects to be sorted (alphabetically). You can use sortByKey method:

In [32]:
rdd6 = rdd5.sortByKey()
print(rdd6.collect())

[('are', 2), ('is', 1), ('red', 2), ('roses', 3), ('sun', 1), ('the', 1)]


9) TODO: Done. Bout it could be done in another way. Instead of grouping by key (rdd4) and counting the number of "1"s (rdd5), you could use reduceByKey method. reduceByKey "merges" all object with the same key. Similar to groupByKey, however, instead of grouping, a new value is computed by provided function, e.g.:

In [33]:
tempRDD = sc.parallelize([("a", 1), ("b", 2), ("a", 3)])
print(tempRDD.reduceByKey(lambda x, y: x + y).collect())

[('a', 4), ('b', 2)]


In [34]:
# Complete the task here. Use rdd3 object to compute rdd7.
rdd7 = rdd3.reduceByKey(lambda x, y: x + y)
print(rdd7.collect())

[('roses', 3), ('are', 2), ('red', 2), ('the', 1), ('sun', 1), ('is', 1)]


10) TODO: Sort the results:

In [35]:
rdd8 = rdd7.sortByKey()
print(rdd8.collect())

[('are', 2), ('is', 1), ('red', 2), ('roses', 3), ('sun', 1), ('the', 1)]


In [36]:
sc.stop()

11) TODO: Complete the method doWordCount (just copy your code, use groupByKey + map(sum) version; should return last rdd object):

In [37]:
def doWordCount(sc, collection, partitions):
    rdd1 = collection
    rdd2 = rdd1.flatMap(lambda x: tokenizeAndNormalize(x))
    rdd3 = rdd2.map(lambda x: ( x ,1))
    rdd4 = rdd3.reduceByKey(lambda x, y: x + y)
    rdd5 = rdd4.sortByKey()
    return rdd5

12) TODO: Run the script and observe the results (why is the best time for 1CPU?):

In [38]:
## i = number of nodes (CPUs). 
for i in range(1, cpus + 1):
    master = "local["+str(i)+"]" 
    sc = SparkContext(master, appName="WordCount")
    start_time = time.time()
    rdd1 = getSmallCollection_EX1(sc, i)
    elapsed = time.time() - start_time
    print(elapsed)
    
    computedData = doWordCount(sc, rdd1, i)
    elapsed = time.time() - start_time
    print("Number of CPUs = %i | Time = %.4f s " % (i, elapsed))  
    sc.stop()

0.00348663330078125
Number of CPUs = 1 | Time = 0.0227 s 
0.0026862621307373047
Number of CPUs = 2 | Time = 0.7214 s 
0.0026967525482177734
Number of CPUs = 3 | Time = 0.6691 s 
0.0019447803497314453
Number of CPUs = 4 | Time = 0.7175 s 


13) TODO: Modyfy the above script (work on a copy, use the cell below) so that the top 3 most common words are printed. Use 1-2CPUs. computedData is an RDD object so you can use sortBy function to resort the elements. 

In [39]:
# do the task here
for i in [1,2]:
    master = "local["+str(i)+"]" 
    sc = SparkContext(master, appName="WordCount")
    start_time = time.time()
    rdd1 = getSmallCollection_EX1(sc, i)
    computedData = doWordCount(sc, rdd1, i)
    rddSort = computedData.sortBy(lambda x: -x[1])
    elapsed = time.time() - start_time
    print("Number of CPUs = %i | Time = %.4f s " % (i, elapsed))  
    ### PRINT HERE 
    sortedData = rddSort.collect()
    for i in range(0, 3): #print top 3
        print("   %i : '%s' occured %d times" % (i, sortedData[i][0], sortedData[i][1]))
    ###
    sc.stop()

Number of CPUs = 1 | Time = 0.0169 s 
   0 : 'roses' occured 3 times
   1 : 'are' occured 2 times
   2 : 'red' occured 2 times
Number of CPUs = 2 | Time = 0.8432 s 
   0 : 'roses' occured 3 times
   1 : 'are' occured 2 times
   2 : 'red' occured 2 times


14) TODO: Repeat the experiment for 1-2CPUs and for 2nd collection (much larger). Compare computation times and print the top 20 most common words. Are the results (the most frequent words) similar to the list of english stop words? Why is the difference in time not as big as in "PI" example?

In [41]:
# do the task here
for i in [1,2]:
    master = "local["+str(i)+"]" 
    sc = SparkContext(master, appName="WordCount")
    start_time = time.time()
    rdd1 = getLargeCollection_EX1(sc, i)
    computedData = doWordCount(sc, rdd1, i)
    rddSort = computedData.sortBy(lambda x: -x[1])
    elapsed = time.time() - start_time
    print("Number of CPUs = %i | Time = %.4f s " % (i, elapsed))  
    ### PRINT HERE
    sortedData = rddSort.collect()
    for i in range(0, 20): #print top 20
        print("   %i : '%s' occured %d times" % (i, sortedData[i][0], sortedData[i][1]))
    ###
    sc.stop()

Number of CPUs = 1 | Time = 1.2391 s 
   0 : 'the' occured 3027 times
   1 : 'and' occured 1910 times
   2 : 'of' occured 1553 times
   3 : 'in' occured 1165 times
   4 : 'are' occured 1031 times
   5 : 'to' occured 962 times
   6 : 'a' occured 769 times
   7 : 'is' occured 622 times
   8 : 'as' occured 560 times
   9 : 'species' occured 558 times
   10 : 'they' occured 370 times
   11 : 'for' occured 362 times
   12 : 'with' occured 352 times
   13 : 'have' occured 344 times
   14 : 'their' occured 326 times
   15 : 'or' occured 306 times
   16 : 'from' occured 269 times
   17 : 'by' occured 244 times
   18 : 'on' occured 230 times
   19 : 'which' occured 214 times
Number of CPUs = 2 | Time = 3.6531 s 
   0 : 'the' occured 3027 times
   1 : 'and' occured 1910 times
   2 : 'of' occured 1553 times
   3 : 'in' occured 1165 times
   4 : 'are' occured 1031 times
   5 : 'to' occured 962 times
   6 : 'a' occured 769 times
   7 : 'is' occured 622 times
   8 : 'as' occured 560 times
   9 : 'sp

# Exercise 3: Inverted Index + Word Count

In this exercise you are asked to construct inverted index in the following form: (term, the number of doccuments in which the term occurs , sorted list of docIDs]. For instance: [...,("roses", 2, [0, 1]),...] -> term "roses" occurs in two documents: termIDs = 0 and 1. The "get...Collection" methods are slightly modified. Both return: rdd object, list of the names of the documents, and a dictionary (docID -> document name):

In [55]:
def getSmallCollection_EX2(sc, partitions):
    doc1 = "Roses,are red "
    doc2 = "Roses are roses"
    doc3 = "The Sun in red."
    rdd1 = sc.parallelize([doc1, doc2, doc3], partitions)
    docNames = ["doc1", "doc2", "doc3"]
    docIDs = {0: docNames[0], 1: docNames[1], 2: docNames[2]}
    return rdd1, docNames, docIDs

In [56]:
def getLargeCollection_EX2(sc, partitions):
    DOCS = sc.wholeTextFiles("./pages/", partitions)
    rdd1 = DOCS.map(lambda x: x[1])
    rdd2 = DOCS.map(lambda x: x[0])
    docNames = rdd2.collect()
    docIDs = [i for i in range(0, len(docNames))]
    return rdd1, docNames, docIDs

In [57]:
def whichOccurs(rdd):
    x = rdd[0]
    index = rdd[1]
    return [(s.lower(), index) for s in re.split(' |;|,|\t|\n|\.', x) if len(s) > 0]

In [58]:
def tokenizeAndNormalize(x):
    return [s.lower() for s in re.split(' |;|,|\t|\n|\.', x) if len(s) > 0]

In [59]:
def tokenizeAndNormalize(x):
    return [s.lower() for s in re.split(' |;|,|\t|\n|\.', x) if len(s) > 0]

TODO: do the task and verify the results using the small collection.

In [60]:
def doInvertedIndex(sc, collection, partitions):
    rddy___ = rddx.map(lambda x: (x[0], x[1][0], x[1][1]))
    rddy = rddx.map(lambda x: (x[0], len(x[1][1]), x[1][1]))
    rdd6 = rddy.sortBy(lambda x: -x[1] )
    
    return rdd6

12) Run the following script and verify the results.

In [61]:
## i = number of nodes (CPUs). 
#Why the best time is for 1CPU???
for i in [1,2]:
    master = "local["+str(i)+"]" 
    sc = SparkContext(master, appName="InvertedIndex")
    start_time = time.time()
    rdd1, docNames, docIDs = getSmallCollection_EX2(sc, i)
    computedData = doInvertedIndex(sc, rdd1, i)
    rddSort = computedData.sortBy(lambda x: -x[1])
    elapsed = time.time() - start_time
    print("Number of CPUs = %i | Time = %.4f s " % (i, elapsed))  
    ### PRINT HERE 
    sortedData = rddSort.collect()
    for i in range(0, 3): #print top 3
        print(f"   {i} : '{sortedData[i][0]}' occured in {sortedData[i][1]} documents: {sortedData[i][2]}")
    ###
    sc.stop()

NameError: name 'rddx' is not defined

In [52]:
sc.stop()

12) Run the following script and verify if it is faster for 2 cores. Lastly, compare the obtained results with the results of exercise 2 (word count). Are the rankings corellated?

In [62]:
## i = number of nodes (CPUs). 
#Why the best time is for 1CPU???
for i in [1,2]:
    master = "local["+str(i)+"]" 
    sc = SparkContext(master, appName="InvertedIndex")
    start_time = time.time()
    rdd1, docNames, docIDs = getLargeCollection_EX2(sc, i)
    computedData = doInvertedIndex(sc, rdd1, i)
    rddSort = computedData.sortBy(lambda x: -x[1])
    elapsed = time.time() - start_time
    print("Number of CPUs = %i | Time = %.4f s " % (i, elapsed))  
    ### PRINT HERE 
    sortedData = rddSort.collect()
    for i in range(0, 20): #print top 3
        print(f"   {i} : '{sortedData[i][0]}' occured in {sortedData[i][1]} documents: {sortedData[i][2][:3]}...")
    ###
    sc.stop()

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=InvertedIndex, master=local[1]) created by __init__ at <ipython-input-61-0f272c506c2e>:5 