This tutorial explains some basic RDD transformations and actions.

In [2]:
sc

In [3]:
#first create an RDD
data = sc.parallelize([1,2,3,3])
#data.take(4)

In [4]:
mapped_data = data.map(lambda x: x * x)

In [5]:
filtered_data = mapped_data.filter(lambda x: x > 2)

In [6]:
filtered_data.take(4)

In [7]:
#let us use a map transformation - use lambda function to return the cube of each element
data.map(lambda x: x * x * x).take(4)

In [8]:
#use a function instead of a lambda function - we will get the square root of each element
import math
def convert(x):
    return math.sqrt(x)
data.map(convert).take(4)

In [9]:
#use filter to get only odd numbers
data.filter(lambda x: x % 2 != 0).take(4) #there will be just 3 odd elements

In [10]:
#let us remove duplicates
data.distinct().take(4)

In [11]:
#let us sample just two elements
data.sample(False, 0.2).take(4) #False implies no replacement; 0.2 is the probability of each element being chosen

In [12]:
from __future__ import print_function
#let us try a few set-like RDD operations
rdd1 = sc.parallelize([1,2,3,4,5])
rdd2 = sc.parallelize([3,5,6,7,8])
#union
rdd1.union(rdd2).collect()


In [13]:
#intersection
rdd1.intersection(rdd2).collect()

In [14]:
#subtract
rdd1.subtract(rdd2).collect()

In [15]:
#cartesian product
rdd1.cartesian(rdd2).collect()

In [16]:
#let us look at some actions
rdd1.collect() #returns all the elements from the RDD

In [17]:
#number of elements in the RDD
rdd1.count()

In [18]:
#Let us create a union and check to see how many times each element occurs in the RDD
u = rdd1.union(rdd2)
u.countByValue()

In [19]:
#return the first element
u.first()

In [20]:
#return the first 3 elements
u.take(3)

In [21]:
#return the top n elements
u.top(3)

In [22]:
#rdd1 is [1,2,3,4,5]. Let us get the product of the elements
product = rdd1.reduce(lambda x, y: x * y)
print(product)

In [23]:
#let us try a few transformations on paired RDDs
#assume that we have multiple sales values per day....not all days of the week are shown
paired_RDD = sc.parallelize([("Mon", 200.00), ("Tue", 1215.50), ("Mon", 300.25),("Wed", 100.00),("Mon", 100.00)])
#get total sales by day
sales_by_day = paired_RDD.reduceByKey(lambda x, y: x + y)
sales_by_day.collect()
#paired_RDD.reduceByKey(lambda x, y: x + y)

In [24]:
#let us group by key - get key and an iterable of items
grouped_RDD = paired_RDD.groupByKey()
grouped_RDD.collect() #note that the value will be an iterator

In [25]:
#let us get a list instead of an iterator ...
list_RDD = paired_RDD.groupByKey().mapValues(list)
list_RDD.collect()

In [26]:
#let us convert list_RDD back to the paired RDD that we started with
original_RDD = list_RDD.flatMapValues(lambda x: x)
original_RDD.collect()

In [27]:
#let us sort out original RDD by key in descending order
sorted_RDD = original_RDD.sortByKey(ascending = False) #default is in ascending order
sorted_RDD.collect()

In [28]:
#let us sort by value
sort_by_value = original_RDD.sortBy(lambda item: item[1]) #use - item[1] to sort in descending order
sort_by_value.collect()

In [29]:
#simple word count
words = ["apple","mango","orange","apple","mango","mango","peach", "mango","guava", "apple"]
words_rdd = sc.parallelize(words)
words_rdd.take(5)

In [30]:
words_rdd.map(lambda word: (word, 1)).countByKey() #note that countByKey() is an action

In [31]:
words_rdd.map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y).collect()

In [32]:
words_rdd.map(lambda word: (word, 1)).groupByKey().collect()

In [33]:
#let us repeat the previous one followed by mapValues to change the iterable to a list
words_rdd.map(lambda word: (word, 1)).groupByKey().mapValues(lambda v: list(v)).collect()

In [34]:
#let us modify the previous statement to get the sum of the 1s in the list
words_rdd.map(lambda word: (word, 1)).groupByKey().mapValues(lambda v: sum(list(v))).collect()