# What is Apache Spark?
* distributed framework
* in-memory data structures 
* data processing
* it improves (most of the times) Hadoop workloads

Spark enables data scientists to tackle problems with larger data sizes than they could before with tools like R or Pandas

## First Steps with Apache Spark Interactive Programming

First of all check that PySpark is running properly. You can check if PySpark is correctly loaded:
In case it is not, you can follow these posts:
    * Windows (IPython): http://jmdvinodjmd.blogspot.com.es/2015/08/installing-ipython-notebook-with-apache.html 
    * Windows (Jupyter): http://www.ithinkcloud.com/tutorials/tutorial-on-how-to-install-apache-spark-on-windows/


In [1]:
sc

<pyspark.context.SparkContext at 0x3b737b8>

The first thing to note is that with Spark all computation is parallelized by means of distributed data structures that are spread through the cluster. These collections are called Resilient Distributed Datasets (RDD). We will talk more about RDD, as they are the main piece in Spark.

As we have successfully loaded the Spark Context, we are ready to do some interactive analysis. We can read a simple file:

In [2]:
lines = sc.textFile("../data/people.csv")
lines.count()

11

In [None]:
lines.first()

This is a very simple first example, where we create an RDD (variable lines) and then we apply some operations (count and first) in a parallel manner. It has to be noted, that as we are running all our examples in a single computer the parallelization is not applied. 

In the next section we will cover the core Spark concepts that allow Spark users to do parallel computation.

## Core Spark Concepts

We will talk about **Spark applications** that are in charge of loading data and applying some distributed computation over it. Every application has a **driver program** that launches parallel operations to the cluster. In the case of interactive programming, the driver program is the shell (or Notebook) itself.

The "access point" to Spark from the driver program is the Spark Context object. As we have previously seen, using the referenced documentation, the sc object, is automatically loaded in the notebook.

Once we have an Spark Context we can use it to build RDDs. In the previous examples we used sc.textFile() to represent the lines of the textFile. Then we run different operations over the RDD lines. 

To run these operations over RDDs, driver programs manage different nodes called executors. For example, for the count operation, it is possible to run count in different ranges of the file. 

Spark's API allows passing functions to its operators to run them on the cluster. For example, we could extend our example by filtering the lines in the file that contain a word, such as individuum.


In [1]:
lines = sc.textFile("../data/people.csv")
filtered_lines = lines.filter(lambda line: "individuum" in line)
filtered_lines.first()

u'individuum 1, 42, female, 52.9, brown, 36.9'

## RDD Basics

An RDD can be defined as a distributed collection of elements. All work done with Spark can be summarized as **creating**, **transforming** and **applying** operations over RDDs to compute a result. Under the hood, Spark automatically **distributes the data contained in RDDs** across your cluster and **parallelizes the operations** you perform on them.

RDD properties:
* it is an **immutable distributed** collection of objects
* it is split into multiple **partitions**
* it is computed on different nodes of the cluster
* it can contain any type of Python object (user defined ones included)

An RDD can be created in **two ways**:
1. loading an external dataset
2. distributing a collection of objects in the driver program

We have already seen the two ways of creating an RDD. 

In [None]:
# loading an external dataset
lines = sc.textFile("../data/people.csv")
print type(lines)
# applying a transformation to an existing RDD
filtered_lines = lines.filter(lambda line: "individuum" in line)
print type(filtered_lines)

It is important to note that once we have an RDD, we can run **two kind of operations**:
* **transformations**: construct a new RDD from a previous one. For example, by filtering lines RDD we create a new RDD that holds the lines that contain "individuum" string. Note that the returning result is an RDD.
* **actions**: *compute* a result based on an RDD, and returns the result to the driver program or stores it to an external storage system (e.g. HDFS). Note that the returning result is not an RDD but another kind of variable.

In [2]:
action_result = lines.first()
print type(action_result)
action_result

<type 'unicode'>


u',Age[years],Sex,Weight[kg],Eye Color,Body Temperature[C]'

Transformations and actions are very different because of the way Spark computes RDDs. 

Transformations are defined in a **lazy** manner this is they are **only computed once they are used in an action**.

In [None]:
# filtered_lines is not computed until the next action is applied over it
# it make sense when working with big data sets, as it is not necessary to 
# transform the whole RDD to get an action over a subset
# Spark doesn't even reads the complete file!
filtered_lines.first()

The drawback is that Spark  **recomputes** again the RDD at **each action application**. 

This means that the computing effort over an already computed RDD may be lost. 

To mitigate this drawback, the user can take the decision of **persisting** the RDD after computing it the first time, **Spark will store the RDD contents in memory**  (partitioned across the machines in your cluster), and reuse them in future actions. 

**Persisting RDDs on disk** instead of memory is also possible.

Let's see an example on the impact of persisting:

In [3]:
import time

lines = sc.textFile("../data/REFERENCE/*")
lines_nonempty = lines.filter( lambda x: len(x) > 0 )
words = lines_nonempty.flatMap(lambda x: x.split())
words_persisted = lines_nonempty.flatMap(lambda x: x.split())

t1 = time.time()
words.count()
print "Word count 1:",time.time() - t1

t1 = time.time()
words.count()
print "Word count 2:",time.time() - t1

t1 = time.time()
words_persisted.persist()
words_persisted.count()
print "Word count persisted 1:",time.time() - t1

t1 = time.time()
words_persisted.count()
print "Word count persisted 2:", time.time() - t1


Word count 1: 17.3249998093
Word count 2: 16.6799998283
Word count persisted 1: 30.1779999733
Word count persisted 2: 14.0679998398


## RDD Operations

We have already seen that RDDs have two basic operations: **transformations** and **actions**.

**Transformations** are operations that return a new RDD. *Examples:* filter, map.

Remember that , transformed RDDs are **computed lazily**, only when you use them in an action.

Lazy evaluation means that when we call a transformation on an RDD (for instance, calling map()), the operation is **not immediately performed**. 

Instead, Spark internally records **metadata** to indicate that this operation has been requested. 

**Loading data** into an RDD is lazily evaluated in the same way trans formations are. So, when we call sc.textFile(), the data is **not loaded** until it is necessary. 

As with transformations, the operation (in this case, reading the data) can occur multiple times. Take in mind that transformations **DO HAVE** impact over computation time.

Many transformations are **element-wise**; that is, they work on one element at a time; but this is not true for all transformations.


In [3]:
lines = sc.textFile("../data/REFERENCE/*")
lines_nonempty = lines.filter( lambda x: len(x) > 0 )
words = lines_nonempty.flatMap(lambda x: x.split())
words_persisted = lines_nonempty.flatMap(lambda x: x.split())
words.take(10)

[u'Newsgroups:',
 u'freenet.shrine.songs',
 u'From:',
 u'aa300',
 u'(Jerry',
 u'Murphy)',
 u'Subject:',
 u'Songs',
 u'from',
 u'the']

