## CS431/631 Data Intensive Distributed Analytics
---

In [1]:
!apt-get update -qq > /dev/null
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

To use Spark from within a Python program, it is first necessary to tell the Python interpreter where the Spark installation is located. The code in the following cell tells Python how to find this Spark installation. This code creates SparkContext (sc) for you.

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

import findspark
findspark.init()

from pyspark import SparkContext, SparkConf

spark_conf = SparkConf()\
  .setAppName("YourTest")\
  .setMaster("local[*]")

sc = SparkContext.getOrCreate(spark_conf)

Once Python knows where Spark is located, you can create a `SparkContext`.   All Spark commands must run within an active `SparkContext`.   The code below will create a `SparkContext`, and store a reference to the context in the variable `sc`. 
The `appName` parameter assigns a name of your choosing to the Spark jobs that are created in this context - this is useful mostly for debugging.   The `master` parameter indicates that Spark jobs will run in local mode, using two threads.   This means that your Spark jobs are not really running on a cluster, and are instead running in a single process on the local machine.   You program Spark jobs the same way whether they run in local mode or on a cluster - the main difference between local and cluster modes is, of course, performance.

Next, let's test that your `SparkContext` has been set up properly by running some simple test code.   This code uses a single Spark job to estimate the value of Euler's number $e$. One way to calculate $e$ is to use the following serries by Jacob Bernoulli:

$p_n = 1 - \frac{1}{1!} + \frac{1}{2!} - \frac{1}{3!} + \cdots + \frac{(-1)^n}{n!} = \sum_{k = 0}^n \frac{(-1)^k}{k!}$

As n tends to infinity, $p_n$ approaches $1/e$.

In the following code,  `parallelize()` and `map()` are Spark *transformations*, and `reduce()` is a Spark *action*.   Study the code in the cell below, then go ahead and run it.   It should take several seconds, since a Spark job is being created and executed, and should print an estimate of $e$ when it finishes.   


In [3]:
import math

n= 10000
inverse_e = sc.parallelize(range(0, n)).map(lambda x: ((-1)**x) * (1 / math.factorial(x))).reduce(lambda x,y:x+y)
e = 1 / inverse_e
print("e = ", e)

e =  2.718281828459044


It starts off by making use of the SparkContext (sc) which tells Python that we are looking for spark functionality implemented in the SparkContext class. The python range (0, n) is then parallized, meaning, it is "transformed" to an RDD which is a distributed collection of objects in Spark whose day is partitioned across multiple clusters (single node in this stand-alone case). Next, the RDD is further transformed (giving as another RDD) using the map operation which in this particular example, converts the integers to individual terms of the given series. It is worth mentioning that the RDD has not executed any transformation as of now. A spark program maintains the order of execution in the form of a DAG (Directed Acyclic Graph) where each vertex represent an RDD and each preceding edge represent the operation that produced that RDD. In other words, each RDD maintains a pointer to it's parent RDD that maintains the transformation that produced it. This is known as lineage.

Coming back to this program, the mapped RDD is then reduced to a single value by calculating a running sum of each of the consecutive terms one by one, summing all the terms of the given series. This action of reduce() is what submits the DAG to the DAG scheduler in Spark and leads to calculation of the specified RDDs. Finally, we take the reciprocal of the series using Python to estimate the value of e. The accuracy of e depends on the chosen value of n. The larger the value of n, the more accurate the estimate will become.

In [4]:
!wget -q https://student.cs.uwaterloo.ca/~cs451/content/cs431/Shakespeare.txt
!wget -q https://student.cs.uwaterloo.ca/~cs451/content/cs431/simple_tokenize.py

In [5]:
file = "Shakespeare.txt"

In [12]:
from simple_tokenize import simple_tokenize

# Returns the count of distinct tokens in the `Shakespeare.txt` dataset
def count_distinct_tokens(sc, file):
    # your solution to Question 2 here

    text_file = sc.textFile(file)

    # Works but not efficient as all the data moved on the network.
    # text_take = text_file.flatMap(lambda x: simple_tokenize(x)).map(lambda x: (x,1)).groupByKey().mapValues(sum).count()

    # Works and is efficient because of combinining although we don't want sum for now.
    text_take = text_file.flatMap(lambda x: simple_tokenize(x)).map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y).count()
    return text_take

