In [2]:
from pyspark import SparkContext
sc = SparkContext()

In [3]:
sc

<pyspark.context.SparkContext at 0x7f6cd0fc0e10>

In [4]:
sc

<pyspark.context.SparkContext at 0x7f6cd0fc0e10>

In [5]:
# Lambda Expressions Overview - it creates anonymous functions

In [6]:
ld = lambda x:x**2

In [7]:
ld(4)

16

In [8]:
# Using PySpark - Spark + Python

In [9]:
from pyspark import SparkContext

In [10]:
# SparkContext is used to connect to Spark Context and create RDDs.
sc

<pyspark.context.SparkContext at 0x7f6cd0fc0e10>

In [11]:
# Reading a text file
# creating a file using jupyter

In [12]:
%%writefile SparkRead.txt
First exercise
Second exercise
Third exercise
Fourth exercise

Overwriting SparkRead.txt


In [13]:
# Create an RDD - Resilent Distributed Dataset

In [14]:
textFileRDD = sc.textFile('SparkRead.txt')
#textFileRDD is the RDD, we can perform operations on this.

In [15]:
# Actions on RDD
textFileRDD.count()

4

In [16]:
textFileRDD.first()

'First exercise'

In [17]:
# Transformations - Ex: filter - filter with a function and create a new RDD

In [18]:
secfind_RDD = textFileRDD.filter(lambda line: 'Second' in line)

In [19]:
secfind_RDD

PythonRDD[4] at RDD at PythonRDD.scala:48

In [20]:
# we will need to collect the RDD to see what is in it.
secfind_RDD.collect()

['Second exercise']

In [21]:
secfind_RDD.count()

1

In [22]:
# RDD Transformations and Actions - part 2

In [23]:
%%writefile example2.txt
First Line
Second Line
This is the Third Line
This is the Fourth Line

Overwriting example2.txt


In [24]:
pwd

'/home/ubuntu/mynotebooks/Giri_Work'

In [25]:
ls

SparkRead.txt  Spark_Test.ipynb  Test_Jupyter.ipynb  example2.txt  services.txt


In [26]:
sc

<pyspark.context.SparkContext at 0x7f6cd0fc0e10>

In [27]:
text2_rdd = sc.textFile('example2.txt')

In [28]:
text2_rdd

example2.txt MapPartitionsRDD[7] at textFile at NativeMethodAccessorImpl.java:-2

In [29]:
# mapping a function or lambda
words = text2_rdd.map(lambda line: line.split())

In [30]:
words

PythonRDD[8] at RDD at PythonRDD.scala:48

In [31]:
# original RDD
text2_rdd.collect()

['First Line',
 'Second Line',
 'This is the Third Line',
 'This is the Fourth Line']

In [32]:
# RDD on which the transformation is done
words.collect()

[['First', 'Line'],
 ['Second', 'Line'],
 ['This', 'is', 'the', 'Third', 'Line'],
 ['This', 'is', 'the', 'Fourth', 'Line']]

In [33]:
# Map vs FlatMap
text2_rdd.flatMap(lambda line: line.split()).collect()

['First',
 'Line',
 'Second',
 'Line',
 'This',
 'is',
 'the',
 'Third',
 'Line',
 'This',
 'is',
 'the',
 'Fourth',
 'Line']

In [34]:
%%writefile services.txt
#EventId    Timestamp    Customer   State    ServiceID    Amount
201       10/13/2017      100       NY       131          100.00
204       10/18/2017      700       TX       129          450.00
202       10/15/2017      203       CA       121          200.00
206       10/19/2017      202       CA       131          500.00
203       10/17/2017      101       NY       173          750.00
205       10/19/2017      202       TX       121          200.00

Overwriting services.txt


In [35]:
ls

SparkRead.txt  Spark_Test.ipynb  Test_Jupyter.ipynb  example2.txt  services.txt


In [36]:
text_ser_rdd = sc.textFile('services.txt')

In [37]:
text_ser_rdd

services.txt MapPartitionsRDD[11] at textFile at NativeMethodAccessorImpl.java:-2

In [38]:
text_ser_rdd.take(2)

['#EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00']

In [39]:
text_ser_rdd.map(lambda line:line.split()).take(3)

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00']]

In [40]:
clean = text_ser_rdd.map(lambda x: x[1:] if x[0]=='#' else x)

In [41]:
clean = clean.map(lambda line:line.split())

In [42]:
# grabbing fields

In [43]:
# How many sales were for NY state
pairs = clean.map(lambda lst:(lst[3],lst[-1]))

In [44]:
pairs.collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [45]:
rekey = pairs.reduceByKey(lambda amt1, amt2: float(amt1) + float(amt2))

In [46]:
rekey.collect()

[('State', 'Amount'), ('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

In [47]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [48]:
# Applying operations on clean

In [49]:
# Step 1 - Grab the State and Amounts in a Tuple
res = clean.map(lambda lst:(lst[3],lst[-1]))

In [50]:
# Step 2 - Reduce by key.
res = res.reduceByKey(lambda amt1,amt2: float(amt1) + float(amt2))

In [51]:
# Step 3 - Get Rid of Amount and State names
res = res.filter(lambda x:not x[0] == 'State')

In [52]:
# Step 4 - Sort results by Amounts
res = res.sortBy(lambda st_amount:st_amount[1], ascending = False)

In [53]:
# Action
res.collect()

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]

In [54]:
# Tuple unpacking
x = ['ID', 'State', 'Amount']

def func1(lst):
    return lst[-1]

In [58]:
def func2(id_st_amt):
    (Id,st,amt) = id_st_amt
    return amt

In [59]:
func1(x)

'Amount'

In [60]:
func2(x)

'Amount'