## How do I make an RDD?

RDDs can be created from stable storage or by transforming other RDDs. Run the cells below to create RDDs from files on the local drive.  All data files can be downloaded from https://www.cse.ust.hk/msbd5003/data/

In [1]:
# On Azure:
fruits = sc.textFile('../data/fruits.txt')
yellowThings = sc.textFile('../data/yellowthings.txt')
fruits.collect()

['apple',
 'banana',
 'canary melon',
 'grape',
 'lemon',
 'orange',
 'pineapple',
 'strawberry']

In [2]:
# In local mode:
fruits = sc.textFile('../data/fruits.txt')
yellowThings = sc.textFile('../data/yellowthings.txt')
print(fruits.collect())
print(yellowThings.collect())

['apple', 'banana', 'canary melon', 'grape', 'lemon', 'orange', 'pineapple', 'strawberry']
['banana', 'bee', 'butter', 'canary melon', 'gold', 'lemon', 'pineapple', 'sunflower']


In [3]:
# On Azure:
# You can also read from other containers.
# The 'cluster' container under the storage account 'msbd' has been made public.
# Use the following format to read data from a public container
# The file can also be accessed from the web at: 
# https://msbd.blob.core.windows.net/cluster/data/course.txt

# this should be on Azure
txtfile = sc.textFile('wasb://cluster@cse.ust.hk/msbd5003/data/course.txt')
txtfile.collect()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.io.IOException: No FileSystem for scheme: wasb
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:258)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:162)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


----------
## PySpark magics 

The PySpark kernel provides some predefined “magics”, which are special commands that you can call with `%%` (e.g. `%%MAGIC` <args>). The magic command must be the first word in a code cell and allow for multiple lines of content. You can’t put comments before a cell magic.

