# RDD Transformations and Actions

## Important Terms

Let's quickly go over some important terms:

Term                   |Definition
----                   |-------
RDD                    |Resilient Distributed Dataset
Transformation         |Spark operation that produces an RDD
Action                 |Spark operation that produces a local object
Spark Job              |Sequence of transformations on data with a final action

## Creating an RDD

There are two common ways to create an RDD:

Method                      |Result
----------                               |-------
`sc.parallelize(array)`                  |Create RDD of elements of array (or list)
`sc.textFile(path/to/file)`                      |Create RDD of lines from file

## RDD Transformations

We can use transformations to create a set of instructions we want to preform on the RDD (before we call an action and actually execute them).

Transformation Example                          |Result
----------                               |-------
`filter(lambda x: x % 2 == 0)`           |Discard non-even elements
`map(lambda x: x * 2)`                   |Multiply each RDD element by `2`
`map(lambda x: x.split())`               |Split each string into words
`flatMap(lambda x: x.split())`           |Split each string into words and flatten sequence
`sample(withReplacement=True,0.25)`      |Create sample of 25% of elements with replacement
`union(rdd)`                             |Append `rdd` to existing RDD
`distinct()`                             |Remove duplicates in RDD
`sortBy(lambda x: x, ascending=False)`   |Sort elements in descending order

## RDD Actions

Once you have your 'recipe' of transformations ready, what you will do next is execute them by calling an action. Here are some common actions:

Action                             |Result
----------                             |-------
`collect()`                            |Convert RDD to in-memory list 
`take(3)`                              |First 3 elements of RDD 
`top(3)`                               |Top 3 elements of RDD
`takeSample(withReplacement=True,3)`   |Create sample of 3 elements with replacement
`sum()`                                |Find element sum (assumes numeric elements)
`mean()`                               |Find element mean (assumes numeric elements)
`stdev()`                              |Find element deviation (assumes numeric elements)

### Creating an RDD from a text file

#### Creating the textfile

In [1]:
%%writefile example2.txt
first 
second line
the third line
then a fourth line

Overwriting example2.txt


In [2]:
# Performing transformations and actions on this text file

In [3]:
# Importing SparkContext

In [4]:
from pyspark import SparkContext

In [5]:
# Creating an object of SparkContext

In [6]:
sc = SparkContext()

In [7]:
# Show RDD

In [8]:
sc.textFile('example.txt')

example.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2

In [9]:
# Save a reference to this RDD

In [10]:
text_rdd = sc.textFile('example2.txt')

In [11]:
# Map a function (or lambda expression) to each line and collecting the results

In [12]:
words = text_rdd.map(lambda line: line.split())

In [13]:
words.collect()

[['first'],
 ['second', 'line'],
 ['the', 'third', 'line'],
 ['then', 'a', 'fourth', 'line']]

In [14]:
text_rdd.collect()

['first ', 'second line', 'the third line', 'then a fourth line']

#### Map vs FlatMap

In [15]:
text_rdd.flatMap(lambda line: line.split()).collect()

['first',
 'second',
 'line',
 'the',
 'third',
 'line',
 'then',
 'a',
 'fourth',
 'line']

### RDDs and Key Value Pairs

In [16]:
# Creating fake data

In [17]:
%%writefile services.txt
#EventId    Timestamp    Customer   State    ServiceID    Amount
201       10/13/2017      100       NY       131          100.00
204       10/18/2017      700       TX       129          450.00
202       10/15/2017      203       CA       121          200.00
206       10/19/2017      202       CA       131          500.00
203       10/17/2017      101       NY       173          750.00
205       10/19/2017      202       TX       121          200.00

Overwriting services.txt


In [18]:
services = sc.textFile('services.txt')

In [19]:
services.take(2)

['#EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00']

In [20]:
services.map(lambda line:line.split()).take(3)

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00']]

In [21]:
# Changing the name of the column from "#EventId" to "EventId"

In [22]:
clean = services.map(lambda line: line[1:] if line[0]=="#" else line)

In [23]:
clean = clean.map(lambda line: line.split())

In [24]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

### Using Key Value Pairs for Operations

#### 1. Find out Total Sales Per State

In [25]:
pairs = clean.map(lambda lst: (lst[3],lst[-1]))

In [26]:
pairs.collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [27]:
rekey = pairs.reduceByKey(lambda amt1,amt2: float(amt1) + float(amt2))

In [28]:
rekey.collect()

[('State', 'Amount'), ('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

Important Note: In order to use reduceByKey() you need your data in tuple format

#### Now repeating all the above code in step-by-step format

In [30]:
# Step 1: Grab the State and Amount and bring them in tuple format

In [31]:
step1 = clean.map(lambda lst: (lst[3],lst[-1]))

In [32]:
# Step 2: Use reduceByKey()

In [33]:
step2 = step1.reduceByKey(lambda amt1,amt2: float(amt1) + float(amt2))

In [34]:
# Step 3: Get rid of State, Amount titles

In [35]:
step3 = step2.filter(lambda x: not x[0]=='State')

In [36]:
# Step 4: Sort results by amount

In [37]:
step4 = step3.sortBy(lambda stAmount: stAmount[1],ascending=False)

In [38]:
# Step 5: Perform the action

In [39]:
step4.collect()

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]

#### Using unpacking for readability

In [40]:
x = ['ID','State','Amount']

In [41]:
def func1(lst):
    return lst[-1]

In [42]:
def func2(id_st_amt):
    # Unpack Values
    (Id,st,amt) = id_st_amt
    return amt

In [43]:
func1(x)

'Amount'

In [44]:
func2(x)

'Amount'