---
---

<center> <h1> Transformations & Actions on RDDs </h1> </center>

In the notebook, we will implement different transformations and actions using Python.



### `Transformations`


We will do the following transformations in this notebook:

* 1. **Map**
* 2. **FlatMap**
* 3. **Filter**
* 4. **Distinct**
* 5. **Union**
* 6. **Intersection**


### `Actions`


We will do the following Actions in this notebook:

* 1. **Collect**
* 2. **Take**
* 3. **Count**



---


### `IMPORTING THE REQUIRED LIBRARIES`

---

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=c300891e79e70e95f2418a4a0d1bd2b9814d2eae196efccb3f1be8c523768b5a
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
from pyspark.context import SparkContext

In [5]:
sc = SparkContext()


ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local[*]) created by __init__ at <ipython-input-3-2dfc28fca47d>:1 

In [6]:
sc

---
---
#### ` Problem Statement`

Suppose there is an orgranization name **`Analytics 20`**. It has 2 different branches, one in **`India`** and another one is in **`Dubai`** We have generated a random data of the employees of this organization. One file **`analytics_20_india.txt`** contains the data of employees of India and another one **`analytics_20_dubai.txt`** that contains the data of employees of Dubai.

Each line of the both the files contains 3 columns. First one is `Name of the employee`, second one is `Department Name` in which he/she works and last one is `Place Name` to which the employee belongs. Data is as shown below -

<center><img src="images/rdd_op_dataset.png"></center>

---
---

#### `Reading the file - analytics_20_india.txt`

---

In [16]:
analytics_india=sc.textFile('/content/analytics_20_india.txt')

In [17]:
type(analytics_india)

---
##### Once we read the file in the spark, it has been converted into an RDD.

---
---

#### `Action - collect`

**collect** action will return the complete output.

---

In [9]:
# Collect all Records
analytics_india.collect()

