## How do I make an RDD?

RDDs can be created from stable storage or by transforming other RDDs. Run the cells below to create RDDs from files on the local drive.  All data files can be downloaded from https://www.cse.ust.hk/msbd5003/data/

In [6]:
# Read data from local file system:
fruits = sc.textFile('/Users/gardasnagarjun/Documents/fruits.txt')
yellowThings = sc.textFile('/Users/gardasnagarjun/Documents/yellowthings.txt')
print fruits.collect()
print yellowThings.collect()

[u'isfsfapple', u'banana', u'canary melon', u'grape', u'lemon', u'orange', u'pineapple', u'strawberry']
[u'banana', u'bee', u'butter', u'canary melon', u'gold', u'lemon', u'pineapple', u'sunflower']


In [2]:
# Read data from HDFS :
fruits = sc.textFile('hdfs://url:9000/pathname/fruits.txt')
fruits.collect()

IllegalArgumentException: u'java.net.UnknownHostException: url'

----------

##  RDD operations

In [7]:
# map
fruitsReversed = fruits.map(lambda fruit: fruit[::-1])

In [9]:
fruitsReversed.unpersist()
# try changing the file and re-execute with and without cache
fruitsReversed.collect()

[u'elppafsfsi',
 u'ananab',
 u'nolem yranac',
 u'eparg',
 u'nomel',
 u'egnaro',
 u'elppaenip',
 u'yrrebwarts']

In [9]:
# filter
shortFruits = fruits.filter(lambda fruit: len(fruit) <= 5)
shortFruits.collect()

[u'apple', u'grape', u'lemon']

In [10]:
# flatMap
characters = fruits.flatMap(lambda fruit: list(fruit))
characters.collect()

[u'i',
 u's',
 u'f',
 u's',
 u'f',
 u'a',
 u'p',
 u'p',
 u'l',
 u'e',
 u'b',
 u'a',
 u'n',
 u'a',
 u'n',
 u'a',
 u'c',
 u'a',
 u'n',
 u'a',
 u'r',
 u'y',
 u' ',
 u'm',
 u'e',
 u'l',
 u'o',
 u'n',
 u'g',
 u'r',
 u'a',
 u'p',
 u'e',
 u'l',
 u'e',
 u'm',
 u'o',
 u'n',
 u'o',
 u'r',
 u'a',
 u'n',
 u'g',
 u'e',
 u'p',
 u'i',
 u'n',
 u'e',
 u'a',
 u'p',
 u'p',
 u'l',
 u'e',
 u's',
 u't',
 u'r',
 u'a',
 u'w',
 u'b',
 u'e',
 u'r',
 u'r',
 u'y']

In [11]:
# union
fruitsAndYellowThings = fruits.union(yellowThings)
fruitsAndYellowThings.collect()

[u'isfsfapple',
 u'banana',
 u'canary melon',
 u'grape',
 u'lemon',
 u'orange',
 u'pineapple',
 u'strawberry',
 u'banana',
 u'bee',
 u'butter',
 u'canary melon',
 u'gold',
 u'lemon',
 u'pineapple',
 u'sunflower']

In [12]:
# intersection
yellowFruits = fruits.intersection(yellowThings)
yellowFruits.collect()

[u'lemon', u'canary melon', u'banana', u'pineapple']

In [13]:
# distinct
distinctFruitsAndYellowThings = fruitsAndYellowThings.distinct()
distinctFruitsAndYellowThings.collect()

[u'orange',
 u'lemon',
 u'grape',
 u'butter',
 u'canary melon',
 u'strawberry',
 u'isfsfapple',
 u'banana',
 u'sunflower',
 u'gold',
 u'bee',
 u'pineapple']

