<p style="text-align:center">
    <a href="https://skills.network/?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMSkillsNetworkBD0231ENCoursera2789-2023-01-01">
    <img src="https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/assets/logos/SN_web_lightmode.png" width="200" alt="Skills Network Logo">
    </a>
</p>


## ETL using Spark


Estimated time needed: **30** minutes


<p style='color: red'>The purpose of this lab is to show you how to use Spark for ETL jobs.


## __Table of Contents__

<ol>
  <li>
    <a href="#Objectives">Objectives
    </a>
  </li>
  <li>
    <a href="#Datasets">Datasets
    </a>
  </li>
  <li>
    <a href="#Setup">Setup
    </a>
    <ol>
      <li>
        <a href="#Installing-Required-Libraries">Installing Required Libraries
        </a>
      </li>
      <li>
        <a href="#Importing-Required-Libraries">Importing Required Libraries
        </a>
      </li>
    </ol>
  </li>
  <li> 
    <a href="#Examples">Examples
    </a>
    <ol>
    <li>
      <a href="#Task-1---Create-a-Dataframe-from-the-raw-data-and-write-to-CSV-file.">Task 1 - Create a Dataframe from the raw data and write to CSV file.
      </a>
    </li>
    <li>
      <a href="#Task-2---Read-from-a-csv-file-and-write-to-parquet-file">Task 2 - Read from a csv file and write to parquet file
      </a>
    </li>
    <li>
      <a href="#Task-3---Condense-PARQUET-to-a-single-file.">Task 3 - Condense PARQUET to a single file.
      </a>
    </li>
    <li>
      <a href="#Task-4---Read-from-a-parquet-file-and-write-to-csv-file">Task 4 - Read from a parquet file and write to csv file
      </a>
    </li>
      </ol>
  <li>
    <a href="#Exercises">Exercises
    </a>
  </li>
  <ol>
    <li>
      <a href="#Exercise-1---Extract">Exercise 1 - Extract
      </a>
    </li>
    <li>
      <a href="#Exercise-2---Transform">Exercise 2 - Transform
      </a>
    </li>
    <li>
      <a href="#Exercise-3---Load">Exercise 3 - Load
      </a>
    </li>
  </ol>
</ol>


## Objectives

After completing this lab you will be able to:
 
 - Create a Spark Dataframe from the raw data and write to CSV file.
 - Read from a csv file and write to parquet file
 - Condense PARQUET to a single file.
 - Read from a parquet file and write to csv file


----


## Setup


For this lab, we will be using the following libraries:

