# Datasets

1. Wikipedia page on Python programming language - **./data/python_wiki.html**

In [1]:
import findspark
findspark.init("../../spark2/")
import pyspark

sc = pyspark.SparkContext(appName="helloworld")

In [2]:
# let's test our setup by counting the number of nonempty lines in a text file
lines = sc.textFile('../data/python_wiki.html')
lines_nonempty = lines.filter( lambda x: len(x) > 0 )
lines_nonempty.count()

1246

In [3]:
# let's test our setup by counting the number of nonempty lines in a text file
lines = sc.textFile('../data/enron.json')
lines_nonempty = lines.filter( lambda x: len(x) > 0 )
lines_nonempty.count()

5929

# Playing with numbers

In [12]:
numRDD = sc.parallelize([1,2,3,4], 2)
numRDD.collect()

[1, 2, 3, 4]

In [13]:
type(numRDD)

pyspark.rdd.RDD

In [14]:
numRDD

ParallelCollectionRDD[13] at parallelize at PythonRDD.scala:475

In [15]:
numRDD.first()

1

In [17]:
numRDD.map(lambda x : x*x).collect()

[1, 4, 9, 16]

In [1]:
import random
n = 1000000
def inside(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1

count = sc.parallelize(range(0, n)) \
             .filter(inside).count()
print("Pi is roughly %f" % (4.0 * count / n))

NameError: name 'sc' is not defined

# PairRDDs

* Some Spark operations are available only on RDDs of key-value pairs
* Most of these operations, except counting operations, usually end up with shuffled data (because all the data for a given key may not be inside the same partition)

Let's look at an example: The Cartesian Operation

The Cartesian operation, applied on a given dataset, takes another dataset as parameter and returns the cartesian product of both the daasets as the output.

If x has m elements and y has n elements, how many elements does their cartesian product have?

In [3]:
x = sc.parallelize([1,2,3])
y = sc.parallelize([10,11,12])

x.cartesian(y).collect()

[(1, 10),
 (1, 11),
 (1, 12),
 (2, 10),
 (2, 11),
 (2, 12),
 (3, 10),
 (3, 11),
 (3, 12)]

## The groupByKey and reduceByKey actions

Let's investigate the additional transformations:
* [groupByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.groupByKey) and
* [reduceByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduceByKey).

Both of these transformations operate on pair RDDs.

* A pair RDD is an RDD where each element is a pair tuple (key, value).
* For example, `sc.parallelize([('a', 1), ('a', 2), ('b', 1)])` would create a pair RDD where the keys are 'a', 'a', 'b' and the values are 1, 2, 1.


* `reduceByKey()` gathers together pairs that have the same key and applies a function to two associated values at a time.
* `reduceByKey()` operates by applying the function first within each partition on a per-key basis and then across the partitions.


* While both the `groupByKey()` and `reduceByKey()` transformations can often be used to solve the same problem and will produce the same answer, the `reduceByKey()` transformation works much better for large distributed datasets.
* This is because Spark knows it can combine output with a common key on each partition *before* shuffling (redistributing) the data across nodes.
* Only use `groupByKey()` if the operation would not benefit from reducing the data before the shuffle occurs.


Look at the diagram below to understand how `reduceByKey` works.

* Notice how pairs on the same machine with the same key are combined (by using the lamdba function passed into reduceByKey) before the data is shuffled.
* Then the lamdba function is called again to reduce all the values from each partition to produce one final result.

![reduceByKey() figure](http://spark-mooc.github.io/web-assets/images/reduce_by.png)

* On the other hand, when using the `groupByKey()` transformation - all the key-value pairs are shuffled around, causing a lot of unnecessary data to being transferred over the network.
* To determine which machine to shuffle a pair to, Spark calls a partitioning function on the key of the pair.


* Spark spills data to disk when there is more data shuffled onto a single executor machine than can fit in memory.
* However, it flushes out the data to disk one key at a time, so if a single key has more key-value pairs than can fit in memory an out of memory exception occurs.
* This is more gracefully handled in later releases of Spark so that the job can still proceed, but should still be avoided.
* When Spark needs to spill to disk, performance is severely impacted.

![groupByKey() figure](http://spark-mooc.github.io/web-assets/images/group_by.png)

As your dataset grows, the difference in the amount of data that needs to be shuffled, between the `reduceByKey()` and `groupByKey()` transformations, becomes increasingly exaggerated.


Here are more transformations to prefer over `groupByKey()`:
* [combineByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.combineByKey) can be used when you are combining elements but your return type differs from your input value type.
* [foldByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.foldByKey) merges the values for each key using an associative function and a neutral "zero value".

Now let's go through a simple `groupByKey()` and `reduceByKey()` example.

In [4]:
pairRDD = sc.parallelize([('a', 1), ('a', 2), ('b', 1)])

# mapValues only used to improve format for printing
pairRDD.groupByKey().mapValues(lambda x: list(x)).collect()

[('b', [1]), ('a', [1, 2])]

In [5]:
# Different ways to sum by key
pairRDD.groupByKey().map(lambda k: (k[0], sum(k[1]))).collect()

[('b', 1), ('a', 3)]

In [6]:
# Using mapValues, which is recommended when they key doesn't change
pairRDD.groupByKey().mapValues(lambda x: sum(x)).collect()

[('b', 1), ('a', 3)]


In [7]:
def add(x, y):
    return(x + y)

# reduceByKey is more efficient / scalable
print(pairRDD.reduceByKey(add).collect())

[('b', 1), ('a', 3)]


## More groupByKey examples

In [24]:
a = sc.parallelize(["black", "blue", "white", "green", "grey"], 2)
a.collect()

['black', 'blue', 'white', 'green', 'grey']

In [25]:
b = a.groupBy(lambda x: len(x)).collect()
b

[(4, <pyspark.resultiterable.ResultIterable at 0x117a1a390>),
 (5, <pyspark.resultiterable.ResultIterable at 0x117a1a320>)]

In [26]:
sorted([(x,sorted(y)) for (x,y) in b])

[(4, ['blue', 'grey']), (5, ['black', 'green', 'white'])]

## More reduceByKey examples

In [27]:
a = sc.parallelize(["black", "blue", "white", "green", "grey"], 2)
b = a.map(lambda x: (len(x), x))
b.reduceByKey(lambda x,y: x + y).collect()

[(4, 'bluegrey'), (5, 'blackwhitegreen')]

In [28]:
a = sc.parallelize(["black", "blue", "white", "orange"], 2)
b = a.map(lambda x: (len(x), x))
b.reduceByKey(lambda x,y: x + y).collect()

[(4, 'blue'), (6, 'orange'), (5, 'blackwhite')]

## The join operation

* Both datasets should be of key-value pair type (pairRDDs)
* The resulting datasets will have all the key-value pairs from both the sets

In [13]:
a = sc.parallelize(["blue", "green", "orange"], 3)
b = a.keyBy(lambda x: len(x))
c = sc.parallelize(["black", "white", "grey"], 3)
d = c.keyBy(lambda x: len(x))

In [14]:
a.collect()

['blue', 'green', 'orange']

In [15]:
b.collect()

[(4, 'blue'), (5, 'green'), (6, 'orange')]

In [16]:
c.collect()

['black', 'white', 'grey']

In [17]:
d.collect()

[(5, 'black'), (5, 'white'), (4, 'grey')]

In [19]:
b.join(d).collect()

[(4, ('blue', 'grey')), (5, ('green', 'black')), (5, ('green', 'white'))]

In [20]:
#leftOuterJoin
b.leftOuterJoin(d).collect()

[(6, ('orange', None)),
 (4, ('blue', 'grey')),
 (5, ('green', 'black')),
 (5, ('green', 'white'))]

In [21]:
#rightOuterJoin
b.rightOuterJoin(d).collect()

[(4, ('blue', 'grey')), (5, ('green', 'black')), (5, ('green', 'white'))]

In [22]:
#fullOuterJoin
b.fullOuterJoin(d).collect()

[(6, ('orange', None)),
 (4, ('blue', 'grey')),
 (5, ('green', 'black')),
 (5, ('green', 'white'))]