### RDD actions
Following are examples of some of the common actions available. For a detailed list, see [RDD Actions](https://spark.apache.org/docs/2.0.0/programming-guide.html#actions).

Run some transformations below to understand this better. Place the cursor in the cell and press **SHIFT + ENTER**.

In [14]:
# collect
fruitsArray = fruits.collect()
yellowThingsArray = yellowThings.collect()
fruitsArray

[u'isfsfapple',
 u'banana',
 u'canary melon',
 u'grape',
 u'lemon',
 u'orange',
 u'pineapple',
 u'strawberry']

In [15]:
# count
numFruits = fruits.count()
numFruits

8

In [16]:
# take
first3Fruits = fruits.take(3)
first3Fruits

[u'isfsfapple', u'banana', u'canary melon']

In [17]:
# reduce
letterSet = fruits.map(lambda fruit: set(fruit)).reduce(lambda x, y: x.union(y))
letterSet

{u' ',
 u'a',
 u'b',
 u'c',
 u'e',
 u'f',
 u'g',
 u'i',
 u'l',
 u'm',
 u'n',
 u'o',
 u'p',
 u'r',
 u's',
 u't',
 u'w',
 u'y'}

In [18]:
letterSet = fruits.flatMap(lambda fruit: list(fruit)).distinct().collect()
letterSet

[u'a',
 u'c',
 u'e',
 u'g',
 u'i',
 u'm',
 u'o',
 u's',
 u'w',
 u'y',
 u' ',
 u'b',
 u'f',
 u'l',
 u'n',
 u'p',
 u'r',
 u't']

### Closure

In [19]:
counter = 0
increment = 10
rdd = sc.parallelize(xrange(10))

# Wrong: Don't do this!!
def increment_counter(x):
    global counter
    counter += x

print rdd.collect()
rdd.foreach(increment_counter)

print counter

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
0


In [20]:
rdd = sc.parallelize(xrange(10))
accum = sc.accumulator(0)

def g(x):
    global accum
    accum += x

a = rdd.foreach(g)

print accum.value

45


In [29]:
rdd = sc.parallelize(xrange(10))
accum = sc.accumulator(0)

def g(x):
    global accum
    accum += x
    return x * x

a = rdd.map(g)
#print a.collect()
print accum.value
print rdd.reduce(lambda x, y: x+y)
a.cache()
tmp = a.count()
print accum.value
print rdd.reduce(lambda x, y: x+y)

tmp = a.count()
print accum.value
print rdd.reduce(lambda x, y: x+y)


0
45
45
45
45
45


In [1]:
from operator import add

rdd = sc.parallelize(xrange(10))

print rdd.sum()

45


### Computing Pi using Monte Carlo simulation

In [30]:
# From the official spark examples.

import sys
import random

partitions = 100
n = 100000 * partitions

def f(_):
    x = random.random()
    y = random.random()
    return 1 if x ** 2 + y ** 2 < 1 else 0

count = sc.parallelize(xrange(1, n + 1), partitions) \
          .map(f).sum()

print "Pi is roughly", 4.0 * count / n

Pi is roughly 3.14392


In [1]:
#Example of mapPartition snd mapPartitionWithIndex
import sys
import random
a = sc.parallelize(xrange(0,20),4)
print a.collect()
print a.glom().collect()
def f(it):
    s = 0
    for i in it:
        s += i
        yield s
        
print a.mapPartitions(f).glom().collect()

def f(index, it):
    s = index
    for i in it:
        s += i
        yield s

print a.mapPartitionsWithIndex(f).collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[[0, 1, 2, 3, 4], [5, 6, 7, 8, 9], [10, 11, 12, 13, 14], [15, 16, 17, 18, 19]]
[[0, 1, 3, 6, 10], [5, 11, 18, 26, 35], [10, 21, 33, 46, 60], [15, 31, 48, 66, 85]]
[0, 1, 3, 6, 10, 6, 12, 19, 27, 36, 12, 23, 35, 48, 62, 18, 34, 51, 69, 88]


In [2]:
# Correct version
import random

partitions = 100
n = 1000 * partitions

def f(index, it):
    random.seed(index + 987236)
    for i in it:
        x = random.random()
        y = random.random()
        yield 1 if x ** 2 + y ** 2 < 1 else 0

count = sc.parallelize(xrange(1, n + 1), partitions) \
          .mapPartitionsWithIndex(f).sum()

print "Pi is roughly", 4.0 * count / n

Pi is roughly 3.13028


### Closure and Persistence

In [39]:
A = sc.parallelize(xrange(10))

x = 5
B = A.filter(lambda z: z < x)
B.persist()
print B.take(10)
# print B.collect()
x = 3
print B.take(10) 
#print B.collect()
# collect() doesn't always re-collect data - bad design!

[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4]


In [3]:
# RDD variables are references
A = sc.parallelize(xrange(10))
B = A.map(lambda x: x*2)
A = B.map(lambda x: x+1)
A.take(10)

[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]

In [4]:
# Linear-time selection

data = [34, 67, 21, 56, 47, 89, 12, 44, 74, 43, 26]
A = sc.parallelize(data,2)
k = 4

while True:
    x = A.first()
    A1 = A.filter(lambda z: z < x)
    A2 = A.filter(lambda z: z > x)
    mid = A1.count()
    if mid == k:
        print x
        break
    
    if k < mid:
        A = A1
    else:
        A = A2
        k = k - mid - 1
    A.cache()

43


In [5]:
sorted(data)

[12, 21, 26, 34, 43, 44, 47, 56, 67, 74, 89]

### Key-Value Pairs

In [25]:
# reduceByKey
numFruitsByLength = fruits.map(lambda fruit: (len(fruit), 1)).reduceByKey(lambda x, y: x + y)
numFruitsByLength.collect()

[(10, 1), (12, 1), (6, 2), (9, 1), (5, 3)]

In [23]:
from operator import add

lines = sc.textFile('../data/course.txt')
counts = lines.flatMap(lambda x: x.split()) \
              .map(lambda x: (x, 1)) \
              .reduceByKey(add)
counts.collect()

[(u'and', 3), (u'videos', 1), (u'exposes', 1), (u'as', 1), (u'including', 1), (u'frameworks,', 1), (u'cloud', 1), (u'even', 1), (u'managing', 1), (u'data', 4), (u'students', 1), (u'systems,', 1), (u'thousands', 1), (u'mining', 1), (u'This', 1), (u'technologies', 1), (u'hands-on', 1), (u'commodity', 1), (u'this', 1), (u'experience', 1), (u'enabling', 1), (u'centers.', 1), (u'amount', 1), (u'the', 2), (u'Information', 1), (u'computing', 1), (u'servers', 1), (u'course', 1), (u'in', 2), (u'Lecture', 1), (u'Description', 1), (u'Big', 1), (u'to', 1), (u'new', 1), (u'across', 1), (u'theory', 1), (u'processing', 1), (u'hundreds', 1), (u'parallel', 1), (u'both', 1), (u'technology.', 1), (u'of', 3), (u'emerge', 1), (u'Course', 2), (u'massive', 1), (u'or', 1)]

In [24]:
counts.sortBy(lambda x: x[1], False).collect()

[(u'data', 4), (u'and', 3), (u'of', 3), (u'the', 2), (u'in', 2), (u'Course', 2), (u'videos', 1), (u'exposes', 1), (u'as', 1), (u'including', 1), (u'frameworks,', 1), (u'cloud', 1), (u'even', 1), (u'managing', 1), (u'students', 1), (u'systems,', 1), (u'thousands', 1), (u'mining', 1), (u'This', 1), (u'technologies', 1), (u'hands-on', 1), (u'commodity', 1), (u'this', 1), (u'experience', 1), (u'enabling', 1), (u'centers.', 1), (u'amount', 1), (u'Information', 1), (u'computing', 1), (u'servers', 1), (u'course', 1), (u'Lecture', 1), (u'Description', 1), (u'Big', 1), (u'to', 1), (u'new', 1), (u'across', 1), (u'theory', 1), (u'processing', 1), (u'hundreds', 1), (u'parallel', 1), (u'both', 1), (u'technology.', 1), (u'emerge', 1), (u'massive', 1), (u'or', 1)]

In [40]:
# Join simple example

products = sc.parallelize([(1, "Apple"), (2, "Orange"), (3, "TV"), (5, "Computer")])
#trans = sc.parallelize([(1, 134, "OK"), (3, 34, "OK"), (5, 162, "Error"), (1, 135, "OK"), (2, 53, "OK"), (1, 45, "OK")])
trans = sc.parallelize([(1, (134, "OK")), (3, (34, "OK")), (5, (162, "Error")), (1, (135, "OK")), (2, (53, "OK")), (1, (45, "OK"))])

print products.join(trans).collect()

[(1, ('Apple', (134, 'OK'))), (1, ('Apple', (135, 'OK'))), (1, ('Apple', (45, 'OK'))), (2, ('Orange', (53, 'OK'))), (3, ('TV', (34, 'OK'))), (5, ('Computer', (162, 'Error')))]


### K-means clustering

In [39]:
import numpy as np

def parseVector(line):
    return np.array([float(x) for x in line.split()])

def closestPoint(p, centers):
    bestIndex = 0
    closest = float("+inf")
    for i in range(len(centers)):
        tempDist = np.sum((p - centers[i]) ** 2)
        if tempDist < closest:
            closest = tempDist
            bestIndex = i
    return bestIndex

# The data file can be downloaded at http://www.cse.ust.hk/msbd5003/data/kmeans_data.txt
lines = sc.textFile('../data/kmeans_data.txt', 5)  

# The data file can be downloaded at http://www.cse.ust.hk/msbd5003/data/kmeans_bigdata.txt
# lines = sc.textFile('../data/kmeans_bigdata.txt', 5)  
# lines is an RDD of strings
K = 3
convergeDist = 0.01  
# terminate algorithm when the total distance from old center to new centers is less than this value

data = lines.map(parseVector).cache() # data is an RDD of arrays

kCenters = data.takeSample(False, K, 1)  # intial centers as a list of arrays
tempDist = 1.0  # total distance from old centers to new centers

while tempDist > convergeDist:
    closest = data.map(lambda p: (closestPoint(p, kCenters), (p, 1)))
    # for each point in data, find its closest center
    # closest is an RDD of tuples (index of closest center, (point, 1))
        
    pointStats = closest.reduceByKey(lambda p1, p2: (p1[0] + p2[0], p1[1] + p2[1]))
    # pointStats is an RDD of tuples (index of center,
    # (array of sums of coordinates, total number of points assigned))
    
    newCenters = pointStats.map(lambda st: (st[0], st[1][0] / st[1][1])).collect()
    # compute the new centers
    
    tempDist = sum(np.sum((kCenters[i] - p) ** 2) for (i, p) in newCenters)
    # compute the total disctance from old centers to new centers
    
    for (i, p) in newCenters:
        kCenters[i] = p
        
print "Final centers: ", kCenters


Final centers:  [array([ 0.05,  0.3 ,  0.05]), array([ 0.2,  0.4,  0.6]), array([ 9.1       ,  2.76666667,  6.16666667])]


### PageRank

In [8]:
import re
from operator import add

def computeContribs(urls, rank):
    # Calculates URL contributions to the rank of other URLs.
    num_urls = len(urls)
    for url in urls:
        yield (url, rank / num_urls)

def parseNeighbors(urls):
    # Parses a urls pair string into urls pair."""
    parts = urls.split(' ')
    return parts[0], parts[1]

# Loads in input file. It should be in format of:
#     URL         neighbor URL
#     URL         neighbor URL
#     URL         neighbor URL
#     ...

# The data file can be downloaded at http://www.cse.ust.hk/msbd5003/data/*
lines = sc.textFile("../data/pagerank_data.txt",2)
# lines = sc.textFile("../data/dblp.in", 5)

numOfIterations = 10

# Loads all URLs from input file and initialize their neighbors. 
links = lines.map(lambda urls: parseNeighbors(urls)) \
             .groupByKey()

# Loads all URLs with other URL(s) link to from input file 
# and initialize ranks of them to one.
ranks = links.mapValues(lambda neighbors: 1.0)

# Calculates and updates URL ranks continuously using PageRank algorithm.
for iteration in range(numOfIterations):
    # Calculates URL contributions to the rank of other URLs.
    contribs = links.join(ranks) \
                    .flatMap(lambda url_urls_rank:
                             computeContribs(url_urls_rank[1][0],
                                             url_urls_rank[1][1]))
    # After the join, each element in the RDD is of the form
    # (url, (list of neighbor urls, rank))
    
    # Re-calculates URL ranks based on neighbor contributions.
    ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
    # ranks = contribs.reduceByKey(add).map(lambda (url, rank): (url, rank * 0.85 + 0.15))

print ranks.top(5, lambda x: x[1])

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 59.0 failed 1 times, most recent failure: Lost task 0.0 in stage 59.0 (TID 50, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/2.4.4/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/usr/local/Cellar/apache-spark/2.4.4/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/Cellar/apache-spark/2.4.4/libexec/python/pyspark/rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/Cellar/apache-spark/2.4.4/libexec/python/pyspark/rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/Cellar/apache-spark/2.4.4/libexec/python/pyspark/rdd.py", line 352, in func
    return f(iterator)
  File "/usr/local/Cellar/apache-spark/2.4.4/libexec/python/pyspark/rdd.py", line 1945, in combine
    merger.mergeValues(iterator)
  File "/usr/local/Cellar/apache-spark/2.4.4/libexec/python/lib/pyspark.zip/pyspark/shuffle.py", line 238, in mergeValues
    for k, v in iterator:
  File "/usr/local/Cellar/apache-spark/2.4.4/libexec/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-8-c847f4085377>", line 28, in <lambda>
  File "<ipython-input-8-c847f4085377>", line 13, in parseNeighbors
IndexError: list index out of range

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/2.4.4/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/usr/local/Cellar/apache-spark/2.4.4/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/Cellar/apache-spark/2.4.4/libexec/python/pyspark/rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/Cellar/apache-spark/2.4.4/libexec/python/pyspark/rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/Cellar/apache-spark/2.4.4/libexec/python/pyspark/rdd.py", line 352, in func
    return f(iterator)
  File "/usr/local/Cellar/apache-spark/2.4.4/libexec/python/pyspark/rdd.py", line 1945, in combine
    merger.mergeValues(iterator)
  File "/usr/local/Cellar/apache-spark/2.4.4/libexec/python/lib/pyspark.zip/pyspark/shuffle.py", line 238, in mergeValues
    for k, v in iterator:
  File "/usr/local/Cellar/apache-spark/2.4.4/libexec/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-8-c847f4085377>", line 28, in <lambda>
  File "<ipython-input-8-c847f4085377>", line 13, in parseNeighbors
IndexError: list index out of range

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


### Join vs. Broadcast Variables

In [41]:
products = sc.parallelize([(1, "Apple"), (2, "Orange"), (3, "TV"), (5, "Computer")])
trans = sc.parallelize([(1, (134, "OK")), (3, (34, "OK")), (5, (162, "Error")), (1, (135, "OK")), (2, (53, "OK")), (1, (45, "OK"))])

print trans.join(products).collect()


[(1, ((134, 'OK'), 'Apple')), (1, ((135, 'OK'), 'Apple')), (1, ((45, 'OK'), 'Apple')), (2, ((53, 'OK'), 'Orange')), (3, ((34, 'OK'), 'TV')), (5, ((162, 'Error'), 'Computer'))]


In [1]:
products = {1: "Apple", 2: "Orange", 3: "TV", 5: "Computer"}
trans = sc.parallelize([(1, (134, "OK")), (3, (34, "OK")), (5, (162, "Error")), (1, (135, "OK")), (2, (53, "OK")), (1, (45, "OK"))])

broadcasted_products = sc.broadcast(products)

results = trans.map(lambda x: (x[0], broadcasted_products.value[x[0]], x[1]))
#  results = trans.map(lambda x: (x[0], products[x[0]], x[1]))
print results.collect()


[(1, 'Apple', (134, 'OK')), (3, 'TV', (34, 'OK')), (5, 'Computer', (162, 'Error')), (1, 'Apple', (135, 'OK')), (2, 'Orange', (53, 'OK')), (1, 'Apple', (45, 'OK'))]
