In [1]:
import numpy as np
import urllib.request

# PySpark Tuto 2 - Operations Basics

Now we know how to load data in a RDD it is time to look at the _operations_ we can perform on it.

In Spark there are two types _operations_ : __transformations__ and __actions__.  

__Transformations__ are operations that creates a new RDD from an existing RDD with transformed data.  
_examples of transformations_ : `map` and `filter`

__Actions__ are operations that return values to the driver program after running all transformations.  
_examples of actions_ : `take` and `collect`

### Two Technical Notes

Before going any further it is important to talk about RDD being immutable and a key aspect of Spark which is lazy evaluation.

_RDD are immutable_.  
In Spark all RDDs are immutable... in other words you don't actually transform a RDD, instead you create a new RDD with transformed data in it.

_Lazy Evaluation_.  
This is a key concept in Spark and it can simply be summarized as follow : No transformation is actually performed until an action is called.

### The ADULT Dataset

In the following we will be using the `adult` dataset.
If you haven't downloaded it yet just run the following command :

In [None]:
# You don't need to run this cell if you already have the data
urllib.request.urlretrieve('https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data', 'adult.data')
urllib.request.urlretrieve('https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.names', 'adult.names')

If you want to know more about the dataset I recommend you to spend some minutes now reading `adult.names` ... actually you'd better read the file to understand what we will be doing later in this tutorial.

# Transformations

## The `map` transformation

We will now have a closer look at some simple examples of transformations and actions starting with the __`map`__ transformation.  
... but first we need to reload the adult dataset.

In [2]:
data_file = 'adult.data'
adult_rdd = sc.textFile(data_file)

In [3]:
# recalling how the data looks like
adult_rdd.take(5)
# 'take' is an action by the way

['39, State-gov, 77516, Bachelors, 13, Never-married, Adm-clerical, Not-in-family, White, Male, 2174, 0, 40, United-States, <=50K',
 '50, Self-emp-not-inc, 83311, Bachelors, 13, Married-civ-spouse, Exec-managerial, Husband, White, Male, 0, 0, 13, United-States, <=50K',
 '38, Private, 215646, HS-grad, 9, Divorced, Handlers-cleaners, Not-in-family, White, Male, 0, 0, 40, United-States, <=50K',
 '53, Private, 234721, 11th, 7, Married-civ-spouse, Handlers-cleaners, Husband, Black, Male, 0, 0, 40, United-States, <=50K',
 '28, Private, 338409, Bachelors, 13, Married-civ-spouse, Prof-specialty, Wife, Black, Female, 0, 0, 40, Cuba, <=50K']

As we can see each line of the original data file is a single string in our RDD.
It will not be very useful for later analysis and we want to break each line in individual elements.
This can be done easily using the `split()` function on a single string except that we want to do this for all elements in my RDD.
This is where the `map` function comes in handy.  

The `map(func)` transformation "returns a new distributed dataset formed by passing each element of the source through a function `func`." (see [spark guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations)).  

What we want is to split each line, so let's first define `func` to do so :

In [4]:
# 'func' take a line and return a list containing the splitted elements
def func(line):
    return line.split(', ')

In [5]:
# map the function to the RDD
adult_rdd_split = adult_rdd.map(func)
# take the first two entries
adult_rdd_split.take(2)

[['39',
  'State-gov',
  '77516',
  'Bachelors',
  '13',
  'Never-married',
  'Adm-clerical',
  'Not-in-family',
  'White',
  'Male',
  '2174',
  '0',
  '40',
  'United-States',
  '<=50K'],
 ['50',
  'Self-emp-not-inc',
  '83311',
  'Bachelors',
  '13',
  'Married-civ-spouse',
  'Exec-managerial',
  'Husband',
  'White',
  'Male',
  '0',
  '0',
  '13',
  'United-States',
  '<=50K']]

It worked! Happy ???  
Note that instead of defining a function and passing it through `map` there is the possibility to do the split and mapping in a single line using `lambda` (very useful for small simple functions) as follow :

