# RDD Transformations and Actions

+ [Important Terms](#terms)
+ [Creating an RDD](#creating)
+ [RDD Transformations](#transformation)
+ [RDD Actions](#actions)

## <a name='terms'>Important Terms</a>

Let's quickly go over some important terms:

Term                   |Definition
----                   |-------
RDD                    |Resilient Distributed Dataset
Transformation         |Spark operation that produces an RDD
Action                 |Spark operation that produces a local object
Spark Job              |Sequence of transformations on data with a final action

## <a name='creating'>Creating an RDD</a>

There are two common ways to create an RDD:

Method                      |Result
----------                               |-------
`sc.parallelize(array)`                  |Create RDD of elements of array (or list)
`sc.textFile(path/to/file)`                      |Create RDD of lines from file

## <a name='transformation'> RDD Transformations</a>

We can use transformations to create a set of instructions we want to preform on the RDD (before we call an action and actually execute them).

Transformation Example                          |Result
----------                               |-------
`filter(lambda x: x % 2 == 0)`           |Discard non-even elements
`map(lambda x: x * 2)`                   |Multiply each RDD element by `2`
`map(lambda x: x.split())`               |Split each string into words
`flatMap(lambda x: x.split())`           |Split each string into words and flatten sequence
`sample(withReplacement=True,0.25)`      |Create sample of 25% of elements with replacement
`union(rdd)`                             |Append `rdd` to existing RDD
`distinct()`                             |Remove duplicates in RDD
`sortBy(lambda x: x, ascending=False)`   |Sort elements in descending order

## <a name='actions'> RDD Actions</a>

Once you have your 'recipe' of transformations ready, what you will do next is execute them by calling an action. Here are some common actions:

Action                             |Result
----------                             |-------
`collect()`                            |Convert RDD to in-memory list 
`take(3)`                              |First 3 elements of RDD 
`top(3)`                               |Top 3 elements of RDD
`takeSample(withReplacement=True,3)`   |Create sample of 3 elements with replacement
`sum()`                                |Find element sum (assumes numeric elements)
`mean()`                               |Find element mean (assumes numeric elements)
`stdev()`                              |Find element deviation (assumes numeric elements)

-------
--------

# Examples

# Creating RDD from text file

In [1]:
%%writefile example2.txt
first
second line
the third line
then a fourth line

Overwriting example2.txt


In [2]:
from pyspark import SparkContext

In [3]:
sc = SparkContext()

In [4]:
# Show RDD
sc.textFile('example2.txt')

example2.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [5]:
# Save a reference to this RDD
text_rdd = sc.textFile('example2.txt')

-------

# Map function

In [6]:
# Map a function (or lambda expression) to each line
# Then collect the results.
words = text_rdd.map(lambda line: line.split())   #just lazily waiting

In [7]:
words.collect()  # now actually performing action

[['first'],
 ['second', 'line'],
 ['the', 'third', 'line'],
 ['then', 'a', 'fourth', 'line']]

In [8]:
text_rdd.collect()

['first', 'second line', 'the third line', 'then a fourth line']

Notice the difference between collecting action on original text_rdd and words.

-------

# Map vs flatMap

In [9]:
# Collect everything as a single flat map
text_rdd.flatMap(lambda line: line.split()).collect()

['first',
 'second',
 'line',
 'the',
 'third',
 'line',
 'then',
 'a',
 'fourth',
 'line']

Now when flat, the splitted words are flatten into one single array when collected.

-----

# RDDs and Key Value Pairs

Now that we've worked with RDDs and how to aggregate values with them, we can begin to look into working with Key Value Pairs. In order to do this, let's create some fake data as a new text file.

This data represents some services sold to customers for some SAAS business.

In [10]:
%%writefile services.txt
#EventId    Timestamp    Customer   State    ServiceID    Amount
201       10/13/2017      100       NY       131          100.00
204       10/18/2017      700       TX       129          450.00
202       10/15/2017      203       CA       121          200.00
206       10/19/2017      202       CA       131          500.00
203       10/17/2017      101       NY       173          750.00
205       10/19/2017      202       TX       121          200.00

Overwriting services.txt


In [11]:
services = sc.textFile('services.txt')

## Exploring the file data

In [12]:
# take top 2 lines of RDD
services.take(2)

['#EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00']

In [13]:
services.map(lambda line: line.split())

PythonRDD[9] at RDD at PythonRDD.scala:53

In [14]:
services.map(lambda line: line.split()).take(3)

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00']]

### We want to remove `#` from the EventId.

Let's remove that first hash-tag!

In [15]:
services.map(lambda line: line[1:] if line[0]=='#' else line).collect() # skip the first index if the char is hash

['EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00',
 '204       10/18/2017      700       TX       129          450.00',
 '202       10/15/2017      203       CA       121          200.00',
 '206       10/19/2017      202       CA       131          500.00',
 '203       10/17/2017      101       NY       173          750.00',
 '205       10/19/2017      202       TX       121          200.00']

In [16]:
clean = services.map(lambda line: line[1:] if line[0]=='#' else line)

In [17]:
clean = clean.map(lambda line: line.split())

In [18]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

-------


## Using Key Value Pairs for Operations

Let us now begin to use methods that combine lambda expressions that use a `ByKey argument`. These ByKey methods will assume that your data is in a Key,Value form. 


For example let's find out the total sales per state: 

In [19]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

## We want to know TotalSales in per State
+ we will grab State and Amount first

In [20]:
pairs = clean.map(lambda lst: (lst[3], lst[5]))

In [21]:
pairs.collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

### `ReduceByKey` automatically assume first column as key

In [22]:
reduce_key = pairs.reduceByKey(lambda amt1, amt2: amt1+amt2)

In [23]:
reduce_key.collect()

[('State', 'Amount'),
 ('NY', '100.00750.00'),
 ('TX', '450.00200.00'),
 ('CA', '200.00500.00')]

However in our first try, `amt1` and `amt2` are added together, but as string instead.

So it is not working correctly yet, but going in right direction.

#### Now parsing values to float

In [24]:
reduce_key = pairs.reduceByKey(lambda amt1, amt2: float(amt1)+float(amt2))

In [25]:
reduce_key.collect()

[('State', 'Amount'), ('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

On our second try, values are adding up correctly.

-------

### Steps Summary

In [29]:
# Grab (State, Amount)
step1 = clean.map(lambda lst: (lst[3], lst[5]))

# Reduce by Key
step2 = step1.reduceByKey(lambda amt1, amt2: float(amt1)+float(amt2))

# Remove the header line
step3 = step2.filter(lambda x: not x[0]=='State') #check the first item of tuple

# Sort the results by Amount
step4 = step3.sortBy(lambda state_with_amount: state_with_amount[1], ascending=False) # sort by the first item of tuple which is Amount

# ACTION
step4.collect()

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]

### One liner for above same code

In [30]:
clean.map(lambda lst: (lst[3],lst[-1]))\
                .reduceByKey(lambda amt1,amt2 : float(amt1)+float(amt2))\
                .filter(lambda x: not x[0]=='State')\
                .sortBy(lambda stateAmount: stateAmount[1], ascending=False)\
                .collect()

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]

-------

### Using Tuple Unpacking for readability

In [31]:
x = ['ID', 'State', 'Amount']

In [32]:
def func1(lst):
    return lst[-1]

For the above func1, we may not remember which is `-1` , why we use that index if we revisit the code after some time.

In [36]:
def func2(id_st_amt):
    # unpack tuple
    (id, state, amt) = id_st_amt
    return amt

In [34]:
func1(x)

'Amount'

In [37]:
func2(x)

'Amount'