# RDDs Transformations and Actions

## Introduction
In previous lab, we saw how to create an RDD from a Python list object, set the number of logical partitions and change basic RDD properties. We also looked at how RDD operations can be split into actions and transformations. In this lesson, we will practice some basic actions and transformations on the RDD we created in the previous lab. 

Note: Your are advised to visit all the included external links to see more examples of stated concepts.

## Objectives

* Apply the `map(func)` transformation to a given function on all elements of an RDD in different partitions. 
* Use `collect()` action to trigger the processing stage of spark's lazy evaluation. 
* Use `count()` action to calculate the number of elements of a parallelized RDD.
* Use `filter(func)` to filter unwanted data from RDDs.
* Develop an understanding of Python's lambda functions for RDDs processing. 

## Start with a parallelized RDD

Let's first import the intRDD we created earlier and tranform the elements using `map()`.

- Import the code from previous lab

In [2]:
# Include the code from previous lab to create an intRDD[1,2,..,1000]

# Code here 


At this point, we have created an RDD containing a sequence of numbers and split it into a number of partitions. We shall now try to execute basic data manipulation operations on this RDD and inspect the outcome. Data analysis usually requires the analyst to perform certain operations on every element of a dataset. In Spark such analyses tasks are run in parallel to process a subset of data in parallel to other subsets. 

## The `map(func)` "Transformation" Operation 

`map(func)` is the most commonly used and one of the basic transformations in Spark. It applies a function `func` to each data element of an RDD and outputs a resulting dataset. Running `map(func)` on a datase launches a **single stage** of tasks. A stage is a group of tasks that all perform the same computation, but on different subsets of data. Tasks are launched for each partition as shown below:

![](tasks.png)

a **TASK** is a unit of execution that runs on a single machine. When we execute `map(func)` in a partition, a new task applies `func` to all entries of data in that partition, outputting a new partition. In the example above, a dataset has been broken into 4 partitions, so four `map(func)` tasks are launched in each partition. 

Following figure shows this mechanism for a smaller example, similar to our `intRDD` . 

![](map.png)
#### `map(func)` : Each task creates a new partition by calling `func(e)` on each element `e` from the original partition. 



### Applying Map transformation

When applying a `map(func)` transformation, each item in the parent RDD maps to one element in the resuting RDD i.e. the parent RDD `intRDD` has 100 elements, the new RDD post map transformation will also have 100 elements. 

Let's try to subtract 1 from each value in the intRDD.

1. create a function `subtract()` to subtract 1 from input integer. 
2. pass each element from `intRDD` to `map(func)` where func refers to the `subtract()` function from step 1. 
3. Use `toDebugString()` to see the transformation lineage. 

In [3]:
# Create sub function to subtract 1

def subtract(value):
    
# Code here 

    pass

# Transform intRDD through map transformation using sub function
# Because map is a transformation and Spark uses lazy evaluation, no jobs, stages,
# or tasks will be launched when we run this code.


# Code here 


# Let's see the RDD transformation hierarchy


# Code here 




# b'(5) PythonRDD[1] at RDD at PythonRDD.scala:49 []\n 
# |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:184 []'

b'(4) PythonRDD[1] at RDD at PythonRDD.scala:53 []\n |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195 []'


## The `collect()` "action" method

If we want to view the contents of resulting RDD i.e. `subtractRDD`, we would need to create a new list on the driver from the data distributed in partitions. The `RDD.collect()` method is used for this purpose. You must be careful when using the collect method to ensure that the driver has enough memory for the collected data, or the driver may crash. 

- Consult the official documentation and use `RDD.collect()` to view the contents of `intRDD`. 

In [3]:
# Let's collect the data using .collect on SubtractRDD


# Code here 


[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221,

We can see that 1 has been subtracted from our original list 1 - 1000 , which now contains a 0 - 999. 

>The `collect()` method is the first **action** operation that we have encountered. Action operations cause Spark to perform the (lazy) transformation operations that are required to compute the RDD returned by the action. In our example, this means that tasks will now be launched to perform the parallelize, map, and collect operations.

Have a look at following figure to further strengthen the intuition around the `collect()` method. 

![](collect.png)




## The `count()` "action" method

`count()` is another basic action that is used to count the number of elements in an RDD. Since `map()` creates new RDD with same number of elements as base RDD, applying `count()` to base and resulting RDD should return the same result. Remember `count()` is an action operation. Had we not called `collect()` earlier, Spark would now perform the evaluation. 

- Use `RDD.count()` to count elements in `intRDD` and `subtractRDD` to see if it meets our expectations.   

In [4]:
# Count the elements in base and resulting RDDs

# Code here 

# 1000 1000

1000 1000


Each task counts the entries in its partition and sends the result to  SparkContext, which adds up all of the counts. The figure below shows what would happen if we ran count() on a small example dataset with just four partitions.
![](count.png)


## The `filter(func)` Transformation

We shall now create another RDD which would only contain values less than threshold, say 10. A `filter(func)`operation is used for filtering out unwanted data elements. `filter(func)` is a transformation method that creates a new RDD by applying `func` to each element of the parent RDD and only returns those values where `func` returns a `True` value, dropping other elements.

Let's try to apply the filter transformation using steps similar to `map(func)` seen earlier. 

1. Create a function `lessThanTen()` that takes an integer values to identify if it is < 10
2. Pass the `subRDD` elements as input to `lessThanTen()` to check for the set condition in step 1.
3. return a True or a False value back to `filter()` to be stored in `filteredRDD`.
4. Collect to trigger the transformation and view the results of `filteredRDD`.


In [4]:
# Define a function to filter a single value
def lessThanTen(value):

    pass

# Pass the function ten to the filter transformation
# Filter is a transformation so no tasks are run


# Code here 


# View the results using collect()
# Collect is an action and triggers the filter transformation to run

# Code here 




# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Following image shows a distinction between the map and filter function mentioned above
![](mf.png)

## Lambda Functions

Python supports the use of small one-line anonymous functions that are not bound to a name at runtime. Borrowed from LISP, these lambda functions can be used wherever function objects are required. They are syntactically restricted to a single expression. Remember that lambda functions are a matter of style and using them is never mandatory. You can always define a separate normal function instead, but using a lambda() function is an equivalent and more compact form of coding.

- Lets try to implement the filter as shown above using lambda function in a single line. 

In [5]:
# Apply the above function with filter(func) transformation using Python's lambda functions


# Code here 


# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Similarly , we can apply other filters. 

- Implement a filter than only outputs even values higher than 10 and less than 30. 

In [7]:
# Let's collect the even values lgreater than 10 and less than 30

# Code here 


# [12, 14, 16, 18, 20, 22, 24, 26, 28]

[12, 14, 16, 18, 20, 22, 24, 26, 28]

## Further Reading 

- [Programming with RDDs](https://runawayhorse001.github.io/LearningApacheSpark/rdd.html)

# SUMMARY

In this lab, we saw how we can apply basic transformations and actions on the data stofred within RDDs. We also looked at the Lazy Evaluation performed by Spark and learnt to differentiate between transformations and actions. We saw how lambda functions can be used as a one-line approach towards declaring functions in Spark. In next lessons, we shall see more transformations and actions on RDDs. 