*   [`PySpark`](https://spark.apache.org/docs/latest/api/python/index.html?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMSkillsNetworkBD0231ENCoursera2789-2023-01-01) for connecting to the Spark Cluster


### Installing Required Libraries

Spark Cluster is pre-installed in the Skills Network Labs environment. However, you need libraries like pyspark and findspark to connect to this cluster.

If you wish to download this jupyter notebook and run on your local computer, follow the instructions mentioned <a href="https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/labs/Connecting_to_spark_cluster_using_Skills_Network_labs.ipynb">here.</a>



The following required libraries are __not__ pre-installed in the Skills Network Labs environment. __You will need to run the following cell__ to install them:


In [1]:
!pip install pyspark==3.1.2 -q
!pip install findspark -q

### Importing Required Libraries

_We recommend you import all required libraries in one place (here):_


In [3]:
# You can also use this section to suppress warnings generated by your code:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python

import findspark
findspark.init()

from pyspark.sql import SparkSession

In [4]:
#Create SparkSession

spark = SparkSession \
    .builder \
    .appName("ETL using Spark") \
    .getOrCreate()

24/01/28 21:38:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Task 1 - Create a Dataframe from the raw data and write to CSV file.


In [5]:
#create a list of tuples
#each tuple contains the student id, height and weight
data = [("student1",64,90),
        ("student2",59,100),
        ("student3",69,95),
        ("",70,110),
        ("student5",60,80),
        ("student3",69,95),
        ("student6",62,85),
        ("student7",65,80),
        ("student7",65,80)]

# some rows are intentionally duplicated

In [6]:
#create a dataframe using createDataFrame and pass the data and the column names.
df = spark.createDataFrame(data, ["student", "height_inches", "weight_pounds"])

In [7]:
# show the data frame
df.show(5)

                                                                                

+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student1|           64|           90|
|student2|           59|          100|
|student3|           69|           95|
|        |           70|          110|
|student5|           60|           80|
+--------+-------------+-------------+
only showing top 5 rows



Write to csv file


In [8]:
df.write.mode("overwrite").csv("student-hw.csv", header=True)

                                                                                

In [None]:
#If you do not wish to over write use df.write.csv("student-hw.csv", header=True)

Verify the csv file


In [9]:
# Load student dataset
df = spark.read.csv("student-hw.csv", header=True, inferSchema=True)
df.show(5)

+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student7|           65|           80|
|student7|           65|           80|
|student2|           59|          100|
|student1|           64|           90|
|student3|           69|           95|
+--------+-------------+-------------+
only showing top 5 rows



## Task 2 - Read from a csv file and write to parquet file


In [18]:
# Load student dataset
df = spark.read.csv("student-hw.csv", header=True, inferSchema=True)
# display dataframe
df.show()

+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student7|           65|           80|
|student7|           65|           80|
|student2|           59|          100|
|student1|           64|           90|
|student3|           69|           95|
|student5|           60|           80|
|student3|           69|           95|
|student6|           62|           85|
|    null|           70|          110|
+--------+-------------+-------------+



In [19]:
# print the number of rows in the dataframe
df.count()

9

Drop Duplicates


In [20]:
df = df.dropDuplicates()
df.show()

                                                                                

+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student6|           62|           85|
|student3|           69|           95|
|student2|           59|          100|
|student7|           65|           80|
|    null|           70|          110|
|student1|           64|           90|
|student5|           60|           80|
+--------+-------------+-------------+



In [21]:
# Notice that the duplicates are removed
# print the number of rows in the dataframe
df.count()

                                                                                

7

Drop Null values


In [22]:
#Observe the rows with null values getting dropped
df = df.dropna()
df.show()

+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student6|           62|           85|
|student3|           69|           95|
|student2|           59|          100|
|student7|           65|           80|
|student1|           64|           90|
|student5|           60|           80|
+--------+-------------+-------------+



In [23]:
df.count()

                                                                                

6

Save to parquet file


In [24]:
# Write the data to a Parquet file
# if you do not wish to overwrite use the command df.write.parquet("student-hw.parquet")
df.write.mode("overwrite").parquet("student-hw.parquet")

                                                                                

In [26]:
# verify that the parquet file(s) are created
!!ls -l student-hw.parquet

['total 7',
 '-rw-r--r-- 1 jupyterlab resources   0 Jan 28 21:44 _SUCCESS',
 '-rw-r--r-- 1 jupyterlab resources 467 Jan 28 21:44 part-00000-cb3ed2cb-1b85-4838-a3de-ac7d1aba04ff-c000.snappy.parquet',
 '-rw-r--r-- 1 jupyterlab resources 911 Jan 28 21:44 part-00003-cb3ed2cb-1b85-4838-a3de-ac7d1aba04ff-c000.snappy.parquet',
 '-rw-r--r-- 1 jupyterlab resources 911 Jan 28 21:44 part-00010-cb3ed2cb-1b85-4838-a3de-ac7d1aba04ff-c000.snappy.parquet',
 '-rw-r--r-- 1 jupyterlab resources 911 Jan 28 21:44 part-00054-cb3ed2cb-1b85-4838-a3de-ac7d1aba04ff-c000.snappy.parquet',
 '-rw-r--r-- 1 jupyterlab resources 911 Jan 28 21:44 part-00132-cb3ed2cb-1b85-4838-a3de-ac7d1aba04ff-c000.snappy.parquet',
 '-rw-r--r-- 1 jupyterlab resources 911 Jan 28 21:44 part-00172-cb3ed2cb-1b85-4838-a3de-ac7d1aba04ff-c000.snappy.parquet',
 '-rw-r--r-- 1 jupyterlab resources 911 Jan 28 21:44 part-00186-cb3ed2cb-1b85-4838-a3de-ac7d1aba04ff-c000.snappy.parquet']

Notice that there are a lot of .parquet files in the output.
- To improve parallellism, spark stores each dataframe in multiple partitions.
- When the data is saved as parquet file, each partition is saved as a separate file.


## Task 3 - Condense PARQUET to a single file.


Reduce the number of partitions in the dataframe to one.


In [27]:
df = df.repartition(1)

Save to parquet file


In [28]:
# Write the data to a Parquet file
# If you do not wish to overwrite use the command df.write.parquet("student-hw-single.parquet")
df.write.mode("overwrite").parquet("student-hw-single.parquet")

                                                                                

In [30]:
# Verify that the parquet file(s) are created
# Notice that there is only one .parquet file
!ls -l student-hw-single.parquet

total 2
-rw-r--r-- 1 jupyterlab resources   0 Jan 28 21:45 _SUCCESS
-rw-r--r-- 1 jupyterlab resources 944 Jan 28 21:45 part-00000-b1286f7b-e0c7-44c4-b2e3-27d5dd704e73-c000.snappy.parquet


## Task 4 - Read from a parquet file and write to csv file


In [31]:
df = spark.read.parquet("student-hw-single.parquet")
df.show()

+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student6|           62|           85|
|student3|           69|           95|
|student2|           59|          100|
|student7|           65|           80|
|student1|           64|           90|
|student5|           60|           80|
+--------+-------------+-------------+



Transform the data


In [33]:
# import the expr function that helps in transforming the data
from pyspark.sql.functions import expr

Convert inches to centimeters


In [34]:
# Convert inches to centimeters
# Multiply the column height_inches with 2.54 to get a new column height_centimeters
df = df.withColumn("height_centimeters", expr("height_inches * 2.54"))
df.show()

+--------+-------------+-------------+------------------+
| student|height_inches|weight_pounds|height_centimeters|
+--------+-------------+-------------+------------------+
|student6|           62|           85|            157.48|
|student3|           69|           95|            175.26|
|student2|           59|          100|            149.86|
|student7|           65|           80|            165.10|
|student1|           64|           90|            162.56|
|student5|           60|           80|            152.40|
+--------+-------------+-------------+------------------+



Convert pounds to kilograms


In [36]:
# Convert pounds to kilograms
# Multiply weight_pounds with 0.453592 to get a new column weight_kg
df = df.withColumn("weight_kg", expr("weight_pounds * 0.453592"))
df.show()

+--------+-------------+-------------+------------------+---------+
| student|height_inches|weight_pounds|height_centimeters|weight_kg|
+--------+-------------+-------------+------------------+---------+
|student6|           62|           85|            157.48|38.555320|
|student3|           69|           95|            175.26|43.091240|
|student2|           59|          100|            149.86|45.359200|
|student7|           65|           80|            165.10|36.287360|
|student1|           64|           90|            162.56|40.823280|
|student5|           60|           80|            152.40|36.287360|
+--------+-------------+-------------+------------------+---------+



Drop the columns


In [37]:
# drop the columns "height_inches","weight_pounds"
df = df.drop("heihgt_inches", "weight_pounds")
df.show()

+--------+-------------+------------------+---------+
| student|height_inches|height_centimeters|weight_kg|
+--------+-------------+------------------+---------+
|student6|           62|            157.48|38.555320|
|student3|           69|            175.26|43.091240|
|student2|           59|            149.86|45.359200|
|student7|           65|            165.10|36.287360|
|student1|           64|            162.56|40.823280|
|student5|           60|            152.40|36.287360|
+--------+-------------+------------------+---------+



Rename a column


In [38]:
# rename the lengthy column name "height_centimeters" to "height_cm"
df = df.withColumnRenamed("height_centimeters", "height_cm")
df.show()

+--------+-------------+---------+---------+
| student|height_inches|height_cm|weight_kg|
+--------+-------------+---------+---------+
|student6|           62|   157.48|38.555320|
|student3|           69|   175.26|43.091240|
|student2|           59|   149.86|45.359200|
|student7|           65|   165.10|36.287360|
|student1|           64|   162.56|40.823280|
|student5|           60|   152.40|36.287360|
+--------+-------------+---------+---------+



Save to csv file


In [39]:
df.write.mode("overwrite").csv("student_transformed.csv", header=True)

                                                                                

Verify the csv file


In [40]:
# Load student dataset
df = spark.read.csv("student_transformed.csv", header=True, inferSchema=True)
df.show()

+--------+-------------+---------+---------+
| student|height_inches|height_cm|weight_kg|
+--------+-------------+---------+---------+
|student6|           62|   157.48| 38.55532|
|student3|           69|   175.26| 43.09124|
|student2|           59|   149.86|  45.3592|
|student7|           65|    165.1| 36.28736|
|student1|           64|   162.56| 40.82328|
|student5|           60|    152.4| 36.28736|
+--------+-------------+---------+---------+



Stop Spark Session


In [41]:
spark.stop()

# Exercises


Create Spark Session


In [44]:
#Create SparkSession
spark = SparkSession \
    .builder \
    .appName("Exercise - ETL using Spark") \
    .getOrCreate()

### Exercise 1 - Extract


Load data from student_transformed.csv into a dataframe


In [45]:
# Load student dataset
df = spark.read.csv("student_transformed.csv", header=True, inferSchema=True)
df.show(5)

+--------+-------------+---------+---------+
| student|height_inches|height_cm|weight_kg|
+--------+-------------+---------+---------+
|student6|           62|   157.48| 38.55532|
|student3|           69|   175.26| 43.09124|
|student2|           59|   149.86|  45.3592|
|student7|           65|    165.1| 36.28736|
|student1|           64|   162.56| 40.82328|
+--------+-------------+---------+---------+
only showing top 5 rows



### Exercise 2 - Transform


Convert cm to meters


In [46]:
#import the expr function that helps in transforming the data
from pyspark.sql.functions import expr

In [48]:
# Convert centimeters to meters
# Divide the column height_cm by 100 a new column height_meters
df = df.withColumn("height_meters", expr("height_cm / 100"))
df.show()

+--------+-------------+---------+---------+------------------+
| student|height_inches|height_cm|weight_kg|     height_meters|
+--------+-------------+---------+---------+------------------+
|student6|           62|   157.48| 38.55532|            1.5748|
|student3|           69|   175.26| 43.09124|            1.7526|
|student2|           59|   149.86|  45.3592|1.4986000000000002|
|student7|           65|    165.1| 36.28736|             1.651|
|student1|           64|   162.56| 40.82328|            1.6256|
|student5|           60|    152.4| 36.28736|             1.524|
+--------+-------------+---------+---------+------------------+



Create a column named bmi


In [49]:
# compute bmi using the below formula
# BMI = weight/(height * height)
# weight must be in kgs
# height must be in meters
df = df.withColumn("BMI", expr("weight_kg / (height_meters * height_meters)"))
df.show()

+--------+-------------+---------+---------+------------------+------------------+
| student|height_inches|height_cm|weight_kg|     height_meters|               BMI|
+--------+-------------+---------+---------+------------------+------------------+
|student6|           62|   157.48| 38.55532|            1.5748|15.546531093062187|
|student3|           69|   175.26| 43.09124|            1.7526|14.028892161964118|
|student2|           59|   149.86|  45.3592|1.4986000000000002|20.197328530250278|
|student7|           65|    165.1| 36.28736|             1.651|13.312549228648752|
|student1|           64|   162.56| 40.82328|            1.6256|15.448293591899683|
|student5|           60|    152.4| 36.28736|             1.524|15.623755691955827|
+--------+-------------+---------+---------+------------------+------------------+



Drop the columns height_inches, weight_pounds


In [50]:
# drop the columns "height_inches","weight_pounds"
df = df.drop("height_inches", "weight_pounds")
df.show()

+--------+---------+---------+------------------+------------------+
| student|height_cm|weight_kg|     height_meters|               BMI|
+--------+---------+---------+------------------+------------------+
|student6|   157.48| 38.55532|            1.5748|15.546531093062187|
|student3|   175.26| 43.09124|            1.7526|14.028892161964118|
|student2|   149.86|  45.3592|1.4986000000000002|20.197328530250278|
|student7|    165.1| 36.28736|             1.651|13.312549228648752|
|student1|   162.56| 40.82328|            1.6256|15.448293591899683|
|student5|    152.4| 36.28736|             1.524|15.623755691955827|
+--------+---------+---------+------------------+------------------+



In [51]:
# Round the column bmi
from pyspark.sql.functions import col, round
df = df.withColumn("BMI_rounded", round(col("BMI")))
df.show()

+--------+---------+---------+------------------+------------------+-----------+
| student|height_cm|weight_kg|     height_meters|               BMI|BMI_rounded|
+--------+---------+---------+------------------+------------------+-----------+
|student6|   157.48| 38.55532|            1.5748|15.546531093062187|       16.0|
|student3|   175.26| 43.09124|            1.7526|14.028892161964118|       14.0|
|student2|   149.86|  45.3592|1.4986000000000002|20.197328530250278|       20.0|
|student7|    165.1| 36.28736|             1.651|13.312549228648752|       13.0|
|student1|   162.56| 40.82328|            1.6256|15.448293591899683|       15.0|
|student5|    152.4| 36.28736|             1.524|15.623755691955827|       16.0|
+--------+---------+---------+------------------+------------------+-----------+



### Exercise 3 - Load


Save the dataframe into a parquet file


In [52]:
#Write the data to a Parquet file, set the mode to overwrite
df.write.mode("overwrite").parquet("studnet_transformed.parquet")

                                                                                

Stop Spark Session


In [53]:
spark.stop()

Congratulations you have completed this lab.<br>


## Authors


[Ramesh Sannareddy](https://www.linkedin.com/in/rsannareddy/?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMBD0231ENSkillsNetwork866-2023-01-01)


### Other Contributors


## Change Log


|Date (YYYY-MM-DD)|Version|Changed By|Change Description|
|-|-|-|-|
|2023-05-21|0.1|Ramesh Sannareddy|Initial Version Created|


Copyright © 2023 IBM Corporation. All rights reserved.
