In [5]:
sc

**flatmap** vs. **map** only affects how the data is collected.

## Fold, Aggregate, countByValue, takeSample, and foreach(n)


### Fold - similiar to a reduce function
**fold** is similar to reduce with a default value. 
- takes function with same design as needed for reduce()
- Takes a default value to be used for the intial call on each partition
- Returns a new value of hte same type

---

If there's 3 partitions

`.fold(0,lambda x,y : x+y)`

Step 1 : **0** 1 3 5 | **0** 7 9 | **0**

Step 2 : **0** 9 | **0** 16

Step 3 :25

---

`.fold(1,lambda x,y : x+y)`

If there's 3 partitions

Step 1 : **1** 1 3 5 | **1** 7 9 | **1**

Step 2 : **0** 9 | **0** 16

Step 3 : 25

Fold is often used for error handling. Handle error on your own. The 0 value will automatically handle the error. 



#### Lets look at odd numbers, want to calculate (sum / count)

In [2]:
odd_nums = sc.parallelize([1,3,5,7,9],2)

In [3]:
odd_nums.map(lambda x : x+1 if x == 3 else x).collect()

[1, 4, 5, 7, 9]

In [4]:
odd_nums.map(lambda x : x+1 if x == 3 else x+2 if x==5 else 0).collect()

[0, 4, 7, 0, 0]

In [5]:
odd_nums.collect()

[1, 3, 5, 7, 9]

We will make a 2-value tuple `(0,0)` which will be considered as `x`.

- `x[0]` will be our sum
- `x[1]` will be our count

```python
.aggregate((0,0),  # default value
    lambda x,y : (x[0],x[1]+y),  # within a partition
    lambda x,y: (x[0]+y[0],x[1]+y[1])) # condensing partitions
```

#### Lets look at the parts

##### Default Value
Will default to (0,0) as a starting point. Then will continually add. 

##### Within partition 

`lambda x,y : (x[0]+1,x[1]+y)`

Let's work through this. 

1. We start with `(0,0) = x`
2. We encounter our first value 1 so `y=1`
3. We want (0,0) to be => (1,1)


##### Gathering Partitions results

`lambda x,y: (x[0]+y[0],x[1]+y[1]))`

Let's work through this step by step. Lets say the partition break is `[1,3] [5,7,9]`

Then after applying the function within the partition we should have `(4,2) (21,3)`

So to reiterate:

1. `(4,2) = x`
2. `(21,3) = y`
3. so the lambda function should add these pair-wise together
4. `x[0]+y[0] = 25`
5. `x[1]+y[1] = 5`
6. Returns `(25,5)`







#### Example 4-1 for the numbers between 1 and 9 calculate sum of odd numbers

In [6]:
odd_nums.reduce(lambda x,y : x + y)

25

#### Example 4-2 for the numbers between 1 and 9 calculate the sum of the odd numbers using fold

In [7]:
odd_nums.fold(0,lambda x,y : x + y)

25

#### Example 4-3 using aggregate () return (sum # of elements ) of odd numbers

In [8]:
odd_nums.aggregate((0,0),lambda x,y:(x[0]+1,+x[1]+y),lambda x,y:(x[0]+y[0],x[1]+y[1]))

(5, 25)

In [9]:
odd_nums.glom().collect()

[[1, 3], [5, 7, 9]]

### Whats the difference between take() and first()?

One returns values, the other returns a string

In [41]:
odd_nums = sc.parallelize([1,3,5,7,9],2)

In [42]:
odd_nums.take(1)

[1]

In [43]:
odd_nums.first()

1

### Whats the difference between sample() and takeSample()?


### Sample
Sample(withReplacement=True, fraction=0.5, seed=1) **transformation**
- **CREATES A NEW RDD** with random elements form the calling RDD
- with replacement (with repeats)
- fraction - expected positive
- expected probability element is used
- seed (for standardizing randomiztion) - otherwise default based on millisecond time

### takeSample
takesSample (with replacement , num, seed) : ** action **
- returns fixed size sample subset of an RDD as an **ARRAY**
- with replacement allow sample multiple times
- num - example number of sampled element
- seed


### Example 5-1

Try collect(), count(), countByValue(), top(n), take(n), first(), takeSample(), operations on Z