In [6]:
adult_rdd_split2 = adult_rdd.map(lambda line: line.split(', '))
# take the first two entries
adult_rdd_split2.take(2)

[['39',
  'State-gov',
  '77516',
  'Bachelors',
  '13',
  'Never-married',
  'Adm-clerical',
  'Not-in-family',
  'White',
  'Male',
  '2174',
  '0',
  '40',
  'United-States',
  '<=50K'],
 ['50',
  'Self-emp-not-inc',
  '83311',
  'Bachelors',
  '13',
  'Married-civ-spouse',
  'Exec-managerial',
  'Husband',
  'White',
  'Male',
  '0',
  '0',
  '13',
  'United-States',
  '<=50K']]

You probably have noticed that the whole operation returns nested lists (lists within a list) and if we ask for the shape of the output we see that it isn't `flat` (i.e. one dimension).

In [7]:
np.shape(adult_rdd_split2.take(2))

(2, 15)

There is the possibility to force Spark to return a `flat` object using the function __`flatMap`__ but keep in mind that this function returns a long flatten list of all elements.

In [8]:
adult_rdd_split_flat = adult_rdd.flatMap(lambda line: line.split(', '))
# take the first entry
adult_rdd_split_flat.take(1)

['39']

In [9]:
# take the first fiftenn entries
adult_rdd_split_flat.take(15)

['39',
 'State-gov',
 '77516',
 'Bachelors',
 '13',
 'Never-married',
 'Adm-clerical',
 'Not-in-family',
 'White',
 'Male',
 '2174',
 '0',
 '40',
 'United-States',
 '<=50K']

## The `filter` transformation

Now we know how to split all elements in our RDD we may want to only keep a subset of the original dataset based on a condition.
For instance assume we want to keep only entries corresponding to people earning more than 50K then we will need to `filter` our data.
Here is how to do it :

In [10]:
# Doing it in a single line
adult_50K = adult_rdd_split2.filter(lambda x: x[-1] == '>50K')
adult_50K.take(2)

[['52',
  'Self-emp-not-inc',
  '209642',
  'HS-grad',
  '9',
  'Married-civ-spouse',
  'Exec-managerial',
  'Husband',
  'White',
  'Male',
  '0',
  '0',
  '45',
  'United-States',
  '>50K'],
 ['31',
  'Private',
  '45781',
  'Masters',
  '14',
  'Never-married',
  'Prof-specialty',
  'Not-in-family',
  'White',
  'Female',
  '14084',
  '0',
  '50',
  'United-States',
  '>50K']]

Here I know that my RDD elements are lists of strings and the information I want is in the last position.  
Try to apply the same `filter` function to the flattened RDD (`adult_rdd_split_flat`) and see if the result makes sense to you...

# Actions

So far in this notebook we have seen three _transformations_ (`map()`, `flatMap()` and `filter()`) but also already an _action_ : `take()` .  
Hereafter I will quickly (re-)introduce three basic _actions_ `take()`, `first()` and `collect()`.

## The `take` and `first` actions

The __`take(n)`__ operation will simply returns the first _`n`_ elements in the RDD.
There is not much to say about it except that if you want to get only the first element there is an equivalent action to __`take(1)`__ and it's __`first()`__.  
A short illustration :

In [11]:
adult_rdd_split2.take(3)

[['39',
  'State-gov',
  '77516',
  'Bachelors',
  '13',
  'Never-married',
  'Adm-clerical',
  'Not-in-family',
  'White',
  'Male',
  '2174',
  '0',
  '40',
  'United-States',
  '<=50K'],
 ['50',
  'Self-emp-not-inc',
  '83311',
  'Bachelors',
  '13',
  'Married-civ-spouse',
  'Exec-managerial',
  'Husband',
  'White',
  'Male',
  '0',
  '0',
  '13',
  'United-States',
  '<=50K'],
 ['38',
  'Private',
  '215646',
  'HS-grad',
  '9',
  'Divorced',
  'Handlers-cleaners',
  'Not-in-family',
  'White',
  'Male',
  '0',
  '0',
  '40',
  'United-States',
  '<=50K']]

