# RDD Transformations and Actions

Create an example file:

In [2]:
%%writefile example2.txt
first
second
the third line
then a fourth line

Overwriting example2.txt


In [3]:
from pyspark import SparkContext

In [4]:
sc = SparkContext()

In [5]:
# Show the RDD
sc.textFile('example2.txt')

example2.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [6]:
# Create a reference to the RDD
text_rdd = sc.textFile('example2.txt')

We will now map a function to each line and collect the results:

In [7]:
# Transformation that splits every line to a list of words
words = text_rdd.map(lambda line: line.split())

Notice the difference between words.collect() and text_rdd.collect()

In [8]:
words.collect()

[['first'],
 ['second'],
 ['the', 'third', 'line'],
 ['then', 'a', 'fourth', 'line']]

In [9]:
text_rdd.collect()

['first', 'second', 'the third line', 'then a fourth line']

`map()` vs. `flatMap()`

In [10]:
# Combine the steps in lines 7 and 9
text_rdd.flatMap(lambda line: line.split()).collect()

['first', 'second', 'the', 'third', 'line', 'then', 'a', 'fourth', 'line']

We can see that `flatMap()` collects everything as a single list

We will now look at key value RDD pairs:

In [11]:
%%writefile services.txt
#EventId    Timestamp    Customer   State    ServiceID    Amount
201       10/13/2017      100       NY       131          100.00
204       10/18/2017      700       TX       129          450.00
202       10/15/2017      203       CA       121          200.00
206       10/19/2017      202       CA       131          500.00
203       10/17/2017      101       NY       173          750.00
205       10/19/2017      202       TX       121          200.00

Overwriting services.txt


In [12]:
services = sc.textFile('services.txt')

In [13]:
# Get the top 2 lines of the RDD
services.take(2)

['#EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00']

In [14]:
# Split the lines
services.map(lambda line: line.split()).take(3)

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00']]

In [15]:
# Remove the '#'
clean = services.map(lambda line: line[1:] if line[0] == '#' else line)
clean.collect()

['EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00',
 '204       10/18/2017      700       TX       129          450.00',
 '202       10/15/2017      203       CA       121          200.00',
 '206       10/19/2017      202       CA       131          500.00',
 '203       10/17/2017      101       NY       173          750.00',
 '205       10/19/2017      202       TX       121          200.00']

In [16]:
clean = clean.map(lambda line: line.split())

In [17]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

Practice how to grab fields

In [18]:
# Get only the State and Amount fields
pairs = clean.map(lambda lst: (lst[3], lst[-1]))
pairs.collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [23]:
# reduceByKey() takes a lambda expression (assumes the data is already in tuple form).
# In order to use reduceByKey(), you need to have some sort of key-value pair.
# It assumes that the first item in each tuple is a key, and performs the lambda expression as a reduction on the
# second item.
reKey = pairs.reduceByKey(lambda amt1, amt2: amt1 + amt2)
reKey.collect()

[('State', 'Amount'),
 ('NY', '100.00750.00'),
 ('TX', '450.00200.00'),
 ('CA', '200.00500.00')]

Problem: The second item in each tuple is still seen as a string.

Solution: Use `float()`

In [24]:
reKey = pairs.reduceByKey(lambda amt1, amt2: float(amt1) + float(amt2))
reKey.collect()

[('State', 'Amount'), ('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

Sort the key-value pairs:

In [25]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [31]:
# Grab (State, Amount) as a tuple
step1 = clean.map(lambda lst: (lst[3], lst[-1]))

# Reduce by key
step2 = step1.reduceByKey(lambda amt1, amt2: float(amt1) + float(amt2))

# Get rid of header tuple
step3 = step2.filter(lambda x: not x[0] == 'State')

# Sort results by Amount
step4 = step3.sortBy(lambda stAmount: stAmount[1], ascending=False)

# ACTION
step4.collect()

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]

Take advantage of tuple unpacking to enhance readability:

In [32]:
x = ['ID', 'State', 'Amount']

In [33]:
# Define a function instead of using a lambda function
def func1(lst):
    return lst[-1]

In [34]:
# Use tuple unpacking
def func2(id_st_amt):
    (id, st, amt) = id_st_amt
    return amt

In [35]:
func1(x)

'Amount'

In [36]:
func2(x)

'Amount'

`func2()`'s definition is more readable than `func1()`'s definition because it uses tuple unpacking to grab the last index of a list and give it an explicit name. However, `func1()` just returns the last index of a list, but we may forget which specific field that is when we come back to it later.