In [None]:
# Actions and Transformations on data

In [1]:
%%writefile services1.txt
#EventId    Timestamp    Customer   State    ServiceID    Amount
201       10/13/2017      100       NY       131          100.00
204       10/18/2017      700       TX       129          450.00
202       10/15/2017      203       CA       121          200.00
206       10/19/2017      202       CA       131          500.00
203       10/17/2017      101       NY       173          750.00
205       10/19/2017      202       TX       121          200.00

Overwriting services1.txt


In [2]:
from pyspark import SparkContext

In [3]:
sc=SparkContext()

In [4]:
rdd=sc.textFile('services1.txt')

In [5]:
rdd.take(2)

['#EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00']

# Transformations 

In [None]:
# Remove the Hash from the Header Fields

In [7]:
hash_removal=rdd.map(lambda line: line[1:] if line[0]=='#' else line)

In [None]:
# Split the data as each list based on lines

In [10]:
split=hash_removal.map(lambda line: line.split())

In [None]:
# Select the State & Amount Columns to make a Key-Pair RDD

In [17]:
st_amt=split.map(lambda lst: (lst[3],lst[-1]))

In [None]:
# Reduce the pairs based on Key and sum them

In [18]:
reduced=st_amt.reduceByKey(lambda st_amt1,st_amt2: float(st_amt1)+float(st_amt2))

In [None]:
# Remove the field names from the pairs

In [24]:
fields_removed=reduced.filter(lambda x: not x[0]=='State')

In [None]:
# Sort Values based on the price pair [1] =='Amount'
#If pair[0] was used it would take the State and lexographically sort data

In [39]:
sorted_values=fields_removed.sortBy(lambda stateAmount: stateAmount[1],ascending=True)

# Actions

In [40]:
sorted_values.collect()

[('TX', 650.0), ('CA', 700.0), ('NY', 850.0)]

In [None]:
#If pair[0] was used [('CA', 700.0), ('NY', 850.0), ('TX', 650.0)], 
#as 'CA' comes before 'NY', and 'NY' comes before 'TX' in lexicographical order.