RDD: Resilient Distributed Dataset

[Github repo of the book](https://github.com/databricks/learning-spark)

Clone URL: git@github.com:databricks/learning-spark.git

path: ~/repos/databricks/learning-spark

# RDD Basics

* An RDD in Spark is simply an immutable distributed collection of objects.  
* Each RDD is split into multiple partitions, which may be computed on different nodes of the cluster. 
* RDDs can contain any type of Python, Java, or Scala objects, including user-defined classes

* Users create RDDs in two ways:
  * by loading an external dataset, or 
  * by distributing a collection of objects (e.g., a list or set) in their driver program

In [2]:
lines = sc.textFile("README.md")
type(lines)

pyspark.rdd.RDD

* Once created, RDDs offer two types of operations: transformations and actions.  

* Transformations construct a new RDD from a previous one.
  * For example, one common transformation is filtering data that matches a predicate.
  * In our text file example, we can use this to create a new RDD holding just the strings that       contain the word Python

In [4]:
python_lines = lines.filter(lambda line: "Python" in line)

* Actions, on the other hand, compute a result based on an RDD, and either return it to the driver program or save it to an external storage system (e.g., HDFS). 
* One example of an action we called earlier is first() , which returns the first element in an RDD and is demonstrated in Example 3-3.

In [5]:
python_lines.first()

u'high-level APIs in Scala, Java, Python, and R, and an optimized engine that'

For the first() action, Spark scans the file only until it finds the first matching line; it doesn’t even read the whole file.


If you would like to reuse an RDD in multiple actions, you can ask Spark to persist it using 

```python
RDD.persist()
```

In practice, you will often use persist() to load a subset of your data into memory and query it repeatedly.  
For example, if we knew that we wanted to compute multiple results about the README lines that contain Python, we could write the script as

```python
python_lines.persist
```

In [7]:
python_lines.persist
print(python_lines.count())
print(python_lines.first())

3
high-level APIs in Scala, Java, Python, and R, and an optimized engine that


*cache()* is the same as calling *persist()* with the default storage level

# Creating RDDs

Spark provides two ways to create RDDs  
  * loading an external dataset
  * parallelizing a collection in your driver program.

The simplest way to create RDDs is to take an existing collection in your program and pass it to SparkContext’s parallelize() method, as shown below.  

This approach is very useful when you are learning Spark, since you can quickly create your own RDDs in the shell and perform operations on them.   

Keep in mind, however, that outside of prototyping and testing, this is not widely used since
it requires that you have your entire dataset in memory on one machine.

In [8]:
lines = sc.parallelize(['pandas','i like pandas'])

A more common way is to load from external storage.  

```python
lines = sc.textFile("/path/to/README.md")
```

# RDD Operations

* Transformation: RDD to RDD  
* Action: RDD to some other return type

## Transformations

Transformed RDDs are computed lazily, only when used in an action

In [25]:
str_lines = " warn: yes it is\ninfo: this is info \n error: this is error\n warn: another warning"
with open("log.txt","w") as f: f.write(str_lines)

In [27]:
input_rdd = sc.textFile("log.txt")
errors_rdd = input_rdd.filter(lambda x: "error" in x)
errors_rdd.first()

u' error: this is error'

Note that the filter() operation does not mutate the existing inputRDD.  
Instead, it returns a pointer to an entirely new RDD. inputRDD can still be reused later in the
program—for instance, to search for other words.

In [28]:
errors_rdd = input_rdd.filter(lambda x: "error" in x)
warn_rdd = input_rdd.filter(lambda x: "warn" in x)
bad_lines_rdd = errors_rdd.union(warn_rdd)
print("type of errors_rdd:", type(errors_rdd), id(errors_rdd))
print("type of warn_rdd:", type(warn_rdd), id(warn_rdd))
print("type of bad_lines_rdd:", type(bad_lines_rdd), id(bad_lines_rdd))


('type of errors_rdd:', <class 'pyspark.rdd.PipelinedRDD'>, 140271238701776)
('type of warn_rdd:', <class 'pyspark.rdd.PipelinedRDD'>, 140271238701840)
('type of bad_lines_rdd:', <class 'pyspark.rdd.RDD'>, 140271238703888)


Note  
A better way to accomplish the same result would be to simply filter the inputRDD once, looking for either error or warning.

## Actions

They are the operations that return a final value to the driver program or write data to an external storage system. 

Actions force the evaluation of the transformations required for the RDD they were called on, since they need to actually produce output.


In [29]:
print("Input had", bad_lines_rdd.count(), " concerning lines")
print("Here are 3 examples:")
for line in bad_lines_rdd.take(3):
    print(line) 

('Input had', 3, ' concerning lines')
Here are 3 examples:
 error: this is error
 warn: yes it is


* take() is used to retrieve a small number of elements in the RDD at the driver program
* collect() to retrieve the entire RDD
  * hence collect() shouldn't be used on large datasets


In most cases RDDs can’t just be collect() ed to the driver because they are too large.  

In these cases, it’s common to write data out to a distributed storage system such as HDFS or Amazon S3.   

You can save the contents of an RDD using the 
* *saveAsTextFile()* action
* *saveAsSequenceFile()* action
* or any of a number of actions for various built-in formats

We will cover the different options for exporting data in Chapter 5.

# Lazy Evalution

Spark uses lazy evaluation to reduce the number of passes it has to take over our data by grouping operations together.   

In systems like Hadoop MapReduce, developers often have to spend a lot of time considering how to group together operations to minimize the number of MapReduce passes.   


In Spark, there is no substantial benefit to writing a single complex map instead of chaining together many simple operations. 

Thus, users are free to organize their program into smaller, more manageable operations.

# Passing functions to Spark

3 ways to doing so in python  
* as lambdas
* as top-level functions
* as locally defined functions

In [31]:
error_rdd = input_rdd.filter(lambda line: "error" in lines)

In [32]:
def contains_error(line):
    return "error" in line

error_rdd = input_rdd.filter(contains_error)

## Important thing

One issue to watch out for when passing functions is inadvertently serializing the object containing the function.  

When you pass a function that is the member of an object, or contains references to fields in an object (e.g., self.field ), Spark sends the entire object to worker nodes, which can be much larger than the bit of information you need. 

Sometimes this can also cause your program to fail, if your class contains objects that Python can’t figure out how to pickle.

### Wrong way

In [33]:
# Passing a function with field references (don’t do this!)

class SearchFunctions(object):
    def __init__(self, query):
        self.query = query
        
    def isMatch(self, s):
        return self.query in s

    def getMatchesFunctionReference(self, rdd):
        # Problem: references all of "self" in "self.isMatch"
        return rdd.filter(self.isMatch)

    def getMatchesMemberReference(self, rdd):
        # Problem: references all of "self" in "self.query"
        return rdd.filter(lambda x: self.query in x)


### Right way

Instead, just extract the fields you need from your object into a local variable and pass that in.  

In [34]:
class WordFunctions(object):
    ##
    #
    
    def getMatchesNoReference(self, rdd):
        # Safe: extract only the field we need into a local variable
        query = self.query
        return rdd.filter(lambda x: query in x)

I know it is weird, but what can you do about it. Shove it up and live with it.

# Common Transformations and Actions

## Basic RDDs

### map, filter

It is useful to note that map() ’s return type does not have to be the same as its input type.  

So if we had an RDD String and our map() function were to parse the strings and return a Double, our input RDD type would be RDD[String] and the resulting RDD type would be RDD[Double] .

In [37]:
nums = sc.parallelize(range(1,5))
print("type of nums:", type(nums))
squared_rdd = nums.map(lambda x: x*x)
print("type of rdd:", type(squared_rdd))
for num in squared_rdd.collect():
    print(num)

('type of nums:', <class 'pyspark.rdd.RDD'>)
('type of rdd:', <class 'pyspark.rdd.PipelinedRDD'>)
1
4
9
16


### flatmap

In [40]:
lines = sc.parallelize(["hello world", "hi"])

words = lines.map(lambda line: line.split(" "))
print("using map:", words.first())

words = lines.flatMap(lambda line: line.split(" "))
print("using flatmap:", words.first())

('using map:', ['hello', 'world'])
('using flatmap:', 'hello')


This is equivalent to
```ruby
lines.map {|line| line.split(" ")}.flatten.first()
```

### Pseudo set operations

distinct, union, intersection, subtract(aka setminus)

In [49]:
rdd_1 = sc.parallelize(["coffee", "coffee", "panda", "monkey", "tea"])
rdd_2 = sc.parallelize(["coffee", "monkey", "kitty"])

print("rdd_1.distinct():", rdd_1.distinct().collect())
print("rdd_1.union(rdd_2):", rdd_1.union(rdd_2).collect())
print("rdd_1.intersection(rdd_2):", rdd_1.intersection(rdd_2).collect())
print("rdd_1.subtract(rdd_2):", rdd_1.subtract(rdd_2).collect())
print("rdd_1.cartesian(rdd_2):", rdd_1.cartesian(rdd_2).collect())

('rdd_1.distinct():', ['tea', 'coffee', 'panda', 'monkey'])
('rdd_1.union(rdd_2):', ['coffee', 'coffee', 'panda', 'monkey', 'tea', 'coffee', 'monkey', 'kitty'])
('rdd_1.intersection(rdd_2):', ['coffee', 'monkey'])
('rdd_1.subtract(rdd_2):', ['tea', 'panda'])
('rdd_1.cartesian(rdd_2):', [('coffee', 'coffee'), ('coffee', 'monkey'), ('coffee', 'kitty'), ('coffee', 'coffee'), ('coffee', 'monkey'), ('coffee', 'kitty'), ('panda', 'coffee'), ('panda', 'monkey'), ('panda', 'kitty'), ('monkey', 'coffee'), ('monkey', 'monkey'), ('monkey', 'kitty'), ('tea', 'coffee'), ('tea', 'monkey'), ('tea', 'kitty')])


Note that distinct() is expensive, however, as it requires shuffling all the data over the network to ensure that we receive only one copy of each element.   

Shuffling, and how to avoid it, is discussed in more detail in Chapter 4.


While intersection() and union() are two similar concepts, the performance of intersection() is much worse since it requires a shuffle over the network to identify common elements.


*subtract(other)*, like intersection() , performs a shuffle.

Be warned, however, that the Cartesian product is very expensive for large RDDs.

## Basic Actions

### reduce

In [51]:
rdd_3 = sc.parallelize(range(1,11))
print('sum:', rdd_3.reduce(lambda x,y: x+y))

('sum:', 55)


In [55]:
import operator
print('sum:', rdd_3.fold(0, operator.add))

('sum:', 55)


Fold (not clear)

You can minimize object creation in fold() by modifying and returning the first of the two parameters in place. However, you should not modify the second parameter.

[reduce vs fold](http://stackoverflow.com/questions/25158780/difference-between-reduce-and-foldleft-fold-in-functional-programming-particula)

### aggregate

The aggregate() function frees us from the constraint of having the return be the same type as the RDD we are working on.  

With aggregate(), like fold(), we supply an initial zero value of the type we want to return. 

We then supply a function to combine the elements from our RDD with the accumulator.  

Finally, we need to supply a second function to merge two accumulators, given that each node accumulates its own results locally.

In [59]:
nums = sc.parallelize(range(20))
sum_count = nums.aggregate((0,0),
                           (lambda acc,val: (acc[0]+val, acc[1]+1)),
                           (lambda acc1,acc2: (acc1[0]+acc2[0], acc1[1]+acc2[1]))
                          )
print(sum_count)
print(sum_count[0] / float(sum_count[1]))

(190, 20)
9.5


### collect

returns the entire RDD's contents.   
all the data must fit on a single m/c else fuck off

### take(n)

return n elements  
attempts to minimize the number of partitions it accesses.  
hence, it may represent a biased collection.  

### top(n)

returns the top n elements based on a (Default) comparison function.



In [69]:
import random
nums = range(20)
random.shuffle(nums)
print(nums)

nums = sc.parallelize(nums)
print(nums.top(0))
print(nums.top(1))
print(nums.top(2))

[6, 2, 11, 5, 14, 17, 13, 0, 19, 9, 7, 16, 15, 8, 12, 1, 10, 18, 3, 4]
[]
[19]
[19, 18]


### takeSample

Sometimes we need a sample of our data in our driver program.   

The takeSample(withReplacement, num, seed) function allows us to take a sample of our data
either with or without replacement.  

Sometimes it is useful to perform an action on all of the elements in the RDD, but

### foreach

Sometimes it is useful to perform an action on all of the elements in the RDD, but without returning any result to the driver program.   

A good example of this would be posting JSON to a webserver or inserting records into a database. 

In either case, the foreach() action lets us perform computations on each element in the RDD without bringing it back locally.


### count, countByValue



In [None]:
rdd_1 = sc.parallelize(["coffee", "coffee", "panda", "monkey", "tea"])

print('rdd_1.count():', rdd_1.count())
print('rdd_1.countByValue():', rdd_1.countByValue())

### countByKey

In [74]:
rdd_2 = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
print('rdd_2.countByKey():', rdd_2.countByKey())

('rdd_2.countByKey():', defaultdict(<type 'int'>, {'a': 2, 'b': 1}))


In [73]:
?rdd_1.countByKey

### takeOrdered

takeOrdered(num)(ordering) 

Return num elements based on
provided ordering

```python
rdd.takeOrdered(2)
(myOrdering) 
```

```python
{3, 3}
```


# Converting between RDD Types

# Persistence (Caching)

As discussed earlier, Spark RDDs are lazily evaluated, and sometimes we may wish to
use the same RDD multiple times.  

If we do this naively, Spark will recompute the RDD and all of its dependencies each time we call an action on the RDD.

This can be especially expensive for iterative algorithms, which look at the data many times.

In [77]:
rdd = sc.parallelize(range(20)).map(lambda x: str(x**2))
print(rdd.count())
print(",".join(rdd.collect()))


20
0,1,4,9,16,25,36,49,64,81,100,121,144,169,196,225,256,289,324,361


 Space used CPU time In memory On disk Comments
MEMORY_ONLY High Low Y N  Low High Y N  High Medium Some Some Spills to disk if there is too much data to fit in
memory.
 Low High Some Some Spills to disk if there is too much data to fit in
memory. Stores serialized representation in
memory.

| Level  | Space Use | CPU Time | In memory | On Disk | Comments |
|---|---|---|---|---|---|
| MEMORY_ONLY | High  | Low  | Y  | N  | |
|  MEMORY_ONLY_SER | Low  | High  | Y  | N  | |
| MEMORY_AND_DISK  | High  | Medium  | Some  | Some  | Spills to disk if too much d in mem|
| MEMORY_AND_DISK_SER | Low | High | SOme | Some | Spills to disk if too much d in mem |
| DISK_ONLY | Low | High | N | Y | |

```scala
val result = input.map(x => x * x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))
```

Note

* Notice that we called persist() on the RDD before the first action.   
* The persist() call on its own doesn’t force evaluation.

If you attempt to cache too much data to fit in memory, Spark will automatically evict old partitions using a Least Recently Used (LRU) cache policy. 

For the memory-only storage levels, it will recompute these partitions the next time they are accessed.

While for the memory-and-disk ones, it will write them out to disk. 

In either case, this means that you don’t have to worry about your job breaking if you ask Spark to cache too much data. 

However, caching unnecessary data can lead to eviction of useful data and more recomputation time.

Finally, RDDs come with a method called unpersist() that lets you manually remove them from the cache.