Now that you have created a Spark cluster, let us understand some basics of working with Spark on HDInsight. For detailed discussion on working with Spark, see [Spark Programming Guide](http://spark.apache.org/docs/2.0.0/sql-programming-guide.html).

----------
## Notebook setup

When using PySpark kernel notebooks on HDInsight, there is no need to create a SparkContext or a SparkSession; a SparkSession which has the SparkContext is created for you automatically when you run the first code cell, and you'll be able to see the progress printed. The contexts are created with the following variable names:
- SparkSession (spark)
- SparkContext (sc)

To run the cells below, place the cursor in the cell and then press **SHIFT + ENTER**.

## How do I make an Rsfas DD?

RDDs can be created from stable storage or by transforming other RDDs. Run the cells below to create RDDs from the sample data files available in the storage container associated with your Spark cluster. One such sample data file is available on the cluster at `wasb:///example/data/fruits.txt`.  The /// notation reads data from the default container.

In [None]:
fruits = sc.textFile('wasb:///example/data/fruits.txt')
yellowThings = sc.textFile('wasb:///example/data/yellowthings.txt')
fruits.collect()

In [29]:
# In local mode:
fruits = sc.textFile('../data/fruits.txt')
yellowThings = sc.textFile('../data/yellowthings.txt')
print fruits.collect()
print yellowThings.collect()

[u'apple', u'banana', u'canary melon', u'grape', u'lemon', u'orange', u'pineapple', u'strawberry']
[u'banana', u'bee', u'butter', u'canary melon', u'gold', u'lemon', u'pineapple', u'sunflower']


In [None]:
# You can also read from other containers.
# The 'cluster' container under the storage account 'msbd' has been made public.
# Use the following format to read data from a public container
# The file can also be accessed from the web at: 
# https://msbd.blob.core.windows.net/cluster/data/course.txt

txtfile = sc.textFile('wasb://cluster@msbd.blob.core.windows.net/data/course.txt')
txtfile.collect()

----------
## PySpark magics 

The PySpark kernel provides some predefined “magics”, which are special commands that you can call with `%%` (e.g. `%%MAGIC` <args>). The magic command must be the first word in a code cell and allow for multiple lines of content. You can’t put comments before a cell magic.

For more information on magics, see [here](http://ipython.readthedocs.org/en/stable/interactive/magics.html).

In [None]:
%%info

### Session configuration (%%configure)
 
Use the `%%configure` magic to configure new or existing Livy sessions.
* If a session is already running, you can change the configuration by using the `-f` argument with `%%configure` magic. This will delete the current session and recreate it with the applied configurations. If you don't provide the `-f` argument, an error will be displayed and no configuration changes will be applied.
* If you haven't already started the session, then the `-f` argument is not mandatory. Even if you use it with a session that you are just creating, it will not delete any currently running sessions.

These are some session attributes that can be used for configuration 
- **"name"**: Name of the application
- **"driverMemory"**: Memory for driver (e.g. 1000M, 2G) 
- **"executorMemory"**: Memory for executor (e.g. 1000M, 2G) 
- **"executorCores"**: Number of cores used by executor

In [None]:
%%configure -f
{"executorCores":4}

----------

##  RDD operations

In [6]:
# map
fruitsReversed = fruits.map(lambda fruit: fruit[::-1])

In [12]:
fruitsReversed.unpersist()
# try changing the file and re-execute with and without cache
fruitsReversed.collect()

[u'elppa', u'ananab', u'nolem yranac', u'eparg', u'nomel', u'egnaro', u'elppaenip', u'yrrebwarts']

In [13]:
# filter
shortFruits = fruits.filter(lambda fruit: len(fruit) <= 5)
shortFruits.collect()

[u'apple', u'grape', u'lemon']

In [14]:
# flatMap
characters = fruits.flatMap(lambda fruit: list(fruit))
characters.collect()

[u'a', u'p', u'p', u'l', u'e', u'b', u'a', u'n', u'a', u'n', u'a', u'c', u'a', u'n', u'a', u'r', u'y', u' ', u'm', u'e', u'l', u'o', u'n', u'g', u'r', u'a', u'p', u'e', u'l', u'e', u'm', u'o', u'n', u'o', u'r', u'a', u'n', u'g', u'e', u'p', u'i', u'n', u'e', u'a', u'p', u'p', u'l', u'e', u's', u't', u'r', u'a', u'w', u'b', u'e', u'r', u'r', u'y']

In [15]:
# union
fruitsAndYellowThings = fruits.union(yellowThings)
fruitsAndYellowThings.collect()

[u'apple', u'banana', u'canary melon', u'grape', u'lemon', u'orange', u'pineapple', u'strawberry', u'banana', u'bee', u'butter', u'canary melon', u'gold', u'lemon', u'pineapple', u'sunflower']

In [16]:
# intersection
yellowFruits = fruits.intersection(yellowThings)
yellowFruits.collect()

[u'lemon', u'canary melon', u'banana', u'pineapple']

In [17]:
# distinct
distinctFruitsAndYellowThings = fruitsAndYellowThings.distinct()
distinctFruitsAndYellowThings.collect()

[u'orange', u'grape', u'lemon', u'butter', u'canary melon', u'strawberry', u'apple', u'banana', u'sunflower', u'gold', u'bee', u'pineapple']

### RDD actions
Following are examples of some of the common actions available. For a detailed list, see [RDD Actions](https://spark.apache.org/docs/2.0.0/programming-guide.html#actions).

Run some transformations below to understand this better. Place the cursor in the cell and press **SHIFT + ENTER**.

In [18]:
# collect
fruitsArray = fruits.collect()
yellowThingsArray = yellowThings.collect()
fruitsArray

[u'apple', u'banana', u'canary melon', u'grape', u'lemon', u'orange', u'pineapple', u'strawberry']

In [19]:
# count
numFruits = fruits.count()
numFruits

8

In [20]:
# take
first3Fruits = fruits.take(3)
first3Fruits

[u'apple', u'banana', u'canary melon']

In [23]:
# reduce
letterSet = fruits.map(lambda fruit: set(fruit)).reduce(lambda x, y: x.union(y))
letterSet

set([u'a', u' ', u'c', u'b', u'e', u'g', u'i', u'm', u'l', u'o', u'n', u'p', u's', u'r', u't', u'w', u'y'])

In [24]:
letterSet = fruits.flatMap(lambda fruit: list(fruit)).distinct().collect()
letterSet

[u'a', u'c', u'e', u'g', u'i', u'm', u'o', u's', u'w', u'y', u' ', u'b', u'l', u'n', u'p', u'r', u't']

### Closure

In [3]:
counter = 0
rdd = sc.parallelize(xrange(10))

# Wrong: Don't do this!!
def increment_counter(x):
    global counter
    counter += x

rdd.foreach(increment_counter)

print (counter)

NameError: name 'sc' is not defined

In [2]:
rdd = sc.parallelize(xrange(10))
accum = sc.accumulator(0)

def g(x):
    global accum
    accum += x

a = rdd.foreach(g)

print (accum.value)

SyntaxError: Missing parentheses in call to 'print' (<ipython-input-2-fa64925ad44f>, line 10)

In [11]:
rdd = sc.parallelize(xrange(10))
accum = sc.accumulator(0)

def g(x):
    global accum
    accum += x
    return x * x

a = rdd.map(g)
print accum.value

tmp = a.count()
print tmp
print rdd.reduce(add)

tmp = a.count()
print tmp
print rdd.reduce(add)

0
10
45
10
45


In [4]:
from operator import add

rdd = sc.parallelize(xrange(10))

print rdd.sum()

45


In [17]:
A = sc.parallelize(xrange(10))

print A.collect()

x = 5
B = A.filter(lambda z: z < x)
print B.take(10)
x = 3
print B.count()
print B.collect() 
#print B.take(10)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[0, 1, 2, 3, 4]
3
[0, 1, 2]


In [None]:
A = sc.parallelize(xrange(10))
B = A.map(lambda x: x*2)
A = B.map(lambda x: x+1)
A.take(10)

In [21]:
# Linear-time selection

data = [34, 67, 21, 56, 47, 89, 12, 44, 74, 43, 26]
A = sc.parallelize(data,2)
k = 4

while True:
    x = A.first()
    A1 = A.filter(lambda z: z < x)
    A2 = A.filter(lambda z: z > x)
    mid = A1.count()
    if mid == k:
        print x
        break
    if k < mid:
        A = A1
    else:
        A = A2
        k = k - mid - 1
    A.cache()


43


In [13]:
sorted(data)

[12, 21, 26, 34, 43, 44, 47, 56, 67, 74, 89]

### Computing Pi using Monte Carlo simulation

In [24]:
# From the official spark examples.

import sys
import random

partitions = 1000
n = 100000 * partitions

def f(_):
    x = random.random()
    y = random.random()
    return 1 if x ** 2 + y ** 2 < 1 else 0

count = sc.parallelize(xrange(1, n + 1), partitions) \
          .map(f).reduce(lambda a,b: a+b)

print "Pi is roughly", 4.0 * count / n

Pi is roughly 3.144


In [28]:
a = sc.parallelize(xrange(0,20),2)
a.map(lambda x: random()).glom().collect()

[[0.046309989645488825, 0.040280274054448006, 0.9646375753621728, 0.8907935522970499, 0.21676368527090806, 0.05073359963379376, 0.8263655725746003, 0.04399358736126835, 0.528015502915553, 0.1860305719880666], [0.046309989645488825, 0.040280274054448006, 0.9646375753621728, 0.8907935522970499, 0.21676368527090806, 0.05073359963379376, 0.8263655725746003, 0.04399358736126835, 0.528015502915553, 0.1860305719880666]]

In [26]:
# Correct version

partitions = 1000
n = 100000 * partitions

def f(index, it):
    random.seed(index + 987236)
    for i in it:
        x = random.random()
        y = random.random()
        yield 1 if x ** 2 + y ** 2 < 1 else 0

count = sc.parallelize(xrange(1, n + 1), partitions) \
          .mapPartitionsWithIndex(f).reduce(lambda a,b: a+b)

print "Pi is roughly", 4.0 * count / n

Pi is roughly 3.14152072


### Key-Value Pairs

In [30]:
# reduceByKey
numFruitsByLength = fruits.map(lambda fruit: (len(fruit), 1)).reduceByKey(lambda x, y: x + y)
numFruitsByLength.collect()

[(10, 1), (12, 1), (6, 2), (9, 1), (5, 3)]

In [31]:
from operator import add

lines = sc.textFile('../data/course.txt')
counts = lines.flatMap(lambda x: x.split()) \
              .map(lambda x: (x, 1)) \
              .reduceByKey(add)
counts.collect()

[(u'and', 3), (u'videos', 1), (u'exposes', 1), (u'as', 1), (u'including', 1), (u'frameworks,', 1), (u'cloud', 1), (u'even', 1), (u'managing', 1), (u'data', 4), (u'students', 1), (u'systems,', 1), (u'thousands', 1), (u'mining', 1), (u'This', 1), (u'technologies', 1), (u'hands-on', 1), (u'commodity', 1), (u'this', 1), (u'experience', 1), (u'enabling', 1), (u'centers.', 1), (u'amount', 1), (u'the', 2), (u'Information', 1), (u'computing', 1), (u'servers', 1), (u'course', 1), (u'in', 2), (u'Lecture', 1), (u'Description', 1), (u'Big', 1), (u'to', 1), (u'new', 1), (u'across', 1), (u'theory', 1), (u'processing', 1), (u'hundreds', 1), (u'parallel', 1), (u'both', 1), (u'technology.', 1), (u'of', 3), (u'emerge', 1), (u'Course', 2), (u'massive', 1), (u'or', 1)]

In [33]:
counts.sortBy(lambda x: x[0], False).collect()

[(u'videos', 1), (u'to', 1), (u'thousands', 1), (u'this', 1), (u'theory', 1), (u'the', 2), (u'technology.', 1), (u'technologies', 1), (u'systems,', 1), (u'students', 1), (u'servers', 1), (u'processing', 1), (u'parallel', 1), (u'or', 1), (u'of', 3), (u'new', 1), (u'mining', 1), (u'massive', 1), (u'managing', 1), (u'including', 1), (u'in', 2), (u'hundreds', 1), (u'hands-on', 1), (u'frameworks,', 1), (u'exposes', 1), (u'experience', 1), (u'even', 1), (u'enabling', 1), (u'emerge', 1), (u'data', 4), (u'course', 1), (u'computing', 1), (u'commodity', 1), (u'cloud', 1), (u'centers.', 1), (u'both', 1), (u'as', 1), (u'and', 3), (u'amount', 1), (u'across', 1), (u'This', 1), (u'Lecture', 1), (u'Information', 1), (u'Description', 1), (u'Course', 2), (u'Big', 1)]

In [35]:
# Join simple example

products = sc.parallelize([(1, "Apple"), (2, "Orange"), (3, "TV"), (5, "Computer")])
#trans = sc.parallelize([(1, 134, "OK"), (3, 34, "OK"), (5, 162, "Error"), (1, 135, "OK"), (2, 53, "OK"), (1, 45, "OK")])
trans = sc.parallelize([(1, (134, "OK")), (3, (34, "OK")), (5, (162, "Error")), (1, (135, "OK")), (2, (53, "OK")), (1, (45, "OK"))])

print products.join(trans).collect()

[(1, ('Apple', (134, 'OK'))), (1, ('Apple', (135, 'OK'))), (1, ('Apple', (45, 'OK'))), (2, ('Orange', (53, 'OK'))), (3, ('TV', (34, 'OK'))), (5, ('Computer', (162, 'Error')))]


### K-means clustering

In [36]:
import numpy as np

def parseVector(line):
    return np.array([float(x) for x in line.split(' ')])

def closestPoint(p, centers):
    bestIndex = 0
    closest = float("+inf")
    for i in range(len(centers)):
        tempDist = np.sum((p - centers[i]) ** 2)
        if tempDist < closest:
            closest = tempDist
            bestIndex = i
    return bestIndex

# The data file can be downloaded at http://www.cse.ust.hk/msbd5003/data/kmeans_data.txt
lines = sc.textFile('../data/kmeans_data.txt', 5)  

# The data file can be downloaded at http://www.cse.ust.hk/msbd5003/data/kmeans_bigdata.txt
# lines = sc.textFile('../data/kmeans_bigdata.txt', 5)  
# lines is an RDD of strings
K = 3
convergeDist = 0.01  
# terminate algorithm when the total distance from old center to new centers is less than this value

data = lines.map(parseVector).cache() # data is an RDD of arrays

kCenters = data.takeSample(False, K, 1)  # intial centers as a list of arrays
tempDist = 1.0  # total distance from old centers to new centers

while tempDist > convergeDist:
    closest = data.map(lambda p: (closestPoint(p, kCenters), (p, 1)))
    # for each point in data, find its closest center
    # closest is an RDD of tuples (index of closest center, (point, 1))
        
    pointStats = closest.reduceByKey(lambda p1, p2: (p1[0] + p2[0], p1[1] + p2[1]))
    # pointStats is an RDD of tuples (index of center,
    # (array of sums of coordinates, total number of points assigned))
    
    newCenters = pointStats.map(lambda st: (st[0], st[1][0] / st[1][1])).collect()
    # compute the new centers
    
    tempDist = sum(np.sum((kCenters[i] - p) ** 2) for (i, p) in newCenters)
    # compute the total disctance from old centers to new centers
    
    for (i, p) in newCenters:
        kCenters[i] = p
        
print "Final centers: ", kCenters


Final centers:  [array([ 0.05,  0.3 ,  0.05]), array([ 0.2,  0.4,  0.6]), array([ 9.1       ,  2.76666667,  6.16666667])]


### PageRank

In [21]:
import re
from operator import add
from pyspark import SparkContext

#sc = SparkContext()
def computeContribs(urls, rank):
    # Calculates URL contributions to the rank of other URLs.
    num_urls = len(urls)
    for url in urls:
        yield (url, rank / num_urls)

def parseNeighbors(urls):
    # Parses a urls pair string into urls pair."""
    parts = urls.split(' ')
    return parts[0], parts[1]

# Loads in input file. It should be in format of:
#     URL         neighbor URL
#     URL         neighbor URL
#     URL         neighbor URL
#     ...

# The data file can be downloaded at http://www.cse.ust.hk/msbd5003/data/*
lines = sc.textFile("pagerank_data.txt", 2)
# lines = sc.textFile("../data/dblp.in", 5)

numOfIterations = 10

# Loads all URLs from input file and initialize their neighbors. 
links = lines.map(lambda urls: parseNeighbors(urls)) \
             .groupByKey()

# Loads all URLs with other URL(s) link to from input file 
# and initialize ranks of them to one.
ranks = links.mapValues(lambda neighbors: 1.0)

# Calculates and updates URL ranks continuously using PageRank algorithm.
for iteration in range(numOfIterations):
    # Calculates URL contributions to the rank of other URLs.
    contribs = links.join(ranks) \
                    .flatMap(lambda url_urls_rank:
                             computeContribs(url_urls_rank[1][0],
                                             url_urls_rank[1][1]))
    # After the join, each element in the RDD is of the form
    # (url, (list of neighbor urls, rank))
    
    # Re-calculates URL ranks based on neighbor contributions.
    # ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
ranks = contribs.reduceByKey(add).map(lambda url, rank: (url, rank * 0.85 + 0.15)).collect()

print (ranks.top(5, lambda x: x[1]))


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 123, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver 3.5, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:395)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:458)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 123, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver 3.5, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:395)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


### Join vs. Broadcast Variables

In [3]:
products = sc.parallelize([(1, "Apple"), (2, "Orange"), (3, "TV"), (5, "Computer")])
trans = sc.parallelize([(1, (134, "OK")), (3, (34, "OK")), (5, (162, "Error")), (1, (135, "OK")), (2, (53, "OK")), (1, (45, "OK"))])

print trans.join(products).collect()


[(1, ((134, 'OK'), 'Apple')), (1, ((135, 'OK'), 'Apple')), (1, ((45, 'OK'), 'Apple')), (2, ((53, 'OK'), 'Orange')), (3, ((34, 'OK'), 'TV')), (5, ((162, 'Error'), 'Computer'))]


In [None]:
products = {1: "Apple", 2: "Orange", 3: "TV", 5: "Computer"}
trans = sc.parallelize([(1, (134, "OK")), (3, (34, "OK")), (5, (162, "Error")), (1, (135, "OK")), (2, (53, "OK")), (1, (45, "OK"))])

broadcasted_products = sc.broadcast(products)

results = trans.map(lambda x: (x[0], broadcasted_products.value[x[0]], x[1]))
# results = trans.map(lambda x: (x[0], products[x[0]], x[1]))
print results.collect()