* filter applies the lambda function to each line in lines RDD, only lines that accomplish the condition that the length is greater than zero are in lines_nonempty variable (**this RDD is not computed yet!**)
* flatMap applies the lambda function to each element of the RDD and then the result is flattened (i.e. a list of lists would be converted to a simple list)

**Actions** are operations that return an object to the driver program or write to external storage, they kick a computation. *Examples:* first, count.

In [None]:
import time

t1 = time.time()
words.count()
print "Word count 1:",time.time() - t1

t1 = time.time()
words.count()
print "Word count 2:",time.time() - t1

t1 = time.time()
words_persisted.persist()
words_persisted.count()
print "Word count persisted 1:",time.time() - t1

t1 = time.time()
words_persisted.count()
print "Word count persisted 2:", time.time() - t1

Actions are the operations that return a **final value** to the driver program or write data to an external storage system. 

Actions **force the evaluation** of the transformations required for the **RDD** they were called on, since they need to actually produce output.

Returning to the previous example, until we call count over words and words persisted, the RDD are not computed. See that we persisted words_persisted, and until its second computation we cannot see the impact of persisting that RDD in memory.

If we want to see a part of the RDD, we can use take, and to have the full RDD we can use collect.

In [None]:
lines = sc.textFile("../data/people.csv")
print "Three elements", lines.take(3)
print "The whole RDD", lines.collect()

## Passing functions to Spark

Most of Spark’s transformations, and some of its actions, depend on **passing in functions** that are used by Spark to **compute** data.

In Python, we have three options for passing functions into Spark. 
 * For shorter functions, we can pass in lambda expressions
 * We can pass in top-level functions, or 
 * Locally defined functions.

In [2]:
lines = sc.textFile("../data/people.csv")

first_cells = lines.map(lambda x: x.split(",")[0])
print first_cells.collect()

# how to pass extra arguments
def get_cell(x):
    return x.split(",")[0]
first_cells = lines.map(get_cell)
print first_cells.collect()

[u'', u'individuum 1', u'individuum 2', u'individuum 3', u'individuum 4', u'individuum 5', u'individuum 6', u'individuum 7', u'individuum 8', u'individuum 9', u'individuum 10']
[u'', u'individuum 1', u'individuum 2', u'individuum 3', u'individuum 4', u'individuum 5', u'individuum 6', u'individuum 7', u'individuum 8', u'individuum 9', u'individuum 10']


## Working with common Spark transformations

The two most common transformations you will likely be using are map() and filter(). 

The **map()** transformation takes in a function and applies it to each element in the RDD with the result of the function being the new value of each element in the resulting RDD. 

The **filter()** transformation takes in a function and returns an RDD that only has elements that pass the filter() function.

Sometimes ** map() ** returns nested lists, to flatten these nested lists we can use ** flatMap() **. So, ** flatMap() ** is called individually for each element in our input RDD. Instead of returning a single element, we return an iterator with our return values. Rather than producing an RDD of iterators, we get back an RDD that consists of the elements from all of the iterators.

### Set operations

* **distinct()** transformation to produce a new RDD with only distinct items. Note that distinct() is expensive, however, as it requires shuffling all the data over the network to ensure that we receive only one copy of each element. 

* **RDD.union(other)** back an RDD consisting of the data from both sources. Unlike the mathematical union(), if there are duplicates in the input RDDs, the result of Spark’s union() will contain duplicates (which we can fix if desired with distinct()).

* **RDD.intersection(other)**  returns only elements in both RDDs. intersection() also removes all duplicates (including duplicates from a single RDD) while running. While intersection() and union() are two similar concepts, the performance of intersection() is much worse since it requires a shuffle over the network to identify common elements.

* ** RDD.subtract(other)** function takes in another RDD and returns an RDD that has only values present in the first RDD and not the second RDD. Like intersection(), it performs a shuffle.

* ** RDD.cartesian(other) ** transformation returns all possible pairs of (a,b) where a is in the source RDD and b is in the other RDD. The Cartesian product can be useful when we wish to consider the similarity between all possible pairs, such as computing every user’s expected interest in each offer. We can also take the Cartesian product of an RDD with itself, which can be useful for tasks like user similarity. Be warned, however, that the Cartesian product is very expensive for large Dds.

### Actions

* **reduce():** which takes a function that operates on two elements of the type in your RFD and returns a new element of the same type. 

* **aggregate():** takes an initial zero value of the type we want to return. We then supply a function to combine the elements from our RDD with the accumulator. Finally, we need to supply a second function to merge two accumulators, given that each node accumulates its own results locally. To know more:
    * http://stackoverflow.com/questions/28240706/explain-the-aggregate-functionality-in-spark
    * http://atlantageek.com/2015/05/30/python-aggregate-rdd/
    
* **collect():** returns the entire RDD’s contents. collect() is commonly used in unit tests where the entire contents of the RDD are expected to fit in memory, as that makes it easy to compare the value of our RDD with our expected result.

* **take(n):** returns n elements from the RDD and attempts to minimize the number of partitions it accesses, so it may represent a biased collection

* **top():** will use the default ordering on the data, but we can supply our own comparison function to extract the top elements. 


### Exercises

** Exercise 1: ** Download all books, from books.csv using the map function.

** Exercise 2: ** Identify transformations and actions. When the returned data is calculated?

** Exercise 3: ** Imagine that you only want to download Dickens books, how would you do that? Which is the impact of not persisting dickens_books_content?

** Exercise 4: ** Use flatMap() in the resulting RDD of the previous exercise, how the result is different?

** Exercise 5: ** You want to know the different books authors there are.

** Exercise 6: ** Return Poe's and Dickens' books URLs (use union function).

** Exercise 7: ** Return the list of books without Dickens' and Poe's books.

** Exercise 8: ** Count the number of books using reduce function.

** Exercise 9: ** Compute the mean price of estates from csv containing Sacramento's estate price using aggregate function.

** Exercise 10: ** Get top 5 highest and lowest prices in Sacramento estate's transactions

** Answer 1: **

In [2]:
import urllib3

def download_file(csv_line):
    link = csv_line[0]
    http = urllib3.PoolManager()
    r = http.request('GET', link, preload_content=False)
    response = r.read()
    return response
    
books_info = sc.textFile("../data/books.csv").map(lambda x: x.split(","))
print books_info.take(10)

books_content = books_info.map(download_file)
print books_content.take(1)[0][:100]