For more information on magics, see [here](http://ipython.readthedocs.org/en/stable/interactive/magics.html).

In [4]:
%%info

UsageError: Cell magic `%%info` not found.


### Session configuration (%%configure)
 
Use the `%%configure` magic to configure new or existing Livy sessions.
* If a session is already running, you can change the configuration by using the `-f` argument with `%%configure` magic. This will delete the current session and recreate it with the applied configurations. If you don't provide the `-f` argument, an error will be displayed and no configuration changes will be applied.
* If you haven't already started the session, then the `-f` argument is not mandatory. Even if you use it with a session that you are just creating, it will not delete any currently running sessions.

These are some session attributes that can be used for configuration 
- **"name"**: Name of the application
- **"driverMemory"**: Memory for driver (e.g. 1000M, 2G) 
- **"executorMemory"**: Memory for executor (e.g. 1000M, 2G) 
- **"executorCores"**: Number of cores used by executor

In [5]:
%%configure -f
{"executorCores":4}

UsageError: Cell magic `%%configure` not found.


----------

##  RDD operations

In [3]:
# map
fruitsReversed = fruits.map(lambda fruit: fruit[::-1])

In [4]:
fruitsReversed.unpersist()
# try changing the file and re-execute with and without cache
fruitsReversed.collect()

['elppa',
 'ananab',
 'nolem yranac',
 'eparg',
 'nomel',
 'egnaro',
 'elppaenip',
 'yrrebwarts']

In [5]:
fruitsReversed.persist()
# don't be changed
fruitsReversed.collect()

['elppa',
 'ananab',
 'nolem yranac',
 'eparg',
 'nomel',
 'egnaro',
 'elppaenip',
 'yrrebwarts']

In [6]:
# filter
shortFruits = fruits.filter(lambda fruit: len(fruit) <= 5)
shortFruits.collect()

['apple', 'grape', 'lemon']

In [7]:
# flatMap
characters = fruits.flatMap(lambda fruit: list(fruit))
characters.collect()

['a',
 'p',
 'p',
 'l',
 'e',
 'b',
 'a',
 'n',
 'a',
 'n',
 'a',
 'c',
 'a',
 'n',
 'a',
 'r',
 'y',
 ' ',
 'm',
 'e',
 'l',
 'o',
 'n',
 'g',
 'r',
 'a',
 'p',
 'e',
 'l',
 'e',
 'm',
 'o',
 'n',
 'o',
 'r',
 'a',
 'n',
 'g',
 'e',
 'p',
 'i',
 'n',
 'e',
 'a',
 'p',
 'p',
 'l',
 'e',
 's',
 't',
 'r',
 'a',
 'w',
 'b',
 'e',
 'r',
 'r',
 'y']

In [8]:
characters = fruits.map(lambda fruit: list(fruit))
characters.collect()
# because it runs after flatmap, the result represents flatted characters

[['a', 'p', 'p', 'l', 'e'],
 ['b', 'a', 'n', 'a', 'n', 'a'],
 ['c', 'a', 'n', 'a', 'r', 'y', ' ', 'm', 'e', 'l', 'o', 'n'],
 ['g', 'r', 'a', 'p', 'e'],
 ['l', 'e', 'm', 'o', 'n'],
 ['o', 'r', 'a', 'n', 'g', 'e'],
 ['p', 'i', 'n', 'e', 'a', 'p', 'p', 'l', 'e'],
 ['s', 't', 'r', 'a', 'w', 'b', 'e', 'r', 'r', 'y']]

In [9]:
# union
fruitsAndYellowThings = fruits.union(yellowThings)
fruitsAndYellowThings.collect()

['apple',
 'banana',
 'canary melon',
 'grape',
 'lemon',
 'orange',
 'pineapple',
 'strawberry',
 'banana',
 'bee',
 'butter',
 'canary melon',
 'gold',
 'lemon',
 'pineapple',
 'sunflower']

In [10]:
# intersection
# this one also involves shuffle
yellowFruits = fruits.intersection(yellowThings)
yellowFruits.collect()

['pineapple', 'canary melon', 'lemon', 'banana']

In [11]:
# distinct
# remove duplicated ones
# this involves shuffle so that it is expensive
distinctFruitsAndYellowThings = fruitsAndYellowThings.distinct()
distinctFruitsAndYellowThings.collect()

['orange',
 'pineapple',
 'canary melon',
 'grape',
 'lemon',
 'bee',
 'banana',
 'butter',
 'gold',
 'sunflower',
 'apple',
 'strawberry']

### RDD actions
Following are examples of some of the common actions available. For a detailed list, see [RDD Actions](https://spark.apache.org/docs/2.0.0/programming-guide.html#actions).

Run some transformations below to understand this better. Place the cursor in the cell and press **SHIFT + ENTER**.

In [12]:
# collect
fruitsArray = fruits.collect()
yellowThingsArray = yellowThings.collect()
fruitsArray

['apple',
 'banana',
 'canary melon',
 'grape',
 'lemon',
 'orange',
 'pineapple',
 'strawberry']

In [13]:
# count
numFruits = fruits.count()
numFruits

8

In [14]:
# take
first3Fruits = fruits.take(3)
first3Fruits

['apple', 'banana', 'canary melon']

In [15]:
# reduce
# still RDD of sets
# list of long list sets
# cracked all distinct list
letterSet = fruits.map(lambda fruit: set(fruit)).reduce(lambda x, y: x.union(y))
letterSet

{' ',
 'a',
 'b',
 'c',
 'e',
 'g',
 'i',
 'l',
 'm',
 'n',
 'o',
 'p',
 'r',
 's',
 't',
 'w',
 'y'}

In [16]:
# remove duplicated characters
letterSet = fruits.flatMap(lambda fruit: list(fruit)).distinct().collect()
letterSet

['p',
 'l',
 'b',
 'c',
 'r',
 'y',
 'g',
 'i',
 's',
 'a',
 'e',
 'n',
 ' ',
 'm',
 'o',
 't',
 'w']

### Closure

In [17]:
counter = 0
# xrange should be range in Python 3
rdd = sc.parallelize(range(10))

# Wrong: Don't do this!!
def increment_counter(x):
    global counter
    counter += x

rdd.foreach(increment_counter)

print(counter)

0


In [18]:
rdd = sc.parallelize(range(10))
accum = sc.accumulator(0)

def g(x):
    global accum
    accum += x

# invoke each RDD
a = rdd.foreach(g)

print(accum.value)

45


In [19]:
# accumulator causes to a problem when you use a.cache() or not.
rdd = sc.parallelize(range(10))
accum = sc.accumulator(0)

def g(x):
    global accum
    accum += x
    return x * x

a = rdd.map(g) # invoke accum via func g
print(accum.value) # nothing has taken place yet.
print(rdd.reduce(lambda x, y: x+y)) # action of RDD
#a.cache()
print(accum.value) # STILL 0 because did not map?
tmp = a.count()
print(accum.value)
print(rdd.reduce(lambda x, y: x+y))

tmp = a.count()
print(accum.value)
print(rdd.reduce(lambda x, y: x+y))


0
45
0
45
45
90
45


In [20]:
rdd = sc.parallelize(range(10))
accum = sc.accumulator(0)

def g(x):
    global accum
    accum += x
    return x * x

a = rdd.map(g) # invoke accum via func g
print(accum.value) # nothing has taken place yet.
print(rdd.reduce(lambda x, y: x+y)) # action of RDD
a.cache()
print(accum.value) # STILL 0 because did not map?
tmp = a.count()
print(accum.value)
print(rdd.reduce(lambda x, y: x+y)) # rdd is cached, and it doesn't run again.

# 
tmp = a.count()
print(accum.value)
print(rdd.reduce(lambda x, y: x+y))

0
45
0
45
45
45
45


In [21]:
from operator import add

rdd = sc.parallelize(range(10))

print(rdd.sum())

45


In [22]:
A = sc.parallelize(range(10))

#print A.collect()

x = 5
B = A.filter(lambda z: z < x)
#B.cache()
# print B.take(10)
#print B.collect()
x = 3
#print B.count()
print(B.take(10)) 
#print B.collect()
#collect() doesn't always re-collect data - bad design!

[0, 1, 2]


In [23]:
# RDD variables are references
A = sc.parallelize(range(10))
B = A.map(lambda x: x*2)
A = B.map(lambda x: x+1)
A.take(10)

[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]

In [24]:
# Linear-time selection

data = [34, 67, 21, 56, 47, 89, 12, 44, 74, 43, 26]
A = sc.parallelize(data,2)
k = 4

while True:
    x = A.first()
    A1 = A.filter(lambda z: z < x)
    A2 = A.filter(lambda z: z > x)
    mid = A1.count()
    if mid == k:
        print(x)
        break
    if k < mid:
        A = A1
    else:
        A = A2
        k = k - mid - 1
    A.cache()
    

43


In [25]:
sorted(data)

[12, 21, 26, 34, 43, 44, 47, 56, 67, 74, 89]

### Computing Pi using Monte Carlo simulation

In [32]:
# From the official spark examples.
# very easy to parallelize
# closure

import sys
import random

partitions = 1

# throwing more points will reduce error? Nope..
# partitions = 1000
# not real random

n = 100000 * partitions

# doesn't care about input parameter
def f(_):
    x = random.random()
    y = random.random()
    return 1 if x ** 2 + y ** 2 < 1 else 0

# generages rdd
# xrange in Python 2, ragne in Python 3
count = sc.parallelize(range(1, n + 1), partitions) \
          .map(f).sum()

print("Pi is roughly", 4.0 * count / n)

Pi is roughly 3.1358


In [52]:
import sys
import random
a = sc.parallelize(range(0,20),4)
print(a.collect())

# glom() is useful when you debug
# rdd into list of list
print(a.glom().collect())
a.map(lambda x: random.random()).glom().collect()

# represents each partition
def f1(it):
    s = 0
    
    for i in it:
        s += i
        yield s
    # mapPartitions always use yield, not return
    #yield s
    
def f2(index, it):
    s =  index
    
    for i in it:
        s += i
        yield s

#a.mapPartitions(f1).collect()
a.mapPartitionsWithIndex(f2).collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[[0, 1, 2, 3, 4], [5, 6, 7, 8, 9], [10, 11, 12, 13, 14], [15, 16, 17, 18, 19]]


[0, 1, 3, 6, 10, 6, 12, 19, 27, 36, 12, 23, 35, 48, 62, 18, 34, 51, 69, 88]

In [38]:
# Correct version

partitions = 1000
n = 100000 * partitions

def f(index, it):
    random.seed(index + 987236)
    for i in it:
        x = random.random()
        y = random.random()
        yield 1 if x ** 2 + y ** 2 < 1 else 0

count = sc.parallelize(range(1, n + 1), partitions) \
          .mapPartitionsWithIndex(f).reduce(lambda a,b: a+b)

print("Pi is roughly", 4.0 * count / n)

Pi is roughly 3.14152072


### Closure and Persistence

In [58]:
A = sc.parallelize(range(10))

#print(A.collect())

x = 5
B = A.filter(lambda z: z < x)
# B.cache()
print(B.take(10))
#print(B.collect())
x = 3
print(B.take(10))
#print(B.collect())
#collect() doesn't always re-collect data - bad design!

[0, 1, 2, 3, 4]
[0, 1, 2]


In [59]:
# comment out cache()
A = sc.parallelize(range(10))

#print(A.collect())

x = 5
B = A.filter(lambda z: z < x)
B.cache()
print(B.take(10))
#print(B.collect())
x = 3
print(B.take(10))
#print(B.collect())
#collect() doesn't always re-collect data - bad design!

[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4]


In [62]:
# collect commented out, not take
# try not to use collect
# use take!
A = sc.parallelize(range(10))

#print(A.collect())

x = 5
B = A.filter(lambda z: z < x)
#B.cache()
#print(B.take(10))
print(B.collect())
x = 3
#print(B.take(10))
print(B.collect())
#collect() doesn't always re-collect data - bad design!

[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4]


In [66]:
A = sc.parallelize(range(10))

x = 5
B = A.filter(lambda z: z < x)
B.unpersist()

print(B.collect())
x = 3

print(B.collect())
#collect() doesn't always re-collect data - bad design!

[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4]


In [64]:
# RDD variables are references
A = sc.parallelize(range(10))
B = A.map(lambda x: x*2)
A = B.map(lambda x: x+1)
A.take(10)

[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]

In [80]:
# Linear time selection
# NOT cached, so filter condition is changed
data = [34, 67, 21, 56, 47, 89, 12, 44, 74, 43, 26]
A = sc.parallelize(data, 2)
k = 4

while True:
    x = A.first()
    A1 = A.filter(lambda z: z < x)
    A2 = A.filter(lambda z: z > x)
    
    # now run, not A2
    # [21, 12, 26]
    mid = A1.count()
    
    if mid == k:
        print(x)
        break
        
    if k < mid:
        A = A1
    # A = A2 = [34, 43, 44, 47, 56, 67, 74, 89]
    else:
        A = A2
        k = k - mid - 1

67


In [72]:
sorted(data)

[12, 21, 26, 34, 43, 44, 47, 56, 67, 74, 89]

In [81]:
# Linear time selection
# fixed version
# now A is always cached!
data = [34, 67, 21, 56, 47, 89, 12, 44, 74, 43, 26]
A = sc.parallelize(data, 2)
k = 4

while True:
    x = A.first()
    A1 = A.filter(lambda z: z < x)
    A2 = A.filter(lambda z: z > x)
    
    # now run, not A2
    # [21, 12, 26]
    mid = A1.count()
    
    if mid == k:
        print(x)
        break
        
    if k < mid:
        A = A1
    # A = A2 = [34, 43, 44, 47, 56, 67, 74, 89]
    else:
        A = A2
        k = k - mid - 1
    A.cache()

43


### Key-Value Pairs

In [90]:
# reduceByKey
# fruits is a rdd
# every string is one line, so that fruit: (len(fruit), 1)
numFruitsByLength = fruits.map(lambda fruit: (len(fruit), 1)).reduceByKey(lambda x, y: x + y)
numFruitsByLength.collect()

# (length, # of fruits with that length)

[(6, 2), (12, 1), (10, 1), (5, 3), (9, 1)]

In [87]:
from operator import add

lines = sc.textFile('../data/course.txt')
# seperate by white space
counts = lines.flatMap(lambda x: x.split()) \
              .map(lambda x: (x, 1)) \
              .reduceByKey(add)
counts.collect()

[('Course', 2),
 ('Information', 1),
 ('systems,', 1),
 ('cloud', 1),
 ('parallel', 1),
 ('as', 1),
 ('in', 2),
 ('mining', 1),
 ('massive', 1),
 ('amount', 1),
 ('of', 3),
 ('even', 1),
 ('servers', 1),
 ('centers.', 1),
 ('both', 1),
 ('hands-on', 1),
 ('this', 1),
 ('new', 1),
 ('Lecture', 1),
 ('videos', 1),
 ('Description', 1),
 ('Big', 1),
 ('data', 4),
 ('including', 1),
 ('computing', 1),
 ('and', 3),
 ('processing', 1),
 ('frameworks,', 1),
 ('emerge', 1),
 ('enabling', 1),
 ('technologies', 1),
 ('managing', 1),
 ('the', 2),
 ('across', 1),
 ('hundreds', 1),
 ('or', 1),
 ('thousands', 1),
 ('commodity', 1),
 ('This', 1),
 ('course', 1),
 ('exposes', 1),
 ('students', 1),
 ('to', 1),
 ('theory', 1),
 ('experience', 1),
 ('technology.', 1)]

In [91]:
# sort by frequency
# every element rdd, extract 2nd component
# descending order is false
counts.sortBy(lambda x: x[1], False).collect()

[('data', 4),
 ('of', 3),
 ('and', 3),
 ('Course', 2),
 ('in', 2),
 ('the', 2),
 ('Information', 1),
 ('systems,', 1),
 ('cloud', 1),
 ('parallel', 1),
 ('as', 1),
 ('mining', 1),
 ('massive', 1),
 ('amount', 1),
 ('even', 1),
 ('servers', 1),
 ('centers.', 1),
 ('both', 1),
 ('hands-on', 1),
 ('this', 1),
 ('new', 1),
 ('Lecture', 1),
 ('videos', 1),
 ('Description', 1),
 ('Big', 1),
 ('including', 1),
 ('computing', 1),
 ('processing', 1),
 ('frameworks,', 1),
 ('emerge', 1),
 ('enabling', 1),
 ('technologies', 1),
 ('managing', 1),
 ('across', 1),
 ('hundreds', 1),
 ('or', 1),
 ('thousands', 1),
 ('commodity', 1),
 ('This', 1),
 ('course', 1),
 ('exposes', 1),
 ('students', 1),
 ('to', 1),
 ('theory', 1),
 ('experience', 1),
 ('technology.', 1)]

In [92]:
# Join simple example

products = sc.parallelize([(1, "Apple"), (2, "Orange"), (3, "TV"), (5, "Computer")])
#trans = sc.parallelize([(1, 134, "OK"), (3, 34, "OK"), (5, 162, "Error"), (1, 135, "OK"), (2, 53, "OK"), (1, 45, "OK")])
trans = sc.parallelize([(1, (134, "OK")), (3, (34, "OK")), (5, (162, "Error")), (1, (135, "OK")), (2, (53, "OK")), (1, (45, "OK"))])

print(products.join(trans).collect())

[(1, ('Apple', (134, 'OK'))), (1, ('Apple', (135, 'OK'))), (1, ('Apple', (45, 'OK'))), (2, ('Orange', (53, 'OK'))), (3, ('TV', (34, 'OK'))), (5, ('Computer', (162, 'Error')))]


### K-means clustering

In [93]:
import numpy as np

def parseVector(line):
    return np.array([float(x) for x in line.split()])

def closestPoint(p, centers):
    bestIndex = 0
    closest = float("+inf")
    for i in range(len(centers)):
        tempDist = np.sum((p - centers[i]) ** 2)
        if tempDist < closest:
            closest = tempDist
            bestIndex = i
    return bestIndex

# The data file can be downloaded at http://www.cse.ust.hk/msbd5003/data/kmeans_data.txt
lines = sc.textFile('../data/kmeans_data.txt', 5)  

# The data file can be downloaded at http://www.cse.ust.hk/msbd5003/data/kmeans_bigdata.txt
# lines = sc.textFile('../data/kmeans_bigdata.txt', 5)  
# lines is an RDD of strings
K = 3
convergeDist = 0.01  
# terminate algorithm when the total distance from old center to new centers is less than this value

data = lines.map(parseVector).cache() # data is an RDD of arrays

# takeSample is action of RDD
kCenters = data.takeSample(False, K, 1)  # intial centers as a list of arrays
tempDist = 1.0  # total distance from old centers to new centers

while tempDist > convergeDist:
    closest = data.map(lambda p: (closestPoint(p, kCenters), (p, 1)))
    # for each point in data, find its closest center
    # closest is an RDD of tuples (index of closest center, (point, 1))
        
    pointStats = closest.reduceByKey(lambda p1, p2: (p1[0] + p2[0], p1[1] + p2[1]))
    # pointStats is an RDD of tuples (index of center,
    # (array of sums of coordinates, total number of points assigned))
    
                                                # (0, ((1, 2, 1), 1))
    newCenters = pointStats.map(lambda st: (st[0], st[1][0] / st[1][1])).collect()
    # compute the new centers
    
    tempDist = sum(np.sum((kCenters[i] - p) ** 2) for (i, p) in newCenters)
    # compute the total disctance from old centers to new centers
    
    for (i, p) in newCenters:
        kCenters[i] = p
        
print("Final centers: ", kCenters)


Final centers:  [array([0.1       , 0.33333333, 0.23333333]), array([9.05, 3.05, 4.65]), array([9.2, 2.2, 9.2])]


### PageRank

In [2]:
import re
from operator import add

def computeContribs(urls, rank):
    # Calculates URL contributions to the rank of other URLs.
    num_urls = len(urls)
    for url in urls:
        yield (url, rank / num_urls)

def parseNeighbors(urls):
    # Parses a urls pair string into urls pair."""
    parts = urls.split(' ')
    return parts[0], parts[1]

# Loads in input file. It should be in format of:
#     URL         neighbor URL
#     URL         neighbor URL
#     URL         neighbor URL
#     ...

# The data file can be downloaded at http://www.cse.ust.hk/msbd5003/data/*
lines = sc.textFile("../data/pagerank_data.txt", 2)
# lines = sc.textFile("../data/dblp.in", 5)

numOfIterations = 10

# Loads all URLs from input file and initialize their neighbors. 
links = lines.map(lambda urls: parseNeighbors(urls)) \
             .groupByKey()
            # ((1, 1), (1, 2), (2, 1), (2, 3)) -> ((1, (1, 2)), (2, (1, 3)))
            #                                      ((1, 1.0), (2, 1.0)) 1.0 is rank?

# Loads all URLs with other URL(s) link to from input file 
# and initialize ranks of them to one.
ranks = links.mapValues(lambda neighbors: 1.0)

# Calculates and updates URL ranks continuously using PageRank algorithm.
for iteration in range(numOfIterations):
    # Calculates URL contributions to the rank of other URLs.
    contribs = links.join(ranks) \
                    .flatMap(lambda url_urls_rank:
                             computeContribs(url_urls_rank[1][0], #(1, (*(3, 2)*, 1.0))
                                             url_urls_rank[1][1]))#(1, ((3, 2), *1.0)*) -> (3, 0.5), (2, 0.5)
    # After the join, each element in the RDD is of the form
    # (url, (list of neighbor urls, rank))
    
    # Re-calculates URL ranks based on neighbor contributions.
    # ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
    
    # ranks = contribs.reduceByKey(add).map(lambda (url, rank): (url, rank * 0.85 + 0.15))
    # https://stackoverflow.com/questions/15712210/python-3-2-lambda-syntax-error
    # if the argument is a tuple, lambda (url, rank) unpacks automatically in Python 2 like the above code
    # no more in Python 3, though
    ranks = contribs.reduceByKey(add).map(lambda urlrank: (urlrank[0], urlrank[1] * 0.85 + 0.15))

print(ranks.top(5, lambda x: x[1]))

[('1', 1.2981882732854677), ('4', 0.9999999999999998), ('3', 0.9999999999999998), ('2', 0.7018117267145316)]


### Join vs. Broadcast Variables

In [35]:
products = sc.parallelize([(1, "Apple"), (2, "Orange"), (3, "TV"), (5, "Computer")])
trans = sc.parallelize([(1, (134, "OK")), (3, (34, "OK")), (5, (162, "Error")), (1, (135, "OK")), (2, (53, "OK")), (1, (45, "OK"))])

print(trans.join(products).collect())


[(1, ((134, 'OK'), 'Apple')), (1, ((135, 'OK'), 'Apple')), (1, ((45, 'OK'), 'Apple')), (2, ((53, 'OK'), 'Orange')), (3, ((34, 'OK'), 'TV')), (5, ((162, 'Error'), 'Computer'))]


In [36]:
products = {1: "Apple", 2: "Orange", 3: "TV", 5: "Computer"}
trans = sc.parallelize([(1, (134, "OK")), (3, (34, "OK")), (5, (162, "Error")), (1, (135, "OK")), (2, (53, "OK")), (1, (45, "OK"))])

# broadcasted_products = sc.broadcast(products)

def f(x):
    return (x[0], broadcasted_products.value[x[0]], x[1])

# results = trans.map(lambda x: (x[0], broadcasted_products.value[x[0]], x[1]))
results = trans.map(lambda x: (x[0], products[x[0]], x[1]))
print(results.collect())


[(1, 'Apple', (134, 'OK')), (3, 'TV', (34, 'OK')), (5, 'Computer', (162, 'Error')), (1, 'Apple', (135, 'OK')), (2, 'Orange', (53, 'OK')), (1, 'Apple', (45, 'OK'))]
