# Spark and Python with RDD Transformations and Actions

Delving deeper into using Spark and Python.

## Important Terms

Some important terms:

Term                   |Definition
----                   |-------
RDD                    |Resilient Distributed Dataset
Transformation         |Spark operation that produces an RDD
Action                 |Spark operation that produces a local object
Spark Job              |Sequence of transformations on data with a final action

## Creating an RDD

There are two common ways to create an RDD:

Method                      |Result
----------                               |-------
`sc.parallelize(array)`                  |Create RDD of elements of array (or list)
`sc.textFile(path/to/file)`                      |Create RDD of lines from file

## RDD Transformations

Transformations can be used to create a set of instructions to be preformed on the RDD (before an action is called and actually they are executed).

Transformation Example                          |Result
----------                               |-------
`filter(lambda x: x % 2 == 0)`           |Discard non-even elements
`map(lambda x: x * 2)`                   |Multiply each RDD element by `2`
`map(lambda x: x.split())`               |Split each string into words
`flatMap(lambda x: x.split())`           |Split each string into words and flatten sequence
`sample(withReplacement=True,0.25)`      |Create sample of 25% of elements with replacement
`union(rdd)`                             |Append `rdd` to existing RDD
`distinct()`                             |Remove duplicates in RDD
`sortBy(lambda x: x, ascending=False)`   |Sort elements in descending order

## RDD Actions

Once the 'recipe' of transformations is ready, the next step is to executing them by calling an action. Here are some common actions:

Action                             |Result
----------                             |-------
`collect()`                            |Convert RDD to in-memory list 
`take(3)`                              |First 3 elements of RDD 
`top(3)`                               |Top 3 elements of RDD
`takeSample(withReplacement=True,3)`   |Create sample of 3 elements with replacement
`sum()`                                |Find element sum (assumes numeric elements)
`mean()`                               |Find element mean (assumes numeric elements)
`stdev()`                              |Find element deviation (assumes numeric elements)

## Examples with RDDs

#### Creating a text file:

In [1]:
%%writefile text_file_example2.txt
first 
second line
the third line
then a fourth line

Overwriting text_file_example2.txt


### Performing some transformations and actions on this text file:

In [2]:
from pyspark import SparkContext

In [3]:
sc1 = SparkContext()

#### Showing RDD:

In [4]:
sc1.textFile('text_file_example2.txt')

text_file_example2.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

#### Saving a reference to this RDD:

In [5]:
textRDD = sc1.textFile('text_file_example2.txt')

### Mapping a function (or lambda expression) to each line. Then collecting the results:

In [12]:
# Transformation
words = textRDD.map(lambda line: line.split())

In [13]:
# Action
words.first()

['first']

In [14]:
# Action
words.take(3)

[['first'], ['second', 'line'], ['the', 'third', 'line']]

In [15]:
# Action on original text
textRDD.take(3)

['first ', 'second line', 'the third line']

### Map vs. flatMap:

In [17]:
# Transformation
wordsFlat = textRDD.flatMap(lambda line: line.split())

In [20]:
# Action
wordsFlat.take(7)

['first', 'second', 'line', 'the', 'third', 'line', 'then']

### RDDs and Key Value Pairs:

Now that I've worked with RDDs and how to aggregate values with them, I can begin to look into working with Key-Value Pairs. In order to do this, I create some fake data in a new text file.

This data represents some services sold to customers for some SAAS business.

#### Creating a new text file:

In [21]:
%%writefile services.txt
#EventId    Timestamp    Customer   State    ServiceID    Amount
201       10/13/2017      100       NY       131          100.00
204       10/18/2017      700       TX       129          450.00
202       10/15/2017      203       CA       121          200.00
206       10/19/2017      202       CA       131          500.00
203       10/17/2017      101       NY       173          750.00
205       10/19/2017      202       TX       121          200.00

Writing services.txt


In [22]:
services = sc1.textFile('services.txt')

When dealing with a complicated text file as the above one, there needs to be doing some sort of manipulation and cleaning.

In [23]:
# Action: taking the first two lines of the RDD
services.take(2)

['#EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00']

In [25]:
# Transformation: splitting it up
servicesSplit = services.map(lambda line: line.split())

In [26]:
# Action: 
servicesSplit.take(3)

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00']]

#### removing the hashtag with RDD Transformation/Action methodology:

In [30]:
# Transformation
# check the first item of every splitted string. If it starts with '#', grab everything beyond it, else return 
# the entire string
servicesSplitWithoutHashtag = services.map(lambda line: line[1:] if line[0] == '#' else line)

In [31]:
# Action
servicesSplitWithoutHashtag.take(3)

['EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00',
 '204       10/18/2017      700       TX       129          450.00']

In [32]:
# Transformation
servicesCleaned = servicesSplitWithoutHashtag.map(lambda line: line.split())

In [33]:
# Action
servicesCleaned.take(4)

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00']]

### Using Key Value Pairs for Operations

Using methods that combine lambda expressions that use a *ByKey* argument. These *ByKey* methods will assume that the data is in a Key/Value form.

As an example here, I find out the total sales per state:

#### grabbing fields (e.g., here --> Seeing how many sales in amounts column are for each state):

STEP 1:

In [35]:
# Transformation
# grabbing the right columns from servicesCleaned RDD:
# taking the 3rd element and the last (5th) element of each line list in a tuple
statesAmounts = servicesCleaned.map(lambda lst: (lst[3], lst[5])) 
# or: servicesCleaned.map(lambda lst: (lst[3], lst[-1])) 

In [40]:
statesAmounts.take(7)

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

STEP 2:

*instead of using the groupby, the RDDs use the **reduceByKey** method to GroupBy objects. It assumes that the first item is the key of the tuples form that was created earlier, and the second is the value.*

In [41]:
# Transformation
# I use also float(), so that the amount values will be converted from strings to floats
amountByState = statesAmounts.reduceByKey(lambda amt1, amt2: float(amt1) + float(amt2))

In [42]:
amountByState.take(5)

[('State', 'Amount'), ('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

STEP 3:

*getting rid of the header --> ('State', 'Amount')*

In [44]:
# Transformation
# getting rid of a tuple item if the first element of that item is equal to the string 'State'
amountByStateCleaned = amountByState.filter(lambda x: not x[0] == 'State')

In [45]:
amountByStateCleaned.take(5)

[('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

STEP 5:

*sorting results by Amount*

In [53]:
# Transformation
# grab the second item (i.e. index 1), and make an descending order
sortedAmounts = amountByStateCleaned.sortBy(lambda stAmount: stAmount[1], ascending=False)

In [54]:
# Action
sortedAmounts.take(3)

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]

###  It is a good idea to use unpacking or tuple unpacking for readability. For example:

In [55]:
# here I use indexing. I make a list:
x = ['ID','State','Amount']

In [56]:
# instead of lambda expressions, functions can also be used that helps readability:
def func1(lst):
    return lst[-1]

In [58]:
def func2(id_st_amt):
    # unpacking the values
    (Id, st, amt) = id_st_amt
    return amt

In [60]:
func1(x)

'Amount'

In [61]:
func2(x)

'Amount'