# Resilient Distributed Datasets (RDD)

Think of a RDD as a distributed dataset. From a Pythonic point of view, imagine a list of integers. This list of integers is one dataset and sits on one computer. 

```python
data = [18, 19, 21, 17]
```

Now, imagine we could somehow split this dataset into two parts and place them on different computers.

```python
data_part_1 = [18, 19] # goes to computer 1
data_part_2 = [21, 17] # goes to computer 2
```

At its most basic level, an RDD is conceptually a collection of elements that is spread around different computers.

## Acquiring a RDD

How do we create a RDD or where does an RDD come from? RDDs may be created programmatically or from reading files. 

### Creating a RDD

The easiest way to programmatically create an RDD is to use the `parallelize()` method from the **spark context** `sc`. Note that we pass in a list of numbers; the list of numbers is generated using a `list comprehension`.

In [1]:
num_rdd = sc.parallelize([i for i in range(10)])
type(num_rdd)

pyspark.rdd.RDD

### Create a pair RDD

Just think of a `pair RDD` as a distributed dataset whose records are key-value pairs. From a Pythonic point of view, think about a list of tuples. This list of tuples is one dataset and sits on one computer. For each tuple in this list, 
* the first element is a name and plays the role of the `key`, and 
* the second element is an age and plays the role of the `value`. 

```python
data = [('john', 18), ('jack', 19), ('jill', 21), ('jenn', 17)]
```

Now, imagine we could somehow split this dataset into two parts and place them on different computers.

```python
data_part_1 = [('john', 18), ('jack', 19)] # goes to computer 1
data_part_2 = [('jill', 21), ('jenn', 17)] # goes to computer 2
```

At its most basic level, a pair RDD is conceptually a collection of 2-tuples that is spread around different computers. Below, we create a pair RDD where the key is a number and the value is the key multiplied by itself.

In [2]:
pair_rdd = sc.parallelize([(i, i*i) for i in range(10)])
type(num_rdd)

pyspark.rdd.RDD

### Read a RDD from HDFS

If we store a `CSV` file in `HDFS` (Hadoop Distributed File System), we can read the contents into a RDD via `sc.textFile()`.

In [3]:
data_rdd = sc.textFile('hdfs://localhost/data.csv')
type(data_rdd)

pyspark.rdd.RDD

## Transformations

After we acquire a RDD, we can do two broad categories of operations.

* Transformation: an operation to change the data
* Action: an operation to collect the data

Transformation operations are `lazily` evaluated. Just because you have applied a transformation to a RDD does not mean anything will happen. Only when you execute an action against the RDD does computation actually start. Let's look at some types of transformations that we may perform against RDDs.

### Map

The ``map()`` function transforms each element into something else. Below, we transform the original number into a new ones by

* multiplying that number by itself,
* adding one to that number,
* subtracting one from that number, and
* dividing that number by ten.

In [4]:
num_rdd.map(lambda x: x * x).collect()

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

In [5]:
num_rdd.map(lambda x: x + 1).collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [6]:
num_rdd.map(lambda x: x - 1).collect()

[-1, 0, 1, 2, 3, 4, 5, 6, 7, 8]

In [7]:
num_rdd.map(lambda x: x / 10).collect()

[0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]

### Filter

The ``filter()`` method removes elements from a RDD. The filter method must supply a function that returns `True` (to keep) or `False` (to remove) each element. Below, we filter even and odd elements out of the data. 

In [8]:
num_rdd.filter(lambda x: x % 2 == 0).collect()

[0, 2, 4, 6, 8]

In [9]:
num_rdd.filter(lambda x: x % 2 != 0).collect()

[1, 3, 5, 7, 9]

### Flat map

The `flatMap()` function flattens lists of lists into a list of elements. Let's say we have the following list of list.

In [10]:
data = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]

What we want to do is to flatten this list `data` so that the resulting list is as follows.

