# Apache Spark - Resilient Distributed Dataset (RDD)

In [1]:
%matplotlib inline 
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy import stats
import seaborn as sns
import warnings
import random
from datetime import datetime
random.seed(datetime.now())
warnings.filterwarnings('ignore')

# Make plots larger
plt.rcParams['figure.figsize'] = (10, 6)

## What is Apache Spark? 

![What is Apache Spark? ](http://nikbearbrown.com/YouTube/MachineLearning/IMG/spark-stack.png)

* Spark SQL is Apache Spark's module for working with structured data.  
* Spark Streaming makes it easy to build scalable fault-tolerant streaming applications.  
* MLlib is Apache Spark's scalable machine learning library.   
* GraphX is Apache Spark's API for graphs and graph-parallel computation.     
![Apache Spark](http://nikbearbrown.com/YouTube/MachineLearning/IMG/Apache_Spark.png)  

What is Apache Spark? [https://youtu.be/SxAxAhn-BDU ](https://youtu.be/SxAxAhn-BDU)  
    
Apache Spark [http://spark.apache.org/](http://spark.apache.org/)     

## Start pyspark server

Starting the pyspark server will be a command like:  

```bash
 ~/spark-2.2.0-bin-hadoop2.7/bin/pyspark

```

In [2]:
import pyspark # Test that pyspark is running

## SparkContext

In order to use Spark and its API we will need to use a `SparkContext`.  When running Spark, you start a new Spark application by creating a [SparkContext](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext).  When the `SparkContext` is created, it asks the master for some cores to use to do work.  The master sets these cores aside just for you; they won't be used for other applications. You con think of the `SparkContext` as you would a database connection object. 

The `SparkContext` is usually created for you as `sc`.

In [3]:
sc

## No SparkContext?

If there is no `SparkContext` you'll have to create it yourself (usually as `sc`).

from pyspark import SparkConf, SparkContext
conf = (SparkConf()
         .setMaster("local")
         .setAppName("My app")
         .set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)

##  `SparkContext` attributes

You can use Python's [dir()](https://docs.python.org/2/library/functions.html?highlight=dir#dir) function to get a list of all the attributes (including methods) accessible through the `sc` object.

In [4]:
# List sc's attributes
dir(sc)

['PACKAGE_EXTENSIONS',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__enter__',
 '__eq__',
 '__exit__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getnewargs__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_accumulatorServer',
 '_active_spark_context',
 '_batchSize',
 '_callsite',
 '_checkpointFile',
 '_conf',
 '_dictToJavaMap',
 '_do_init',
 '_ensure_initialized',
 '_gateway',
 '_getJavaStorageLevel',
 '_initialize_context',
 '_javaAccumulator',
 '_jsc',
 '_jvm',
 '_lock',
 '_next_accum_id',
 '_pickled_broadcast_vars',
 '_python_includes',
 '_repr_html_',
 '_temp_dir',
 '_unbatched_serializer',
 'accumulator',
 'addFile',
 'addPyFile',
 'appName',
 'applicationId',
 'binaryFiles',
 'binaryRecords',
 'broadcast',
 'cancelAllJobs',
 'cancelJobGroup',
 'def

## help()  

You can use Python's [help()](https://docs.python.org/2/library/functions.html?highlight=help#help) function to get an easier to read list of all the attributes, including examples, that the `sc` object has.

In [5]:
# Use help to obtain more detailed information
help(sc)

Help on SparkContext in module pyspark.context object:

class SparkContext(builtins.object)
 |  Main entry point for Spark functionality. A SparkContext represents the
 |  connection to a Spark cluster, and can be used to create L{RDD} and
 |  broadcast variables on that cluster.
 |  
 |  Methods defined here:
 |  
 |  __enter__(self)
 |      Enable 'with SparkContext(...) as sc: app(sc)' syntax.
 |  
 |  __exit__(self, type, value, trace)
 |      Enable 'with SparkContext(...) as sc: app' syntax.
 |      
 |      Specifically stop the context on exit of the with block.
 |  
 |  __getnewargs__(self)
 |  
 |  __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None, profiler_cls=<class 'pyspark.profiler.BasicProfiler'>)
 |      Create a new SparkContext. At least the master and app name should be set,
 |      either through the named parameters here or through C{conf}.
 |      

In [6]:
# Help can be used on any Python object
help(map)

Help on class map in module builtins:

class map(object)
 |  map(func, *iterables) --> map object
 |  
 |  Make an iterator that computes the function using arguments from
 |  each of the iterables.  Stops when the shortest iterable is exhausted.
 |  
 |  Methods defined here:
 |  
 |  __getattribute__(self, name, /)
 |      Return getattr(self, name).
 |  
 |  __iter__(self, /)
 |      Implement iter(self).
 |  
 |  __new__(*args, **kwargs) from builtins.type
 |      Create and return a new object.  See help(type) for accurate signature.
 |  
 |  __next__(self, /)
 |      Implement next(self).
 |  
 |  __reduce__(...)
 |      Return state information for pickling.



In [7]:
nums = list(range(1, 11))

In [8]:
type(nums)

list

In [9]:
nums[0:5]

[1, 2, 3, 4, 5]

## RDD vs DataFrame vs Dataset

### Resilient Distributed Dataset (RDD)
- from [http://spark.apache.org/docs/2.1.1/programming-guide.html#resilient-distributed-datasets-rdds](http://spark.apache.org/docs/2.1.1/programming-guide.html#resilient-distributed-datasets-rdds)   
 

Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. RDD was the primary user-facing API in Spark since its inception. There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

### Parallelized Collections   

Parallelized collections are created by calling SparkContext’s parallelize method on an existing iterable or collection in your driver program. The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is how to create a parallelized collection holding the numbers 1 to 5:

```python
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
```

Once created, the distributed dataset (distData) can be operated on in parallel. For example, we can call distData.reduce(lambda a, b: a + b) to add up the elements of the list. We describe operations on distributed datasets later on.

One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster. Normally, Spark tries to set the number of partitions automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10)). Note: some places in the code use the term slices (a synonym for partitions) to maintain backward compatibility.

### External Datasets  

PySpark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.

Text file RDDs can be created using SparkContext’s textFile method. This method takes an URI for the file (either a local path on the machine, or a hdfs://, s3n://, etc URI) and reads it as a collection of lines. Here is an example invocation:

```python
>>> distFile = sc.textFile("data.txt")
```

Once created, distFile can be acted on by dataset operations. For example, we can add up the sizes of all the lines using the map and reduce operations as follows: 

```python
distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b).
```

Some notes on reading files with Spark:

* If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.   

* All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz").   

* The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.   

**Resilient Distributed Dataset (RDD) are fault tolerant**  

The 'Resilient' in RDD comes from the fact that they are fault tolerant. If one fails, it can be recreated. 

**Resilient Distributed Dataset (RDD) stores data in memory** 

One of the defining features of Spark, compared to other data analytics frameworks (e.g., Hadoop), is that it stores data in memory rather than on disk.  This allows Spark applications to run much more quickly, because 
they are not slowed down by needing to read data from disk.

** When to use RDDs? **  

* You want low-level transformation and actions and control on your dataset   
* Your data is unstructured  
* you want to manipulate your data with functional programming constructs than domain specific expressions   
*  don’t care about imposing a schema  
* you can forgo some optimization and performance benefits available with DataFrames and Datasets for structured and semi-structured data.   

**What happens to RDDs in Apache Spark 2.0?** 


Are they being deprecated? NO!

You can seamlessly move between DataFrame or Dataset and RDDs at will—by simple API method calls—and DataFrames and Datasets are built on top of RDDs.  


## Datasets and DataFrames
- from [https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes](https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes)   

Like an RDD, a Datasets and DataFrames are immutable distributed collections of data. 

A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.). The Dataset API is available in Scala and Java. Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName). The case for R is similar.

Starting in Spark 2.0, Dataset takes on two distinct APIs characteristics: a strongly-typed API and an untyped API.

A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R. In Scala and Java, a DataFrame is represented by a Dataset of Rows. In the Scala API, DataFrame is simply a type alias of Dataset[Row]. While, in Java API, users need to use Dataset<Row> to represent a DataFrame.

![Unified Apache Spark 2.0](http://nikbearbrown.com/YouTube/MachineLearning/IMG/Unified_Apache_Spark_2.0.png)

Image from [http://go.databricks.com/apache-spark-2.0-presented-by-databricks-co-founder-reynold-xin](http://go.databricks.com/apache-spark-2.0-presented-by-databricks-co-founder-reynold-xin)




In [10]:
# Parallelize data using 8 partitions
# This operation is a transformation of data into an RDD
# Spark uses lazy evaluation, so no Spark jobs are run at this point
rdd = sc.parallelize(nums, 8)
# Cache the RDD
rdd.cache()

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:480

In [11]:
type(rdd)

pyspark.rdd.RDD

```python
parallelize(c, numSlices=None)
```

Distribute a local Python collection to form an RDD. Using xrange is recommended if the input represents a range for performance.

[http://spark.apache.org/docs/2.1.0/api/python/pyspark.html](http://spark.apache.org/docs/2.1.0/api/python/pyspark.html)

In [12]:
help(sc.parallelize)

Help on method parallelize in module pyspark.context:

parallelize(c, numSlices=None) method of pyspark.context.SparkContext instance
    Distribute a local Python collection to form an RDD. Using xrange
    is recommended if the input represents a range for performance.
    
    >>> sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()
    [[0], [2], [3], [4], [6]]
    >>> sc.parallelize(xrange(0, 6, 2), 5).glom().collect()
    [[], [0], [], [2], [4]]



In [13]:
# Each RDD gets a unique ID
print ('RDD id: {0}'.format(rdd.id()))

RDD id: 0


```python
collect()
```

Return a list that contains all of the elements in this RDD.

Note This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.

[http://spark.apache.org/docs/2.1.0/api/python/pyspark.html](http://spark.apache.org/docs/2.1.0/api/python/pyspark.html)

In [14]:
# View entire RDD
rdd.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

```python
take(num)
``` 

Take the first num elements of the RDD.

It works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit.

Translated from the Scala implementation in RDD#take().

Note this method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.

```python
>>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
[2, 3]
>>> sc.parallelize([2, 3, 4, 5, 6]).take(10)
[2, 3, 4, 5, 6]
>>> sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3)
[91, 92, 93]
```

[http://spark.apache.org/docs/2.1.0/api/python/pyspark.html](http://spark.apache.org/docs/2.1.0/api/python/pyspark.html)


In [15]:
# View first 7 of the RDD
rdd.take(7)

[1, 2, 3, 4, 5, 6, 7]

```python
map(f, preservesPartitioning=False)

``` 
Return a new RDD by applying a function to each element of this RDD.

```python
>>> rdd = sc.parallelize(["b", "a", "c"])
>>> sorted(rdd.map(lambda x: (x, 1)).collect())
[('a', 1), ('b', 1), ('c', 1)]
```


[http://spark.apache.org/docs/2.1.0/api/python/pyspark.html](http://spark.apache.org/docs/2.1.0/api/python/pyspark.html)


In [16]:
plus_one=rdd.map(lambda x: x + 1)

In [17]:
plus_one

PythonRDD[4] at RDD at PythonRDD.scala:48

In [18]:
plus_one.collect()

[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]

In [19]:
# Create minus_one function to subtract 1 rather than a lambda expression
def minus_one(x):
    return (x - 1)
minus_one=rdd.map(minus_one)
print (minus_one)
minus_one.collect()

PythonRDD[5] at RDD at PythonRDD.scala:48


[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [20]:
try:
  rdd.saveAsTextFile('data/rez_1.txt')
except:
  print ("File exists")
try:
  rdd.map(lambda x: "Number " + x ).saveAsTextFile('data/formated_rez_1.txt')
except:
  print ("File exists")

File exists


## RDD from text
  
The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.   


In [21]:
text=sc.textFile('data/Dr_Seuss_Oh_The_Places_Youll_Go.txt')
print (type(rdd))
print ('RDD id: {0}'.format(text.id()))

<class 'pyspark.rdd.RDD'>
RDD id: 13


In [22]:
text.collect()

['You have brains in your head. You have feet in your shoes. ',
 'You can steer yourself in any direction you choose. ',
 "You're on your own, and you know what you know. ",
 "And you are the guy who'll decide where to go. ",
 "― Dr. Seuss, Oh, The Places You'll Go!"]

In [23]:
text_p=sc.textFile('data/Dr_Seuss_Oh_The_Places_Youll_Go.txt', minPartitions=11)
print (type(text_p))
print ('RDD id: {0}'.format(text_p.id()))

<class 'pyspark.rdd.RDD'>
RDD id: 15


In [24]:
text_p.collect()

['You have brains in your head. You have feet in your shoes. ',
 'You can steer yourself in any direction you choose. ',
 "You're on your own, and you know what you know. ",
 "And you are the guy who'll decide where to go. ",
 "― Dr. Seuss, Oh, The Places You'll Go!"]

In [25]:
linesYou = text.filter(lambda line: "You" in line)
linesYou.collect()

['You have brains in your head. You have feet in your shoes. ',
 'You can steer yourself in any direction you choose. ',
 "You're on your own, and you know what you know. ",
 "― Dr. Seuss, Oh, The Places You'll Go!"]

One common data flow pattern is MapReduce, as popularized by Hadoop. Spark can implement MapReduce flows.

In [26]:
wordCounts = text.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
wordCounts.collect()

[('have', 2),
 ('in', 3),
 ('shoes.', 1),
 ('steer', 1),
 ('direction', 1),
 ('choose.', 1),
 ("You're", 1),
 ('know', 1),
 ('And', 1),
 ('are', 1),
 ('where', 1),
 ('go.', 1),
 ('Seuss,', 1),
 ('Oh,', 1),
 ('The', 1),
 ('Places', 1),
 ("You'll", 1),
 ('You', 3),
 ('brains', 1),
 ('your', 3),
 ('head.', 1),
 ('feet', 1),
 ('can', 1),
 ('yourself', 1),
 ('any', 1),
 ('you', 4),
 ('on', 1),
 ('own,', 1),
 ('and', 1),
 ('what', 1),
 ('know.', 1),
 ('the', 1),
 ('guy', 1),
 ("who'll", 1),
 ('decide', 1),
 ('to', 1),
 ('―', 1),
 ('Dr.', 1),
 ('Go!', 1)]

**count()**  
Return the number of elements in this RDD.

```python
>>> sc.parallelize([2, 3, 4]).count()
```

**countApprox(timeout, confidence=0.95)**   
Note Experimental
Approximate version of count() that returns a potentially incomplete result within a timeout, even if not all tasks have finished.

```python
>>> rdd = sc.parallelize(range(1000), 10)
>>> rdd.countApprox(1000, 1.0)
1000
```

**countApproxDistinct(relativeSD=0.05)**  

Return approximate number of distinct elements in the RDD.

The algorithm used is based on streamlib’s implementation of “HyperLogLog in Practice: Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm”, available here.

Parameters:	relativeSD – Relative accuracy. Smaller values create counters that require more space. It must be greater than 0.000017.

```python
>>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct()
>>> 900 < n < 1100
True
>>> n = sc.parallelize([i % 20 for i in range(1000)]).countApproxDistinct()
>>> 16 < n < 24
True
```

  [http://spark.apache.org/docs/2.1.0/api/python/pyspark.html](http://spark.apache.org/docs/2.1.0/api/python/pyspark.html)


In [27]:
print (rdd.count())
print (rdd.countApprox(1000,0.9))
print (rdd.countApproxDistinct())

10
10
10


**filter(f)**

Return a new RDD containing only the elements that satisfy a predicate.

```python
>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.filter(lambda x: x % 2 == 0).collect()
[2, 4]
```  

  [http://spark.apache.org/docs/2.1.0/api/python/pyspark.html](http://spark.apache.org/docs/2.1.0/api/python/pyspark.html)



In [28]:
rdd_lt5=rdd.filter(lambda x: x < 5)
rdd_lt5.collect()

[1, 2, 3, 4]

In [29]:
rdd_even=rdd.filter(lambda x: x % 2 == 0)
rdd_even.collect()

[2, 4, 6, 8, 10]

**first()**  

Return the first element in this RDD.

```python
>>> sc.parallelize([2, 3, 4]).first()
2
>>> sc.parallelize([]).first()
Traceback (most recent call last):
    ...
ValueError: RDD is empty
```

**take(num)**  

Take the first num elements of the RDD.

It works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit.

Translated from the Scala implementation in RDD#take().

Note this method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.  

```python
>>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
[2, 3]
>>> sc.parallelize([2, 3, 4, 5, 6]).take(10)
[2, 3, 4, 5, 6]
>>> sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3)
[91, 92, 93]
```

**takeOrdered(num, key=None)**   

Get the N elements from an RDD ordered in ascending order or as specified by the optional key function.

Note this method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.

```python
>>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)
[1, 2, 3, 4, 5, 6]
>>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x)
[10, 9, 7, 6, 5, 4]
```


**takeSample(withReplacement, num, seed=None)**      

Return a fixed-size sampled subset of this RDD.

Note This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.  

```python
>>> rdd = sc.parallelize(range(0, 10))
>>> len(rdd.takeSample(True, 20, 1))
20
>>> len(rdd.takeSample(False, 5, 2))
5
>>> len(rdd.takeSample(False, 15, 3))
10
```

  [http://spark.apache.org/docs/2.1.0/api/python/pyspark.html](http://spark.apache.org/docs/2.1.0/api/python/pyspark.html)


 
    

In [30]:
print(rdd.first())

1


In [31]:
print(rdd.take(3))
print(rdd.filter(lambda x: x > 3).take(3))
print(rdd.take(33)) # ok to take more elements than the RDD has

[1, 2, 3]
[4, 5, 6]
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]


In [32]:
print(rdd.takeOrdered(6))
print(rdd.takeOrdered(6, key=lambda x: -x)) # Reverse order

[1, 2, 3, 4, 5, 6]
[10, 9, 8, 7, 6, 5]


In [33]:
print(rdd.takeSample(True, 7, 1))
print(rdd.takeSample(False, 7, 2))
print(rdd.takeSample(False, 7, 2))
print(rdd.takeSample(False, 7, 3))

[5, 1, 3, 1, 10, 5, 1]
[6, 10, 4, 5, 7, 8, 3]
[6, 10, 4, 5, 7, 8, 3]
[2, 6, 7, 1, 10, 5, 8]


**reduce(f)**

Reduces the elements of this RDD using the specified commutative and associative binary operator. Currently reduces partitions locally.

```python
>>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
15
>>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)
10
>>> sc.parallelize([]).reduce(add)
```

Traceback (most recent call last):
    ...
ValueError: Can not reduce() empty RDD  

**
reduceByKey(func, numPartitions=None, partitionFunc=<function portable_hash at 0x7fc35dbc8e60>)**  

Merge the values for each key using an associative and commutative reduce function.

This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce.

Output will be partitioned with numPartitions partitions, or the default parallelism level if numPartitions is not specified. Default partitioner is hash-partition.

```python
>>> from operator import add
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.reduceByKey(add).collect())
[('a', 2), ('b', 1)]
```

**reduceByKeyLocally(func)**  

Merge the values for each key using an associative and commutative reduce function, but return the results immediately to the master as a dictionary.

This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce.

```python
>>> from operator import add
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.reduceByKeyLocally(add).items())
[('a', 2), ('b', 1)]
```

[http://spark.apache.org/docs/2.1.0/api/python/pyspark.html](http://spark.apache.org/docs/2.1.0/api/python/pyspark.html)


In [34]:
from operator import add, sub
print(rdd.reduce(add))
print(rdd.reduce(sub))
print(rdd.reduce(lambda a, b: a + b))
print(rdd.reduce(lambda a, b: b + a))
# Note that subtraction is not both associative and commutative
print(rdd.reduce(lambda a, b: a - b))
print(rdd.reduce(lambda a, b: b - a ))

55
-23
55
55
-23
-7


In [35]:
rdd_tmp = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd_tmp.reduceByKeyLocally(add).items())

[('a', 2), ('b', 1)]

In [36]:
rdd_tmp = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd_tmp.reduceByKey(add).collect())

[('a', 2), ('b', 1)]

**flatMap(f, preservesPartitioning=False)**   


Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.

```python
>>> rdd = sc.parallelize([2, 3, 4])
>>> sorted(rdd.flatMap(lambda x: range(1, x)).collect())
[1, 1, 1, 2, 2, 3]
>>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())
[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
```

**flatMapValues(f)**    

Pass each value in the key-value pair RDD through a flatMap function without changing the keys; this also retains the original RDD’s partitioning.


```python
>>> x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
>>> def f(x): return x
>>> x.flatMapValues(f).collect()
[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
```

Applying a map() transformation would yield a new RDD made up of iterators. Each iterator could have zero or more elements. Instead, we often want an RDD consisting of the values contained in those iterators. The solution is to use a flatMap() transformation, flatMap() is similar to map(), except that with flatMap() each input item can be mapped to zero or more output elements.

  [http://spark.apache.org/docs/2.1.0/api/python/pyspark.html](http://spark.apache.org/docs/2.1.0/api/python/pyspark.html)



In [37]:
print(rdd.collect())

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]


In [38]:
print(rdd.flatMap(lambda x: range(1, x)).collect())
print(sorted(rdd.flatMap(lambda x: range(1, x)).collect()))

[1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 5, 5, 5, 5, 5, 6, 6, 6, 6, 7, 7, 7, 8, 8, 9]


In [39]:
print(rdd.flatMap(lambda x: [(x, x)]).collect())

[(1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7), (8, 8), (9, 9), (10, 10)]


In [40]:
print(rdd.map(lambda x: range(1, x)).collect())
print(rdd.flatMap(lambda x: range(1, x)).collect())

[range(1, 1), range(1, 2), range(1, 3), range(1, 4), range(1, 5), range(1, 6), range(1, 7), range(1, 8), range(1, 9), range(1, 10)]
[1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9]


In [41]:
print(rdd.map(lambda x: [(x, x)]).collect())
print(rdd.flatMap(lambda x: [(x, x)]).collect())

[[(1, 1)], [(2, 2)], [(3, 3)], [(4, 4)], [(5, 5)], [(6, 6)], [(7, 7)], [(8, 8)], [(9, 9)], [(10, 10)]]
[(1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7), (8, 8), (9, 9), (10, 10)]


## Caching RDDs

Spark keeps your RDDs in memory. However, when memory is limited Spark will automatically delete RDDs from memory to make space for new RDDs.  

If you plan to use an RDD many times, you can Spark to cache that RDD. You can use the `cache()` operation to keep the RDD in memory.

**cache()**

Persist this RDD with the default storage level (MEMORY_ONLY).

[http://spark.apache.org/docs/2.1.0/api/python/pyspark.html](http://spark.apache.org/docs/2.1.0/api/python/pyspark.html)



In [42]:
# Name the RDD
rdd.setName('Bears RDD')
# Cache the RDD
rdd.cache()
# Is it cached
print (rdd.is_cached)
print (rdd.name)
# Release the RDD to free memory 
rdd.unpersist()
print (rdd.is_cached)

True
<bound method RDD.name of Bears RDD ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:480>
False


**cartesian(other)**  

Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of elements (a, b) where a is in self and b is in other.

```python
>>> rdd = sc.parallelize([1, 2])
>>> sorted(rdd.cartesian(rdd).collect())
[(1, 1), (1, 2), (2, 1), (2, 2)]
```

[http://spark.apache.org/docs/2.1.0/api/python/pyspark.html](http://spark.apache.org/docs/2.1.0/api/python/pyspark.html)



In [43]:
ab = sc.parallelize(['a', 'b'])
rdd.cartesian(ab).collect()

[(1, 'a'),
 (1, 'b'),
 (2, 'a'),
 (2, 'b'),
 (3, 'a'),
 (3, 'b'),
 (4, 'a'),
 (5, 'a'),
 (4, 'b'),
 (5, 'b'),
 (6, 'a'),
 (6, 'b'),
 (7, 'a'),
 (7, 'b'),
 (8, 'a'),
 (8, 'b'),
 (9, 'a'),
 (10, 'a'),
 (9, 'b'),
 (10, 'b')]

**pipe(command, env=None, checkCode=False)**   

Return an RDD created by piping elements to a forked external process.

```python
>>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect()
[u'1', u'2', u'', u'3']
```

Parameters:	checkCode – whether or not to check the return value of the shell comman


[http://spark.apache.org/docs/2.1.0/api/python/pyspark.html](http://spark.apache.org/docs/2.1.0/api/python/pyspark.html)



In [44]:
rdd.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [45]:
rdd.pipe("grep 5").collect()

['5']

**repartition(numPartitions)**  

Return a new RDD that has exactly numPartitions partitions.

Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data. If you are decreasing the number of partitions in this RDD, consider using coalesce, which can avoid performing a shuffle.

```python
>>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
>>> sorted(rdd.glom().collect())
[[1], [2, 3], [4, 5], [6, 7]]
>>> len(rdd.repartition(2).glom().collect())
2
>>> len(rdd.repartition(10).glom().collect())
10
```

**repartitionAndSortWithinPartitions(numPartitions=None, partitionFunc=<function portable_hash at 0x7fc35dbc8e60>, ascending=True, keyfunc=<function <lambda> at 0x7fc35dbcf758>)**  

Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys.

```python
>>> rdd = sc.parallelize([(0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 3)])
>>> rdd2 = rdd.repartitionAndSortWithinPartitions(2, lambda x: x % 2, 2)
>>> rdd2.glom().collect()
[[(0, 5), (0, 8), (2, 6)], [(1, 3), (3, 8), (3, 8)]]
```

**glom()**  

Return an RDD created by coalescing all elements within each partition into a list.

```python
>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
>>> sorted(rdd.glom().collect())
[[1, 2], [3, 4]]
```

[http://spark.apache.org/docs/2.1.0/api/python/pyspark.html](http://spark.apache.org/docs/2.1.0/api/python/pyspark.html)



In [46]:
rdd_tmp = sc.parallelize([1,2,3,4,5,6,7], 4)
print (rdd_tmp.collect())
print (sorted(rdd_tmp.glom().collect()))
print (len(rdd_tmp.repartition(2).glom().collect()))
print (len(rdd.repartition(10).glom().collect()))

[1, 2, 3, 4, 5, 6, 7]
[[1], [2, 3], [4, 5], [6, 7]]
2
10


## Checkpointing RDDs

Checkpointing saves the data to HDFS, which provide fault tolerant storage across nodes.

**setCheckpointDir(dirName)**

Set the directory under which RDDs are going to be checkpointed. The directory must be a HDFS path if running on a cluster.

**checkpoint()**  

Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint directory set with SparkContext.setCheckpointDir() and all references to its parent RDDs will be removed. This function must be called before any job has been executed on this RDD. It is strongly recommended that this RDD is persisted in memory, otherwise saving it on a file will require recomputation.


In [47]:
sc.setCheckpointDir("data")
rdd_tmp = sc.parallelize([1,2,3,4,5])

for i in range(999):
    rdd_tmp_plus_one = rdd_tmp.map(lambda x: x + 1)

    if i % 100 == 0: 
        rdd_tmp_plus_one.checkpoint()

## Broadcast Variables

Sometimes, a variable needs to be shared across tasks, or between tasks and the driver program. Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums.   

**class pyspark.Broadcast(sc=None, value=None, pickle_registry=None, path=None)**    

A broadcast variable created with SparkContext.broadcast(). Access its value through value.

Examples:

```python
>>> from pyspark.context import SparkContext
>>> sc = SparkContext('local', 'test')
>>> b = sc.broadcast([1, 2, 3, 4, 5])
>>> b.value
[1, 2, 3, 4, 5]
>>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()
[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
>>> b.unpersist()
>>> large_broadcast = sc.broadcast(range(10000))
```

[http://spark.apache.org/docs/2.1.0/api/python/pyspark.html](http://spark.apache.org/docs/2.1.0/api/python/pyspark.html)



## Accumulators  

**class pyspark.Accumulator(aid, value, accum_param)**

A shared variable that can be accumulated, i.e., has a commutative and associative “add” operation. Worker tasks on a Spark cluster can add values to an Accumulator with the += operator, but only the driver program is allowed to access its value, using value. Updates from the workers get propagated automatically to the driver program.

While SparkContext supports accumulators for primitive data types like int and float, users can also define accumulators for custom types by providing a custom AccumulatorParam object. Refer to the doctest of this module for an example.

_add(term)_  

Adds a term to this accumulator’s value

_value_  

Get the accumulator’s value; only usable in driver program

**class pyspark.AccumulatorParam**    

Helper object that defines how to accumulate values of a given type.

_addInPlace(value1, value2)_  

Add two values of the accumulator’s data type, returning a new value; for efficiency, can also update value1 in place and return it.

_zero(value)_  

Provide a “zero value” for the type, compatible in dimensions with the provided value (e.g., a zero vector)



## Apache Spark 2.0 Review  

![Apache Spark 2.0 Review](http://nikbearbrown.com/YouTube/MachineLearning/IMG/Apache_Spark_2.0_Review.png)


Apache Spark 2.0 [https://youtu.be/ssPBlqiRJGY](https://youtu.be/ssPBlqiRJGY)
                                                
                                                

Last update October 3, 2017 

The text is released under the [CC-BY-NC-ND license](https://creativecommons.org/licenses/by-nc-nd/3.0/us/legalcode), and code is released under the [MIT license](https://opensource.org/licenses/MIT).