In [1]:
from pyspark import SparkContext

In [2]:
sc = SparkContext()

In [5]:
with open("rdd_file.txt", mode="w") as f:
    f.write('line number one\nline number two\nline number three\nline number four')

f = open("rdd_file.txt", mode="r")
f.read()

'line number one\nline number two\nline number three\nline number four'

In [6]:
sc.textFile("rdd_file.txt")

rdd_file.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [7]:
text_rdd = sc.textFile("rdd_file.txt")

In [8]:
text_rdd

rdd_file.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0

In [9]:
text_rdd.count()

4

In [10]:
words = text_rdd.map(lambda line: line.split())

In [11]:
words.collect()

[['line', 'number', 'one'],
 ['line', 'number', 'two'],
 ['line', 'number', 'three'],
 ['line', 'number', 'four']]

In [12]:
text_rdd.collect()

['line number one', 'line number two', 'line number three', 'line number four']

In [13]:
text_rdd.flatMap(lambda line: line.split()).collect()

['line',
 'number',
 'one',
 'line',
 'number',
 'two',
 'line',
 'number',
 'three',
 'line',
 'number',
 'four']

In [14]:
file = open("services.txt", mode="r")
file.read()

'#EventId    Timestamp    Customer   State    ServiceID    Amount\n201       10/13/2017      100       NY       131          100.00\n204       10/18/2017      700       TX       129          450.00\n202       10/15/2017      203       CA       121          200.00\n206       10/19/2017      202       CA       131          500.00\n203       10/17/2017      101       NY       173          750.00\n205       10/19/2017      202       TX       121          200.00'

In [15]:
services = sc.textFile("services.txt")

In [17]:
services.take(2)

['#EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00']

In [19]:
services.map(lambda line : line.split()).take(3)

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00']]

In [21]:
# remove hashtag
clean = services.map(lambda line: line[1:] if line[0] == "#" else line)

In [22]:
clean = clean.map(lambda line: line.split())

In [23]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [24]:
# how to grab fields (State and Amount)
clean.map(lambda lst: (lst[3], lst[-1])).collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [27]:
# how to grab fields (Timestamp and Customer)
clean.map(lambda lst: (lst[1], lst[2])).collect()

[('Timestamp', 'Customer'),
 ('10/13/2017', '100'),
 ('10/18/2017', '700'),
 ('10/15/2017', '203'),
 ('10/19/2017', '202'),
 ('10/17/2017', '101'),
 ('10/19/2017', '202')]

In [28]:
pairs = clean.map(lambda lst: (lst[3], lst[-1]))

In [30]:
pairs.collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [34]:
red_key = pairs.reduceByKey(lambda amount_1, amount_2: float(amount_1 )+ float(amount_2))

In [35]:
red_key.collect()

[('State', 'Amount'), ('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

In [36]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [59]:
# grab the state and amount as tuple
clean.map(lambda lst: (lst[3],lst[-1]))\
    # reduce by key to get total amount per state
.reduceByKey(lambda amt1,amt2 : float(amt1)+float(amt2))\
    # get rid of State, Amount titles
.filter(lambda x: not x[0]=='State')\
    # sort by amount
.sortBy(lambda stateAmount: stateAmount[1], ascending=False)\
    # action
.collect()

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]

In [60]:
x = ['ID', 'State', 'Amount']

In [61]:
def func(lst):
    return lst[-1]

In [62]:
def func1(id_state_amount):
    # unpack the tuple
    (Id, st, amt) = id_state_amount
    return amt

In [64]:
func(x)

'Amount'

In [65]:
func1(x)

'Amount'