### Intro to spark in python

In [33]:
!pip install pyspark




In [4]:
from pyspark import SparkContext

In [6]:
sc = SparkContext.getOrCreate()

In [7]:
%%writefile example.txt
first line
second line
third line
fourth line

Writing example.txt


In [8]:
textFile = sc.textFile('example.txt')

In [9]:
textFile.count()

4

In [10]:
textFile.first()

'first line'

In [11]:
secfind = textFile.filter(lambda line: 'second' in line)

In [12]:
secfind

PythonRDD[4] at RDD at PythonRDD.scala:56

In [13]:
secfind.collect()

['second line']

In [14]:
secfind.count()

1

### lambda

In [15]:
def square(num):
    result = num**2
    return result

In [16]:
square(2)

4

In [17]:
def square(num):
    return num**2

In [18]:
square(2)

4

In [19]:
def square(num): return num**2

In [20]:
square(2)

4

In [21]:
lambda num: num**2

<function __main__.<lambda>(num)>

In [22]:
square = lambda num: num**2

In [23]:
square(2)

4

In [24]:
even = lambda x: x%2==0

In [25]:
even(3)

False

In [26]:
even(4)

True

In [27]:
first = lambda s: s[0]

In [28]:
first('hello')

'h'

In [29]:
rev = lambda s: s[::-1]

In [30]:
rev('hello')

'olleh'

In [31]:
adder = lambda x,y : x+y

In [32]:
adder(2,3)

5

### RDD Transformations and Actions

In [35]:
%%writefile example2.txt
first
second line
the third line
then a fourth line

Writing example2.txt


In [36]:
sc.textFile('example2.txt')

example2.txt MapPartitionsRDD[7] at textFile at NativeMethodAccessorImpl.java:0

In [37]:
text_rdd = sc.textFile('example2.txt')

In [38]:
words = text_rdd.map(lambda line: line.split())

In [39]:
words.collect()

[['first'],
 ['second', 'line'],
 ['the', 'third', 'line'],
 ['then', 'a', 'fourth', 'line']]

In [40]:
text_rdd.flatMap(lambda line: line.split()).collect()


['first',
 'second',
 'line',
 'the',
 'third',
 'line',
 'then',
 'a',
 'fourth',
 'line']

In [41]:
%%writefile services.txt
#EventId    Timestamp    Customer   State    ServiceID    Amount
201       10/13/2017      100       NY       131          100.00
204       10/18/2017      700       TX       129          450.00
202       10/15/2017      203       CA       121          200.00
206       10/19/2017      202       CA       131          500.00
203       10/17/2017      101       NY       173          750.00
205       10/19/2017      202       TX       121          200.00

Writing services.txt


In [42]:
services = sc.textFile('services.txt')

In [43]:
services.take(2)

['#EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00']

In [44]:
services.map(lambda x: x.split())

PythonRDD[15] at RDD at PythonRDD.scala:56

In [45]:
services.map(lambda x: x.split()).take(3)

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00']]

In [46]:
services.map(lambda x: x[1:] if x[0]=='#' else x).collect()

['EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00',
 '204       10/18/2017      700       TX       129          450.00',
 '202       10/15/2017      203       CA       121          200.00',
 '206       10/19/2017      202       CA       131          500.00',
 '203       10/17/2017      101       NY       173          750.00',
 '205       10/19/2017      202       TX       121          200.00']

In [47]:
clean = services.map(lambda x: x[1:] if x[0]=='#' else x)

In [48]:
clean = clean.map(lambda x: x.split())

In [49]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [51]:
pairs = clean.map(lambda lst: (lst[3],lst[-1]))

In [52]:
pairs.collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [54]:
rekey=pairs.reduceByKey(lambda amt1,amt2 : float(amt1)+float(amt2))

In [55]:
rekey.collect()

[('NY', 850.0), ('CA', 700.0), ('State', 'Amount'), ('TX', 650.0)]

In [56]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [57]:
#grab(state,amount)
step1 = clean.map(lambda lst: (lst[3],lst[-1]))

In [58]:
#reduce by key
step2 = step1.reduceByKey(lambda amt1,amt2 : float(amt1)+float(amt2))

In [59]:
#get rid of state , amount title
step3 = step2.filter(lambda x: not x[0]=='State')

In [60]:
#sort result by amount
step4  = step3.sortBy(lambda stAmount: stAmount[1],ascending=False)

In [61]:
#action
step4.collect()

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]

### tuple unpacking example

In [62]:
x = ['ID','States','Amount']

In [63]:
def func1(lst):
  return lst[-1]

In [64]:
def func2(id_st_amt):
    # Unpack Values
    (Id,st,amt) = id_st_amt
    return amt

In [65]:
func1(x)

'Amount'

In [66]:
func2(x)

'Amount'