In [13]:
count_distinct_tokens(sc, file)

25975

In [15]:
from simple_tokenize import simple_tokenize

# Returns the count of distinct pairs in the `Shakespeare.txt` dataset
from itertools import permutations

def make_pairs(x_list):
  "This function returns unique permutations excluding out reflexive pairs."
  arr = set(x_list)
  pairs = []
  for first_token in arr:
    for second_token in arr:
      if first_token != second_token:
        pairs.append((first_token, second_token))
  return pairs

def count_distinct_pairs(sc, file):
    # your solution to Question 3 here
    with open(file) as f:
      text_file = sc.parallelize(f)
      take = text_file.map(lambda x: simple_tokenize(x)).flatMap(lambda x: make_pairs(x)).map(lambda x: (x,1)).groupByKey().mapValues(sum).count()
    return take

In [16]:
count_distinct_pairs(sc, file)

1969760

In [17]:
from simple_tokenize import simple_tokenize

def no_rep(x):
  "Since we count number of lines instead of actual occurrences of a token, we remove duplicates."
  return list(set(x))

# Returns a list of the top 50 (probability, count, token) tuples, ordered by probability
def top_50_tokens_probabilities(sc, file):
    # your solution to Question 4 here 
    with open(file) as f:
      text_file = sc.textFile(file) 
      text_file.cache() # what if cache before action
      lines_count = text_file.count()
      tokens_count = text_file.map(lambda x: simple_tokenize(x)).flatMap(lambda x: no_rep(x)).map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y).map(lambda x: (x[1], x[0])).sortByKey(False).map(lambda x: (x[1], x[0]))
      tokens_count = tokens_count.map(lambda x: (x[0], x[1], x[1] / lines_count)).take(50)
      
    return tokens_count

In [18]:
top_50_tokens_probabilities(sc, file)