```python
[1, 2, 3, 4, 5, 6, 7, 8, 9]
```

How do we flatten a list of lists in Python? In Python we can use the `chain()` method from the `itertools` module.

In [11]:
from itertools import chain

list(chain(*data))

[1, 2, 3, 4, 5, 6, 7, 8, 9]

When using PySpark, the `flatMap()` function does the flattening for us.

In [12]:
data = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
sc.parallelize(data).flatMap(lambda x: x).collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9]

### Sample

If we want to take samples from a RDD, we can use the `sample()` method. The arguments to `sample()` are as follows.

* `withReplacement` will indicate if we want to sample with replacement (records can be selected multiple times)
* `fraction` specifies the percentage of the data we want to bring back
* `seed` will be the seed used to control for randomization during sampling

In [13]:
num_rdd.sample(withReplacement=False, fraction=0.2, seed=37).collect()

[0, 3, 7]

### Union

If we have two RDDs, we can bring them together through `union()`.

In [14]:
num_rdd.union(num_rdd).collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

### Intersection

Between two RDDs, if we want only the elements they share in common, we can apply the `intersection()` function.

In [15]:
a = sc.parallelize([1, 2, 3])
b = sc.parallelize([3, 4, 5])

a.intersection(b).collect()

[3]

### Distinct

The `distinct()` function will bring back only unique elements.

In [16]:
a = sc.parallelize([1, 2, 2, 3, 4])
a.distinct().collect()

[1, 2, 3, 4]

### Group by key

If we have a pair RDD, we can group data by the key using `groupByKey()`. After we apply `groupByKey()` a new pair RDD is created where 
* the key is the key as before and 
* the value is an `iterable`.

Below, we convert the `iterable` to a list using the `list()` function. The `groupByKey()` is an expensive operation as it causes data shuffling. In the Spark framework, we work extra hard to keep data from moving (as there is a lot of data and we do not desire to congest the network with such movement of huge data); the only thing we desire to move is the compute code. Try to avoid `groupByKey()` when you can.

In [17]:
a = sc.parallelize([(1, 1), (1, 2), (2, 4), (2, 3)])

for key, it in a.groupByKey().collect():
    print(key, list(it))

1 [1, 2]
2 [4, 3]


### Reduce by key

If we wanted to collapse all the values associated with a key in a pair RDD, we need to use the `reduceByKey()` function. The `reduceByKey()` function is much more efficient than `groupByKey()`. We should work extra hard to modify logic that works for `groupByKey()` to work for and use `reduceByKey()`.

Below, we simply sum over all the values associated with a key.

In [18]:
a = sc.parallelize([(1, 1), (1, 2), (2, 4), (2, 3)])
a.reduceByKey(lambda a, b: a + b).collect()

[(1, 3), (2, 7)]

Here is a sort of an `anti-pattern` using `groupByKey()` to add the elements associated with each key. We get the same result as with `reduceByKey()`, but with potentially extra overhead (data shuffling).

In [19]:
add_elements = lambda tup: (tup[0], sum(list(tup[1])))

sc.parallelize([(1, 1), (1, 2), (2, 4), (2, 3)])\
    .groupByKey()\
    .map(add_elements)\
    .collect()

[(1, 3), (2, 7)]

Here's an interesting dataset. It's a list of 2-tuples, where the first element is the key or unique identifier of a person, and the second element is a piece of information (stored in a map) about the person. How do we use `reduceByKey()` to merge all the information according to the unique identifier? If you look below, you will notice that we merge the dictionaries using the **dictionary unpacking operator** `**`.

In [20]:
data = [
    (1, {'name': 'john'}),
    (2, {'name': 'jack'}),
    (1, {'age': 23}),
    (2, {'age': 24}),
]

sc.parallelize(data).reduceByKey(lambda a, b: {**a, **b}).collect()

[(1, {'name': 'john', 'age': 23}), (2, {'name': 'jack', 'age': 24})]

