# Lecture 7: Spark Programming

In what follows, you can find pyspark code for the examples we saw in class.
Many of the examples follow examples found in [Learning Spark: Lightning-Fast Big Data Analysis, by Holden Karau, Andy Konwinski, Patrick Wendell, Matei Zaharia](http://www.amazon.com/Learning-Spark-Lightning-Fast-Data-Analysis-ebook/dp/B00SW0TY8O), which you can also find at Aalto's library.

Further information also available here: https://spark.apache.org/docs/1.6.0/programming-guide.html

## Setup

These instructions should work for Mac and Linux. We'll assume you'll be using python3.

To run the following on your computer, make sure that pyspark is in your PYTHONPATH variable.
You can do that by [downloading](https://spark.apache.org/downloads.html) a zipped file with Spark, extracting it into its own folder (e.g., `spark-1.6.0-bin-hadoop2.6/`) and then executing the following commands in bash.
```
export PYSPARK_PYTHON=python3
export SPARK_HOME=/path/to/spark-1.6.0-bin-hadoop2.6/
export PYTHONPATH=$SPARK_HOME/python:$PYTHONPATH
```

On Windows, the easiest way is to use pyspark is to use Anaconda and then install Jupyter and findspark

In [1]:
#On windows
#import findspark
#findspark.init(spark_home="C:/Users/me/software/spark-1.6.3-bin-hadoop2.6/")

In [5]:
import pyspark
import numpy as np # we'll be using numpy for some numeric operations
sc = pyspark.SparkContext(master="local[*]", appName="tour")

In [4]:
sc.stop()

- **local**: Run Spark locally with one worker thread (i.e. no parallelism at all).
- **local[K]**: Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine).
- **local[*]**: Run Spark locally with as many worker threads as logical cores on your machine.
- **spark://HOST:PORT**: Connect to the given Spark standalone cluster master. The port must be whichever one your master is configured to use, which is 7077 by default.
- **mesos://HOST:PORT**: Connect to the given Mesos cluster. The port must be whichever one your is configured to use, which is 5050 by default. Or, for a Mesos cluster using ZooKeeper, use mesos://zk://.... To submit with --deploy-mode cluster, the HOST:PORT should be configured to connect to the MesosClusterDispatcher.
- **yarn**: Connect to a YARN cluster in client or cluster mode depending on the value of --deploy-mode. The cluster location will be found based on the HADOOP_CONF_DIR or YARN_CONF_DIR variable.
- **yarn-client**: Equivalent to yarn with --deploy-mode client, which is preferred to `yarn-client`
- **yarn-cluster**: Equivalent to yarn with --deploy-mode cluster, which is preferred to `yarn-cluster`

In [90]:
# To try the SparkContext with other masters first stop the one that is already running
# sc.stop()

## Lambda functions

[Lambda expressions](https://docs.python.org/3.5/howto/functional.html#small-functions-and-the-lambda-expression) are an easy way to write short functions in Python.

In [7]:
f = lambda line: 'Spark' in line

In [10]:
f("we are learning park")

False

In [102]:
def f(line):
    return 'Spark' in line
f("we are learning Spark")

True

## Creating RDDS

We saw that we can create RDDs by loading files from disk. We can also create RDDs from Python collections or transforming other RDDs.

In [12]:
my_list = [0,1,2,3,4,5,6,7,8,9]

In [13]:
data = sc.parallelize(my_list) # create RDD from Python collection

In [15]:
data_squared = data.map(lambda num: num ** 2) # transformation

In [17]:
data_squared.collect()

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

## RDD operations

There are two types of RDD operations in Spark: *transformations* and *actions*. Transfromations create new RDDs from other RDDs. Actions extract information from RDDs and return it to the driver program.

In [105]:
data = sc.parallelize([0,1,2,3,4,5,6,7,8,9]) # creation of RDD
data_squared = data.map(lambda num: num ** 2) # transformation
data_squared.collect() # action

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

***

In [19]:
text = sc.textFile("myfile.txt",1) # load data

95

In [21]:
text = sc.textFile("myfile.txt") # load data

In [22]:
# count only lines that mention "Spark"
spark_lines = text.filter(lambda line: 'Spark' in line)

In [23]:
spark_lines.collect()

['# Apache Spark',
 'Spark is a fast and general cluster computing system for Big Data. It provides',
 'rich set of higher-level tools including Spark SQL for SQL and DataFrames,',
 'and Spark Streaming for stream processing.',
 'You can find the latest Spark documentation, including a programming',
 '## Building Spark',
 'Spark is built using [Apache Maven](http://maven.apache.org/).',
 'To build Spark and its example programs, run:',
 '["Building Spark"](http://spark.apache.org/docs/latest/building-spark.html).',
 'The easiest way to start using Spark is through the Scala shell:',
 'Spark also comes with several sample programs in the `examples` directory.',
 '    ./bin/run-example SparkPi',
 '    MASTER=spark://host:7077 ./bin/run-example SparkPi',
 'Testing first requires [building Spark](#building-spark). Once Spark is built, tests',
 'Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported',
 'Hadoop, you must build Spark against the same version that your c

## Lazy evaluation

RDDs are **evaluated lazily**. This means that Spark will not materialize an RDD until it has to perform an action. In the example below, `primesRDD` is not evaluated until action `collect()` is performed on it.

In [24]:
def is_prime(num):
    """ return True if num is prime, False otherwise"""
    if num < 1 or num % 1 != 0:
        raise Exception("invalid argument")
    for d in range(2, int(np.sqrt(num) + 1)):
        if num % d == 0:
            return False
    return True

In [28]:
numbersRDD = sc.parallelize(range(1, 1000000)) # creation of RDD
primesRDD = numbersRDD.filter(is_prime) # transformation

In [29]:
# primesRDD has not been materialized until this point
primes = primesRDD.collect() # action

In [36]:
type(primesRDD)

pyspark.rdd.PipelinedRDD

In [33]:
print(primes[0:15])
print(primesRDD.take(15))

[1, 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43]
[1, 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43]


## Persistence

RDDs are **ephemeral** by default, i.e. there is no guarantee they will remain in memory after they are materialized. If we want them to `persist` in memory, possibly to query them repeatedly or use them in multiple operations, we can ask Spark to do this by calling `persist()` on them.

In [38]:
primesRDD.persist()

PythonRDD[13] at collect at <ipython-input-29-e29460052702>:2

In [39]:
primesRDD.persist() # we're asking Spark to keep this RDD in memory. Note that cache is the same but as using persist for memory. However, persist allows you to define other types of storage

print("Found", primesRDD.count(), "prime numbers") # first action -- causes primesRDD to be materialized
print("Here are some of them:")

Found 78499 prime numbers
Here are some of them:
[1, 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67]


In [40]:
print(primesRDD.take(20)) # second action - RDD is already in memory

[1, 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67]


If we do not need `primesRDD` in memory anymore, we can tell Spark to discard it.

In [41]:
primesRDD.unpersist()

PythonRDD[13] at collect at <ipython-input-29-e29460052702>:2

How long does it take to collect `primesRDD`? Let's time the operation.

In [42]:
%%timeit
primes = primesRDD.collect()

1 loop, best of 3: 19.2 s per loop


When I ran the above on my laptop, it took about more than 10s. That's because Spark had to evaluate `primesRDD` before performing `collect` on it.

How long would it take if `primesRDD` was already in memory?

In [43]:
primesRDD.persist()

PythonRDD[13] at collect at <ipython-input-29-e29460052702>:2

In [44]:
%%timeit
primes = primesRDD.collect()

The slowest run took 17.60 times longer than the fastest. This could mean that an intermediate result is being cached.
1 loop, best of 3: 1.07 s per loop


When I ran the above on my laptop, it took about 1s to collect `primesRDD` - that's almost $10$ times faster compared to when the RDD had to be recomputed from scratch.

***

## Passing functions

When we pass a function as a parameter to an RDD operation, the function can be specified either as a lambda function or as a reference to a function defined elsewhere.

In [45]:
data = sc.parallelize(range(10))
squares = data.map(lambda x: x**2)
squares.collect()

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

In [46]:
def f(x):
    """ return the square of a number"""
    return x**2

data = sc.parallelize(range(10))
squares = data.map(f)
squares.collect()

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Be careful, though: if the function that you pass as argument to an RDD operation 
* is an object method, or
* references an object field,

then Spark will ship the entire object to the cluster nodes along with the function.

This is demonstrated in the piece of code below.

In [121]:
class SearchFunctions(object):
    def __init__(self, query):
        self.query
        
    def is_match(self, s):
        return self.query in s
    
    def get_matches_in_rdd_v1(self, rdd):
        return rdd.filter(self.is_match) # the function is an object method
    
    def get_matches_in_rdd_v2(self, rdd):
        return rdd.filter(lambda x: self.query in x) # the function references an object field

The following is a better way to implement the two methods above (`get_matches_in_rdd_v1` and `get_matches_in_rdd_v2`), if we want to avoid sending a SearchFunctions object for computation to the cluster.

In [122]:
class SearchFunctions(object):
    def __init__(self, query):
        self.query
        
    def is_match(self, s):
        return self.query in s
    
    def get_matches_in_rdd(self, rdd):
        query = self.query
        return rdd.filter(lambda x: query in x)

***

## map and flatmap

In [47]:
phrases = sc.parallelize(["hello world", "terve terve", "how are you"])

words_map = phrases.map(lambda phrase: phrase.split(" "))

words_map.collect() # This returns a list of lists

[['hello', 'world'], ['terve', 'terve'], ['how', 'are', 'you']]

In [48]:
phrases = sc.parallelize(["hello world", "terve terve", "how are you"])

words_flatmap = phrases.flatMap(lambda phrase: phrase.split(" "))

words_flatmap.collect() # This returns a list withe the combined elements of the list

['hello', 'world', 'terve', 'terve', 'how', 'are', 'you']

In [226]:
# We can use the flatmap to make a word count
words_flatmap.map(
    lambda x: (x,1)
).reduceByKey(
    lambda x,y: x+y
).collect()

[('are', 1), ('hello', 1), ('you', 1), ('how', 1), ('terve', 2), ('world', 1)]

***

## (Pseudo) set operations

In [50]:
oneRDD = sc.parallelize([1, 1, 1, 2, 3, 3, 4, 4])
oneRDD.persist()

ParallelCollectionRDD[27] at parallelize at PythonRDD.scala:423

In [51]:
otherRDD = sc.parallelize([1, 4, 4, 7])
otherRDD.persist()

ParallelCollectionRDD[28] at parallelize at PythonRDD.scala:423

In [56]:
unionRDD = oneRDD.union(otherRDD)
unionRDD.persist()

UnionRDD[33] at union at null:-2

In [24]:
oneRDD.subtract(otherRDD).collect()

[2, 3, 3]

In [57]:
oneRDD.distinct().collect()

[1, 2, 3, 4]

In [26]:
oneRDD.intersection(otherRDD).collect() # removes duplicates

[4, 1]

In [27]:
oneRDD.cartesian(otherRDD).collect()[:5]

[(1, 1), (1, 4), (1, 4), (1, 7), (1, 1)]

***

## reduce

In [61]:
np.sum([1,43,62,23,52])

181

In [62]:
data = sc.parallelize([1,43,62,23,52])
data.reduce(lambda x, y: x + y)

181

In [63]:
data.reduce(lambda x, y: x * y)

3188536

In [64]:
data.reduce(lambda x, y: x**2 + y**2) # this does NOT compute the sum of squares of RDD elements

137823683725010149883130929

In [137]:
((((1 ** 2 + 43 ** 2) ** 2 + 62 ** 2) **2 + 23 ** 2) **2 + 52 **2)

137823683725010149883130929

In [200]:
data.reduce(lambda x, y: np.sqrt(x**2 + y**2)) ** 2

8927.0

In [198]:
np.sum(np.array([1,43,62,23,52]) ** 2)

8927

***

## aggregate

In [65]:
help(data.aggregate)

Help on method aggregate in module pyspark.rdd:

aggregate(zeroValue, seqOp, combOp) method of pyspark.rdd.RDD instance
    Aggregate the elements of each partition, and then the results for all
    the partitions, using a given combine functions and a neutral "zero
    value."
    
    The functions C{op(t1, t2)} is allowed to modify C{t1} and return it
    as its result value to avoid object allocation; however, it should not
    modify C{t2}.
    
    The first function (seqOp) can return a different result type, U, than
    the type of this RDD. Thus, we need one operation for merging a T into
    an U and one operation for merging two U
    
    >>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
    >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
    >>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)
    (10, 4)
    >>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp)
    (0, 0)



In [68]:
def seq(x,y):
    return x[0] + y, x[1] + 1

In [69]:
def comb(x,y):
    print(x,y,"comb")
    return x[0] + y[0], x[1] + y[1]

In [73]:
data = sc.parallelize([1,43,62,23,52], 1) # Try different levels of paralellism
aggr = data.aggregate(zeroValue = (0,0),
                      seqOp = seq, #
                      combOp = comb)

(0, 0) (181, 5) comb


In [74]:
aggr

(181, 5)

In [75]:
aggr[0] / aggr[1] # average value of RDD elements

36.2

***

## reduceByKey

In [189]:
help(pairRDD.reduceByKey)

Help on method reduceByKey in module pyspark.rdd:

reduceByKey(func, numPartitions=None, partitionFunc=<function portable_hash at 0x000001E7C579D378>) method of pyspark.rdd.RDD instance
    Merge the values for each key using an associative reduce function.
    
    This will also perform the merging locally on each mapper before
    sending results to a reducer, similarly to a "combiner" in MapReduce.
    
    Output will be partitioned with C{numPartitions} partitions, or
    the default parallelism level if C{numPartitions} is not specified.
    Default partitioner is hash-partition.
    
    >>> from operator import add
    >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
    >>> sorted(rdd.reduceByKey(add).collect())
    [('a', 2), ('b', 1)]



In [76]:
pairRDD = sc.parallelize([('$APPL', 100.64), 
                          ('$APPL', 100.52), 
                          ('$GOOG', 706.2), 
                          ('$AMZN', 552.32), 
                          ('$AMZN', 552.32) ])

pairRDD.reduceByKey(lambda x,y: x + y).collect() # sum of values per key

[('$APPL', 201.16), ('$AMZN', 1104.64), ('$GOOG', 706.2)]

From https://github.com/vaquarkhan/vk-wiki-notes/wiki/reduceByKey--vs-groupBykey-vs-aggregateByKey-vs-combineByKey

ReduceByKey will aggregate y key before shuffling: 
![alt text](https://camo.githubusercontent.com/516114b94193cddf7e59bdd5368d6756d30dc8b4/687474703a2f2f7777772e727578697a68616e672e636f6d2f75706c6f6164732f342f342f302f322f34343032333436352f313836363838325f6f7269672e706e67)

GroupByKey will shuffle all the value key pairs as the diagrams show: 
![alt text](https://camo.githubusercontent.com/ed75baabdaee2198d3fc1390e04a5d20bcd2e484/687474703a2f2f7777772e727578697a68616e672e636f6d2f75706c6f6164732f342f342f302f322f34343032333436352f333030393135315f6f7269672e706e67)

***

## combineByKey

In [191]:
help(pairRDD.combineByKey)

Help on method combineByKey in module pyspark.rdd:

combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None, partitionFunc=<function portable_hash at 0x000001E7C579D378>) method of pyspark.rdd.RDD instance
    Generic function to combine the elements for each key using a custom
    set of aggregation functions.
    
    Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined
    type" C.  Note that V and C can be different -- for example, one might
    group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]).
    
    Users provide three functions:
    
        - C{createCombiner}, which turns a V into a C (e.g., creates
          a one-element list)
        - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of
          a list)
        - C{mergeCombiners}, to combine two C's into a single one.
    
    In addition, users can control the partitioning of the output RDD.
    
    >>> x = sc.parallelize([("a", 1), ("b", 1), ("a

In [78]:
pairRDD = sc.parallelize([ ('$APPL', 100.64), ('$GOOG', 706.2), ('$AMZN', 552.32), ('$APPL', 100.52), ('$AMZN', 552.32) ])

aggr = pairRDD.combineByKey(createCombiner = lambda x: (x, 1),
                           mergeValue = lambda x,y: (x[0] + y, x[1] + 1),
                           mergeCombiners = lambda x,y: (x[0] + y[0], x[1] + y[1]))

In [79]:
aggr.collect()

[('$APPL', (201.16, 2)), ('$AMZN', (1104.64, 2)), ('$GOOG', (706.2, 1))]

In [80]:
aggr.map(lambda x: (x[0], x[1][0]/x[1][1])).collect() # average value per key

[('$APPL', 100.58), ('$AMZN', 552.32), ('$GOOG', 706.2)]

***

## (inner) join

In [84]:
help(course_a.join)

Help on method join in module pyspark.rdd:

join(other, numPartitions=None) method of pyspark.rdd.RDD instance
    Return an RDD containing all pairs of elements with matching keys in
    C{self} and C{other}.
    
    Each pair of elements will be returned as a (k, (v1, v2)) tuple, where
    (k, v1) is in C{self} and (k, v2) is in C{other}.
    
    Performs a hash join across the cluster.
    
    >>> x = sc.parallelize([("a", 1), ("b", 4)])
    >>> y = sc.parallelize([("a", 2), ("a", 3)])
    >>> sorted(x.join(y).collect())
    [('a', (1, 2)), ('a', (1, 3))]



In [83]:
course_a = sc.parallelize([ ("Antti", 8), ("Tuukka", 10), ("Leena", 9)])
course_b = sc.parallelize([ ("Leena", 10), ("Tuukka", 10)])

result = course_a.join(course_b)
result.collect()

[('Tuukka', (10, 10)), ('Leena', (9, 10))]

***

## Accumulators

This example demonstrates how to use accumulators.
The map operations creates an RDD that contains the length of lines in the text file - and while the RDD is materialized, an accumulator keeps track of how many lines are long (longer than $30$ characters).

In [85]:
text = sc.textFile("myfile.txt")
long_lines = sc.accumulator(0) # create accumulator

def line_len(line):
    global long_lines # to reference an accumulator, declare it as global variable
    length = len(line)
    if length > 30:
        long_lines += 1 # update the accumulator
    return length

llengthRDD = text.map(line_len)
llengthRDD.count()

95

In [87]:
long_lines.value # this is how we obtain the value of the accumulator in the driver program

45

In [261]:
help(long_lines)

Help on Accumulator in module pyspark.accumulators object:

class Accumulator(builtins.object)
 |  A shared variable that can be accumulated, i.e., has a commutative and associative "add"
 |  operation. Worker tasks on a Spark cluster can add values to an Accumulator with the C{+=}
 |  operator, but only the driver program is allowed to access its value, using C{value}.
 |  Updates from the workers get propagated automatically to the driver program.
 |  
 |  While C{SparkContext} supports accumulators for primitive data types like C{int} and
 |  C{float}, users can also define accumulators for custom types by providing a custom
 |  L{AccumulatorParam} object. Refer to the doctest of this module for an example.
 |  
 |  Methods defined here:
 |  
 |  __iadd__(self, term)
 |      The += operator; adds a term to this accumulator's value
 |  
 |  __init__(self, aid, value, accum_param)
 |      Create a new Accumulator with a given initial value and AccumulatorParam object
 |  
 |  __reduce

In [262]:
long_lines.value # this is how we obtain the value of the accumulator in the driver program

45

### Warning

In the example above, we update the value of an accumulator within a transformation (map). This is **not recommended**, unless for debugging purposes! The reason is that, if there are failures during the materialization of `llengthRDD`, some of its partitions will be re-computed, possibly causing the accumulator to double-count some the the long lines.

It is advisable to use accumulators within actions - and particularly with the `foreach` action, as demonstrated below.

In [265]:
text = sc.textFile("myfile.txt")
long_lines_2 = sc.accumulator(0)

def line_len(line):
    global long_lines_2
    length = len(line)
    if length > 30:
        long_lines_2 += 1

text.foreach(line_len)

long_lines_2.value

45

***

## Broadcast variable

We use *broadcast variables* when many operations depend on the same large static object - e.g., a large lookup table that does not change but provides information for other operations. In such cases, we can make a broadcast variable out of the object and thus make sure that the object will be shipped to the cluster only once - and not for each of the operations we'll be using it for.

The example below demonstrates the usage of broadcast variables. In this case, we make a broadcast variable out of a dictionary that represents an address table. The tablke is shipped to cluster nodes only once across multiple operations.

In [88]:
def load_address_table():
    return {"Anu": "Chem. A143", "Karmen": "VTT, 74", "Michael": "OIH, B253.2",
            "Anwar": "T, B103", "Orestis": "T, A341", "Darshan": "T, A325"}

address_table = sc.broadcast(load_address_table())

In [267]:
def find_address(name):
    res = None
    if name in address_table.value:
        res = address_table.value[name]
    return res

people = sc.parallelize(["Anwar", "Michael", "Orestis", "Darshan"])
pairRDD = people.map(lambda name: (name, find_address(name))) # first operation that uses the address table
print(pairRDD.collectAsMap())

other_people = sc.parallelize(["Karmen", "Michael", "Anu"])
pairRDD = other_people.map(lambda name: (name, find_address(name))) # second operation that uses the address table
print(pairRDD.collectAsMap())

{'Anwar': 'T, B103', 'Michael': 'OIH, B253.2', 'Darshan': 'T, A325', 'Orestis': 'T, A341'}
{'Anu': 'Chem. A143', 'Karmen': 'VTT, 74', 'Michael': 'OIH, B253.2'}


***

***

## Stopping

Call `stop()` on the SparkContext object to shut it down.

In [40]:
sc.stop()

# Example: Estimating Pi

In [41]:
import random

NUM_SAMPLES = 10000000

def inside(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1

count = sc.parallelize(range(0, NUM_SAMPLES)).filter(inside).count()
print("Pi is roughly %f" % (4.0 * count / NUM_SAMPLES))

Pi is roughly 3.141248


# Example: Computing PageRank

In [5]:
# Details of the algorithm can be found here: http://www.cs.princeton.edu/~chazelle/courses/BIB/pagerank.htm
iterations = 5

In [23]:
def computeContribs(urls, rank):
    """Calculates URL contributions to the rank of other URLs."""
    num_urls = len(urls)
    for url in urls:
        yield (url, rank / num_urls)

def parseNeighbors(urls):
    """Parses a urls pair string into urls pair."""
    parts = urls.split(',')
    return parts[0], parts[1]

In [24]:
# Read the lines
lines = sc.textFile("higgs-mention_network.txt").persist()

In [25]:
lines.collect()

['316609,5011',
 '439696,12389',
 '60059,6929',
 '161345,8614',
 '137487,759',
 '57587,107757',
 '397696,6940',
 '436988,71',
 '43994,90976',
 '124554,286277',
 '189395,189395',
 '97700,162293',
 '427934,46803',
 '237395,284372',
 '256336,31549',
 '113665,503',
 '91356,32652',
 '173861,88',
 '274148,274149',
 '171699,1274',
 '203156,2994',
 '62045,15',
 '154893,32098',
 '12866,22252',
 '425029,35248',
 '379679,2417',
 '265505,112050',
 '7363,396581',
 '86142,16452',
 '253856,5226',
 '85796,5226',
 '357993,111667',
 '233022,7932',
 '821,94891',
 '133528,250519',
 '236304,56771',
 '71457,71',
 '69597,11921',
 '6999,88',
 '107893,16358',
 '251752,3998',
 '158380,52504',
 '11680,1988',
 '185346,392959',
 '6800,2055',
 '39671,154603',
 '73254,29378',
 '3228,13537',
 '177507,216',
 '61358,21273',
 '370779,26677',
 '272461,146122',
 '225193,220',
 '126681,88',
 '27095,8438',
 '76782,3998',
 '305239,16613',
 '424421,88',
 '40939,88',
 '177767,5005',
 '110041,76634',
 '86128,216',
 '429956,1179

In [26]:
# Loads all URLs from input file and initialize their neighbors.
links = lines.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache()

In [27]:
links.collect()

[('231424', <pyspark.resultiterable.ResultIterable at 0x2085e494470>),
 ('223127', <pyspark.resultiterable.ResultIterable at 0x2085e494828>),
 ('43948', <pyspark.resultiterable.ResultIterable at 0x2085e494668>),
 ('437899', <pyspark.resultiterable.ResultIterable at 0x2085e494898>),
 ('13674', <pyspark.resultiterable.ResultIterable at 0x2085e4940f0>),
 ('60737', <pyspark.resultiterable.ResultIterable at 0x2085d9d9390>),
 ('380346', <pyspark.resultiterable.ResultIterable at 0x2085d9d9128>),
 ('3665', <pyspark.resultiterable.ResultIterable at 0x2085e4947f0>),
 ('51328', <pyspark.resultiterable.ResultIterable at 0x2085d9d9278>),
 ('243762', <pyspark.resultiterable.ResultIterable at 0x2085d9d9710>),
 ('223789', <pyspark.resultiterable.ResultIterable at 0x2085d9d96a0>),
 ('254009', <pyspark.resultiterable.ResultIterable at 0x2085d98f828>),
 ('124614', <pyspark.resultiterable.ResultIterable at 0x2085d98fa20>),
 ('153015', <pyspark.resultiterable.ResultIterable at 0x2085d98fb00>),
 ('173474', 

In [28]:
# Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
ranks = links.map(lambda url_neighbors: (url_neighbors[0], 1.0))

In [29]:
ranks.collect()

[('231424', 1.0),
 ('223127', 1.0),
 ('43948', 1.0),
 ('437899', 1.0),
 ('13674', 1.0),
 ('60737', 1.0),
 ('380346', 1.0),
 ('3665', 1.0),
 ('51328', 1.0),
 ('243762', 1.0),
 ('223789', 1.0),
 ('254009', 1.0),
 ('124614', 1.0),
 ('153015', 1.0),
 ('173474', 1.0),
 ('18780', 1.0),
 ('383191', 1.0),
 ('384822', 1.0),
 ('437813', 1.0),
 ('148657', 1.0),
 ('156477', 1.0),
 ('376767', 1.0),
 ('143843', 1.0),
 ('267319', 1.0),
 ('314417', 1.0),
 ('94481', 1.0),
 ('310946', 1.0),
 ('336269', 1.0),
 ('386922', 1.0),
 ('50046', 1.0),
 ('80779', 1.0),
 ('408289', 1.0),
 ('20922', 1.0),
 ('337714', 1.0),
 ('21432', 1.0),
 ('316841', 1.0),
 ('74590', 1.0),
 ('182825', 1.0),
 ('233251', 1.0),
 ('7588', 1.0),
 ('128016', 1.0),
 ('152610', 1.0),
 ('52064', 1.0),
 ('105740', 1.0),
 ('32162', 1.0),
 ('131946', 1.0),
 ('160938', 1.0),
 ('238852', 1.0),
 ('143500', 1.0),
 ('160528', 1.0),
 ('65644', 1.0),
 ('239322', 1.0),
 ('82191', 1.0),
 ('176671', 1.0),
 ('233718', 1.0),
 ('55196', 1.0),
 ('209601', 

In [33]:
# Calculates and updates URL ranks continuously using PageRank algorithm.
for iteration in range(iterations):
    # Calculates URL contributions to the rank of other URLs.
    contribs = links.join(ranks).flatMap(
        lambda url_urls_rank: computeContribs(url_urls_rank[1][0], url_urls_rank[1][1]))
    # Re-calculates URL ranks based on neighbor contributions.
    ranks = contribs.reduceByKey(lambda x,y: x+y).mapValues(lambda rank: rank * 0.85 + 0.15)
# Collects all URL ranks and dump them to console.
for (link, rank) in ranks.collect():
    print("%s has rank: %s." % (link, rank))

316785 has rank: 0.32887921963500977.
27868 has rank: 0.2817196222974318.
132749 has rank: 0.7035119357910156.
261872 has rank: 1.1748629910836759.
85113 has rank: 2.5085980625.
59817 has rank: 0.4159359744037455.
373297 has rank: 0.1835724325019531.
175682 has rank: 1.0.
5772 has rank: 1.3771495156249993.
22991 has rank: 0.6018105371623558.
17866 has rank: 0.7410292739900823.
372414 has rank: 0.3745169113159179.
169034 has rank: 1.3771495156249993.
89946 has rank: 0.25175104831252915.
69902 has rank: 0.21126345012431413.
22371 has rank: 0.8213711594238279.
321667 has rank: 0.7035119357910156.
35941 has rank: 0.7559792050808376.
306525 has rank: 1.3771495156249993.
241712 has rank: 1.0.
317 has rank: 52.27317648159579.
4380 has rank: 14.055130840607882.
39862 has rank: 1.918843536458333.
24341 has rank: 0.24433453718171294.
321295 has rank: 0.7035119357910156.
55950 has rank: 1.2828621367187494.
54798 has rank: 0.3802194771947873.
357068 has rank: 0.8272641206054686.
217327 has rank: 1