### RDD(Resilient Distributed Database)

In [177]:
from pyspark import SparkConf
from pyspark.context import SparkContext
#local becasue its running on my local machine else "yarn" should be passed in case of cluster
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))


In [178]:
%%writefile examplerdd.txt
Line First
Second Line
--NA---
Third Line




Overwriting examplerdd.txt


In [179]:
# Donot write this line with above line
text_rdd=sc.textFile('examplerdd.txt')
text_rdd.collect()
# text_rdd.collect() only prints the whole content as string

['Line First', 'Second Line', '--NA---', 'Third Line', '', '']

In [180]:
#map >> list of arrays in an array
words=text_rdd.map(lambda line:line.split())
words.collect()


[['Line', 'First'], ['Second', 'Line'], ['--NA---'], ['Third', 'Line'], [], []]

In [181]:
#flat map >>> list of words in a single array
text_rdd.flatMap(lambda line:line.split()).collect()

['Line', 'First', 'Second', 'Line', '--NA---', 'Third', 'Line']

In [182]:
%%writefile services.txt
#EventId    Timestamp    Customer   State    ServiceID    Amount
201       10/13/2017      100       NY       131          100.00
204       10/18/2017      700       TX       129          450.00
202       10/15/2017      203       CA       121          200.00
206       10/19/2017      202       CA       131          500.00
203       10/17/2017      101       NY       173          750.00
205       10/19/2017      202       TX       121          200.00

Overwriting services.txt


In [183]:
services=sc.textFile('services.txt')
services.take(2)

['#EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00']

In [184]:
clean1=services.map(lambda line:line[1:] if line[0]=='#' else line)
clean1.collect()

['EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00',
 '204       10/18/2017      700       TX       129          450.00',
 '202       10/15/2017      203       CA       121          200.00',
 '206       10/19/2017      202       CA       131          500.00',
 '203       10/17/2017      101       NY       173          750.00',
 '205       10/19/2017      202       TX       121          200.00']

In [185]:

clean=clean1.map(lambda line:line.split())
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [186]:
pairs=clean.map(lambda line:(line[3],line[5]))
#pairs=clean.map(lambda line:(line[3],line[-1]))

pairs.collect()


[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [188]:
#reduceByKey supposes first value as key and next as the value
# data cleansing below
# rekey=pairs.reduceByKey(lambda  val1,val2:val1+val2)
rekey=pairs.reduceByKey(lambda  val1,val2:float(val1)+float(val2))

rekey.collect()

[('State', 'Amount'), ('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

In [219]:
remTitle=rekey.filter(lambda x:not x[0]=='State')
remTitle.collect()

[('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

In [222]:
remTitle.sortBy(lambda amount:amount[1],ascending =False).collect()

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]