 ## RDD transformations and actions

In [1]:
%%writefile example2.txt
first
second line 
the third line 
then a fourth line

Writing example2.txt


In [2]:
from pyspark import SparkContext

In [3]:
sc = SparkContext.getOrCreate()

In [4]:
sc.textFile('example2.txt')

example2.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [5]:
text_rdd = sc.textFile('example2.txt')

In [7]:
words = text_rdd.map(lambda line: line.split())

In [8]:
words.collect()

[[u'first'],
 [u'second', u'line'],
 [u'the', u'third', u'line'],
 [u'then', u'a', u'fourth', u'line']]

In [9]:
text_rdd.flatMap(lambda line:line.split()).collect()

[u'first',
 u'second',
 u'line',
 u'the',
 u'third',
 u'line',
 u'then',
 u'a',
 u'fourth',
 u'line']

In [10]:
%%writefile services.txt
#EventId    Timestamp    Customer   State    ServiceID    Amount
201       10/13/2017      100       NY       131          100.00
204       10/18/2017      700       TX       129          450.00
202       10/15/2017      203       CA       121          200.00
206       10/19/2017      202       CA       131          500.00
203       10/17/2017      101       NY       173          750.00
205       10/19/2017      202       TX       121          200.00

Writing services.txt


In [11]:
services = sc.textFile('services.txt')

In [12]:
services

services.txt MapPartitionsRDD[8] at textFile at NativeMethodAccessorImpl.java:0

In [13]:
services.take(2)

[u'#EventId    Timestamp    Customer   State    ServiceID    Amount',
 u'201       10/13/2017      100       NY       131          100.00']

### some data cleaning is reaquired since it is 1 string per line 

In [16]:
services.map(lambda line: line.split()).take(3)

[[u'#EventId', u'Timestamp', u'Customer', u'State', u'ServiceID', u'Amount'],
 [u'201', u'10/13/2017', u'100', u'NY', u'131', u'100.00'],
 [u'204', u'10/18/2017', u'700', u'TX', u'129', u'450.00']]

### now we have separate items instead of an entire line of string :: Good Job !! 

### removing the hashtags also 

In [18]:
clean = services.map(lambda line: line[1:] if line[0]=='#' else line)

In [20]:
clean = clean.map(lambda line: line.split())

In [21]:
clean.collect()

[[u'EventId', u'Timestamp', u'Customer', u'State', u'ServiceID', u'Amount'],
 [u'201', u'10/13/2017', u'100', u'NY', u'131', u'100.00'],
 [u'204', u'10/18/2017', u'700', u'TX', u'129', u'450.00'],
 [u'202', u'10/15/2017', u'203', u'CA', u'121', u'200.00'],
 [u'206', u'10/19/2017', u'202', u'CA', u'131', u'500.00'],
 [u'203', u'10/17/2017', u'101', u'NY', u'173', u'750.00'],
 [u'205', u'10/19/2017', u'202', u'TX', u'121', u'200.00']]

### grabbing field

In [23]:
clean.map(lambda lst: (lst[3],lst[-1])).collect()

[(u'State', u'Amount'),
 (u'NY', u'100.00'),
 (u'TX', u'450.00'),
 (u'CA', u'200.00'),
 (u'CA', u'500.00'),
 (u'NY', u'750.00'),
 (u'TX', u'200.00')]

In [24]:
pairs = clean.map(lambda lst: (lst[3],lst[-1]))

In [30]:
# reduceByKey automatically assumes that we will use the first col as key
# works like group by 
# reduceByKey needs tuples type data format to work with 
rekey = pairs.reduceByKey(lambda amt1,amt2: float(amt1) + float(amt2))

In [31]:
rekey.collect()

[(u'State', u'Amount'), (u'CA', 700.0), (u'TX', 650.0), (u'NY', 850.0)]

In [32]:
clean.collect()

[[u'EventId', u'Timestamp', u'Customer', u'State', u'ServiceID', u'Amount'],
 [u'201', u'10/13/2017', u'100', u'NY', u'131', u'100.00'],
 [u'204', u'10/18/2017', u'700', u'TX', u'129', u'450.00'],
 [u'202', u'10/15/2017', u'203', u'CA', u'121', u'200.00'],
 [u'206', u'10/19/2017', u'202', u'CA', u'131', u'500.00'],
 [u'203', u'10/17/2017', u'101', u'NY', u'173', u'750.00'],
 [u'205', u'10/19/2017', u'202', u'TX', u'121', u'200.00']]

In [35]:
#grabbing states and amounts in the form of a tuple
step1 = clean.map(lambda lst: (lst[3],lst[-1]))

In [43]:
step1.collect()

[(u'State', u'Amount'),
 (u'NY', u'100.00'),
 (u'TX', u'450.00'),
 (u'CA', u'200.00'),
 (u'CA', u'500.00'),
 (u'NY', u'750.00'),
 (u'TX', u'200.00')]

In [46]:
step2 = step1.reduceByKey(lambda amt1,amt2: float(amt1)+float(amt2))

In [47]:
#Getting rid of the titles / headers 
step3 = step2.filter(lambda x: not x[0]=='State')

In [48]:
step4 = step3.sortBy(lambda stAmount: stAmount[1],ascending=False)

In [49]:
step4.collect()

[(u'NY', 850.0), (u'CA', 700.0), (u'TX', 650.0)]

### Tuple unpacking

In [50]:
x = ['ID','State','Amount']

In [51]:
def func1(lst):
    return lst[-1]

In [54]:
def func2(id_st_amt):
    #unpcking the tuple values
    (Id,st,amt) = id_st_amt # Improves the readability
    return amt

In [55]:
func2(x)

'Amount'