# Programming with RDDs

## RDD Basics

An RDD in Spark is simply an immutable distributed collection of objects. Each RDD is split into multiple partitions, which may be computed on different nodes of the cluster. RDDs can contain any time pf Python, Java, or Scala objects, including user-defined classes.

In [1]:
# Creating an RDD of strings with textFile() in Python
lines = sc.textFile("/Applications/spark-1.6.0-bin-hadoop2.6/README.md") # Create an RDD called lines

RDDs offer two types of operations: transformations and actions.

In [2]:
# Calling the filter transformation
pythonLines = lines.filter(lambda line: "Python" in line)

In [3]:
# Calling the first action
pythonLines.first()

u'high-level APIs in Scala, Java, Python, and R, and an optimized engine that'

Although you can define new RDDs any time, Spark computes them only in a lazy fashion -  that is, the first time they are used in an action.

In [4]:
pythonLines.count()

3

If you would like to reuse an RDD in multiple actions, you can ask Spark to persist it using RDD.persist().

For example , if we knew that we wanted to compute multiple results about the README lines that contain Python, we could write is as:

In [5]:
pythonLines.persist

<bound method PipelinedRDD.persist of PythonRDD[4] at RDD at PythonRDD.scala:43>

In [6]:
pythonLines.count()

3

In [8]:
pythonLines.first()

u'high-level APIs in Scala, Java, Python, and R, and an optimized engine that'

## Creating RDDs

Spark provides two ways to create RDDs: loading an external dataset and parallelizing a collection in your driver program.

The simplest way to create RDDs is to take an existing collection in your program and pass it to SparkContext's parallelize() method.

In [9]:
# parallelize method
lines = sc.parallelize(["pandas", "I like pandas"])

A more common way to create RDDs is to load data from external storage.

In [11]:
# textFile method
lines = sc.textFile("/Applications/spark-1.6.0-bin-hadoop2.6/README.md")

## RDD Operations

Spark treats transformations and actions very differently, so understanding which type of operation you are performing will be important. If you are ever confused whether a given function is a transformation or an action, you can look at its return type: transformations return RDDs, whereas actions return some other data type.

## Transformations

In [None]:
# filter transformation in Python
inputRDD = sc.textFile(log.txt)
errorsRDD = inputRDD.filter(lambda x: "error" in x)

Note that the filter() operation does not mutate the existing inputRDD. Instead it returns a pointer to an entirely new RDD. inputRDD can still be reused later in the program.

In [None]:
# union transformation
warningsRDD = inputRdd.filter(lambda x: "warning" in x)
badlinesRDD = errorsRDD.union(warningsRDD)

Finally,as you derive new RDDs from each other using transformations, Spark keeps track of the set of dependencies between different RDDs, called lineage graph.

## Actions

Actions are the operations that return a final value to the driver program or write data to an external storage system.

In [15]:
print "Input had " , pythonLines.count() , "Python lines"

 Input had  3 Python lines


In [16]:
for line in pythonLines.take(10):
    print line

high-level APIs in Scala, Java, Python, and R, and an optimized engine that
## Interactive Python Shell
Alternatively, if you prefer Python, you can use the Python shell:


In [17]:
pythonLines.collect()

[u'high-level APIs in Scala, Java, Python, and R, and an optimized engine that',
 u'## Interactive Python Shell',
 u'Alternatively, if you prefer Python, you can use the Python shell:']

In most cases RDDs can't just be collect()ed to the driver because they are too large.

It is important to note that each time we call a new action, the entire RDD must be computed "from scratch". To avoid this inefficiency, users can persist intermediate results.

## Lazy Evaluation

Transformations on RDDs are lazily evaluated, meaning that Spark will not begin to execute until it sees an action.

## Passing functions to Spark

In [18]:
word = lines.filter(lambda s: "Scala" in s)

In [19]:
def containsScala(s):
    return "Scala" in s
word = lines.filter(containsScala)

## Common Transformations and Actions

### Basic RDDs

#### Element-wise transformations

The map() transformation takes in a fucntion and applies it to each element in the RDD with the result of the function being the new value of each element in the resulting RDD. The filter() transformation takes in a function and returns and RDD that only has elements that pass the filter() function.