### Aggregate by key

In a pair RDD, we can specify how to aggregate values by keys **within** and **between** partitions. There are three arguments required.

* an initial value
* a combining function to aggregate within a partition
* a merging function to aggregate between partitions

Below, are some examples of how to aggregate by key. 

In [21]:
a = sc.parallelize([(1, 1), (1, 2), (2, 4), (2, 3)])
a.aggregateByKey('value', lambda s, d: f'{s} {d}', lambda s1, s2: f'{s1}, {s2}').collect()

[(1, 'value 1, value 2'), (2, 'value 4, value 3')]

In [22]:
a = sc.parallelize([(1, 1), (1, 2), (2, 4), (2, 3)])
a.aggregateByKey(0, lambda s, d: s + d, lambda s1, s2: s1 + s2).collect()

[(1, 3), (2, 7)]

### Sort by key

We can also sort records by key in a pair RDD using `sortByKey()`.

In [23]:
a = sc.parallelize([(1, 1), (3, 2), (5, 4), (4, 3)])
a.sortByKey().collect()

[(1, 1), (3, 2), (4, 3), (5, 4)]

### Join

If we have two pair RDDs, we can perform a join based on the keys using `join()`.

In [24]:
a = sc.parallelize([(1, 1), (2, 2), (3, 3)])
b = sc.parallelize([(1, 2), (2, 3), (3, 4)])
a.join(b).collect()

[(1, (1, 2)), (2, (2, 3)), (3, (3, 4))]

Note that `join()` is like a SQL inner join; only records with keys in both RDDs will be returned.

In [25]:
a = sc.parallelize([(1, 1), (2, 2), (3, 3), (5, 5)])
b = sc.parallelize([(1, 2), (2, 3), (3, 4), (6, 6)])
a.join(b).collect()

[(1, (1, 2)), (2, (2, 3)), (3, (3, 4))]

### Left outer join

The `leftOuterJoin()` will join two pair RDDs like a SQL left-outer join. All records on the left will be returned even if there is not a corresponding matching record on the right.

In [26]:
a = sc.parallelize([(1, 1), (2, 2), (3, 3)])
b = sc.parallelize([(1, 2), (2, 3), (4, 5)])
a.leftOuterJoin(b).collect()

[(1, (1, 2)), (2, (2, 3)), (3, (3, None))]

### Right outer join

The `rightOuterJoin()` will join two pair RDDs like a SQL right-outer join. All records on the right will be returned even if there is not a corresponding matching record on the left.

In [27]:
a = sc.parallelize([(1, 1), (2, 2), (3, 3)])
b = sc.parallelize([(1, 2), (2, 3), (4, 5)])
a.rightOuterJoin(b).collect()

[(1, (1, 2)), (2, (2, 3)), (4, (None, 5))]

### Full outer join

The `fullOuterJoin()` will join two pair RDDs like a SQL full-outer join. All records on the left and right will be returned.

In [28]:
a = sc.parallelize([(1, 1), (2, 2), (3, 3)])
b = sc.parallelize([(1, 2), (2, 3), (4, 5)])
a.fullOuterJoin(b).collect()

[(1, (1, 2)), (2, (2, 3)), (3, (3, None)), (4, (None, 5))]

### Cogroup

The `cogroup()` function will bring the values from two pair RDDs together. 

