# Lambda Expression Review

In [None]:
def square(num):
    #print(num)
    result = num**2
    return result

In [None]:
square(4)

In [None]:
sq = lambda num: num**2

In [None]:
sq(3)

In [None]:
sm = lambda a,b: a+b

In [None]:
sm(2,3)

In [None]:
rev = lambda mystr: mystr[::-1] #extended slice [begin:end:step]

In [None]:
rev('manoj')

# Introduction to Spark

In [1]:
from pyspark import SparkContext

In [2]:
sc = SparkContext() # connect to spark cluster

In [3]:
%%writefile example.txt
first line
second line
third line
fourth line

Writing example.txt


In [4]:
textFile = sc.textFile('example.txt') # textFile is RDD

In [5]:
textFile.count()

4

In [6]:
textFile.first()

'first line'

In [8]:
secfind = textFile.filter(lambda line: 'second' in line) # transformation just recipy not executed on data until action

In [9]:
secfind

PythonRDD[4] at RDD at PythonRDD.scala:48

In [10]:
secfind.collect() #action

['second line']

In [11]:
secfind.count()

1

# RDD Transformation and Actions

In [15]:
%%writefile example2.txt
first 
second line
the third line
then a fourth line

Overwriting example2.txt


In [16]:
#from pyspark import SparkContext

In [18]:
# sc = SparkContext() One notebook can not have second context

In [19]:
text_rdd = sc.textFile('example2.txt')

In [20]:
text_rdd.map(lambda line: line.split())

PythonRDD[8] at RDD at PythonRDD.scala:48

In [21]:
text_rdd.collect()

['first ', 'second line', 'the third line', 'then a fourth line']

In [22]:
text_rdd.flatMap(lambda line: line.split()).collect()

['first',
 'second',
 'line',
 'the',
 'third',
 'line',
 'then',
 'a',
 'fourth',
 'line']

In [23]:
%%writefile service.txt
#EventId    Timestamp    Customer   State    ServiceID    Amount
201       10/13/2017      100       NY       131          100.00
204       10/18/2017      700       TX       129          450.00
202       10/15/2017      203       CA       121          200.00
206       10/19/2017      202       CA       131          500.00
203       10/17/2017      101       NY       173          750.00
205       10/19/2017      202       TX       121          200.00

Writing service.txt


In [24]:
services = sc.textFile('service.txt')

In [25]:
services.take(2)

['#EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00']

In [30]:
services.top(2)

['206       10/19/2017      202       CA       131          500.00',
 '205       10/19/2017      202       TX       121          200.00']

In [27]:
services.map(lambda line: line.split()).take(3)

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00']]

In [34]:
clean = services.map(lambda line: line[1:] if line[0] == '#' else line)

In [35]:
clean = clean.map(lambda line: line.split())

In [36]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [39]:
pairs = clean.map(lambda lst: (lst[3],lst[-1]))

In [41]:
rekey = pairs.reduceByKey(lambda amt1, amt2: float(amt1)+float(amt2))

In [42]:
rekey.collect()

[('State', 'Amount'), ('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

In [45]:
services.top(2)

['206       10/19/2017      202       CA       131          500.00',
 '205       10/19/2017      202       TX       121          200.00']

In [49]:
# Grab state and amounts
# Add them
# Get rid of ('State','Amount')
# Sort them by the amount value
clean.map(lambda lst: (lst[3],lst[-1]))\
.reduceByKey(lambda amt1,amt2 : float(amt1)+float(amt2))\
.filter(lambda x: not x[0]=='State')\
.sortBy(lambda stateAmount: stateAmount[1], ascending=False)\
.collect()

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]


## Remember to try to use unpacking for readability. For example:

In [50]:
x = ['ID','State','Amount']

In [51]:
def func1(lst):
    return lst[-1]

In [52]:
def func2(id_st_amt):
    # Unpack Values
    (Id,st,amt) = id_st_amt
    return amt

In [53]:
func1(x)

'Amount'

In [55]:
func2(x) # better way

'Amount'