[('and', 24604, 0.2009178657172255),
 ('the', 24300, 0.19843538192686472),
 ('i', 18657, 0.1523542765682928),
 ('to', 18237, 0.148924529226347),
 ('of', 16624, 0.1357526662202551),
 ('a', 13280, 0.10844534452628657),
 ('you', 12196, 0.09959332995802643),
 ('my', 11549, 0.09430988583840991),
 ('in', 10614, 0.08667461497003054),
 ('that', 10569, 0.08630714204053634),
 ('is', 8756, 0.07150206601447026),
 ('not', 8230, 0.06720671577193814),
 ('with', 7552, 0.06167012363422561),
 ('me', 7396, 0.06039621747864574),
 ('for', 7326, 0.05982459292165477),
 ('it', 7147, 0.05836286726877787),
 ('be', 6662, 0.054402325695340446),
 ('this', 6425, 0.05246696826667102),
 ('his', 6403, 0.05228731483447386),
 ('your', 6233, 0.050899083767495794),
 ('but', 6205, 0.05067043394469941),
 ('he', 5816, 0.0474938346208496),
 ('have', 5742, 0.046889545803459144),
 ('thou', 5060, 0.04132028940534714),
 ('as', 4917, 0.04015254209606559),
 ('him', 4840, 0.03952375508337552),
 ('so', 4836, 0.03949109082297604),
 ('

In [19]:
from simple_tokenize import simple_tokenize
from math import log10

# Returns a list of tuples with the following format:
# ((token1, token2), pmi, co-occurrence_count, token1_count, token2_count)
def PMI(sc, file, threshold, reduce=True):
    # your solution to Question 5 here
    with open(file) as f:
      text_file = sc.textFile(file) 

      # cache-ing so text_file is not "re"-loaded while calculating "single token count and probability". 
      text_file.cache() 
      lines_count = text_file.count()

      # single token count and probability
      tokens_count = text_file.map(lambda x: simple_tokenize(x)).flatMap(lambda x: no_rep(x)).map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y).map(lambda x: (x[1], x[0])).sortByKey(False).map(lambda x: (x[1], x[0]))
      tokens_count = tokens_count.map(lambda x: (x[0], (x[1], x[1] / lines_count)))

      # making pairs of tokens
      pairs = text_file.map(lambda x: simple_tokenize(x)).flatMap(lambda x: make_pairs(x)).map(lambda x: (x,1)).groupByKey().mapValues(sum).filter(lambda x: x[1] >= threshold)
      
      # reformatting so information for first token of a pair can be joined
      first_join_prep = pairs.map(lambda x: (x[0][0], list([x[0][1], x[1], x[1]/lines_count])))
      first_joined = first_join_prep.join(tokens_count)

      # reformatting for the second token and joining information
      second_join_prep = first_joined.map(lambda x: (x[1][0][0], [x[1][0][1], x[1][0][2], x[0], x[1][1][0], x[1][1][1]]))
      second_joined = second_join_prep.join(tokens_count)

      # reformatting and calculating PMI
      final = second_joined.map(lambda x: ((x[1][0][2], x[0]), [x[1][0][0], x[1][0][1], x[1][0][3], x[1][0][4], x[1][1][0], x[1][1][1]]))
      final_pmi = final.map(lambda x: (x[0], log10(x[1][1] / (x[1][3] * x[1][5])), x[1][0], x[1][2], x[1][4]))

      if reduce:
        return final_pmi.collect()
      else:
        return final_pmi

In [20]:
PMI(sc, file, 6000)

[(('the', 'of'), 0.342940751918893, 7266, 24300, 16624),
 (('of', 'the'), 0.342940751918893, 7266, 16624, 24300)]

In [21]:
from simple_tokenize import simple_tokenize
from math import log

# Returns a list of samp_size tuples with the following format:
# (token, [ list_of_cooccurring_tokens ])
# where list_of_cooccurring_tokens is of the form
# [((token1, token2), pmi, cooc_count, token1_count, token2_count), ...]
def PMI_one_token(sc, file, threshold, samp_size):
    # your solution to Question 6 here

    # calculating PMIs for distinct tokens based on threshold
    pmi = PMI(sc, file, threshold, reduce=False)

    # grouping the PMIs by a single token (the first token)
    pmi_one_token = pmi.map(lambda x: (x[0][0], x)).groupByKey().map(lambda x: (x[0], list(x[1])))
    
    # sampling
    return pmi_one_token.takeSample(withReplacement=False, num=samp_size, seed=1)

In [22]:
PMI_one_token(sc, file, threshold=2000, samp_size=10)

[('my',
  [(('my', 'and'), 0.0056896916303759305, 2351, 11549, 24604),
   (('my', 'i'), 0.16722948986833625, 2586, 11549, 18657)]),
 ('will', [(('will', 'i'), 0.45697891972441507, 2108, 4831, 18657)]),
 ('of',
  [(('of', 'and'), 0.028305447826683598, 3565, 16624, 24604),
   (('of', 'the'), 0.342940751918893, 7266, 16624, 24300),
   (('of', 'i'), -0.08531809933104956, 2081, 16624, 18657),
   (('of', 'a'), 0.13551796879761382, 2463, 16624, 13280)]),
 ('a',
  [(('a', 'of'), 0.13551796879761382, 2463, 13280, 16624),
   (('a', 'and'), 0.0006198226234107662, 2672, 13280, 24604),
   (('a', 'to'), 0.048227962752620035, 2210, 13280, 18237),
   (('a', 'i'), 0.02456394293286546, 2141, 13280, 18657)]),
 ('in',
  [(('in', 'and'), 0.04031821796867754, 2340, 10614, 24604),
   (('in', 'the'), 0.13332315333353123, 2863, 10614, 24300)]),
 ('that',
  [(('that', 'the'), 0.069458604407197, 2461, 10569, 24300),
   (('that', 'i'), 0.11221751207929728, 2085, 10569, 18657)]),
 ('to',
  [(('to', 'and'), 0.01752