[[u'http://www.textfiles.com/etext/REFERENCE/15-songs.txt', u'15-songs.txt', u'17619', u'A Civil War Songbook (January 1990)'], [u'http://www.textfiles.com/etext/REFERENCE/1776-va.rts', u'1776-va.rts', u'5907', u'The Virginia Declaration of Rights'], [u'http://www.textfiles.com/etext/REFERENCE/1mlkd11.txt', u'1mlkd11.txt', u'817486', u'"Project Gutenberg: Martin Luther King\'s ""I have a Dream"" Speech"'], [u'http://www.textfiles.com/etext/REFERENCE/1st_than.txt', u'1st_than.txt', u'2979', u'"The First Thanksgiving Proclomation', u' June 20', u' 1676"'], [u'http://www.textfiles.com/etext/REFERENCE/2sqrt10a.txt', u'2sqrt10a.txt', u'5262079', u'"Project Gutenberg: The Square Root of Two', u' to 5 Million digits"'], [u'http://www.textfiles.com/etext/REFERENCE/32pri10.txt', u'32pri10.txt', u'247391', u'Project Gutenberg: The 32nd Mersenne prime'], [u'http://www.textfiles.com/etext/REFERENCE/all11.txt', u'all11.txt', u'85580', u'Project Gutenberg: The Declaration of Independence of The Unit

** Answer 2: **
If we consider the text reading as a transformation...
Transformations:
* books_info = sc.textFile("../data/books.csv").map(lambda x: x.split(","))
* books_content = books_info.map(lambda x: download_file(x[0]))

Actions:
* print books_info.take(10)
* print books_content.take(1)[0][:100]

Computation is carried out in actions. In this case we take advantage of it, as for downloading data we only apply the function to one element of the books_content RDD

** Answer 3: **

In [4]:
import re

def is_dickens(csv_line):
    link = csv_line[0]
    t = re.match("http://www.textfiles.com/etext/AUTHORS/DICKENS/",link)
    return t != None

dickens_books_info = books_info.filter(is_dickens)
print dickens_books_info.take(4)

dickens_books_content = dickens_books_info.map(download_file)

# take into consideration that each time an action is performed over dickens_book_content, the file is downloaded
# this has a big impact into calculations
print dickens_books_content.take(2)[1][:100]


[[u'http://www.textfiles.com/etext/AUTHORS/DICKENS/dickens-american-631.txt', u'dickens-american-631.txt', u'604047', u'"PROJECT GUTENBERG: American Notes for General Circulation', u' by Charles Dickens"'], [u'http://www.textfiles.com/etext/AUTHORS/DICKENS/dickens-battle-630.txt', u'dickens-battle-630.txt', u'181551', u'"PROJECT GUTENBERG: The Battle of Life', u' by Charles Dickens"'], [u'http://www.textfiles.com/etext/AUTHORS/DICKENS/dickens-childs-629.txt', u'dickens-childs-629.txt', u'934709', u'"PROJECT GUTENBERG: A Child\'s History of England', u' by Charles Dickens"'], [u'http://www.textfiles.com/etext/AUTHORS/DICKENS/dickens-chimes-379.txt', u'dickens-chimes-379.txt', u'170704', u'"The Chimes', u' by Charles Dickens"']]
Project Gutenberg Etext of The Battle of Life by Charles Dickens
#10 in our series by Charles Dicken


** Answer 4: **

In [5]:
flat_content = dickens_books_info.flatMap(lambda x: x)
print flat_content.take(4)

[u'http://www.textfiles.com/etext/AUTHORS/DICKENS/dickens-american-631.txt', u'dickens-american-631.txt', u'604047', u'"PROJECT GUTENBERG: American Notes for General Circulation']


** Answer 5: **

In [6]:
def get_author(csv_line):
    link = csv_line[0]
    t = re.match("http://www.textfiles.com/etext/AUTHORS/(\w+)/",link)
    if t:
        return t.group(1)
    return u'UNKNOWN'

authors = books_info.map(get_author)
authors.distinct().collect()

[u'BURROUGHS',
 u'DICKENS',
 u'STEVENSON',
 u'TWAIN',
 u'EMERSON',
 u'WILDE',
 u'ARISTOTLE',
 u'DOYLE',
 u'KANT',
 u'UNKNOWN',
 u'HAWTHORNE',
 u'PLATO',
 u'IRVING',
 u'KEATS',
 u'JEFFERSON',
 u'SHAKESPEARE',
 u'POE',
 u'MILTON']

** Answer 6 **

In [7]:
import re

def get_author_and_link(csv_line):
    link = csv_line[0]
    t = re.match("http://www.textfiles.com/etext/AUTHORS/(\w+)/",link)
    if t:
        return (t.group(1), link)
    return (u'UNKNOWN',link)

authors_links = books_info.map(get_author_and_link)

# not very efficient
dickens_books = authors_links.filter(lambda x: x[0]=="DICKENS")
poes_books = authors_links.filter(lambda x: x[0]=="POE")

poes_dickens_books = poes_books.union(dickens_books)
poes_dickens_books.sample(True,0.05).collect()

[(u'POE', u'http://www.textfiles.com/etext/AUTHORS/POE/blackcat.poe'),
 (u'POE', u'http://www.textfiles.com/etext/AUTHORS/POE/masque_r.txt'),
 (u'POE', u'http://www.textfiles.com/etext/AUTHORS/POE/poe-angel-666.txt'),
 (u'POE', u'http://www.textfiles.com/etext/AUTHORS/POE/poe-dreamland-434.txt'),
 (u'POE', u'http://www.textfiles.com/etext/AUTHORS/POE/poe-literary-454.txt'),
 (u'POE', u'http://www.textfiles.com/etext/AUTHORS/POE/poe-morning-559.txt'),
 (u'POE', u'http://www.textfiles.com/etext/AUTHORS/POE/poe-morning-559.txt'),
 (u'POE', u'http://www.textfiles.com/etext/AUTHORS/POE/poe-to-715.txt'),
 (u'POE', u'http://www.textfiles.com/etext/AUTHORS/POE/poe-to-718.txt'),
 (u'POE', u'http://www.textfiles.com/etext/AUTHORS/POE/telltale.txt'),
 (u'DICKENS',
  u'http://www.textfiles.com/etext/AUTHORS/DICKENS/dickens-pickwick-635.txt')]

** Answer 7 **

In [8]:
authors_links.subtract(poes_dickens_books).map(lambda x: x[0]).distinct().collect()

[u'EMERSON',
 u'WILDE',
 u'JEFFERSON',
 u'MILTON',
 u'STEVENSON',
 u'DOYLE',
 u'ARISTOTLE',
 u'UNKNOWN',
 u'PLATO',
 u'IRVING',
 u'KEATS',
 u'HAWTHORNE',
 u'SHAKESPEARE',
 u'BURROUGHS',
 u'TWAIN',
 u'KANT']

** Answer 8 **

In [9]:
authors_links.map(lambda x: 1).reduce(lambda x,y: x+y) == authors_links.count()

True

**Answer 9**

In [12]:
sacramento_estate_csv = sc.textFile("../data/Sacramentorealestatetransactions.csv")
header = sacramento_estate_csv.first()

sacramento_estate = sacramento_estate_csv.filter(lambda x: x != header)\
        .map(lambda x: x.split(","))\
        .map(lambda x: int(x[9]))

seqOp = (lambda x,y: (x[0] + y, x[1] + 1))
combOp = (lambda x,y: (x[0] + y[0], x[1] + y[1]))

total_sum, number = sacramento_estate.aggregate((0,0),seqOp,combOp)
mean = float(total_sum)/number
mean

234144.26395939087

** Answer 10**

In [13]:
print sacramento_estate.top(5)
print sacramento_estate.top(5, key=lambda x: -x)

[884790, 879000, 839000, 830000, 760000]
[1551, 2000, 4897, 4897, 4897]


## Spark Key/Value Pairs

Spark provides special operations on RDDs containing key/value pairs. 

These RDDs are called pair RDDs, but are simple RDDs with an special structure. In Python, for the functions on keyed data to work we need to return an RDD composed of tuples.

** Exercise 1:** Create a pair RDD from our books information data, having author as key and the rest of the information as value. (Hint: the answer is very similar to the previous section Exercise 6)

** Exercise 2:** Check that pair RDDs are also RDDs and that common RDD operations work as well. Filter elements with author equals to "UNKNOWN" from previous RDD. 

** Exercise 3:** Check mapValue in Spark API (http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.mapValues) function that works on pair RDDs.

** Answer 1:**

In [14]:
import re

def get_author_data(csv_line):
    link = csv_line[0]
    t = re.match("http://www.textfiles.com/etext/AUTHORS/(\w+)/",link)
    if t:
        return (t.group(1), csv_line)
    return (u'UNKNOWN', csv_line)

books_info = sc.textFile("../data/books.csv").map(lambda x: x.split(","))
authors_info = books_info.map(get_author_data)

print authors_info.take(5)

[(u'UNKNOWN', [u'http://www.textfiles.com/etext/REFERENCE/15-songs.txt', u'15-songs.txt', u'17619', u'A Civil War Songbook (January 1990)']), (u'UNKNOWN', [u'http://www.textfiles.com/etext/REFERENCE/1776-va.rts', u'1776-va.rts', u'5907', u'The Virginia Declaration of Rights']), (u'UNKNOWN', [u'http://www.textfiles.com/etext/REFERENCE/1mlkd11.txt', u'1mlkd11.txt', u'817486', u'"Project Gutenberg: Martin Luther King\'s ""I have a Dream"" Speech"']), (u'UNKNOWN', [u'http://www.textfiles.com/etext/REFERENCE/1st_than.txt', u'1st_than.txt', u'2979', u'"The First Thanksgiving Proclomation', u' June 20', u' 1676"']), (u'UNKNOWN', [u'http://www.textfiles.com/etext/REFERENCE/2sqrt10a.txt', u'2sqrt10a.txt', u'5262079', u'"Project Gutenberg: The Square Root of Two', u' to 5 Million digits"'])]


** Answer 2: **

The operations over pair RDDs will also be slightly different.

But take into account that pair RDDs are just *special* RDDs that some operations can be applied, however common RDDs also fork for them.


In [15]:
authors_info.filter(lambda x: x[0] != "UNKNOWN").take(3)

[(u'WILDE',
  [u'http://www.textfiles.com/etext/AUTHORS/WILDE/wilde-ballad-611.txt',
   u'wilde-ballad-611.txt',
   u'27238',
   u'"The Ballad of Reading Gaol',
   u' by Oscar Wilde (1898)"']),
 (u'WILDE',
  [u'http://www.textfiles.com/etext/AUTHORS/WILDE/wilde-burden-612.txt',
   u'wilde-burden-612.txt',
   u'17887',
   u'"The Burden of Itys',
   u' by Oscar Wilde (1890)"']),
 (u'WILDE',
  [u'http://www.textfiles.com/etext/AUTHORS/WILDE/wilde-charmides-601.txt',
   u'wilde-charmides-601.txt',
   u'34648',
   u'"Charmides',
   u' by Oscar Wilde (1890)"'])]

** Answer 3:** 

Sometimes is awkward to work with pairs, and Spark provides a map function that operates over values.

In [16]:
authors_info.mapValues(lambda x: x[2]).take(5)

[(u'UNKNOWN', u'17619'),
 (u'UNKNOWN', u'5907'),
 (u'UNKNOWN', u'817486'),
 (u'UNKNOWN', u'2979'),
 (u'UNKNOWN', u'5262079')]

## Transformations on Pair RDDs

 Since pair RDDs contain tuples, we need to pass functions that operate on tuples rather than on individual elements.
 
 * *reduceByKey(func):* Combine values with the same key.
 * *groupByKey():* Group values with the same key.
 * *combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner):* Combine values with the same key using a different result type.
 * *keys():* return RDD keys
 * *values():* return RDD values
 * *groupBy():* takes a function that it applies to every element in the source RDD and uses the result to determine the key.
 * *cogroup():* over two RDDs sharing the same key type, K, with the respective value types V and W gives us back RDD[(K,(Iterable[V], Iterable[W]))]. If one of the RDDs doesn’t have elements for a given key that is present in the other RDD, the corresponding Iterable is simply empty. cogroup() gives us the power to group data from multiple RDDs.

** Exercise 1: ** Get the total size of files for each author. 

** Exercise 2: ** Get the top 5 authors with more data.

** Exercise 3:** Try the combineByKey() with a randomly generated set of 5 values for 4 keys. Get the average value of the random variable for each key.

** Exercise 4:** Compute the average book size per author using combineByKey(). If you were an English Literature student and your teacher says: "Pick one Author and I'll randomly pick a book for you to read", what would be a Data Scientist answer?

** Exercise 5: ** All Spark books have the word count example. Let's count words over all our books! (This might take some time)

** Exercise 6: ** Group author data by author surname initial. How many authors have we grouped? 

** Exercise 7: ** Generate a pair RDD with alphabet letters in upper case as key, and empty list as value. Then group the previous RDD with this new one.

** Answer 1**

In [21]:
authors_data = authors_info.mapValues(lambda x: int(x[2]))
authors_data.reduceByKey(lambda y,x: y+x).collect()

[(u'BURROUGHS', 10070497),
 (u'DICKENS', 14236826),
 (u'STEVENSON', 6965452),
 (u'TWAIN', 13259786),
 (u'EMERSON', 2655619),
 (u'WILDE', 669926),
 (u'ARISTOTLE', 6219825),
 (u'DOYLE', 8450256),
 (u'KANT', 5901915),
 (u'UNKNOWN', 266421132),
 (u'HAWTHORNE', 1898878),
 (u'PLATO', 3648947),
 (u'IRVING', 2223565),
 (u'KEATS', 360556),
 (u'JEFFERSON', 6565921),
 (u'SHAKESPEARE', 5347823),
 (u'POE', 3395985),
 (u'MILTON', 911458)]

** Answer 2:**

In [24]:
aauthors_data.reduceByKey(lambda y,x: y+x).top(5,key=lambda x: x[1])

[(u'UNKNOWN', 266421132),
 (u'DICKENS', 14236826),
 (u'TWAIN', 13259786),
 (u'BURROUGHS', 10070497),
 (u'DOYLE', 8450256)]

**Answer 3:**

In [55]:
import numpy as np

# generate the data
rdd = sc.parallelize(zip(range(5)*5, np.random.normal(0,1,5*5)))

createCombiner = lambda value: (value,1)
# you can check what createCombiner does
# rdd.mapValues(createCombiner).collect()

# here x is the combiner (sum,count) and value is value in the 
# initial RDD (the random variable)
mergeValue = lambda x, value: (x[0] + value, x[1] + 1)

# here, all combiners are summed (sum,count)
mergeCombiner = lambda x, y: (x[0] + y[0], x[1] + y[1])

sumCount = rdd.combineByKey(createCombiner,
                        mergeValue,
                         mergeCombiner)

sumCount.mapValues(lambda x: x[0]/x[1]).collect()


[(0, 0.36496102496138977),
 (4, 0.11892759636084722),
 (1, -0.66545959341697147),
 (2, 0.30208685716557426),
 (3, -0.30334391176249054)]

** Answer 4:**

In [53]:
createCombiner = lambda value: (value,1)
# you can check what createCombiner does
# rdd.mapValues(createCombiner).collect()

# here x is the combiner (sum,count) and value is value in the 
# initial RDD (the random variable)
mergeValue = lambda x, value: (x[0] + value, x[1] + 1)

# here, all combiners are summed (sum,count)
mergeCombiner = lambda x, y: (x[0] + y[0], x[1] + y[1])

sumCount = authors_data.combineByKey(createCombiner,
                        mergeValue,
                         mergeCombiner)

sumCount.mapValues(lambda x: x[0]/x[1]).collect()
# I would choose the author with lowest average book size
sumCount.mapValues(lambda x: x[0]/x[1]).top(5,lambda x: -x[1])

[(u'KEATS', 10604),
 (u'POE', 24431),
 (u'MILTON', 30381),
 (u'WILDE', 39407),
 (u'IRVING', 55589)]

** Answer 5: **

In [12]:
import urllib3
import re

def download_file(csv_line):
    link = csv_line[0]
    http = urllib3.PoolManager()
    r = http.request('GET', link, preload_content=False)
    response = r.read()
    return response
    
books_info = sc.textFile("../data/books.csv").map(lambda x: x.split(","))
books_content = books_info.map(download_file)
#books_content = sc.parallelize(books_info.map(download_file).take(2))

words_rdd = books_content.flatMap(lambda x: x.split(" ")).\
                          flatMap(lambda x: x.split("\r\n")).\
                          map(lambda x: re.sub('[^0-9a-zA-Z]+', '', x).lower()).\
                          filter(lambda x: x != '')

words_rdd.map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y).top(5, key=lambda x: x[1])

