# SPARK PYTHON & BIG DATA

In [36]:
from pyspark import SparkContext

In [59]:
sc.stop() #only one SparkContext can run at a time. In case of error try sc.stop() before starting SparkContext
sc = SparkContext()
#SparkContext represents the connection to the Spark cluster and it can be used to create RDD and broadcast variables
#on that cluster

# Lambda Expression Review

In [42]:
#UDF
def square(num):
    result = num**2
    return result

In [43]:
square(4)

16

In [44]:
#Another way of writing
def square(num):
    return num**2

In [45]:
square(5)

25

In [46]:
#Rewriting
def square(num):return num**2

In [47]:
square(2)

4

In [48]:
# Same result obtained from a lambda expression
sq = lambda num: num**2

In [49]:
sq(4)

16

In [50]:
#for checking even numbers
even = lambda num: num%2 == 0

In [51]:
even(3)

False

In [52]:
even(2)

True

In [53]:
#for grabbing first character of a string
first = lambda s:s[0]

In [54]:
first('wow')

'w'

In [55]:
#reverse a string
reverse = lambda s:s[::-1] #this is an indexing trick that prints everything backwards

In [56]:
reverse('river')

'revir'

In [57]:
#more than one argument can be accepted in a lambda expression
mul = lambda x,y:x*y

In [58]:
mul(2,4)

8

# Spark and Python

In [69]:
##Using jupyter magic command to create a new file. Anything after the command is added to the file

In [70]:
%%writefile example.txt
Line first
Second line
The line of 3
4th from the start

Overwriting example.txt


In [73]:
testFile = sc.textFile('example.txt') #creating an RDD testFile using textFile method

# Actions and transformations on the RDD

In [75]:
testFile.count()

4

In [76]:
testFile.first()

'Line first'

In [81]:
secfind = testFile.filter(lambda line: 'Second' in line)

In [85]:
secfind.collect()

['Second line']

In [87]:
words = testFile.map(lambda line: line.split())

In [88]:
words.collect()

[['Line', 'first'],
 ['Second', 'line'],
 ['The', 'line', 'of', '3'],
 ['4th', 'from', 'the', 'start']]

In [89]:
testFile.collect()

['Line first', 'Second line', 'The line of 3', '4th from the start']

In [91]:
##Comparing the two collect actions we can see that 'words' has all the words from the line split up and testFile
##retruns all the lines.

In [93]:
#Flatmap will collect everything as a single list
testFile.flatMap(lambda line: line.split()).collect()

['Line',
 'first',
 'Second',
 'line',
 'The',
 'line',
 'of',
 '3',
 '4th',
 'from',
 'the',
 'start']

In [95]:
%%writefile services.txt
#EventId    Timestamp    Customer   State    ServiceID    Amount
201       10/13/2017      100       NY       131          100.00
204       10/18/2017      700       TX       129          450.00
202       10/15/2017      203       CA       121          200.00
206       10/19/2017      202       CA       131          500.00
203       10/17/2017      101       NY       173          750.00
205       10/19/2017      202       TX       121          200.00

Overwriting services.txt


In [96]:
services = sc.textFile('services.txt')

In [97]:
services.take(2)

['#EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00']

In [99]:
services.map(lambda line: line.split()).take(2)

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00']]

In [107]:
clean = services.map(lambda line: line[1:] if line[0]=='#' else line)

In [108]:
clean = clean.map(lambda line: line.split())

In [109]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [113]:
pairs = clean.map(lambda lst: (lst[3],lst[-1]))#pair RDD commands

In [115]:
pairs.collect() 

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [119]:
rekey = pairs.reduceByKey(lambda k1,k2:k1+k2) #takes the first value(k1) as the key and reduces by that

In [121]:
rekey.collect() #However here it is just concatenating the strings and num summing up the values

[('State', 'Amount'),
 ('NY', '100.00750.00'),
 ('TX', '450.00200.00'),
 ('CA', '200.00500.00')]

In [122]:
rekey = pairs.reduceByKey(lambda k1,k2:float(k1)+float(k2)) #we can change the string values to float in the lambda expression itself
rekey.collect()

[('State', 'Amount'), ('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

In [125]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [129]:
#Grabbing state and amount
step1 = clean.map(lambda lst:(lst[3],lst[-1]))
#Reducing the key
step2 = step1.reduceByKey(lambda k1,k2: float(k1)+float(k2))
#Removing the State and Amount titles
step3 = step2.filter(lambda x: not x[0] == 'State')
#Sort results by amount
step4 = step3.sortBy(lambda stAmount: stAmount[1],ascending = False)
#Collecting
step4.collect()

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]

In [130]:
#We were using indexing till now. Next example shows how to use tuple unpacking
x = ['ID','State','Amount']

In [132]:
#Using indexing-it can be a little difficult to read
def func1(lst):
    return lst[-1]

In [134]:
#Using tuple unpacking - easier to read
def func2(id_st_amt):
    #unpack values
    (Id,st,amt) = id_st_amt
    return amt

In [136]:
func1(x)

'Amount'

In [137]:
func2(x)

'Amount'