In [21]:
# Squaring the values in an RDD
nums = sc.parallelize([1, 2, 3, 4])
squared = nums.map(lambda x: x*x).collect()
for num in squared:
    print "%i" %(num)

1
4
9
16


Sometimes we want to produce multiple output elements for each input element. The operation to do this is called flatMap().

In [23]:
lines = sc.parallelize(["hello world", "hi"])
words = lines.flatMap(lambda line: line.split(" "))
words.first()

'hello'

In [25]:
words.take(3)

['hello', 'world', 'hi']

### Pseudo set operations

In [26]:
rdd1 = sc.parallelize(["coffee", "coffee", "panda", "monkey", "tea"])
rdd2 = sc.parallelize(["coffee", "monkey", "kitty"])

In [29]:
rdd1.distinct().collect()

['tea', 'coffee', 'panda', 'monkey']

In [30]:
rdd1.union(rdd2).collect()

['coffee', 'coffee', 'panda', 'monkey', 'tea', 'coffee', 'monkey', 'kitty']

In [31]:
rdd1.intersection(rdd2).collect()

['coffee', 'monkey']

In [33]:
rdd1.subtract(rdd2).collect()

['tea', 'panda']

In [34]:
rdd1.cartesian(rdd2).collect()

[('coffee', 'coffee'),
 ('coffee', 'monkey'),
 ('coffee', 'kitty'),
 ('coffee', 'coffee'),
 ('coffee', 'monkey'),
 ('coffee', 'kitty'),
 ('panda', 'coffee'),
 ('panda', 'monkey'),
 ('panda', 'kitty'),
 ('monkey', 'coffee'),
 ('monkey', 'monkey'),
 ('monkey', 'kitty'),
 ('tea', 'coffee'),
 ('tea', 'monkey'),
 ('tea', 'kitty')]

In [50]:
s = rdd1.sample(False, .5)

In [51]:
s.first()

'coffee'

### Actions

The most common action on basic RDDs you will likely use is reduce(), which takes a function that operates on two elements of the type in your RDD and returns a new element of the same type.

In [52]:
# reduce() in Python
rdd = sc.parallelize([1, 2, 3, 4])
my_sum = rdd.reduce(lambda x, y: x+y)
print my_sum

10


Similar to reduce() is fold, which also takes a function with the same sinature as needed for reduce(), but in addition takes a "zero value" to be used for the initial call on each partition.

The aggregate() function frees us from the constraint of having the return be the same type as the RDD we are working on. With aggregate(), like fold(), we supply an initial value of the type we want to return. We then supply a function to combine the elements from our RDD with the accumulator. Finally, we need to supply a second function to merge two accumulators, given that each nodes accumulates its own results locally.

In [55]:
nums = sc.parallelize([1, 2, 3, 4])
sumCount = nums.aggregate((0, 0),
                          (lambda acc, value: (acc[0] + value, acc[1] + 1)),
                          (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))
                         )

In [56]:
sumCount

(10, 4)

Basic actions on an rdd

In [57]:
nums.collect()

[1, 2, 3, 4]

In [58]:
nums.count()

4

In [59]:
nums.countByValue()

defaultdict(<type 'int'>, {1: 1, 2: 1, 3: 1, 4: 1})

In [61]:
nums.take(2)

[1, 2]

In [62]:
nums.top(2)

[4, 3]

In [65]:
nums.takeSample(False, 1)

[4]

In [67]:
nums.reduce(lambda x, y: x+y)

10

In [72]:
nums.fold(0, lambda x, y: x+y)

10

## Persistence (Caching)

To avoid computing an RDD multiple times, we can ask Spark to persist the data. When we ask Spark to persist an RDD, the nodes that compute the RDD store their partitions.

Spark has many levels of persistence to choose from based on what our goals are: MEMORY_ONLY, MEMORY_ONLY_SER, MEMORY_AND_DISK, MEMORY_AND_DISK_SER, DISK_ONLY.

In Python, stored objects will always be serialized with the Pickle library, so it does not matter whether you choose a serialized level.

In [83]:
result = nums.map(lambda x: x*x)
result.persist()
result.count()

4

In [84]:
# remove the cache
result.unpersist()

PythonRDD[113] at RDD at PythonRDD.scala:43