In [1]:
from pyspark import SparkContext

In [2]:
sc = SparkContext()

In [3]:
%%writefile example.txt
first line
second line
third line
fourth line

Overwriting example.txt


In [4]:
textFile = sc.textFile('example.txt')

In [5]:
#action

In [6]:
textFile.count()

4

In [7]:
textFile.first()

'first line'

In [8]:
#transform
secfind = textFile.filter(lambda line: 'second' in line)

In [9]:
secfind

PythonRDD[4] at RDD at PythonRDD.scala:48

In [10]:
secfind.collect()

['second line']

In [11]:
secfind.count()

1

In [12]:
#There are two options to perform RDD: by an action or a transformation
#Transformation is just building out a recipe on RDD. None of that is executed
#until we perform the action. 
#Which means we can create a complicated recipe without worry about downtime
#by running all the recipe commands.
#It is only going to run when we call a particular action.

In [13]:
#Google RDD Transformation and RDD Action

In [14]:
%%writefile example2.txt
first
second line
thrid line
then a fourth line

Overwriting example2.txt


In [15]:
sc.textFile('example2.txt')

example2.txt MapPartitionsRDD[7] at textFile at NativeMethodAccessorImpl.java:-2

In [16]:
text_rdd = sc.textFile('example2.txt')

In [17]:
words = text_rdd.map(lambda line: line.split())

In [18]:
words.collect()

[['first'],
 ['second', 'line'],
 ['thrid', 'line'],
 ['then', 'a', 'fourth', 'line']]

In [19]:
text_rdd.collect()

['first', 'second line', 'thrid line', 'then a fourth line']

In [20]:
#Map vs Flat Map

In [21]:
text_rdd.flatMap(lambda line: line.split()).collect()

['first', 'second', 'line', 'thrid', 'line', 'then', 'a', 'fourth', 'line']

In [22]:
#it collects everything in one array

In [23]:
%%writefile services.txt
#EventID   Timestamp     Customer    State     ServiceID    Amount
201        10/13/2017    100         NY        131          100.00
204        10/18/2017    700         TX        129          450.00
202        10/15/2017    203         CA        121          200.00
206        10/19/2017    202         CA        131          500.00
203        10/17/2017    101         NY        173          750.00
205        10/19/2017    202         TX        121          200.00

Overwriting services.txt


In [24]:
services = sc.textFile('services.txt')

In [25]:
#For more complex file like this one, it may be required to clean or manipute it

In [26]:
services.take(2)

['#EventID   Timestamp     Customer    State     ServiceID    Amount',
 '201        10/13/2017    100         NY        131          100.00']

In [27]:
#splitting using map
services.map(lambda line: line.split()).take(6)

[['#EventID', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00']]

In [28]:
#remove # from ID
services.map(lambda line: line[1:] if line[0]=='#' else line).collect()

#Back to the service, check the first item of every string.
#If it is a hastag, remove it by grabbing everything beyond it.
#If not, return the entire line.

['EventID   Timestamp     Customer    State     ServiceID    Amount',
 '201        10/13/2017    100         NY        131          100.00',
 '204        10/18/2017    700         TX        129          450.00',
 '202        10/15/2017    203         CA        121          200.00',
 '206        10/19/2017    202         CA        131          500.00',
 '203        10/17/2017    101         NY        173          750.00',
 '205        10/19/2017    202         TX        121          200.00']

In [29]:
clean = services.map(lambda line: line[1:] if line[0]=='#' else line)

In [30]:
clean = clean.map(lambda line: line.split())

In [31]:
clean.collect()

[['EventID', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [32]:
clean.map(lambda lst: (lst[3], lst[-1])).collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [33]:
pairs = clean.map(lambda lst: (lst[3], lst[-1]))

In [34]:
pairs.collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [35]:
rekey = pairs.reduceByKey(lambda amt1, amt2 : amt1 + amt2)

In [36]:
rekey.collect()

[('State', 'Amount'),
 ('NY', '100.00750.00'),
 ('TX', '450.00200.00'),
 ('CA', '200.00500.00')]

In [37]:
#It doesn't work as expected because they are still in string

In [38]:
rekey = pairs.reduceByKey(lambda amt1, amt2 : float(amt1) + float(amt2))

In [39]:
rekey.collect()

[('State', 'Amount'), ('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

In [40]:
#Now it shows the total sales by keys.

In [41]:
clean.collect()

[['EventID', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [42]:
#Grab (State and Amount)
step1 = clean.map(lambda lst: (lst[3], lst[-1]))

#Reduce by Key
step2 = step1.reduceByKey(lambda amt1, amt2 : float(amt1) + float(amt2))

#Remove titles
step3 = step2.filter(lambda x: not x[0]=='State')

#Sort Results by Amount
step4 = step3.sortBy(lambda stAmount: stAmount[1], ascending=False)

#Action
step4.collect()

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]

In [43]:
x = ['ID', 'STATE', 'AMOUNT']

In [44]:
def func1 (lst):
    return lst[-1]

In [45]:
#Sometime we don't know what is in that index and we would have to go 
#back to check the index.
#We can use a tuple and packing instead

In [46]:
def func2 (id_st_amt):
    #unpack values
    (Id, st, amt) = id_st_amt
    return amt

In [47]:
func1(x)

'AMOUNT'

In [48]:
func2(x)

'AMOUNT'

In [49]:
#Function 2 is much more readable than function 1.