In [13]:
x = sc.parallelize([3,4,1,2])
y = sc.parallelize(range(2,6))
z = x.union(y)

In [14]:
z.collect()

[3, 4, 1, 2, 2, 3, 4, 5]

In [15]:
z.count()

8

In [16]:
z.countByValue()

defaultdict(int, {1: 1, 2: 2, 3: 2, 4: 2, 5: 1})

In [17]:
z.top(3)

[5, 4, 4]

In [18]:
z.take(3)

[3, 4, 1]

In [19]:
z.first()

3

In [20]:
z.takeSample(withReplacement=False,num=3,seed=None)

[2, 1, 3]

In [21]:
z.takeSample(withReplacement=True,num=20,seed=None)

[3, 2, 3, 2, 4, 3, 5, 2, 4, 1, 2, 4, 2, 4, 2, 3, 2, 2, 3, 4]

In [22]:
z.glom().collect()

[[], [3], [], [4], [], [1], [], [2], [], [2], [], [3], [], [4], [], [5]]

In [23]:
res = z.sample(True,0.5)
res.glom().collect()

[[], [], [], [], [], [], [], [], [], [2], [], [], [], [], [], [5]]

In [24]:
b = z.foreach(lambda x : x+1)

## Week 3 - Pair RDDs

Pair RDDs are a **key value pair**. 

|Key|value|
|---|------------------------------|
|001| 'some data that i have added'|

Very common structure for schemaless data or NoSQL. In NoSQL - key is unique. Spark, key could be duplicated. Key could show up in multiple places

#### Keys 
Could be simple, or complex objects (tuples)
Could be simple, or a complex json

#### Exercise 1 - get words from readme file

In [6]:
path = '/Users/owner/USF/spark/README.md'
readme = sc.textFile(path)

In [7]:
words = readme.flatMap(lambda x: x.split(' '))
word_count = words.map(lambda x: (x,1))
word_count.collect()[:10]

[(u'#', 1),
 (u'Apache', 1),
 (u'Spark', 1),
 (u'', 1),
 (u'Spark', 1),
 (u'is', 1),
 (u'a', 1),
 (u'fast', 1),
 (u'and', 1),
 (u'general', 1)]

## Transformations designed for key value pairs

|function | description |
|-----|------------------|
|keys()| gets the keys|
|values()| value|


Other **transformations on pair RDDs**:
- sortByKey()
- groupByKey()
- mapValues(func)
- flatMapValues(func)
- reduceByKey()
- combineByKey()

#### Example 2: Do the same but do value as length of word and sort by value

In [11]:
words = readme.flatMap(lambda x: x.split(' '))
word_len = words.map(lambda x: (len(x),x))
sorted_list = word_len.sortByKey(ascending=False)
sorted_list.collect()[:20]