In [29]:
a = sc.parallelize([(1, 1), (1, 2), (2, 3), (2, 4), (3, 5), (3, 6)])
b = sc.parallelize([(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd'), (3, 'e'), (3, 'f')])

for key, (it1, it2) in a.cogroup(b).collect():
    print(key, list(it1), list(it2))

1 [1, 2] ['a', 'b']
2 [3, 4] ['c', 'd']
3 [5, 6] ['e', 'f']


### Cartesian

In Python, if we had two list as follows, and we wanted the cartesian product of those two lists, we use `product` from the `itertools` module.

In [30]:
from itertools import product

a = [1, 2, 3, 4]
b = ['a', 'b', 'c', 'd']

list(product(*[a, b]))

[(1, 'a'),
 (1, 'b'),
 (1, 'c'),
 (1, 'd'),
 (2, 'a'),
 (2, 'b'),
 (2, 'c'),
 (2, 'd'),
 (3, 'a'),
 (3, 'b'),
 (3, 'c'),
 (3, 'd'),
 (4, 'a'),
 (4, 'b'),
 (4, 'c'),
 (4, 'd')]

We can achieve the same using the `cartesian()` function on two RDDs. 

In [31]:
a = sc.parallelize([1, 2, 3, 4])
b = sc.parallelize(['a', 'b', 'c', 'd'])
a.cartesian(b).collect()

[(1, 'a'),
 (1, 'b'),
 (1, 'c'),
 (1, 'd'),
 (2, 'a'),
 (2, 'b'),
 (2, 'c'),
 (2, 'd'),
 (3, 'a'),
 (3, 'b'),
 (3, 'c'),
 (3, 'd'),
 (4, 'a'),
 (4, 'b'),
 (4, 'c'),
 (4, 'd')]

### Repartition

We can force our distributed dataset (the RDD) into a specified number of partitions using `repartition()`.

In [32]:
a = sc.parallelize(['hello', 'world'])
print(a.getNumPartitions())

a = a.repartition(2)
print(a.getNumPartitions())

8
2


### Coalesce

We can also force our RDD into a specified number of partitions using `coalesce()`.

In [33]:
a = sc.parallelize(['hello', 'world'])
print(a.getNumPartitions())

a = a.coalesce(2)
print(a.getNumPartitions())

8
2


So, `repartition()` and `coalesce()` seem to do the same thing: forcing the data into a specified number of partitions. What's the difference? 

* `repartition()` incurs a cost of a shuffling and creating new partitions, however, the resulting partitions are roughly equal in size.
* `coalesce()` minimizes shuffling of data and reuses existing partitions, however, the result partitions will most likely not be roughly equal in size.

Which should I use? It depends on your goals and/or preferences. If you want computation to be evenly distributed, go for `repartition()`, otherwise, save time by **not** shuffling data and use `coalesce()`.

### Pipe

The `pipe()` function enables you to specify an external script or program to transform the data. The script or program must be able to receive the data as input and should return an output. The script must be accessible on all the compute nodes; a common mistake is that the script only exists on the driver node and your piping fails. 

In the code below, by default, the RDD has 12 partitions, that is why we see 12 outputs of `One-Off Coder`. Obviously, or not, 10 of the partitions have no data and only 2 of them do (one for **hello** and one for **world**). The script is just a simple echo and looks like the following.

```bash
#!/bin/sh
echo 'One-Off Coder'
while read LINE; do
    echo ${LINE}
done
```

In [34]:
a = sc.parallelize(['hello', 'world'])
a.pipe('/root/ipynb/echo.sh').collect()

['One-Off Coder',
 'One-Off Coder',
 'One-Off Coder',
 'One-Off Coder',
 'hello',
 'One-Off Coder',
 'One-Off Coder',
 'One-Off Coder',
 'One-Off Coder',
 'world']

If we force the number of partitions to 2, then we get more sensible output.

In [35]:
a = sc.parallelize(['hello', 'world']).repartition(2)
a.pipe('/root/ipynb/echo.sh').collect()

['One-Off Coder', 'hello', 'One-Off Coder', 'world']

If we force the number of partitions to 1, then all the data will be fed to one instance of the script (that's why we see `One-Off Coder` once only).

In [36]:
a = sc.parallelize(['hello', 'world']).repartition(1)
a.pipe('/root/ipynb/echo.sh').collect()

['One-Off Coder', 'hello', 'world']

### Repartition and sort within partitions

For a pair RDD, we can control how many partitions we want and which records go into which partitions with `repartitionAndSortWithinPartitions()`. What we get for free is sorting within each partition. The arguments for `repartitionAndSortWithinPartitions()` are as follows.

* `numPartitions` specifies the number of desired partitions
* `partitionFunc` specifies how to assign records to partitions
* `ascending` specifies if we want to sort ascendingly
* `keyfunc` specifies how to retrieve the key

In [37]:
sc.parallelize([(1, 5), (2, 15), (1, 4), (2, 14), (1, 3), (2, 13)])\
    .map(lambda tup: (tup, tup[1]))\
    .repartitionAndSortWithinPartitions(
        numPartitions=2, 
        partitionFunc=lambda tup: tup[0] % 2)\
    .map(lambda tup: tup[0])\
    .collect()

[(2, 13), (2, 14), (2, 15), (1, 3), (1, 4), (1, 5)]

## Actions

Remember, **transformations** on RDDs create other RDDs and are lazily evaluated (no computational cost is incurred). On the other hand, when an **action** is applied to a RDD, a non-RDD is the result and the data is typically returned to the driver node (or the user from the worker nodes).

### Reduce

The `reduce()` function collapses all the elements into one. 

In [38]:
a = sc.parallelize([1, 2, 3])
a.reduce(lambda a, b: a + b)

6

Reducing data does not have to be math operations like adding. Below, we merge the dictionaries into one.

In [39]:
a = sc.parallelize([{'fname': 'john'}, {'lname': 'doe'}, {'age': 32}])
a.reduce(lambda a, b: {**a, **b})

{'fname': 'john', 'lname': 'doe', 'age': 32}

We can also reduce data by selecting on the smallest value.

In [40]:
from random import randint

a = sc.parallelize([randint(10, 1000) for _ in range(100)])
a.reduce(lambda a, b: min(a, b))

24

### Collect

The `collect()` function is an action that we have been using all along. This function simply brings back the distributed data into one list on the driver. Be careful, though, as if the data is huge, this operation may fail.

In [41]:
a = sc.parallelize([1, 2, 3])
a.collect()

[1, 2, 3]

### Count

The `count()` function counts the number of elements in a RDD.

In [42]:
a = sc.parallelize([1, 2, 3])
a.count()

3

Below, we generate 1,000 random numbers in the range $[1, 10]$. We then perform a `map()` operation creating a list of $x$ length for each $x$, followed by a `flatMap()` and then `count()`.

In [43]:
from random import randint

a = sc.parallelize([randint(1, 10) for _ in range(1000)])
a.map(lambda x: [x for _ in range(x)]).flatMap(lambda x: x).count()

5639

### First

The function `first()` always returns the first record back from a RDD.

In [44]:
a = sc.parallelize([1, 2, 3])
a.first()

1

### Take

We can bring back the first $n$ records using `take()`.

In [45]:
a = sc.parallelize([1, 2, 3])
a.take(2)

[1, 2]

### Take sample

We can bring back random records using `takeSample()`.

In [46]:
a = sc.parallelize([i for i in range(100)])
a.takeSample(withReplacement=False, num=10, seed=37)

[21, 86, 84, 83, 26, 59, 92, 0, 48, 44]

### Take ordered

We can bring back the first $n$ records in order using `takeOrdered()`.

In [47]:
from random import randint

a = sc.parallelize([randint(1, 10000) for _ in range(1000)])
a.takeOrdered(10)

[8, 40, 40, 55, 63, 68, 69, 81, 83, 106]

### Count by key

Counting the number of records associated with a key is accomplished through `countByKey()`.

In [48]:
a = sc.parallelize([(randint(1, 10), 1) for _ in range(10000)])
a.countByKey()

defaultdict(int,
            {2: 1045,
             1: 972,
             7: 1005,
             8: 975,
             10: 1044,
             5: 1017,
             4: 1008,
             6: 904,
             9: 999,
             3: 1031})

## Chaining transformations and actions

The power of transformations and actions emerges from chaining them together. 

### Map, filter, reduce

The three basic functions introduced when we start to adopt `functional programming` are `map()`, `filter()` and `reduce()`. Below, we map each number $x$ to $x \times x$, filter for only even numbers, and then add the results.

In [49]:
num_rdd = sc.parallelize([i for i in range(10)])

num_rdd\
    .map(lambda x: x * x)\
    .filter(lambda x: x % 2 == 0)\
    .reduce(lambda a, b: a + b)

120

### Filter, map, take

Here's an example of parsing out a `CSV` file. Note that we have to filter out the row starting with `x` since that indicates the header (for this CSV file). We then split (or tokenize) the line specifying the delimiter as a comma `,`. We finally convert all the tokens, which are strings, to integers. We take the first 10 records (rows) to see if we parsed the CSV file correctly.

In [50]:
data_rdd = sc.textFile('hdfs://localhost/data.csv')

data_rdd\
    .filter(lambda s: False if s.startswith('x') else True)\
    .map(lambda s: s.split(','))\
    .map(lambda arr: [int(s) for s in arr])\
    .take(10)

[[14, 22, 25, 63, 47, 52, 13, 14, 23, 27],
 [35, 80, 38, 28, 73, 69, 21, 16, 76, 53],
 [46, 37, 46, 55, 78, 68, 61, 62, 81, 82],
 [19, 12, 45, 50, 71, 63, 94, 7, 10, 77],
 [50, 94, 94, 87, 67, 89, 73, 17, 39, 7],
 [47, 97, 64, 7, 47, 40, 77, 63, 50, 21],
 [33, 0, 99, 46, 43, 32, 47, 20, 4, 67],
 [46, 100, 28, 8, 34, 49, 62, 77, 4, 51],
 [14, 12, 50, 96, 57, 59, 40, 87, 44, 48],
 [13, 48, 30, 62, 88, 99, 65, 94, 13, 34]]

### Merging dictionaries

We already saw some examples of merging dictionaries. Here's another example.

In [51]:
sc.parallelize([(randint(1, 10), 1) for _ in range(10000)])\
    .reduceByKey(lambda a, b: a + b)\
    .map(lambda tup: {tup[0]: tup[1]})\
    .reduce(lambda a, b: {**a, **b})

{8: 1001,
 1: 985,
 9: 1006,
 10: 1002,
 2: 969,
 3: 980,
 4: 984,
 5: 1009,
 6: 1010,
 7: 1054}

## Broadcasting variables

If we have data that needs to be shared across the worker nodes, we can `broadcast` that data. Below, we have a dictionary `m` that is local to the driver and we want to broadcast (make it available) it to all the worker nodes. We broadcast `m` with `sc.broadcast()` and assign the reference to `b`; note that `b` wraps the data `m` and we can access the dictionary through `value` property of `b` (e.g. `b.value`). Now our parallel operations can access the dictionary.

In [52]:
from random import randint

m = {i: randint(1, 10) for i in range(101)}
b = sc.broadcast(m)

sc.parallelize([randint(1, 100) for _ in range(20000)])\
    .map(lambda num: (b.value[num], 1))\
    .reduceByKey(lambda a, b: a + b)\
    .collect()

[(8, 2197),
 (1, 620),
 (9, 1600),
 (2, 1734),
 (10, 1630),
 (3, 2427),
 (4, 1920),
 (5, 1996),
 (6, 2664),
 (7, 3212)]

## Accumulator

If we want to keep count of things or put metrics on our operations, we need to use an `accumulator`. The `accumulator` is defined locally (on the driver) but is visible across the worker nodes. Below, we use an accumulator to simply keep track of the number of map operations.

In [53]:
accum = sc.accumulator(0)

sc.parallelize([i for i in range(10000)])\
    .map(lambda num: accum.add(1))\
    .count()

accum.value

10000