[('the', 3278274),
 ('of', 1881320),
 ('and', 1853270),
 ('to', 1467214),
 ('a', 1139576)]

** Answer 6: **

In [44]:
print authors_info.groupBy(lambda x: x[0][0]).collect()

authors_info.map(lambda x: x[0]).distinct().\
                map(lambda x: (x[0],1)).\
                reduceByKey(lambda x,y: x+y).\
                filter(lambda x: x[1]>1).\
                collect()

[(u'A', <pyspark.resultiterable.ResultIterable object at 0x0000000006E90B70>), (u'E', <pyspark.resultiterable.ResultIterable object at 0x0000000006E90EB8>), (u'I', <pyspark.resultiterable.ResultIterable object at 0x0000000006E90630>), (u'K', <pyspark.resultiterable.ResultIterable object at 0x0000000006E90080>), (u'M', <pyspark.resultiterable.ResultIterable object at 0x0000000006E2F080>), (u'S', <pyspark.resultiterable.ResultIterable object at 0x0000000006E2F198>), (u'U', <pyspark.resultiterable.ResultIterable object at 0x0000000006E50198>), (u'W', <pyspark.resultiterable.ResultIterable object at 0x0000000006E507F0>), (u'B', <pyspark.resultiterable.ResultIterable object at 0x0000000006E50940>), (u'D', <pyspark.resultiterable.ResultIterable object at 0x0000000006E50E80>), (u'H', <pyspark.resultiterable.ResultIterable object at 0x000000000664E2E8>), (u'J', <pyspark.resultiterable.ResultIterable object at 0x000000000664E630>), (u'P', <pyspark.resultiterable.ResultIterable object at 0x00000

[(u'K', 2), (u'S', 2), (u'D', 2), (u'P', 2)]

** Answer 7:**

In [32]:
import string

sc.parallelize(list(string.ascii_uppercase)).\
                                map(lambda x: (x,[])).\
                                cogroup(authors_info.groupBy(lambda x: x[0][0])).\
                                take(5)

[('A',
  (<pyspark.resultiterable.ResultIterable at 0x6dcad68>,
   <pyspark.resultiterable.ResultIterable at 0x6dcaf28>)),
 ('C',
  (<pyspark.resultiterable.ResultIterable at 0x6dd62e8>,
   <pyspark.resultiterable.ResultIterable at 0x6dd6320>)),
 ('E',
  (<pyspark.resultiterable.ResultIterable at 0x6dd6358>,
   <pyspark.resultiterable.ResultIterable at 0x6dd6390>)),
 ('G',
  (<pyspark.resultiterable.ResultIterable at 0x6dd6780>,
   <pyspark.resultiterable.ResultIterable at 0x6dd67b8>)),
 ('I',
  (<pyspark.resultiterable.ResultIterable at 0x6dd67f0>,
   <pyspark.resultiterable.ResultIterable at 0x6dd6828>))]

## Joins

Some of the most useful operations we get with keyed data comes from using it together with other keyed data. 

Joining data together is probably one of the most common operations on a pair RDD, and we have a full range of options including right and left outer joins, cross joins, and inner joins.

### Inner Join

Only keys that are present in both pair RDDs are output. 

When there are multiple values for the same key in one of the inputs, the resulting pair RDD will have an entry for every possible pair of values with that key from the two input RDDs


**Exercise:**

Take countries_data_clean.csv and countries_GDP_clean.csv and join them using country name as key. Before doing the join, please, check how many element should the resulting pair RDD have. After the join, check if the initial hypothesis was true. In case it is not, what is the reason? How would you resolve that problem?

In [38]:
#more info: https://www.worlddata.info/downloads/
rdd_countries = sc.textFile("../data/countries_data_clean.csv").map(lambda x: x.split(","))
#more info: http://data.worldbank.org/data-catalog/GDP-ranking-table
rdd_gdp = sc.textFile("../data/countries_GDP_clean.csv").map(lambda x: x.split(","))

# check rdds size
hyp_final_rdd_num = rdd_gdp.count() if rdd_countries.count() > rdd_gdp.count() else rdd_countries.count()
print "The final number of elements in the joined rdd should be: ", hyp_final_rdd_num
p_rdd_gdp = rdd_gdp.map(lambda x: (x[3],x))
p_rdd_countries = rdd_countries.map(lambda x: (x[1],x))

print p_rdd_countries.take(1)
print p_rdd_gdp.take(1)

p_rdd_contry_data = p_rdd_countries.join(p_rdd_gdp)

final_join_rdd_size = p_rdd_contry_data.count()
hyp = hyp_final_rdd_num == final_join_rdd_size
print "The initial hypothesis is ", hyp
if not hyp:
    print "The final joined rdd size is ", final_join_rdd_size



The final number of elements in the joined rdd should be:  229
[(u'Afghanistan', [u'0', u'Afghanistan', u'Afghanistan', u'Afganistan/Afqanestan', u'AF', u'Asia', u'', u'32564342', u'652230', u'0', u'islamic republic', u'Afghani', u'AFN', u'93', u'38.6', u'13.9', u'50.9', u'https://www.laenderdaten.info/Asien/Afghanistan/index.php'])]
[(u'United States', [u'0', u'USA', u'1', u'United States', u'17946996.0'])]
The initial hypothesis is  False
The final joined rdd size is  185


## Left and Right outer Joins

Sometimes we don’t need the key to be present in both RDDs to want it in our result.

For example, imagine that our list of countries is not complete, and we don't want to miss data if it a country is not present in both RDDs.

** leftOuterJoin(other)** and **rightOuterJoin(other)** both join pair RDDs together by key, where one of the pair RDDs can be missing the key.

With **leftOuterJoin()** the resulting pair RDD has entries for each key in the source RDD. The value associated with each key in the result is a tuple of the value from the source RDD and an Option for the value from the other pair RDD. In Python, if a value isn’t present None is used; and if the value is present the regular value, without any wrapper, is used. As with join(), we can have multiple entries for each key; when this occurs, we get the Cartesian product between the two lists of values.

**rightOuterJoin()** is almost identical to leftOuterJoin() except the key must be present in the other RDD and the tuple has an option for the source rather than the other RDD.

** Exercise: **

Use two simple RDDs to show the results of left and right outer join.

In [115]:
n = 5
rdd_1 = sc.parallelize([(x,1) for x in range(n)])
rdd_2 = sc.parallelize([(x*2,1) for x in range(n)])

print "rdd_1: ",rdd_1.collect()
print "rdd_2: ",rdd_2.collect()

print "leftOuterJoin: ",rdd_1.leftOuterJoin(rdd_2).collect()
print "rightOuterJoin: ",rdd_1.rightOuterJoin(rdd_2).collect()

print "join: ", rdd_1.join(rdd_2).collect()

#explore what hapens if a key is present twice or more
rdd_3 = sc.parallelize([(x*2,1) for x in range(n)] + [(4,2),(6,4)])
print "rdd_3: ",rdd_3.collect()
print "join: ", rdd_2.join(rdd_3).collect()


rdd_1:  [(0, 1), (1, 1), (2, 1), (3, 1), (4, 1)]
rdd_2:  [(0, 1), (2, 1), (4, 1), (6, 1), (8, 1)]
leftOuterJoin:  [(0, (1, 1)), (1, (1, None)), (2, (1, 1)), (3, (1, None)), (4, (1, 1))]
rightOuterJoin:  [(0, (1, 1)), (8, (None, 1)), (2, (1, 1)), (4, (1, 1)), (6, (None, 1))]
join:  [(0, (1, 1)), (2, (1, 1)), (4, (1, 1))]
rdd_3:  [(0, 1), (2, 1), (4, 1), (6, 1), (8, 1), (4, 2), (6, 4)]
join:  [(0, (1, 1)), (8, (1, 1)), (2, (1, 1)), (4, (1, 1)), (4, (1, 2)), (6, (1, 1)), (6, (1, 4))]


** Exercise: **

Generate two pair RDDs with country info:
1. A first one with country code and GDP
2. A second one with country code and life expectancy

Then join them to have a pair RDD with country code plus GDP and life expentancy.

** Answer: **

Inspect the dataset with GDP.

In [109]:
rdd_gdp = sc.textFile("../data/countries_GDP_clean.csv").map(lambda x: x.split(";"))
rdd_gdp.take(2)
#generate a pair rdd with countrycode and GDP
rdd_cc_gdp = rdd_gdp.map(lambda x: (x[1],x[4]))
rdd_cc_gdp.take(2)

[u'330779.0']

Inspect the dataset with life expectancy.

In [103]:
rdd_countries = sc.textFile("../data/countries_data_clean.csv").map(lambda x: x.split(","))
print rdd_countries.take(2)
#generate a pair rdd with countrycode and lifexpectancy 
#(more info in https://www.worlddata.info/downloads/)
#we don't have countrycode in this dataset, but let's try to add it
#we have a dataset with countrynames and countrycodes
#let's take countryname and ISO 3166-1 alpha3 code
rdd_cc = sc.textFile("../data/countrycodes.csv").\
                    map(lambda x: x.split(";")).\
                    map(lambda x: (x[0].strip("\""),x[4].strip("\""))).\
                    filter(lambda x: x[0] != 'Country (en)')
print rdd_cc.take(2)
rdd_cc_info = rdd_countries.map(lambda x: (x[1],x[16]))
rdd_cc_info.take(2)
#let's count and see if something is missing
print rdd_cc.count()
print rdd_cc_info.count()
#take only values, the name is no longer needed
rdd_name_cc_le = rdd_cc_info.leftOuterJoin(rdd_cc)
rdd_cc_le = rdd_name_cc_le.map(lambda x: x[1])
print rdd_cc_le.count()
#what is missing?
rdd_name_cc_le.filter(lambda x: x[1][1] == None).collect()
#how can we solve this problem??

[[u'0', u'Afghanistan', u'Afghanistan', u'Afganistan/Afqanestan', u'AF', u'Asia', u'', u'32564342', u'652230', u'0', u'islamic republic', u'Afghani', u'AFN', u'93', u'38.6', u'13.9', u'50.9', u'https://www.laenderdaten.info/Asien/Afghanistan/index.php'], [u'1', u'Egypt', u'\xc4gypten', u'Misr', u'EG', u'Africa', u'', u'88487396', u'1001450', u'2450', u'republic', u'Pfund', u'EGP', u'20', u'22.9', u'4.8', u'73.7', u'https://www.laenderdaten.info/Afrika/Aegypten/index.php']]
[(u'Afghanistan', u'AFG'), (u'Egypt', u'EGY')]
247
247
247


[(u'"Saint Helena', (u'7.4', None)), (u'"Micronesia', (u'4.2', None))]

We have some missing data, that we have to complete, but we have quite a lot of data, let's follow.

Inspect the results of GDP and life expectancy and join them. ** Is there some data missing? **

In [111]:
print "Is there some data missing?", rdd_cc_gdp.count() != rdd_cc_le.count()
print "GDP dataset: ", rdd_cc_gdp.count()
print "Life expectancy dataset: ", rdd_cc_le.count()
#lets try to see what happens
print rdd_cc_le.take(10)
print  rdd_cc_gdp.take(10)
rdd_cc_gdp_le = rdd_cc_le.map(lambda x: (x[1],x[0])).leftOuterJoin(rdd_cc_gdp)
#we have some countries that the data is missing
# we have to check if this data is available
# or there is any error

Is there some data missing? True
GDP dataset:  229
Life expectancy dataset:  247
[(u'81.8', u'CAN'), (u'73.5', u'BRA'), (u'82.1', u'ITA'), (u'80.2', u'FRO'), (u'68.1', u'IND'), (u'74.7', u'LTU'), (u'78.5', u'CYP'), (u'77.6', u'SXM'), (u'82.7', u'AND'), (u'75.1', u'VCT')]
[(u'USA', u'17946996.0'), (u'CHN', u'10866444.0'), (u'JPN', u'4123258.0'), (u'DEU', u'3355772.0'), (u'GBR', u'2848755.0'), (u'FRA', u'2421682.0'), (u'IND', u'2073543.0'), (u'ITA', u'1814763.0'), (u'BRA', u'1774725.0'), (u'CAN', u'1550537.0')]


## Sort Data

sortByKey(): We can sort an RDD with key/value pairs provided that there is an ordering defined on the key.

Once we have sorted our data, any subsequent call on the sorted data to collect() or save() will result in ordered data.

** Exercise: **
Sort country data by key.

In [46]:
p_rdd_contry_data.sortByKey().take(2)

[(u'"Micronesia',
  ([u'138',
    u'"Micronesia',
    u' Federated States of"',
    u'Mikronesien',
    u'Micronesia',
    u'FM',
    u'Oceania',
    u'',
    u'105216',
    u'702',
    u'6112',
    u'federal republic',
    u'Dollar',
    u'USD',
    u'691',
    u'20.5',
    u'4.2',
    u'72.6',
    u'https://www.laenderdaten.info/Ozeanien/Mikronesien/index.php'],
   [u'190', u'FSM', u'191', u'"Micronesia', u' Fed. Sts."', u'318.0'])),
 (u'Afghanistan',
  ([u'0',
    u'Afghanistan',
    u'Afghanistan',
    u'Afganistan/Afqanestan',
    u'AF',
    u'Asia',
    u'',
    u'32564342',
    u'652230',
    u'0',
    u'islamic republic',
    u'Afghani',
    u'AFN',
    u'93',
    u'38.6',
    u'13.9',
    u'50.9',
    u'https://www.laenderdaten.info/Asien/Afghanistan/index.php'],
   [u'109', u'AFG', u'110', u'Afghanistan', u'19199.0']))]

## Actions over Pair RDDs

* countByKey(): Count the number of elements for each key.
* collectAsMap(): Collect the result as a map to provide easy lookup.
* lookup(key): Return all values associated with the provided key.

** Exercises: **
    1. Count countries RDD by key
    2. Collect countries RDD as map
    3. Lookup Andorra info in countries RDD

In [55]:
p_rdd_contry_data.countByKey()["Andorra"]

1

In [59]:
p_rdd_contry_data.collectAsMap()["Andorra"]

([u'5',
  u'Andorra',
  u'Andorra',
  u'Andorra',
  u'AD',
  u'Europe',
  u'',
  u'85580',
  u'468',
  u'0',
  u'constitutional monarchy',
  u'Euro',
  u'EUR',
  u'376',
  u'8.1',
  u'7.0',
  u'82.7',
  u'https://www.laenderdaten.info/Europa/Andorra/index.php'],
 [u'161', u'ADO', u'162', u'Andorra', u'3249.0'])

In [60]:
p_rdd_contry_data.lookup("Andorra")

[([u'5',
   u'Andorra',
   u'Andorra',
   u'Andorra',
   u'AD',
   u'Europe',
   u'',
   u'85580',
   u'468',
   u'0',
   u'constitutional monarchy',
   u'Euro',
   u'EUR',
   u'376',
   u'8.1',
   u'7.0',
   u'82.7',
   u'https://www.laenderdaten.info/Europa/Andorra/index.php'],
  [u'161', u'ADO', u'162', u'Andorra', u'3249.0'])]

## Data Partitioning

(*from: Learning Spark - O'Reilly*)

Spark programs can choose to control their RDDs’ partitioning to reduce communication. 

Partitioning will not be helpful in all applications— for example, if a given RDD is scanned only once, there is no point in partitioning it in advance. 

It is useful only when a dataset is reused multiple times in key-oriented operations such as joins.

Spark’s partitioning is **available on all RDDs of key/value pairs**, and causes the system to **group elements based on a function of each key**.

Spark does not give explicit control of which worker node each key goes to (partly because the system is designed to work even if specific nodes fail), it lets the program ensure that a set of keys will appear together on some node. 

** Example:**

As a simple example, consider an application that keeps a large table of user information in memory—say, an RDD of (UserID, UserInfo) pairs, where UserInfo contains a list of topics the user is subscribed to. 

In [107]:
rdd_userinfo = sc.textFile("../data/users_events_example/user_info_1000users_20topics.csv")\
                    .filter(lambda x: len(x)>0)\
                    .map(lambda x: (x.split(",")[0],x.split(",")[1].split("|")))
rdd_userinfo.take(2)

[(u'user0',
  [u'healthcare',
   u'sportscasters',
   u'elections',
   u'running',
   u'environment',
   u'internet',
   u'airlines',
   u'automotive',
   u'directors',
   u'history',
   u'grammys']),
 (u'user1',
  [u'stock exchange', u'directors', u'basketball', u'running', u'automotive'])]

The application periodically combines this table with a smaller file representing events that happened in the past five minutes—say, a table of (UserID, LinkInfo) pairs for users who have clicked a link on a website in those five minutes.

In [114]:
rdd_userevents = sc.textFile("../data/users_events_example/userevents_*.log")\
                                .filter(lambda x: len(x))\
                                .map(lambda x: (x.split(",")[1], [x.split(",")[2]]))
print rdd_userevents.take(2)

[(u'user67', [u'movies']), (u'user77', [u'stock exchange'])]


For example, we may wish to count how many users visited a link that was not to one of their subscribed topics. We can perform this combination with Spark’s join() operation, which can be used to group the User Info and LinkInfo pairs for each UserID by key.

In [120]:
rdd_joined = rdd_userinfo.join(rdd_userevents)
print rdd_joined.count()
print rdd_joined.filter(lambda x: (x[1][1][0] not in x[1][0])).count()
print rdd_joined.filter(lambda x: (x[1][1][0] in x[1][0])).count()

10000
8492
1508


Imagine that we want to count the number of visits to non-subscribed visits using a function. 

In [125]:
rdd_userinfo = sc.textFile("../data/users_events_example/user_info_1000users_20topics.csv")\
                    .filter(lambda x: len(x)>0)\
                    .map(lambda x: (x.split(",")[0],x.split(",")[1].split("|"))).persist()
        
def process_new_logs(event_fite_path):
    rdd_userevents = sc.textFile(event_fite_path)\
                                .filter(lambda x: len(x))\
                                .map(lambda x: (x.split(",")[1], [x.split(",")[2]]))
    rdd_joined = rdd_userinfo.join(rdd_userevents)
    print "Number of visits to non-subscribed topics: " ,\
            rdd_joined.filter(lambda x: (x[1][1][0] not in x[1][0])).count()
        
process_new_logs("../data/users_events_example/userevents_01012016000500.log")

Number of visits to non-subscribed topics:  852


This code will run fine as is, but it will be inefficient. This is because the join() operation, called each time process_new_logs() is invoked, does not know anything about how the keys are partitioned in the datasets. By default, this operation will hash all  the keys of both datasets, sending elements with the same key hash across the network to the same machine, and then join together the elements with the same key on that machine (see figure below). 
![user partitioning example rdds](https://github.com/f-guitart/data_mining/blob/master/notes/img/user_partitioning_example_rdds.png?raw=true "user partitioning example rdds")

Because we expect the rdd_userinfo table to be much larger than the small log of events seen every five minutes, this wastes a lot of work: the rdd_userinfo table is hashed and shuffled across the network on every call, even though it doesn’t change.

Fixing this is simple: just use the **partitionBy()** transformation on rdd_userinfo to hash-partition it at the start of the program. We do this by passing a spark.HashPartitioner object to partitionBy.

In [129]:
rdd_userinfo = sc.textFile("../data/users_events_example/user_info_1000users_20topics.csv")\
                    .filter(lambda x: len(x)>0)\
                    .map(lambda x: (x.split(",")[0],x.split(",")[1].split("|"))).partitionBy(10)    
rdd_userinfo

MapPartitionsRDD[614] at mapPartitions at PythonRDD.scala:422

The process_new_logs() method can remain unchanged: the rdd_userevents RDD is local to process_new_logs(), and is used only once within this method, so there is no advantage in specifying a partitioner for events. Because we called partitionBy() when building userData, Spark will now know that it is hash-partitioned, and calls to join() on it will take advantage of this information. In particular, when we call user rdd_userinfo.join(rdd_userevents), Spark will shuffle only the events RDD, sending events with each particular UserID to the machine that contains the corresponding hash partition of rdd_userinfo. The result is that a lot less data is communicated over the network, and the program runs significantly faster.

![user partitioning example rdds 2](https://github.com/f-guitart/data_mining/blob/master/notes/img/user_partitioning_example_rdds_partitionBy.png?raw=true "user partitioning example rdds 2")

More on partitioning:
 * http://stackoverflow.com/questions/35973590/pyspark-partioning-data-using-partitionby
 * http://stackoverflow.com/questions/31424396/apache-spark-hashpartitioner-how-does-it-work

Note that partitionBy() is a transformation, so it always returns a new RDD—it does not change the original RDD in place. RDDs can never be modified once created. Therefore it is important to persist and save as rdd_userinfo the result of partitionBy(), not the original textFile().

Also, the 100 passed to partitionBy() represents the number of partitions, which will control how many parallel tasks perform further operations on the RDD (e.g., joins); in general, **make this at least as large as the number of cores in your cluster**.

In fact, many other Spark operations automatically result in an RDD with known partitioning information, and many operations other than join() will take advantage of this information.

For example, sortByKey() and groupByKey() will result in range-partitioned and hash-partitioned RDDs, respectively. 

On the other hand, operations like map() cause the new RDD to forget the parent’s partitioning information, because such operations could theoretically modify the key of each record. 

## Summarizing Example: PageRank Algorithm

** from: *Learning Spark - O'Reilly* **

As an example of a more involved algorithm that can benefit from RDD partitioning, we consider PageRank. 

The PageRank algorithm, named after Google’s Larry Page, aims to assign a measure of importance (a “rank”) to each document in a set based on how many documents have links to it.

It can be used to rank web pages, of course, but also scientific articles, or influential users in a social network.

PageRank is an iterative algorithm that performs many joins, so it is a good use case for RDD partitioning. 

The algorithm maintains two datasets: one of (pageID, link List) elements containing the list of neighbors of each page, and one of (pageID, rank) elements containing the current rank for each page. 

In our implementation we will use two datasets:
   * A dataset containing papers (../data/countries_data.csv)
   * A dataset containing the references of each paper (../data/countries_data_clean.csv)

The Page Rank algorithm proceeds as follows:
1. Initialize each page’s rank to 1.0.
2. On each iteration, have page p send a contribution of rank(p)/numNeighbors(p) to its neighbors (the pages it has links to).
3. Set each page’s rank to 0.15 + 0.85 * contributionsReceived

The last two steps repeat for several iterations, during which the algorithm will converge
to the correct PageRank value for each page. 

In practice, it’s typical to run about 10 iterations.

In [8]:
def compute_contribs(refs, rank):
    num_refs = len(refs)
    contr = []
    for ref in refs: 
        contr.append((ref, rank / float(num_refs)))
    return contr

        
refs_rdd = sc.textFile("../data/papers_references.csv").\
                    map(lambda x: (x.split(",")[0],x.split(",")[1].split(";"))).\
                    partitionBy(100).\
                    persist()
refs_rdd = sc.parallelize(refs_rdd.take(500))
#print refs_rdd.take(5)
ranks_rdd = refs_rdd.mapValues(lambda x: 1)
#print ranks_rdd.take(5)

print refs_rdd.join(ranks_rdd).take(10)
#for i in range(10):
    contribs_rdd = refs_rdd.join(ranks_rdd).flatMap(
                    lambda (ref, (refs, rank)): compute_contribs(refs, rank))
    ranks_rdd = contribs_rdd.reduceByKey(lambda x,y: x+y).\
                    mapValues(lambda rank: rank * 0.85 + 0.15)
#ranks_rdd.sortBy(lambda x: -x[1]).take(10)


[(u'1788', ([u'36956', u'5023', u'65053', u'14944', u'104357', u'107505', u'62679', u'26733', u'28602', u'16911', u'121268', u'77385', u'5406', u'51132', u'121026', u'99015', u'5872', u'54118', u'58266', u'44125', u'80108'], 1)), (u'13834', ([u'54799', u'72705', u'60562', u'97164', u'15494', u'6992', u'14100', u'112255', u'23192', u'21951', u'73127', u'34825', u'19154', u'64302', u'16479', u'82663', u'85660', u'12745', u'99806'], 1)), (u'11199', ([u'67234', u'29579', u'101522', u'110', u'21470'], 1)), (u'33928', ([u'58100', u'69677', u'79148', u'6155', u'112131', u'34467', u'18186', u'42629', u'87966', u'2983', u'115035', u'25221', u'117390', u'83158', u'16906', u'66112', u'112365', u'12644', u'115306', u'19512', u'12136', u'38764', u'23412', u'120334', u'102685', u'57538', u'46203', u'78161'], 1)), (u'4026', ([u'38375', u'51512', u'96056', u'105971', u'121452', u'46407', u'92839', u'60730', u'7723', u'119645'], 1)), (u'4372', ([u'87991', u'68001', u'75437', u'13938', u'78211', u'3382'