## RDD Transformations and Actions

### Creating an RRD from a text file

In [1]:
%%writefile example2.txt
first
second line
the third line
then a fourth line

Overwriting example2.txt


In [2]:
from pyspark import SparkContext

sc = SparkContext()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/05 18:24:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
sc.textFile("example2.txt")

example2.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [4]:
# save a reference to the RDD
text_rdd = sc.textFile('example2.txt')

In [5]:
words = text_rdd.map(lambda line: line.split())

In [6]:
words.collect()

                                                                                

[['first'],
 ['second', 'line'],
 ['the', 'third', 'line'],
 ['then', 'a', 'fourth', 'line']]

In [7]:
text_rdd.collect()

['first', 'second line', 'the third line', 'then a fourth line']

### Map vs. flatMap

In [8]:
# collect everything as a single flat map
text_rdd.flatMap(lambda line: line.split()).collect()

['first',
 'second',
 'line',
 'the',
 'third',
 'line',
 'then',
 'a',
 'fourth',
 'line']

## Key Value Pairs

In [10]:
%%writefile services.txt
#EventId    Timestamp    Customer   State    ServiceID    Amount
201       10/13/2023      100       NY       131          100.00
204       10/18/2023      700       TX       129          450.00
202       10/15/2023      203       CA       121          200.00
206       10/19/2023      202       CA       131          500.00
203       10/17/2023      101       NY       173          750.00
205       10/19/2023      202       TX       121          200.00

Overwriting services.txt


In [11]:
services = sc.textFile("services.txt")

In [12]:
# show 2 first records -> every row is a single string
services.take(2)

['#EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2023      100       NY       131          100.00']

In [13]:
# convert line items into elements within a list for the first 3 lines
services.map(lambda line: line.split()).take(3)

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2023', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2023', '700', 'TX', '129', '450.00']]

In [14]:
# remove the hashtag if the line begins with a hashtag
services.map(lambda line: line[1:] if line[0] == "#" else line).collect()

['EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2023      100       NY       131          100.00',
 '204       10/18/2023      700       TX       129          450.00',
 '202       10/15/2023      203       CA       121          200.00',
 '206       10/19/2023      202       CA       131          500.00',
 '203       10/17/2023      101       NY       173          750.00',
 '205       10/19/2023      202       TX       121          200.00']

In [22]:
# rdd with separated items per line and without the first '#' in the colum descriptions
clean = services.map(lambda line: line[1:] if line[0] == "#" else line)
clean = clean.map(lambda line: line.split())
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2023', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2023', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2023', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2023', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2023', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2023', '202', 'TX', '121', '200.00']]

## Using Key-Value Paris for Operations

In [24]:
# show columns `State` and `Amount`
clean.map(lambda lst: (lst[3], lst[-1])).collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [25]:
# show total amount by state
pairs = clean.map(lambda lst: (lst[3], lst[-1]))
rekey = pairs.reduceByKey(lambda amt1, amt2: float(amt1) + float(amt2))

In [26]:
rekey.collect()

                                                                                

[('State', 'Amount'), ('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

In [27]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2023', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2023', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2023', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2023', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2023', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2023', '202', 'TX', '121', '200.00']]

In [30]:
# retrieve `state` and `amount` in a tuple
step1 = clean.map(lambda lst: (lst[3], lst[-1]))
# reduce by key
step2 = step1.reduceByKey(lambda amt1, amt2: float(amt1) + float(amt2))
# remove `state` and `amount` titles
step3 = step2.filter(lambda x: not x[0] == 'State')
# sort results by `amount`
step4 = step3.sortBy(lambda stAmount: stAmount[1], ascending=False)
# action
step4.collect()

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]

In [31]:
# unpacking (to avoid using list indices that are not meaningfukl)
x = ['ID', 'State', 'Amount']

def func1(lst):
    return lst[-1]

def func2(id_st_amt):
    (id, st, amt) = id_st_amt
    return amt

'Amount'

In [32]:
func1(x)

'Amount'

In [33]:
func2(x)

'Amount'