In [12]:
adult_rdd_split2.first()

['39',
 'State-gov',
 '77516',
 'Bachelors',
 '13',
 'Never-married',
 'Adm-clerical',
 'Not-in-family',
 'White',
 'Male',
 '2174',
 '0',
 '40',
 'United-States',
 '<=50K']

Simple, isn't it ?  

If we want to just look at the data without storing RDDs in a new variable it is possible to call `take` and `first` inline with the transformation :

In [13]:
adult_rdd.map(lambda line: line.split(', ')).first()

['39',
 'State-gov',
 '77516',
 'Bachelors',
 '13',
 'Never-married',
 'Adm-clerical',
 'Not-in-family',
 'White',
 'Male',
 '2174',
 '0',
 '40',
 'United-States',
 '<=50K']

## The `collect` action

What if I want to return the whole transformed dataset ?  
No need to know the size or number of elements in the RDD, the __`collect()`__ function is here for you :

In [14]:
adult_collect = adult_50K.collect()
adult_collect[:2]

[['52',
  'Self-emp-not-inc',
  '209642',
  'HS-grad',
  '9',
  'Married-civ-spouse',
  'Exec-managerial',
  'Husband',
  'White',
  'Male',
  '0',
  '0',
  '45',
  'United-States',
  '>50K'],
 ['31',
  'Private',
  '45781',
  'Masters',
  '14',
  'Never-married',
  'Prof-specialty',
  'Not-in-family',
  'White',
  'Female',
  '14084',
  '0',
  '50',
  'United-States',
  '>50K']]

In [15]:
np.shape(adult_collect)

(7841, 15)

# Summary and Neat Code

In this notebook we have introduced the two types of _operations_ one can perform on a RDD, i.e. _transformations_ and _actions_. We have more specifically described :  

* 3 _transformations_ : `map`, `flatMap` and `filter`
* 3 _actions_ : `take`, `first` and `collect`

__Don't forget that no _transformation_ is actually performed until an _action_ is called__

The aforementioned operations come in addition to the _transformations_ `repartition()` and `coalesce()` we mentioned in the previous notebook.

## Cleaned code

I have tried in the above to detail each step, making sure that each line correspond to a single operation.
However there is the possibility to write a more compact code to split and filter the data.  
Below is the short commented version of the code :

In [16]:
# reading the data
adult_RDD = sc.textFile('adult.data')

# perform 'map' , 'filter' and 'collect'
adult_RDD\
    .map(lambda line: line.split(', '))\
    .filter(lambda x: x[-1] == '>50K')\
    .collect()

[['52',
  'Self-emp-not-inc',
  '209642',
  'HS-grad',
  '9',
  'Married-civ-spouse',
  'Exec-managerial',
  'Husband',
  'White',
  'Male',
  '0',
  '0',
  '45',
  'United-States',
  '>50K'],
 ['31',
  'Private',
  '45781',
  'Masters',
  '14',
  'Never-married',
  'Prof-specialty',
  'Not-in-family',
  'White',
  'Female',
  '14084',
  '0',
  '50',
  'United-States',
  '>50K'],
 ['42',
  'Private',
  '159449',
  'Bachelors',
  '13',
  'Married-civ-spouse',
  'Exec-managerial',
  'Husband',
  'White',
  'Male',
  '5178',
  '0',
  '40',
  'United-States',
  '>50K'],
 ['37',
  'Private',
  '280464',
  'Some-college',
  '10',
  'Married-civ-spouse',
  'Exec-managerial',
  'Husband',
  'Black',
  'Male',
  '0',
  '0',
  '80',
  'United-States',
  '>50K'],
 ['30',
  'State-gov',
  '141297',
  'Bachelors',
  '13',
  'Married-civ-spouse',
  'Prof-specialty',
  'Husband',
  'Asian-Pac-Islander',
  'Male',
  '0',
  '0',
  '40',
  'India',
  '>50K'],
 ['40',
  'Private',
  '121772',
  'Assoc-vo