# MSAN 604 Sec2Lec2 Notes: RDDs, Transformations, Actions

Note: to run the code, please note where your dataset is and also that you have started the notebook at the command line using:

```bash
$pyspark
```

which should open jupyter notebook instance

In [1]:
sc

## What is a RDD?


![alt text](http://backtobazics.com/wp-content/themes/twentyfourteen/images/spark/spark-rdd.png)

RDD is a distributed file set split over partitions in a data cluster. This is represented in spark as a single dataset called RD.

**Partition**: a "subset". In computer terms, you may have 1TB, but you can put some walls up to subdivide the drive into 2 drives of 500 GB, or 3 drives of 250, 250, 500 GB.

** By default, max number of partitions is number of threads that you computer has **
** Example: if your computer has a quad core intel, each has 2 threads so 4 x2 = 8 available cores, 8 partitions**

Some features:

1. Distributed -> over different partitions
2. Immutable -> read only
3. Resilient -> if one node dies, cluster will be rebuilt. (Not replication). The data will rebuilt by the instructions supplied by the master note


# Data Manipulations - Transformations + Actions

![](https://i.stack.imgur.com/3QiV8.png)

### RDD - Transformations: change RDD's and return RDDs

### RDD - Actions: take RDD's and return values


![](https://trongkhoanguyen.com/assets/post-images/2014/rdd-operations.png)

## Get data 

In [11]:
filepath = '/Users/tlee010/Desktop/github_repos/2017-msan694-example/Data/README.md'

### Read in data from readme markdown file

In [12]:
lines = sc.textFile(filepath)
lines

/Users/tlee010/Desktop/github_repos/2017-msan694-example/Data/README.md MapPartitionsRDD[8] at textFile at NativeMethodAccessorImpl.java:0

### Read in data parallel

In [13]:
lines = sc.parallelize(["spark","spark is fun!"])
lines

ParallelCollectionRDD[9] at parallelize at PythonRDD.scala:480

In [14]:
lines.getNumPartitions()

8

In [15]:
### Shows all the partitions

### Note - **`collect`** is a mapping function that is forcing a evaluation

In [16]:
lines.glom().collect()

[[], [], [], ['spark'], [], [], [], ['spark is fun!']]

In [17]:
### Shows 

In [10]:
lines.collect()

['spark', 'spark is fun!']

#### Lets add more data

- add an option for how many partitions it needs to be divided by
- sc.parallelize( data, no_of_partitions)

In [23]:
lines = sc.parallelize(["spark","spark is fun!","1","2","3","4","5","6","7","8"],4)
lines.glom().collect()

[['spark', 'spark is fun!'], ['1', '2'], ['3', '4'], ['5', '6', '7', '8']]

## Lambda functions

These are one-time functions are executed at run-time. These are also considered anonymus functions. A lot of times you don't have to name the functions.

Example of a regular function
```python
def f(x):
    return x+2
```

Inline equivalent
```python
lambda x : x+2
```




## RDD Operations

1. Transformations

**RDDs when changed:** Make a new RDD when manipulating an old RDD (this is because all RDDs are immutable). So any changes must mean a new data set has to be stored to a new dataset.
    
    
    ```python
    # only want lines that have the word 'spark" in it
    lines_with_spark = lines.filter(lambda lines : 'spark' in lines)
    ```
    
**Lazy evaluation**: the above filtering function has been setup BUT HAS NOT BEEN EXECUTED. This will only be executed when the `lines_with_spark` is called in another function

**Sample Transformation functions:** 

- map
- filter
- flatMAP
- mapPartitions

** Example: Map (on Elements)**


Typical methods that take a function and apply element wise
- map(func)

- flatmap(func)

- filter(func)


Element wise transformation methods, even across partitions. 

**Example**, lets say you have a dataset [1,2,3,4] split over 2 partitions:

    Partition 1 
    1, 2

    Partition 2
    3, 4

`map(f(x))` will do the following:

    Partition 1 
    f(1), f(2)

    Partition 2
    f(3), f(4)
    
**Example** let's say you have a dataset:
```python
'I love tacos'
'I love coffee'
```
**map function under** `.map(lambda x : x.split())`

```python
[['I','love','taco']
  ['I','love','coffee']]
```

Note the nested list structure

**flat map** under `.flatmap(lambda x : x.split())`


```python
['I','love','taco','I','love','coffee']
```

Note the flat list structure


## Inclass Tutorial

In [25]:
filepath = '/Users/tlee010/Desktop/github_repos/2017-msan694-example/Data/ignatian_pedagogy'

#### Read in data from disk

In [30]:
# reads in a CSV flat text file
lines = sc.textFile(filepath)

# collects all the different terms
lines.collect()[:4]


[u'= Ignatian Values =',
 u'The University of San Francisco enjoys a distinguished heritage and Jesuit tradition.  At the core of this tradition are transcendent values, including the integration of learning, faith and service; care for the whole person; character and conviction; religious truth and interfaith understanding; and a commitment to building a more just world.  The key values of this Jesuit tradition are as follows:',
 u'***********************************************************************************',
 u"1. Contemplative in Action - St. Ignatius Loyola believed that prayer and reflectivity should so guide our choices and actions that our activity itself becomes a way of entering into union with and praising God.  Being a contemplative in action also means seeing beyond the superficial in life to appreciate the mystery, beauty, and sacredness of all life.  It is a means of seeing God in all things and in everyone.  Contemplation is a critical dimension of the spiritual l

#### Split the words using the map function (should have nested lists)

In [37]:
words = lines.map(lambda line: line.split())

# each of the lines is essentially a paragraph
words.collect()[:3]

[[u'=', u'Ignatian', u'Values', u'='],
 [u'The',
  u'University',
  u'of',
  u'San',
  u'Francisco',
  u'enjoys',
  u'a',
  u'distinguished',
  u'heritage',
  u'and',
  u'Jesuit',
  u'tradition.',
  u'At',
  u'the',
  u'core',
  u'of',
  u'this',
  u'tradition',
  u'are',
  u'transcendent',
  u'values,',
  u'including',
  u'the',
  u'integration',
  u'of',
  u'learning,',
  u'faith',
  u'and',
  u'service;',
  u'care',
  u'for',
  u'the',
  u'whole',
  u'person;',
  u'character',
  u'and',
  u'conviction;',
  u'religious',
  u'truth',
  u'and',
  u'interfaith',
  u'understanding;',
  u'and',
  u'a',
  u'commitment',
  u'to',
  u'building',
  u'a',
  u'more',
  u'just',
  u'world.',
  u'The',
  u'key',
  u'values',
  u'of',
  u'this',
  u'Jesuit',
  u'tradition',
  u'are',
  u'as',
  u'follows:'],
 [u'***********************************************************************************']]

### Inclass Problem - do the same above exercise, but do it using a flatMap command`

In [62]:
words = lines.flatMap(lambda line: line.split())
words.collect()[:10]

[u'=',
 u'Ignatian',
 u'Values',
 u'=',
 u'The',
 u'University',
 u'of',
 u'San',
 u'Francisco',
 u'enjoys']

### part 2 : filter by 'USF'

#### using a list comprehension

In [61]:
words = lines.flatMap(lambda line: [x for x in line.split() if 'USF' in x])
words.collect()[:10]

[u"USF's", u'USF', u"USF's", u"USF's", u'USF', u'USF', u'USF', u'USF']

In [56]:
words = lines.flatMap(lambda line: [x for x in line.split() if 'USF' in x])
filtered_words = words.filter(lambda x: 'USF' in x )
filtered_words.collect()

[u"USF's", u'USF', u"USF's", u"USF's", u'USF', u'USF', u'USF', u'USF']

## Partition Based Operations


**Remember the Example**, lets say you have a dataset [1,2,3,4] split over 2 partitions:

    Partition 1 
    1, 2

    Partition 2
    3, 4

`map(f(x))` will do the following:

    Partition 1 
    f(1), f(2)

    Partition 2
    f(3), f(4)
    
Creates a connection & function per element, very computationally expensive. **What about doing it at a partition level ( level above)?**

This is done by creating a **iterator**. A iterator is a data structure that works like a cursor. It starts at the beginning of a list and only feeds the next element when the process is ready. Simplest python example is as follows:

`[0,1,2,3,4,5,6]` vs. `xrange(6)`


### Inclass example: parallelize numbers between 1 and 16. Calculate the count and sum in each partition.

In 4 partitions:

```python

[1,2] [3,4] [ 5,6], [7,8]

```


```python

(2,3) (2,7)

```




    


#### Let's make a function for map partioins()

lets make a sample numbers dataset for 16 numbers

In [80]:
numbers = sc.parallelize(range(1,17))
numbers.glom().collect()

[[1, 2], [3, 4], [5, 6], [7, 8], [9, 10], [11, 12], [13, 14], [15, 16]]

In [81]:
def count_sum(nums):
    
    # making our own list to hold two things
    # first the count
    # the sum
    count_sum = [0,0]
    
    for num in nums:
        # will count the quantity of floats
        count_sum[0] += 1
        # will sum the actual values of the floats
        count_sum[1] += num
        
    # returning a nested list
    return [count_sum]

#### the answer is for each partition

In [88]:
parse = numbers.mapPartitions(count_sum)
parse.collect()

[[2, 3], [2, 7], [2, 11], [2, 15], [2, 19], [2, 23], [2, 27], [2, 31]]

#### how to get the total sum overall partitions

In [93]:
total_count_sum = parse.reduce(lambda x,y: [x[0]+y[0],x[1]+y[1]])
total_count_sum

[16, 136]

## Run Spark program in a Python script (instead of notebooks)

Know this for the test. The submissions are going to be in .py

## Within your python file

#### import your libraries
```python
from pyspark import SparkConf, SparkContext
```

#### Set the configuration files. 

- **Appname** = the name program or 'job' when its sent to the cluster to be run.
- **local[\*]** = the cluster or program that you are connecting to. Would be a URL if connecting to a system

```python
conf = SparkConf().setMaster("local[*]").setAppName("AppName")
```

#### start your pyspark context (program envir)
```python
sc = SparkContext(conf = conf)

# when done call
sc.stop()
```

## At the command line

#### Check current setup

```bash
$echo $PYSPARK_DRIVER_PYTHON
jupyter

$echo $PYSPARK_DRIVER_PYTHON_OPTS
notebook

```

#### Unset your environment variables

```bash
$unset PYSPARK_DRIVER_PYTHON
$echo PYSPARK_DRIER_PYTHON

$unset PYSPARK_DRIVER_PYTHON_OPTS
$echo PYSPARK_DRIVER_PYTHON_OPTS
```

#### Run your standalone program
```bash
# too much output! but will print to screen
$spark-submit ex6.py

# will still print to console, but the ouput only will write to a file
$spark-submit ex6.py > output.txt
```

#### Reset the environment variables

```bash
$ nano ~/.bash_profile
```

#### Within your bash profile - reset the defaults back to jupyter notebook

```bash
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON="notebook"
```

### Review of HW1 assignment, and submission

Split by word, but if number, add the values together. Going over the assignment PDF

## RDD Operation-Transformations

- distinct()
- union()
- intersection()




## RDD Operation-Action

Compute on RDD, but return non-RDD answers. 

- only those covered in class will be on quiz (see below)
```python
reduce()
collect()
count()
```

others
```
fold()
aggregate()
```
**Example**:

```python
>>> num = [1, 2] [3, 4] [5, 6] [7, 8]
```

```python
>>> num.reduce(lambda x,y: x + y)
```

What happens under the hood

`[1 + 2] [3 + 4] [5 + 6] [7 + 8]`

`[x + y] [x + y] [x + y] [x + y]`

Reduces x's and y's by partition

`[3] [7] [11] [15]`

`[3]+[7] <-[11]<- [15]`



Then condenses (there's a lot of options here, so not guaranteed to be the exact process)

    `[10]<-[11]`

        `[21]<- [15]`

            `[36]`

** Fold example **

```python
>>> num.fold(0)(lambda x,y: x + y)
```



What happens under the hood, note the empty partition on the right. The fold(0) will give a default value and prevent a computation error

`[1 + 2] [3 + 4] [5 + 6] [7 + 8] []`

`[0+ x + y] [0 + x + y] [0 + x + y] [0 + x + y] [0]`

Reduces x's and y's by partition

`[3] [7] [11] [15] [0]`

`[3]+[7] <-[11]<- [15] <- [0]`



Then condenses

    `[10]<-[11]`

        `[21]<- [15]`

            `[36]`


** In class Example 4-1 calculate sum of odd nubmers **

In [94]:
numbers = sc.parallelize(range(1,17))
numbers.glom().collect()

[[1, 2], [3, 4], [5, 6], [7, 8], [9, 10], [11, 12], [13, 14], [15, 16]]

In [99]:
def getOdd(x):
    if x % 2 == 1:
        return x
    else:
        return 0

In [110]:
temp1 = numbers.map(lambda x: x if x %2==1 else 0) 
temp1 = numbers.filter(lambda x: x%2==1) 
temp2 = temp1.reduce(lambda x,y : x+y)
temp2

64

In [None]:
lambda x,y : 0 if x&2 == 0 else 0 if y

## RDD operation-actions (to be used with reduce)

Numeric RDD action Types

- count()
- collect()
- countByValue()
- top(n)
- take(n)
- first()
- takeSample()
- foreach()
- mean()
- sum()
- max()
- min()
- variance()
- stdev()

Sample for a RDD

```python
rdd_variable.sum()
rdd_variable.top()
rdd_variable.mean()
```

## Inclass exercise - the number of distinct values 

from a pedagogy file in the data folder

In [123]:
filepath = '/Users/tlee010/Desktop/github_repos/2017-msan694-example/Data/ignatian_pedagogy'

In [124]:
pedagog = sc.textFile(filepath)

In [132]:
words = pedagog.flatMap(lambda x : x.split())
summary = words.countByValue()
summary.items()[:10]

[(u'all', 3),
 (u'enrollment.', 1),
 (u'themes', 1),
 (u'religious', 2),
 (u'Today', 1),
 (u'relationships', 1),
 (u'young', 1),
 (u'to', 17),
 (u'Reflecting', 1),
 (u'discovering', 1)]

---

# Additional Exercises from the Slides

### Example 1-1 Load Text file and split line by space

In [93]:
filepath = '/Users/tlee010/Desktop/github_repos/2017-msan694-example/Data/ignatian_pedagogy'
lines = sc.textFile(filepath)
words = lines.map(lambda line : line.split())

# print sample results, will only print top 4 of every list 
results = words.glom().collect() # limited to 2 items for viewing ease
map( lambda x: x[:4],results[0])

[[u'=', u'Ignatian', u'Values', u'='],
 [u'The', u'University', u'of', u'San'],
 [u'***********************************************************************************'],
 [u'1.', u'Contemplative', u'in', u'Action'],
 [],
 [u'2.', u'Academic', u'Excellence', u'-'],
 [],
 [u'3.', u'Educating', u'the', u'Whole'],
 [],
 [u'4.', u'"Cura', u'Personalis"', u'-']]

### Example 1-2 Generate a list of words within one level structure

In [66]:
filepath = '/Users/tlee010/Desktop/github_repos/2017-msan694-example/Data/ignatian_pedagogy'
lines = sc.textFile(filepath)
words = lines.flatMap(lambda line : line.split())

# print sample results, will only print top 5 of every list 
results = words.glom().collect()
for x in map( lambda x: x[:5],results):
    print x

# or make flat with collect()
results = words.collect()
print '\n', results[:10]

[u'=', u'Ignatian', u'Values', u'=', u'The']
[u'5.', u'Women', u'and', u'Men', u'for']

[u'=', u'Ignatian', u'Values', u'=', u'The', u'University', u'of', u'San', u'Francisco', u'enjoys']


### Example 1-3 Find words USF

##### note that if we use `map` when breaking lines, it will filter which LIST has USF in the collection

In [67]:
filepath = '/Users/tlee010/Desktop/github_repos/2017-msan694-example/Data/ignatian_pedagogy'
lines = sc.textFile(filepath)
words = lines.map(lambda line : line.split())
usf_words = words.filter(lambda w: 'USF' in w)

# print sample results, will only print top 5 of every list 
results = words.glom().collect() # limited to 2 items for viewing ease
print '----------------- original split into words'
for x in map( lambda x: x[:4],results[0]):
    print x

# print sample results, will only print top 4 of every list 
results = usf_words.glom().collect() # limited to 2 items for viewing ease
print '----------------- only collections with USF'
for x in map( lambda x: x[:4],results[0]):
    print x

----------------- original split into words
[u'=', u'Ignatian', u'Values', u'=']
[u'The', u'University', u'of', u'San']
[u'***********************************************************************************']
[u'1.', u'Contemplative', u'in', u'Action']
[]
[u'2.', u'Academic', u'Excellence', u'-']
[]
[u'3.', u'Educating', u'the', u'Whole']
[]
[u'4.', u'"Cura', u'Personalis"', u'-']
----------------- only collections with USF
[u'2.', u'Academic', u'Excellence', u'-']


##### note that if we use `mapFlat` when breaking lines, it will filter WORDS in the LIST that have USF

In [54]:
filepath = '/Users/tlee010/Desktop/github_repos/2017-msan694-example/Data/ignatian_pedagogy'
lines = sc.textFile(filepath)
words = lines.flatMap(lambda line : line.split())
usf_words = words.filter(lambda w: 'USF' in w)

usf_words.glom().collect()

[[u"USF's", u'USF', u"USF's"], [u"USF's", u'USF', u'USF', u'USF', u'USF']]

### Example 2: Parallelize numbers between 1 and 16

Calculate the count and sum in each partition

In [75]:
numbers = sc.parallelize(range(1,17),4)
numbers.glom().collect()

[[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]]

##### instead, if we return a 1 value list, we can get sums by partition

In [94]:
results = numbers.mapPartitions(lambda x: [sum(x)])
results.collect()

[10, 26, 42, 58]

In [76]:
numbers.reduce(lambda x,y: x + y)

136

### Example 3-1 
Find distinct words in ignatian pedagogy

In [100]:
filepath = '/Users/tlee010/Desktop/github_repos/2017-msan694-example/Data/ignatian_pedagogy'
lines = sc.textFile(filepath)
words = lines.flatMap(lambda x: x.split())
distinct31 = words.distinct()
distinct.collect()[:20]

[u'when',
 u'R,',
 u'including',
 u'computation',
 u'using:',
 u'guidance',
 u'Scala,',
 u'environment',
 u'only',
 u'rich',
 u'Apache',
 u'sc.parallelize(range(1000)).count()',
 u'Building',
 u'And',
 u'guide,',
 u'return',
 u'Please',
 u'[Eclipse](https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#UsefulDeveloperTools-Eclipse)',
 u'Try',
 u'not']

### Example 3-2
Create a flatmap of distinct words from “README.md”

In [101]:
filepath = '/Users/tlee010/Desktop/github_repos/2017-msan694-example/Data/README.md'
lines = sc.textFile(filepath)
words = lines.flatMap(lambda x: x.split())
distinct32 = words.distinct()
distinct.collect()[:20]

[u'when',
 u'R,',
 u'including',
 u'computation',
 u'using:',
 u'guidance',
 u'Scala,',
 u'environment',
 u'only',
 u'rich',
 u'Apache',
 u'sc.parallelize(range(1000)).count()',
 u'Building',
 u'And',
 u'guide,',
 u'return',
 u'Please',
 u'[Eclipse](https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#UsefulDeveloperTools-Eclipse)',
 u'Try',
 u'not']

### Example 3-3
What is union, intersection, subtract and cartesian product of the sets from Example 3-1 and Example 3-2?

#### union: joining 2 sets together

In [107]:
distinct32.union(distinct31).collect()[:10]

[u'when',
 u'R,',
 u'including',
 u'computation',
 u'using:',
 u'guidance',
 u'Scala,',
 u'environment',
 u'only',
 u'rich']

#### intesection: only the common

In [106]:
distinct32.intersection(distinct31).collect()[:10]

[u'through',
 u'including',
 u'This',
 u'and',
 u'from',
 u'building',
 u'with',
 u'this',
 u'a',
 u'to']

#### subtract: differences

In [108]:
distinct32.subtract(distinct31).collect()[:10]

[u'when',
 u'environment',
 u'Apache',
 u'Building',
 u'IDE,',
 u'return',
 u'Please',
 u'[Eclipse](https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#UsefulDeveloperTools-Eclipse)',
 u'"yarn"',
 u'"local"']

#### cartesian multiply words

In [109]:
distinct32.cartesian(distinct31).collect()[:10]

[(u'when', u'1981,'),
 (u'when', u'all'),
 (u'when', u'just'),
 (u'when', u'Father'),
 (u'when', u'actions'),
 (u'when', u'discovered'),
 (u'when', u'schools'),
 (u'when', u'including'),
 (u'when', u'ecumenical'),
 (u'when', u'human')]

### Exercise 4-1
For the numbers between 1 and 9, calculate sum of the odd numbers.

In [127]:
numbers = sc.parallelize(range(1,10))
oddnum = numbers.filter(lambda x : x%2==1)
print oddnum.collect()
oddnum.reduce(lambda x,y:x+y)

[1, 3, 5, 7, 9]


25

### Exercise 4-2
For the numbers between 1 and 9, calculate sum of the odd numbers using fold().

In [132]:
oddnum.collect()

[1, 3, 5, 7, 9]

In [128]:
oddnum.reduce(lambda x,y: x+y)

25

In [133]:
oddnum.fold(1,lambda x,y: x+y)

34

### Exercise 4-3

Using aggregate(), return (sum, # of elements) of odd numbers.

In [134]:
oddnum.collect()

[1, 3, 5, 7, 9]

In [135]:
oddnum.aggregate((0,0),(lambda x,y: (x[0]+1,x[1]+y)),(lambda x,y : (x[0]+y[0],x[1]+y[1])))

(5, 25)

### Example 5-1
Try collect(), count(), countByValue(), top(n), take(n), first(), takeSample() operations on z.

In [145]:
x = sc.parallelize([3,4,1,2])
y = sc.parallelize(range(2,6))
z = x.union(y)
print z.glom().collect()
z.collect()

[[], [3], [], [4], [], [1], [], [2], [], [2], [], [3], [], [4], [], [5]]


[3, 4, 1, 2, 2, 3, 4, 5]

In [140]:
z.count()

8

In [141]:
z.countByValue()

defaultdict(int, {1: 1, 2: 2, 3: 2, 4: 2, 5: 1})

In [142]:
z.top(4)

[5, 4, 4, 3]

In [144]:
z.take(5)

[3, 4, 1, 2, 2]

In [146]:
z.first()

3

In [150]:
z.takeSample(False,6,1)

[3, 1, 2, 4, 5, 2]