[(115,
  u'[IntelliJ](https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#UsefulDeveloperTools-IntelliJ).'),
 (112,
  u'[Eclipse](https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#UsefulDeveloperTools-Eclipse)'),
 (96,
  u'Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version)'),
 (82,
  u'3"](https://cwiki.apache.org/confluence/display/MAVEN/Parallel+builds+in+Maven+3).'),
 (81,
  u'tests](https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools).'),
 (65, u'Spark"](http://spark.apache.org/docs/latest/building-spark.html).'),
 (62, u'Guide](http://spark.apache.org/docs/latest/configuration.html)'),
 (57, u'wiki](https://cwiki.apache.org/confluence/display/SPARK).'),
 (49, u'page](http://spark.apache.org/documentation.html)'),
 (35, u'sc.parallelize(range(1000)).count()'),
 (33, u'Maven](http://maven.apache.org/).'),
 (26, u'<http://spark.apache.org/>'),
 (24, u'MASTER=spark://host:707

**groupByKey()**

- group by key
- return an RDD  of (Key, ResultIterable)

#### Example 3 

Create a pair RDD with (length of a word, list of words) from README.md

In [14]:
words = readme.flatMap(lambda x: x.split(' '))

In [17]:
len_word = words.map(lambda x: (len(x), x))
len_word.collect()[:5]

[(1, u'#'), (6, u'Apache'), (5, u'Spark'), (0, u''), (5, u'Spark')]

In [20]:
len_list = len_word.groupByKey().map(lambda x: (x[0],list(x[1])))
len_list.sortByKey()

PythonRDD[73] at RDD at PythonRDD.scala:48

In [24]:

for x in len_list.sortByKey().collect()[:5]:
    print x[:2]
    print '----------------'


(0, [u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u''])
----------------
(1, [u'#', u'a', u'a', u'a', u'a', u'a', u'a', u'N', u'a', u'A', u'a'])
----------------
(2, [u'is', u'It', u'in', u'R,', u'an', u'It', u'of', u'##', u'on', u'##', u'is', u'To', u'do', u'to', u'do', u'if', u'by', u'-T', u'in', u'is', u'at', u'an', u'##', u'to', u'is', u'to', u'##', u'if', u'##', u'in', u'To', u'of', u'Pi', u'to', u'to', u'be', u'or', u'to', u'on', u'to', u'or', u'to', u'an', u'if', u'is', u'in', u'of', u'if', u'no', u'##', u'is', u'be', u'on', u'to', u'or', u'##', u'to', u'to', u'in', u'of', u'to', u'at', u'on', u'of', u'##', u'to', u'in', u'an', u'on', u'to'])
----------------
(3, [u'and', u'for', u'Big', u'and'

### mapValues

Pass each value in the key value pair RDD through a map function wihtout changing the keys.

Retain the original RDD’s partitioning.

#### Example 4

From the “README.md” file:

- Extract all the words. (space separated)
- Generate key-value pairs of (Word, Occurrence).

In [28]:
words = readme.flatMap(lambda x: x.split(' '))
words = words.map(lambda x: (x,1))
word_count = words.groupByKey().mapValues(sum)
word_count.sortByKey(ascending=False).collect()[:10]

[(u'your', 1),
 (u'you', 4),
 (u'with', 4),
 (u'will', 1),
 (u'wiki](https://cwiki.apache.org/confluence/display/SPARK).', 1),
 (u'which', 2),
 (u'when', 1),
 (u'web', 1),
 (u'way', 1),
 (u'versions', 1)]

In [29]:
word_count.sortBy(lambda x: x[1], ascending= False).collect()[:10]

[(u'', 68),
 (u'the', 22),
 (u'Spark', 15),
 (u'to', 14),
 (u'for', 11),
 (u'and', 11),
 (u'##', 8),
 (u'a', 8),
 (u'run', 7),
 (u'can', 7)]

#### Example 5 - Create a list of (len, word) pairs from len_word pair in Example 3

In [48]:
words = readme.flatMap(lambda x: x.split())
len_word = words.map(lambda x: (len(x), x))
len_word_pair_group = len_word.mapValues(lambda x: list([x]))

In [49]:
len_word_pair_group.collect()[:10]

[(1, [u'#']),
 (6, [u'Apache']),
 (5, [u'Spark']),
 (5, [u'Spark']),
 (2, [u'is']),
 (1, [u'a']),
 (4, [u'fast']),
 (3, [u'and']),
 (7, [u'general']),
 (7, [u'cluster'])]

### reduceByKey

similar to reduce ()
reuns in parallel reduce operations

#### Example 6 - do the word counts - generate word / occurance again

In [44]:
words = readme.flatMap(lambda x: x.split())
words = words.map(lambda x: (x,1))

In [47]:
word_occurence = words.reduceByKey(lambda x,y: x+y)
word_occurence.sortBy(lambda x: x[1], ascending= False).collect()[:10]

[(u'the', 22),
 (u'Spark', 15),
 (u'to', 14),
 (u'for', 11),
 (u'and', 11),
 (u'##', 8),
 (u'a', 8),
 (u'run', 7),
 (u'can', 7),
 (u'is', 6)]

### Which requires less shuffles?

    .groupByKey().mapValues(lambda x : sum(x))
    - Sends all the values over first, then condenses 2nd with the mapValues

    .reduceByKey(lambda x,y: x+y)
    - Does partition accumulation first, then send pair to be combined
    - FEWER SHUFFLES

### combineByKey(createCombiner, mergeValue, mergeCombiners)

Similar to aggregate(). We use it when we need to transform an RDD to another structure (e.g. from key-vslue pair (x, len(x)) to (x, (sum(x), count(key)))

- __createCombiner__ - creates an initital value for the Accumulator on a key. In other words, when we first "see" the key we create an initial structure using createCombine
- __mergeValue__ if we encounter the same key again, we apply the mergeValue function. Note that here we merge two elements of two different structures: 1. key1,(len(x1),1) and 2.(key2, x2), so in mergeValue function we would address it as mergeValue(lambda x,value: (x[0]+value,x[1]+1)) where value is 2nd pair.
- __mergeCombiners__ apply this function to merge accumulator from dufferent partitions!

Note that combineByKey is essentially simi

In [15]:
text = ["I love choco, I love coffee, I love coffee"]
#lines = sc.parallelize(["spark","spark is fun!"])
text = sc.parallelize(text,6)
text.collect()

['I love choco, I love coffee, I love coffee']

In [16]:
len_word = text.flatMap(lambda x: x.split(','), 6).flatMap(lambda x:x.split(" ")).map(lambda x:(len(x),x))
len_word.glom().collect()

[[],
 [],
 [],
 [],
 [],
 [(1, 'I'),
  (4, 'love'),
  (5, 'choco'),
  (0, ''),
  (1, 'I'),
  (4, 'love'),
  (6, 'coffee'),
  (0, ''),
  (1, 'I'),
  (4, 'love'),
  (6, 'coffee')]]

In [21]:
def create_combiner(x):
    return (1,x)

def merge_value(x,y):
    return (x[0]+1, x[1] + "," +y)

def merge_combiner(x,y):
    return (x[0]+y[0],x[1]+ "," + y[1])
combo = len_word.combineByKey((create_combiner), (merge_value),(merge_combiner))

In [22]:
combo.collect()

[(0, (2, ',')),
 (6, (2, 'coffee,coffee')),
 (1, (3, 'I,I,I')),
 (4, (3, 'love,love,love')),
 (5, (1, 'choco'))]

#### Example 7: 
Using combineByKey(), create pairs (Length of words, (Frequency, a list of words)) from “README.md”

In [28]:
readme = sc.textFile('README.MD')
words = readme.flatMap(lambda x: x.split())
words.collect()[:3]


[u'#', u'Apache', u'Spark']

In [27]:
words_len_pair = words.map(lambda x: (len(x),x))
words_len_pair.collect()[:3]

[(1, u'#'), (6, u'Apache'), (5, u'Spark')]

In [30]:
words_combined = words_len_pair.combineByKey((lambda x: (1,x)), (lambda x,y: (x[0]+1, x[1] + ";" +y)),\
                                             (lambda x,y: (x[0]+y[0], x[1] + "; " + y[1])))

In [32]:
words_combined.sortByKey().collect()[:5]

[(1, (11, u'#;a;a;a;a; a;a;N;a;A;a')),
 (2,
  (70,
   u'is;It;in;R,;an;It;of;##;on;##;is;To;do;to;do;if;by;-T;in;is;at;an;##;to;is;to;##;if; ##;in;To;of;Pi;to;to;be;or;to;on;to;or;to;an;if;is;in;of;if;no;##;is;be;on;to;or;##;to;to;in;of;to;at;on;of;##;to;in;an;on;to')),
 (3,
  (94,
   u'and;for;Big;and;and;for;set;SQL;for;SQL;and;for;for;and;for;You;can;the;the;web;and;and;its;not;you;You;can;one;the;see;the;For;see;and;The;way;the;Try;the;you;you;can;use;the; And;run;the;>>>;the;run;one;use;For;run;the;You;can;set;the;can;run;and;run;one;run;You;can;use;the;the;For;the;are;can;run;see;the;how;for;the;and;the;you;the;the;the;for;for;for;and;the;the;for;how')),
 (4,
  (47,
   u'fast;APIs;that;data;also;rich;find;This;file;only;run:;(You;need;this;more;than;with;More;from;IDE,; also;also;with;will;when;This;URL,;with;with;also;name;Many;help;Once;[run;Note;uses;core;talk;HDFS;have;must;same;that;your;Hive;Hive')),
 (5,
  (60,
   u'Spark;Spark;Data.;Java,;tools;Spark;MLlib;graph;Spark;Spa