['Keaton Data_Science India',
 'Idona Data_Science Australia',
 'Janna HR India',
 'Damon Data_Science India',
 'Rahim Marketing India',
 'Audrey Data_Science India',
 'Irma HR Dubai',
 'Tatum HR India',
 'Acton Data_Science India',
 'Ainsley Data_Science India',
 'Phillip Data_Science India',
 'Maite Marketing India',
 'Kevyn Marketing Australia',
 'Vielka HR India',
 'Risa Operations India',
 'Jael Accounts Dubai',
 'Erich Data_Science India',
 'Pearl Operations Australia',
 'Francesca Data_Science India',
 'Ross Sales India',
 'Tarik HR Dubai',
 'Lev HR India',
 'Nerea Accounts India',
 'Halla Sales India',
 'Daquan Legal India',
 'Ivan HR India',
 'Venus HR India',
 'Lareina Legal India',
 'Orlando Sales Australia',
 'Denise Accounts India',
 'Alvin Accounts Dubai',
 'Rafael Data_Science Australia',
 'Whoopi Data_Science Australia',
 'Norman Legal Dubai',
 'Forrest Sales Dubai',
 'Sigourney Legal India',
 'Stone Legal Scotland',
 'Todd Sales India',
 'Jerome Sales India',
 'Signe H

---
---
#### `Action - take`

**take** action will return the top n (takes an integer as a parameter) results of the query. It the similar to the head funciton of pandas.


---

In [10]:
# Take 5 records
analytics_india.take(5)

['Keaton Data_Science India',
 'Idona Data_Science Australia',
 'Janna HR India',
 'Damon Data_Science India',
 'Rahim Marketing India']

---
---
#### `Transformation - map`

**map** transformation does the same operation on each of the object.

We will split each line into a list of words using **map**.

---

In [22]:
# Map transformation-Tokenize all records
analytics_india_map = analytics_india.map(lambda x: x.split(' '))

In [12]:
analytics_india_map

PythonRDD[3] at RDD at PythonRDD.scala:53

In [23]:
analytics_india_map.take(5)

[['Keaton', 'Data_Science', 'India'],
 ['Idona', 'Data_Science', 'Australia'],
 ['Janna', 'HR', 'India'],
 ['Damon', 'Data_Science', 'India'],
 ['Rahim', 'Marketing', 'India']]

---
---
#### `Transformation - distinct`

**distinct** is used to find the unique elements in the RDD.

Find out the list of unique places of origin of the employees in the India branch.

---

In [24]:
#Create rdd with only country value
analytics_india_places= analytics_india_map.map(lambda x : x[2])

In [25]:
analytics_india_places

PythonRDD[12] at RDD at PythonRDD.scala:53

In [27]:
analytics_india_places.take(6)

['India', 'Australia', 'India', 'India', 'India', 'India']

In [28]:
# Apply the distinct transformation
analytics_india_distinct_places =analytics_india_places.distinct()

In [29]:
# Name of the distinct countrie of people working for india branch
analytics_india_distinct_places.collect()

['India', 'Australia', 'Dubai', 'Scotland']

In [30]:
# use count action to find out total places
analytics_india_distinct_places.count()

4

---
---

#### `Transformation - filter`

**filter** transformation only returns the elements which satisfies the given condition.

Find out the data of the people who belong to the **India**.


---

In [31]:
analytics_india_map.take(5)

[['Keaton', 'Data_Science', 'India'],
 ['Idona', 'Data_Science', 'Australia'],
 ['Janna', 'HR', 'India'],
 ['Damon', 'Data_Science', 'India'],
 ['Rahim', 'Marketing', 'India']]

In [32]:
# Filter the people who belongs to the india
analytics_india_employee_india = analytics_india_map.filter(lambda x :x[2]== 'India')

In [35]:
#analytics_india_employee_india.take(5)
analytics_india_employee_india.collect()

[['Keaton', 'Data_Science', 'India'],
 ['Janna', 'HR', 'India'],
 ['Damon', 'Data_Science', 'India'],
 ['Rahim', 'Marketing', 'India'],
 ['Audrey', 'Data_Science', 'India'],
 ['Tatum', 'HR', 'India'],
 ['Acton', 'Data_Science', 'India'],
 ['Ainsley', 'Data_Science', 'India'],
 ['Phillip', 'Data_Science', 'India'],
 ['Maite', 'Marketing', 'India'],
 ['Vielka', 'HR', 'India'],
 ['Risa', 'Operations', 'India'],
 ['Erich', 'Data_Science', 'India'],
 ['Francesca', 'Data_Science', 'India'],
 ['Ross', 'Sales', 'India'],
 ['Lev', 'HR', 'India'],
 ['Nerea', 'Accounts', 'India'],
 ['Halla', 'Sales', 'India'],
 ['Daquan', 'Legal', 'India'],
 ['Ivan', 'HR', 'India'],
 ['Venus', 'HR', 'India'],
 ['Lareina', 'Legal', 'India'],
 ['Denise', 'Accounts', 'India'],
 ['Sigourney', 'Legal', 'India'],
 ['Todd', 'Sales', 'India'],
 ['Jerome', 'Sales', 'India'],
 ['Signe', 'HR', 'India'],
 ['Xavier', 'Legal', 'India'],
 ['Kevin', 'Customer_Support', 'India'],
 ['Michelle', 'Customer_Support', 'India'],
 ['Ign

---

Let's find out the data of the people who belongs to **Dubai** and are in **HR** department.

---

In [36]:
# Apply the filter transformation dubai and HR
analytics_india_filtered_data = analytics_india_map.filter(lambda x : (x[1]=='HR')&(x[2]=='Dubai'))

In [37]:
analytics_india_filtered_data.collect()

[['Irma', 'HR', 'Dubai'], ['Tarik', 'HR', 'Dubai']]

---
---

#### `Transformation - flatmap`

* We saw **map** function does a one-to-one transformation.
> * It transforms each element of a collection into one element of the resulting collection.

<center><img src ="images/map_transformation.png"></center>

* In the **flatmap** transformation, we will see that instead of multiple lists of each line it will return a single list of output.
> * Spark **flatMap** function expresses a one-to-many transformation.

Let's see the difference.

---

In [38]:
analytics_india.take(5)

['Keaton Data_Science India',
 'Idona Data_Science Australia',
 'Janna HR India',
 'Damon Data_Science India',
 'Rahim Marketing India']

In [39]:
# Flatmap transformation - tokenize
analytics_india_flatmap = analytics_india.flatMap(lambda x : x.split(' '))

In [40]:
analytics_india_flatmap.take(10)

['Keaton',
 'Data_Science',
 'India',
 'Idona',
 'Data_Science',
 'Australia',
 'Janna',
 'HR',
 'India',
 'Damon']

In [42]:
analytics_india_map.take(5)

[['Keaton', 'Data_Science', 'India'],
 ['Idona', 'Data_Science', 'Australia'],
 ['Janna', 'HR', 'India'],
 ['Damon', 'Data_Science', 'India'],
 ['Rahim', 'Marketing', 'India']]

---
---

#### `Transformation - union`

Use **union** transformation to find out all the places of origin from both branches - India and Dubai.

---

In [43]:
analytics_dubai = sc.textFile('/content/analytics_20_dubai.txt')

In [44]:
analytics_dubai.take(5)

['Leo Customer_Support Scotland',
 'Cyrus Customer_Support India',
 'Jolie Sales India',
 'Susan HR Australia',
 'Azalia Customer_Support Dubai']

In [45]:
# Map Transformation - Tokenize
analytics_dubai_map = analytics_dubai.map(lambda x: x.split(' '))

In [46]:
analytics_dubai_map.take(5)

[['Leo', 'Customer_Support', 'Scotland'],
 ['Cyrus', 'Customer_Support', 'India'],
 ['Jolie', 'Sales', 'India'],
 ['Susan', 'HR', 'Australia'],
 ['Azalia', 'Customer_Support', 'Dubai']]

In [47]:
# Select distinct places from analytics_20_dubai
analytics_dubai_places = analytics_dubai_map.map(lambda x: x[2])

In [48]:
analytics_dubai_places.take(5)

['Scotland', 'India', 'India', 'Australia', 'Dubai']

In [49]:
#Get distinct places
analytics_dubai_distinct_places=analytics_dubai_places.distinct()

In [50]:
analytics_dubai_distinct_places.collect()

['Scotland', 'India', 'Australia', 'Dubai', 'South_Africa']

In [51]:
analytics_india_distinct_places.collect()

['India', 'Australia', 'Dubai', 'Scotland']

In [52]:
# Union places from two branches
union_places = analytics_india_distinct_places.union(analytics_dubai_distinct_places)

In [53]:
# Total
union_places.collect()

['India',
 'Australia',
 'Dubai',
 'Scotland',
 'Scotland',
 'India',
 'Australia',
 'Dubai',
 'South_Africa']

---
---

#### `Transformation - intersection`

Use **intersection** transformation to find out the common places of origin of the employees from both branches - India and Dubai.

---

In [54]:
# Intersection of Both RDDs of unique place
common_places= analytics_india_distinct_places.intersection(analytics_dubai_distinct_places)

In [55]:
# Collect the results
common_places.collect()

['Dubai', 'India', 'Australia', 'Scotland']