## Types of spark operations

There are Three types of operations on RDDs: Transformations, Actions and Shuffles.

* The most expensive operations are those the require communication between nodes.

**Transformations:** RDD $\to$ RDD.
  * **Examples** map, filter, sample, [More](http://spark.apache.org/docs/latest/programming-guide.html#transformations)
  * **No** communication needed.

**Actions:** RDD $\to$ Python-object in head node.
  * **Examples:** reduce, collect, count, take, [More](http://spark.apache.org/docs/latest/programming-guide.html#actions)
  * **Some** communication needed.

**Shuffles:** RDD $\to$ RDD, **shuffle** needed
  * **Examples:** repartition, sortByKey, reduceByKey, join [More](http://spark.apache.org/docs/latest/programming-guide.html#shuffle-operations)
  * **A LOT** of communication needed.

## Key/value pairs

* A python dictionary is a collection of *key/value* pairs.
* The **key** is used to find a set of pairs with the particular key.
* The **value** can be anything.
* Spark has a set of special operations for *(key,value)* RDDs.

Spark provides specific functions to deal with RDDs in which each element is a key/value pair. Key/value RDDs expose new operations (e.g. aggregating and grouping together data with the same key and grouping together two different RDDs.) Such RDDs are also called pair RDDs. **In python, each element of a pair RDD is a pair tuple.**

In [None]:
#start the SparkContext
from pyspark import SparkContext
sc = SparkContext(master="local[4]")

### Creating (key,value) RDDS

**Method 1:** `parallelize` a list of pairs.

In [None]:
pair_rdd = sc.parallelize([(1,2), (3,4)])
print pair_rdd.collect()

**Method 2:** `map()` a function that returns a key/value pair.

In [None]:
regular_rdd = sc.parallelize([1, 2, 3, 4, 2, 5, 6])
pair_rdd = regular_rdd.map( lambda x: (x, x*x) )
print pair_rdd.collect()

For this first exercise, we will use the reduced dataset (10 percent) provided for the KDD Cup 1999, containing nearly half million **nework interactions**. First, download and read the gzip file:

In [None]:
import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)
print raw_data.count()
print raw_data.take(1)

# Exercise 1
##### create a function: 
```python
def csv_to_kv(raw_data):
    \\ raw_data: an RDD of lines as defined above.
    return KV
    \\ Key-Value RDD
```
Map each row in `raw_data` to a key-value pair where the key is the last element in the raw (network interaction type). and the value is a list of all of the elements in the row. Return the resulting RDD. Example:

######  <span style="color:blue">Code:</span>
```python
csv_to_kv(raw_data).take(1)
```
######  <span style="color:magenta">Output:</span>
`(u'normal.', [u'0', u'tcp', u'http', u'SF', u'181', u'5450', u'0', u'0', u'0', u'0', u'0', u'1', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'8', u'8', u'0.00', u'0.00', u'0.00', u'0.00', u'1.00', u'0.00', u'0.00', u'9', u'9', u'1.00', u'0.00', u'0.11', u'0.00', u'0.00', u'0.00', u'0.00', u'0.00', u'normal.'])`

# Teacher Stuff

In [None]:
def csv_to_kv(raw_data):
    return raw_data.map(lambda x: (x.split(",")[-1], x.split(",")) )

# Student Stuff

In [None]:
import sys
import os 
testPath = '/'.join(os.getcwd().split('/')[:-1]) + "/Tester"
sys.path.insert(0, testPath )

from miniTester import isSpark

In [None]:
sys.stdout.write( "Checking Spark Format: " )
initDebugStr = ' '.join(raw_data.toDebugString().split(' ')[1:])
newDebugStr  = csv_to_kv(raw_data).toDebugString()
assert initDebugStr in newDebugStr
sys.stdout.write("Correct!\n")

sys.stdout.write("Checking Problem: ")
assert csv_to_kv(raw_data).take(10) == [(u'normal.', [u'0', u'tcp', u'http', u'SF', u'181', u'5450', u'0', u'0', u'0', u'0', u'0', u'1', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'8', u'8', u'0.00', u'0.00', u'0.00', u'0.00', u'1.00', u'0.00', u'0.00', u'9', u'9', u'1.00', u'0.00', u'0.11', u'0.00', u'0.00', u'0.00', u'0.00', u'0.00', u'normal.']), (u'normal.', [u'0', u'tcp', u'http', u'SF', u'239', u'486', u'0', u'0', u'0', u'0', u'0', u'1', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'8', u'8', u'0.00', u'0.00', u'0.00', u'0.00', u'1.00', u'0.00', u'0.00', u'19', u'19', u'1.00', u'0.00', u'0.05', u'0.00', u'0.00', u'0.00', u'0.00', u'0.00', u'normal.']), (u'normal.', [u'0', u'tcp', u'http', u'SF', u'235', u'1337', u'0', u'0', u'0', u'0', u'0', u'1', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'8', u'8', u'0.00', u'0.00', u'0.00', u'0.00', u'1.00', u'0.00', u'0.00', u'29', u'29', u'1.00', u'0.00', u'0.03', u'0.00', u'0.00', u'0.00', u'0.00', u'0.00', u'normal.']), (u'normal.', [u'0', u'tcp', u'http', u'SF', u'219', u'1337', u'0', u'0', u'0', u'0', u'0', u'1', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'6', u'6', u'0.00', u'0.00', u'0.00', u'0.00', u'1.00', u'0.00', u'0.00', u'39', u'39', u'1.00', u'0.00', u'0.03', u'0.00', u'0.00', u'0.00', u'0.00', u'0.00', u'normal.']), (u'normal.', [u'0', u'tcp', u'http', u'SF', u'217', u'2032', u'0', u'0', u'0', u'0', u'0', u'1', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'6', u'6', u'0.00', u'0.00', u'0.00', u'0.00', u'1.00', u'0.00', u'0.00', u'49', u'49', u'1.00', u'0.00', u'0.02', u'0.00', u'0.00', u'0.00', u'0.00', u'0.00', u'normal.']), (u'normal.', [u'0', u'tcp', u'http', u'SF', u'217', u'2032', u'0', u'0', u'0', u'0', u'0', u'1', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'6', u'6', u'0.00', u'0.00', u'0.00', u'0.00', u'1.00', u'0.00', u'0.00', u'59', u'59', u'1.00', u'0.00', u'0.02', u'0.00', u'0.00', u'0.00', u'0.00', u'0.00', u'normal.']), (u'normal.', [u'0', u'tcp', u'http', u'SF', u'212', u'1940', u'0', u'0', u'0', u'0', u'0', u'1', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'1', u'2', u'0.00', u'0.00', u'0.00', u'0.00', u'1.00', u'0.00', u'1.00', u'1', u'69', u'1.00', u'0.00', u'1.00', u'0.04', u'0.00', u'0.00', u'0.00', u'0.00', u'normal.']), (u'normal.', [u'0', u'tcp', u'http', u'SF', u'159', u'4087', u'0', u'0', u'0', u'0', u'0', u'1', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'5', u'5', u'0.00', u'0.00', u'0.00', u'0.00', u'1.00', u'0.00', u'0.00', u'11', u'79', u'1.00', u'0.00', u'0.09', u'0.04', u'0.00', u'0.00', u'0.00', u'0.00', u'normal.']), (u'normal.', [u'0', u'tcp', u'http', u'SF', u'210', u'151', u'0', u'0', u'0', u'0', u'0', u'1', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'8', u'8', u'0.00', u'0.00', u'0.00', u'0.00', u'1.00', u'0.00', u'0.00', u'8', u'89', u'1.00', u'0.00', u'0.12', u'0.04', u'0.00', u'0.00', u'0.00', u'0.00', u'normal.']), (u'normal.', [u'0', u'tcp', u'http', u'SF', u'212', u'786', u'0', u'0', u'0', u'1', u'0', u'1', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'8', u'8', u'0.00', u'0.00', u'0.00', u'0.00', u'1.00', u'0.00', u'0.00', u'8', u'99', u'1.00', u'0.00', u'0.12', u'0.05', u'0.00', u'0.00', u'0.00', u'0.00', u'normal.'])]
sys.stdout.write("Correct!\n")


### Some important Key-Value Transformations
#### 1. reduceByKey(func): Apply the reduce function on the values with the same key. 

In [None]:
rdd = sc.parallelize([(1,2), (2,4), (2,6)])
print "Original RDD :", rdd.collect()
print "After transformation : ", rdd.reduceByKey(lambda a,b: a+b).collect()

Note that although it is similar to the reduce function, it is implemented as a transformation and not as an action because the dataset can have very large number of keys. So, it does not return values to the driver program. Instead, it returns a new RDD. 

#### 2. sortByKey(): 
Sort RDD by keys in ascending order. 

In [None]:
rdd = sc.parallelize([(2,2), (1,4), (3,6)])
print "Original RDD :", rdd.collect()
print "After transformation : ", rdd.sortByKey().collect()

**Note:** The output of sortByKey() is an RDD. This means that  RDDs do have a meaningful order, which extends between partitions.

#### 3. mapValues(func):
Apply func to each value of RDD without changing the key. 

In [None]:
rdd = sc.parallelize([(1,2), (2,4), (2,6)])
print "Original RDD :", rdd.collect()
print "After transformation : ", rdd.mapValues(lambda x: x*2).collect()

#### 4. groupByKey(): 
Returns a new RDD of `(key,<iterator>)` pairs where the iterator iterates over the values associated with the key.

[Iterators](http://anandology.com/python-practice-book/iterators.html) are python objects that generate a sequence of values. Writing a loop over `n` elements as 
```python
for i in range(n):
    ##do something
```
is inefficient because it first allocates a list of `n` elements and then iterates over it.
Using the iterator `xrange(n)` achieves the same result without materializing the list. Instead, elements are generated on the fly.

To materialize the list of values returned by an iterator we will use the list comprehension command:
```python
[a for a in <iterator>]
```

In [None]:
rdd = sc.parallelize([(1,2), (2,4), (2,6)])
print "Original RDD :", rdd.collect()
print "After transformation : ", rdd.groupByKey().mapValues(lambda x:[a for a in x]).collect()

# Exercise 2
Continue with the function created in last exercise. Create a new function orderNet which returns the sorted network interaction types(with their total durations) which have the largest total durations. Duration is the first column of x, i.e. x[0].

######  <span style="color:blue">Code:</span>
```python
RDD = csv_to_kv(raw_data)
orderNet.take(5)

```
######  <span style="color:magenta">Output:</span>
`
[(u'normal.', 21075991.0), (u'portsweep.', 1991911.0), (u'warezclient.', 627563.0), (u'buffer_overflow.', 2751.0), (u'multihop.', 1288.0)]
`

# Teacher Stuff

In [None]:
def orderedNet(RDD):
    return RDD.mapValues(lambda x: int(x[0]) ) \
            .reduceByKey(lambda x,y: x+y)  \
            .map(lambda x: (x[1],x[0]) ) \
            .sortByKey(False) \
            .map(lambda x: (x[1],x[0])) 


# Student Stuff

In [None]:
from miniTester import isSpark

RDD = csv_to_kv(raw_data)

sys.stdout.write( "Checking Spark Format: " )
initDebugStr = RDD.toDebugString() .split('|  ')[1:]
newDebugStr  = orderedNet(RDD).toDebugString()
assert all([phrase in newDebugStr for phrase in initDebugStr]) == True
sys.stdout.write("Correct!\n")

sys.stdout.write( "Checking Problem: " )
assert orderedNet(RDD).take(50) == [(u'normal.', 21075991), (u'portsweep.', 1991911), (u'warezclient.', 627563), (u'buffer_overflow.', 2751), (u'multihop.', 1288), (u'rootkit.', 1008), (u'spy.', 636), (u'loadmodule.', 326), (u'warezmaster.', 301), (u'back.', 284), (u'ftp_write.', 259), (u'guess_passwd.', 144), (u'perl.', 124), (u'imap.', 72), (u'satan.', 64), (u'ipsweep.', 43), (u'phf.', 18), (u'nmap.', 0), (u'smurf.', 0), (u'pod.', 0), (u'neptune.', 0), (u'teardrop.', 0), (u'land.', 0)]
sys.stdout.write("Correct!")

#### 5. flatMapValues(func): 
`func` is a function that takes as input a single value and returns an itrator that generates a sequence of values.
The application of flatMapValues operates on a key/value RDD. It applies `func` to each value, and gets an list (generated by the iterator) of values. It then combines each of the values with the original key to produce a list of key-value pairs. These lists are concatenated as in `flatMap`

In [None]:
rdd = sc.parallelize([(1,2), (2,4), (2,6)])
print "Original RDD :", rdd.collect()
# the lambda function generates for each number i, an iterator that produces i,i+1
print "After transformation : ", rdd.flatMapValues(lambda x: xrange(x,x+2)).collect()

#### (Advanced) 6. combineByKey(createCombiner, mergeValue, mergeCombiner): 
Combine values with the same key using a different result type.

This is the most general of the per-key aggregation functions. Most of the other per-key combiners are implemented using it. 

The elements of the original RDD are considered here *values*

Values are converted into *combiners* which we will refer to here as "accumulators". An example of such a mapping is the mapping of the value *word* to the accumulator (*word*,1) that is done in WordCount.

accumulators are then combined with values and the other combiner to generate a result for each key.

For example, we can use it to calculate per-activity average durations as follows. Consider an RDD of key/value pairs where keys correspond to different activities and values correspond to duration.


In [None]:
rdd = sc.parallelize([("Sleep", 7), ("Work",5), ("Play", 3), 
                      ("Sleep", 6), ("Work",4), ("Play", 4),
                      ("Sleep", 8), ("Work",5), ("Play", 5)])

sum_counts = rdd.combineByKey(
    (lambda x: (x, 1)), # createCombiner maps each value into a  combiner (or accumulator)
    (lambda acc, value: (acc[0]+value, acc[1]+1)),
#mergeValue defines how to merge a accumulator with a value (saves on mapping each value to an accumulator first)
    (lambda acc1, acc2: (acc1[0]+acc2[0], acc1[1]+acc2[1])) # combine accumulators
)

print sum_counts.collect()
duration_means_by_activity = sum_counts.mapValues(lambda value:
                                                  value[0]*1.0/value[1]) \
                                            .collect()
print duration_means_by_activity

To understand combineByKey(), it’s useful to think of how it handles each element it processes. As combineByKey() traverses through the elements in a partition, each element either has a key it hasn’t seen before or has the same key as a previous element.

If it’s a new key, createCombiner() is called to create the initial value for the accumulator on that key. In the above example, the accumulator is a tuple initialized as (x, 1) where x is a value in original RDD. Note that createCombiner() is called only when a key is seen for the first time in **each partition.**

If it is a key we have seen before while processing that partition, it will instead use the provided function, mergeValue(), with the current value for the accumulator for that key and the new value.

Since each partition is processed independently, we can have multiple accumulators for the same key. When we are merging the results from each partition, if two or more partitions have an accumulator for the same key, we merge the accumulators using the user-supplied mergeCombiners() function. In the above example, we are just adding the 2 accumulators element-wise.

### Transformations on two Pair RDDs

In [None]:
rdd1 = sc.parallelize([(1,2),(2,1),(2,2)])
rdd2 = sc.parallelize([(2,5),(3,1)])
a = rdd1.collect()
b = rdd2.collect()
print a,b

#### 1. subtractByKey: 
Remove from RDD1 all elements whose key is present in RDD2.

In [None]:
print "RDD1:", a
print "RDD2:", b
print "Result:", rdd1.subtractByKey(rdd2).collect()

#### 2. join: 
* A fundamental operation in relational databases.
* assumes two tables have a **key** column in common. 
* merges rows with the same key.

Suppose we have two `(key,value)` datasets 


|**dataset 1**|                                     |..........| **dataset 2** | 	       	     |
|-------------|-------------------------------------|   |-------------|-----------------|
| **key=name**   |   **(gender,occupation,age)**    |   |  **key=name**   |   **hair color**    |
| John   |  (male,cook,21)                          |   | Jill   |  blond |
| Jill   |  (female,programmer,19)                  |   | Grace  |  brown |         
| John   |  (male, kid, 2)                          |   | John   |  black |
| Kate   |  (female, wrestler, 54)                  |


When `Join` is called on datasets of type `(Key, V)` and `(Key, W)`, it  returns a dataset of `(Key, (V, W))` pairs with all pairs of elements for each key. Joining the 2 datasets above yields:

|   key = name | (gender,occupation,age),haircolor |
|--------------|-----------------------------------|
| John         | ((male,cook,21),black)             |
| John         | ((male, kid, 2),black)             |
| Jill         | ((female,programmer,19),blond)     |

In [None]:
print "RDD1:", a
print "RDD2:", b
print "Result:", rdd1.join(rdd2).collect()

### Variants of join.
There are four variants of `join` which differ in how they treat keys that appear in one dataset but not the other.
* `join` is an *inner* join which means that keys that appear only in one dataset are eliminated.
* `leftOuterJoin` keeps all keys from the left dataset even if they don't appear in the right dataset. The result of leftOuterJoin in our example will contain the keys `John, Jill, Kate`
* `rightOuterJoin` keeps all keys from the right dataset even if they don't appear in the left dataset. The result of leftOuterJoin in our example will contain the keys `Jill, Grace, John`
* `FullOuterJoin` keeps all keys from both datasets. The result of leftOuterJoin in our example will contain the keys `Jill, Grace, John, Kate`

In outer joins, if the element appears only in one dataset, the element in `(K,(V,W))` that does not appear in the dataset is represented bye `None`

#### 3. rightOuterJoin: 
Perform a right join between two RDDs. Every key in the right/second RDD will be present at least once.

In [None]:
print "RDD1:", a
print "RDD2:", b
print "Result:", rdd1.rightOuterJoin(rdd2).collect()

#### 4. leftOuterJoin: Perform a left join between two RDDs. Every key in the left RDD will be present at least once.

In [None]:
print "RDD1:", a
print "RDD2:", b
print "Result:", rdd1.leftOuterJoin(rdd2).collect()

### Actions on Pair RDDs

In [None]:
rdd = sc.parallelize([(1,2), (2,4), (2,6)])
a = rdd.collect()

#### 1. countByKey(): Count the number of elements for each key. Returns a dictionary for easy access to keys.

In [None]:
print "RDD: ", a
result = rdd.countByKey()
print "Result:", result

#### 2. collectAsMap(): 
Collect the result as a dictionary to provide easy lookup.

In [None]:
print "RDD: ", a
result = rdd.collectAsMap()
print "Result:", result

#### 3. lookup(key): 
Return all values associated with the provided key.

In [None]:
print "RDD: ", a
result = rdd.lookup(2)
print "Result:", result

# Exercise 3

Continue with the function created in exercise 2. Use any of the above transformations/actions to create a function, **avgDuration**, that calculates and returns the average duration for each of the network interaction types. Return the final dataset as a dictionary. You are encouraged to use **combineByKey()**.


######  <span style="color:blue">Code:</span>
```python
RDD =  csv_to_kv(raw_data) 
avgDuration(RDD)

```
######  <span style="color:magenta">Output:</span>
`
{u'guess_passwd.': 2.717, u'nmap.': 0.0, u'loadmodule.': 36.222, u'rootkit.': 100.8, u'warezclient.': 615.258, u'smurf.': 0.0, u'pod.': 0.0, u'neptune.': 0.0, u'normal.': 216.657, u'spy.': 318.0, u'ftp_write.': 32.375, u'phf.': 4.5, u'portsweep.': 1915.299, u'teardrop.': 0.0, u'buffer_overflow.': 91.7, u'land.': 0.0, u'imap.': 6.0, u'warezmaster.': 15.05, u'perl.': 41.333, u'multihop.': 184.0, u'back.': 0.129, u'ipsweep.': 0.034, u'satan.': 0.04}
`

# Teacher

In [None]:
RDD1 =  csv_to_kv(raw_data) 
RDD2 =  orderedNet( csv_to_kv(raw_data) )

In [None]:
sum_counts = RDD1.combineByKey(
    (lambda x: (x, 1)),
    (lambda acc, value: (acc[0]+value, acc[1]+1)),
    (lambda acc1, acc2: (acc1[0]+acc2[0], acc1[1]+acc2[1])) )

duration_means_by_activity = sum_counts.mapValues(lambda value: value[0]*1.0/value[1]) 

In [None]:

checkit = duration_means_by_activity.collectAsMap()
print checkit

In [None]:
RDD1.countByKey()

In [None]:
RDD2.collect()

In [None]:
checkit2 = csv_to_kv(raw_data)
for i in checkit2.take(10):
    print i
    print " "

# Student