# Spark Introduction

In [1]:
from pyspark import SparkContext

In [2]:
sc = SparkContext()

In [4]:
%%writefile example.txt
first line 
second line
third line
fourth line

Writing example.txt


In [5]:
textFile = sc.textFile('example.txt')

In [7]:
textFile.count() #elements are just the lines

4

In [8]:
textFile.first()

'first line '

In [13]:
secfind = textFile.filter(lambda line: 'second' in line) # creates a transformation

In [14]:
secfind

PythonRDD[6] at RDD at PythonRDD.scala:48

In [15]:
secfind.collect() # perform action on new transformation

['second line']

## RDD Transformations and Actions

In [22]:
splitted = textFile.flatMap(lambda line: line.split())

In [23]:
splitted

PythonRDD[10] at RDD at PythonRDD.scala:48

In [24]:
splitted.count()

8

In [25]:
splitted.collect()

['first', 'line', 'second', 'line', 'third', 'line', 'fourth', 'line']

In [26]:
%%writefile example2.txt
first
second line
third line
then a fourth line

Writing example2.txt


In [28]:
text_rdd = sc.textFile('example2.txt')

In [30]:
words = text_rdd.map(lambda line: line.split())

In [31]:
words.collect()

[['first'],
 ['second', 'line'],
 ['third', 'line'],
 ['then', 'a', 'fourth', 'line']]

In [32]:
flat_words = text_rdd.flatMap(lambda line: line.split()).collect()

In [33]:
flat_words

['first', 'second', 'line', 'third', 'line', 'then', 'a', 'fourth', 'line']

In [34]:
%%writefile services.txt
#EventId    Timestamp    Customer   State    ServiceID    Amount
201       10/13/2017      100       NY       131          100.00
204       10/18/2017      700       TX       129          450.00
202       10/15/2017      203       CA       121          200.00
206       10/19/2017      202       CA       131          500.00
203       10/17/2017      101       NY       173          750.00
205       10/19/2017      202       TX       121          200.00

Writing services.txt


In [36]:
services_rdd = sc.textFile('services.txt')

In [38]:
services_rdd.take(2)

['#EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00']

In [41]:
services_rdd.map(lambda line: line.split()).take(3)

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00']]

In [46]:
clean = services_rdd.map(lambda line: line[1:] if line[0] == '#' else line) # remove # from data

In [47]:
clean = clean.map(lambda line: line.split())

In [48]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [53]:
#Step 1 grab (State, Amount)
pairs = clean.map(lambda lst: (lst[3], lst[-1]))

In [56]:
#Step 2 reduceByKey 
rekey = pairs.reduceByKey(lambda amt1, amt2: float(amt1) + float(amt2))

In [57]:
rekey.collect()

[('State', 'Amount'), ('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

In [60]:
#Step 3 get rid of State, Amount titles
step3 = rekey.filter(lambda x: not x[0] == 'State')

In [61]:
#Step 4 sort results by amount
step4 = step3.sortBy(lambda stAmount: stAmount[1], ascending=False)

In [62]:
step4.collect()

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]

In [63]:
x = ['ID', 'State', 'Amount']

In [64]:
def foo(id_st_amt):
    #unpack the values
    (Id, st, amt) = id_st_amt
    return amt

In [65]:
foo(x)

'Amount'