<p style="text-align:center">
    <a href="https://skills.network/?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMSkillsNetworkBD0231ENCoursera2789-2023-01-01">
    <img src="https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/assets/logos/SN_web_lightmode.png" width="200" alt="Skills Network Logo">
    </a>
</p>


## ETL using Spark


Estimated time needed: **30** minutes


<p style='color: red'>The purpose of this lab is to show you how to use Spark for ETL jobs.


## __Table of Contents__

<ol>
  <li>
    <a href="#Objectives">Objectives
    </a>
  </li>
  <li>
    <a href="#Datasets">Datasets
    </a>
  </li>
  <li>
    <a href="#Setup">Setup
    </a>
    <ol>
      <li>
        <a href="#Installing-Required-Libraries">Installing Required Libraries
        </a>
      </li>
      <li>
        <a href="#Importing-Required-Libraries">Importing Required Libraries
        </a>
      </li>
    </ol>
  </li>
  <li> 
    <a href="#Examples">Examples
    </a>
    <ol>
    <li>
      <a href="#Task-1---Create-a-Dataframe-from-the-raw-data-and-write-to-CSV-file.">Task 1 - Create a Dataframe from the raw data and write to CSV file.
      </a>
    </li>
    <li>
      <a href="#Task-2---Read-from-a-csv-file-and-write-to-parquet-file">Task 2 - Read from a csv file and write to parquet file
      </a>
    </li>
    <li>
      <a href="#Task-3---Condense-PARQUET-to-a-single-file.">Task 3 - Condense PARQUET to a single file.
      </a>
    </li>
    <li>
      <a href="#Task-4---Read-from-a-parquet-file-and-write-to-csv-file">Task 4 - Read from a parquet file and write to csv file
      </a>
    </li>
      </ol>
  <li>
    <a href="#Exercises">Exercises
    </a>
  </li>
  <ol>
    <li>
      <a href="#Exercise-1---Extract">Exercise 1 - Extract
      </a>
    </li>
    <li>
      <a href="#Exercise-2---Transform">Exercise 2 - Transform
      </a>
    </li>
    <li>
      <a href="#Exercise-3---Load">Exercise 3 - Load
      </a>
    </li>
  </ol>
</ol>


## Objectives

After completing this lab you will be able to:
 
 - Create a Spark Dataframe from the raw data and write to CSV file.
 - Read from a csv file and write to parquet file
 - Condense PARQUET to a single file.
 - Read from a parquet file and write to csv file


----


## Setup


For this lab, we will be using the following libraries:

*   [`PySpark`](https://spark.apache.org/docs/latest/api/python/index.html?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMSkillsNetworkBD0231ENCoursera2789-2023-01-01) for connecting to the Spark Cluster


### Installing Required Libraries

Spark Cluster is pre-installed in the Skills Network Labs environment. However, you need libraries like pyspark and findspark to connect to this cluster.

If you wish to download this jupyter notebook and run on your local computer, follow the instructions mentioned <a href="https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/labs/Connecting_to_spark_cluster_using_Skills_Network_labs.ipynb">here.</a>



The following required libraries are __not__ pre-installed in the Skills Network Labs environment. __You will need to run the following cell__ to install them:


In [1]:
!pip install pyspark==3.1.2 -q
!pip install findspark -q

### Importing Required Libraries

_We recommend you import all required libraries in one place (here):_


In [2]:
# You can also use this section to suppress warnings generated by your code:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python

import findspark
findspark.init()

from pyspark.sql import SparkSession


In [3]:
#Create SparkSession
#Ignore any warnings by SparkSession command

spark = SparkSession.builder.appName("ETL using Spark").getOrCreate()

24/11/05 15:28:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Task 1 - Create a Dataframe from the raw data and write to CSV file.


In [6]:
#create a list of tuples
#each tuple contains the student id, height and weight
data = [("student1",64,90),
        ("student2",59,100),
        ("student3",69,95),
        ("",70,110),
        ("student5",60,80),
        ("student3",69,95),
        ("student6",62,85),
        ("student7",65,80),
        ("student7",65,80)]

# some rows are intentionally duplicated

In [8]:
#create a dataframe using createDataFrame and pass the data and the column names.

df = spark.createDataFrame(data, ["student","height_inches","weight_pounds"])

In [9]:
# show the data frame

df.show()

                                                                                

+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student1|           64|           90|
|student2|           59|          100|
|student3|           69|           95|
|        |           70|          110|
|student5|           60|           80|
|student3|           69|           95|
|student6|           62|           85|
|student7|           65|           80|
|student7|           65|           80|
+--------+-------------+-------------+



Write to csv file

>**Note: In Apache Spark, when you use the write method to save a DataFrame to a CSV file, it indeed creates a directory rather than a single file. This is because Spark is designed to run in a distributed manner across multiple nodes, and it saves the output as multiple part files within a directory.The csv file is within the directory.**


In [10]:
df.write.mode("overwrite").csv("student-hw.csv", header=True)

                                                                                

In [None]:
#If you do not wish to over write use df.write.csv("student-hw.csv", header=True)

Verify the csv file


In [12]:
# Load student dataset
df = spark.read.csv("student-hw.csv", header=True, inferSchema=True)

# display dataframe
df.show()

+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student7|           65|           80|
|student7|           65|           80|
|student2|           59|          100|
|student1|           64|           90|
|student3|           69|           95|
|student5|           60|           80|
|student3|           69|           95|
|student6|           62|           85|
|    null|           70|          110|
+--------+-------------+-------------+



## Task 2 - Read from a csv file and write to parquet file


In [13]:
# Load student dataset
df = spark.read.csv("student-hw.csv", header=True, inferSchema=True)

# display dataframe
df.show()

+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student7|           65|           80|
|student7|           65|           80|
|student2|           59|          100|
|student1|           64|           90|
|student3|           69|           95|
|student5|           60|           80|
|student3|           69|           95|
|student6|           62|           85|
|    null|           70|          110|
+--------+-------------+-------------+



In [14]:
# print the number of rows in the dataframe
df.count()

9

Drop Duplicates


In [15]:
df = df.dropDuplicates()

In [16]:
df.show()

                                                                                

+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student6|           62|           85|
|student3|           69|           95|
|student2|           59|          100|
|student7|           65|           80|
|    null|           70|          110|
|student1|           64|           90|
|student5|           60|           80|
+--------+-------------+-------------+



                                                                                

In [None]:
#Notice that the duplicates are removed

In [17]:
# print the number of rows in the dataframe
df.count()

                                                                                

7

Drop Null values


In [18]:
df=df.dropna()

In [19]:
#Observe the rows with null values getting dropped
df.show()

                                                                                

+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student6|           62|           85|
|student3|           69|           95|
|student2|           59|          100|
|student7|           65|           80|
|student1|           64|           90|
|student5|           60|           80|
+--------+-------------+-------------+



                                                                                

Save to parquet file


In [20]:
#Write the data to a Parquet file
df.write.mode("overwrite").parquet("student-hw.parquet")

                                                                                

In [None]:
# if you do not wish to overwrite use the command df.write.parquet("student-hw.parquet")

In [None]:
# verify that the parquet file(s) are created

In [21]:
!!ls -l student-hw.parquet

['total 7',
 '-rw-r--r-- 1 jupyterlab resources   0 Nov  5 15:36 _SUCCESS',
 '-rw-r--r-- 1 jupyterlab resources 467 Nov  5 15:36 part-00000-5aaf4f87-b8e6-4abc-b237-500629aa692d-c000.snappy.parquet',
 '-rw-r--r-- 1 jupyterlab resources 911 Nov  5 15:36 part-00003-5aaf4f87-b8e6-4abc-b237-500629aa692d-c000.snappy.parquet',
 '-rw-r--r-- 1 jupyterlab resources 911 Nov  5 15:36 part-00010-5aaf4f87-b8e6-4abc-b237-500629aa692d-c000.snappy.parquet',
 '-rw-r--r-- 1 jupyterlab resources 911 Nov  5 15:36 part-00054-5aaf4f87-b8e6-4abc-b237-500629aa692d-c000.snappy.parquet',
 '-rw-r--r-- 1 jupyterlab resources 911 Nov  5 15:36 part-00132-5aaf4f87-b8e6-4abc-b237-500629aa692d-c000.snappy.parquet',
 '-rw-r--r-- 1 jupyterlab resources 911 Nov  5 15:36 part-00172-5aaf4f87-b8e6-4abc-b237-500629aa692d-c000.snappy.parquet',
 '-rw-r--r-- 1 jupyterlab resources 911 Nov  5 15:36 part-00186-5aaf4f87-b8e6-4abc-b237-500629aa692d-c000.snappy.parquet']

Notice that there are a lot of .parquet files in the output.
- To improve parallellism, spark stores each dataframe in multiple partitions.
- When the data is saved as parquet file, each partition is saved as a separate file.


## Task 3 - Condense PARQUET to a single file.


Reduce the number of partitions in the dataframe to one.


In [22]:
df = df.repartition(1)

Save to parquet file


In [24]:
#Write the data to a Parquet file
df.write.mode("overwrite").parquet("student-hw-single.parquet")

                                                                                

In [None]:
# if you do not wish to overwrite use the command df.write.parquet("student-hw-single.parquet")

In [None]:
# verify that the parquet file(s) are created

In [26]:
!ls -l student-hw-single.parquet

total 2
-rw-r--r-- 1 jupyterlab resources   0 Nov  5 15:38 _SUCCESS
-rw-r--r-- 1 jupyterlab resources 944 Nov  5 15:38 part-00000-d48a1cc7-c2f8-4465-839f-f0ff393f5be9-c000.snappy.parquet


In [None]:
#Notice that there is only one .parquet file

## Task 4 - Read from a parquet file and write to csv file


In [36]:
df = spark.read.parquet("student-hw-single.parquet")

In [28]:
df.show()

+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student6|           62|           85|
|student3|           69|           95|
|student2|           59|          100|
|student7|           65|           80|
|student1|           64|           90|
|student5|           60|           80|
+--------+-------------+-------------+



Transform the data


In [30]:
#import the expr function that helps in transforming the data
from pyspark.sql.functions import expr

Convert inches to centimeters


In [31]:
# Convert inches to centimeters
# Multiply the column height_inches with 2.54 to get a new column height_centimeters
df = df.withColumn("height_centimeters", expr("height_inches * 2.54"))
df.show()

+--------+-------------+-------------+------------------+
| student|height_inches|weight_pounds|height_centimeters|
+--------+-------------+-------------+------------------+
|student6|           62|           85|            157.48|
|student3|           69|           95|            175.26|
|student2|           59|          100|            149.86|
|student7|           65|           80|            165.10|
|student1|           64|           90|            162.56|
|student5|           60|           80|            152.40|
+--------+-------------+-------------+------------------+



Convert pounds to kilograms


In [32]:
# Convert pounds to kilograms
# Multiply weight_pounds with 0.453592 to get a new column weight_kg
df = df.withColumn("weight_kg", expr("weight_pounds * 0.453592"))
df.show()

+--------+-------------+-------------+------------------+---------+
| student|height_inches|weight_pounds|height_centimeters|weight_kg|
+--------+-------------+-------------+------------------+---------+
|student6|           62|           85|            157.48|38.555320|
|student3|           69|           95|            175.26|43.091240|
|student2|           59|          100|            149.86|45.359200|
|student7|           65|           80|            165.10|36.287360|
|student1|           64|           90|            162.56|40.823280|
|student5|           60|           80|            152.40|36.287360|
+--------+-------------+-------------+------------------+---------+



Drop the columns


In [33]:
# drop the columns "height_inches","weight_pounds"
df = df.drop("height_inches","weight_pounds")
df.show()

+--------+------------------+---------+
| student|height_centimeters|weight_kg|
+--------+------------------+---------+
|student6|            157.48|38.555320|
|student3|            175.26|43.091240|
|student2|            149.86|45.359200|
|student7|            165.10|36.287360|
|student1|            162.56|40.823280|
|student5|            152.40|36.287360|
+--------+------------------+---------+



Rename a column


In [34]:
# rename the lengthy column name "height_centimeters" to "height_cm"
df = df.withColumnRenamed("height_centimeters","height_cm")
df.show()

+--------+---------+---------+
| student|height_cm|weight_kg|
+--------+---------+---------+
|student6|   157.48|38.555320|
|student3|   175.26|43.091240|
|student2|   149.86|45.359200|
|student7|   165.10|36.287360|
|student1|   162.56|40.823280|
|student5|   152.40|36.287360|
+--------+---------+---------+



Save to csv file


In [39]:
df.write.mode("overwrite").csv("student_transformed.csv", header=True)

24/11/05 15:53:14 ERROR util.Utils: Aborting task
java.io.FileNotFoundException: File file:/resources/labs/authoride/IBMSkillsNetwork+BD0231EN/labs/student_transformed.csv/part-00000-feeb90a6-022e-4684-bd2d-373371fad053-c000.csv does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasN

Py4JJavaError: An error occurred while calling o119.csv.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:664)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 64.0 failed 1 times, most recent failure: Lost task 0.0 in stage 64.0 (TID 1339, localhost, executor driver): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: File file:/resources/labs/authoride/IBMSkillsNetwork+BD0231EN/labs/student_transformed.csv/part-00000-feeb90a6-022e-4684-bd2d-373371fad053-c000.csv does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:244)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
	... 10 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
	... 33 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more
Caused by: java.io.FileNotFoundException: File file:/resources/labs/authoride/IBMSkillsNetwork+BD0231EN/labs/student_transformed.csv/part-00000-feeb90a6-022e-4684-bd2d-373371fad053-c000.csv does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:244)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
	... 10 more


Verify the csv file


In [40]:
# Load student dataset
df = spark.read.csv("student_transformed.csv", header=True, inferSchema=True)
# display dataframe
df.show()

AnalysisException: 'Unable to infer schema for CSV. It must be specified manually.;'

Stop Spark Session


In [41]:
spark.stop()

# Exercises


Create Spark Session


In [42]:
#Create SparkSession
#Ignore any warnings by SparkSession command

spark = SparkSession.builder.appName("Exercises - ETL using Spark").getOrCreate()

### Exercise 1 - Extract


Load data from student_transformed.csv into a dataframe


In [None]:
# Load student dataset
df = #TODO
# display dataframe
#TODO

<details>
    <summary>Click here for a Hint</summary>
    
Use spark.read.csv

</details>


<details>
    <summary>Click here for Solution</summary>

```
# Load student dataset
df = spark.read.csv("student_transformed.csv", header=True, inferSchema=True)
# display dataframe
df.show()
```

</details>


### Exercise 2 - Transform


Convert cm to meters


In [None]:
#import the expr function that helps in transforming the data


In [None]:
# Convert centimeters to meters
# Divide the column height_cm by 100 a new column height_cm
df = #TODO
# display dataframe
#TODO

<details>
    <summary>Click here for a Hint</summary>
    
Use df.withColumn() method
</details>


<details>
    <summary>Click here for Solution</summary>

```
# Divide the column height_cm by 100 a new column height_cm
df = df.withColumn("height_meters", expr("height_cm / 100"))
# display dataframe
df.show()
```

</details>


Create a column named bmi


In [None]:
# compute bmi using the below formula
# BMI = weight/(height * height)
# weight must be in kgs
# height must be in meters
df = #TODO
# display dataframe
#TODO

<details>
    <summary>Click here for a Hint</summary>
    
Use df.withColumn() method
</details>


<details>
    <summary>Click here for Solution</summary>

```
df = df.withColumn("bmi", expr("weight_kg/(height_meters*height_meters)"))
# display dataframe
df.show()
```

</details>


Drop the columns height_cm, weight_kg and height_meters


In [None]:
# Drop the columns height_cm, weight_kg and height_meters
df = #TODO
# display dataframe
#TODO

<details>
    <summary>Click here for a Hint</summary>
    
Use df.drop()
</details>


<details>
    <summary>Click here for Solution</summary>

```
# Drop the columns height_cm, weight_kg and height_meters"
df = df.drop("height_cm","weight_kg","height_meters")
# display dataframe
df.show()
```

</details>


In [43]:
# Let us round the column bmi
from pyspark.sql.functions import col, round
df = df.withColumn("bmi_rounded", round(col("bmi")))
df.show()

AnalysisException: "cannot resolve '`bmi`' given input columns: [student, height_inches, weight_pounds];;\n'Project [student#283, height_inches#284, weight_pounds#285, round('bmi, 0) AS bmi_rounded#308]\n+- Relation[student#283,height_inches#284,weight_pounds#285] csv\n"

### Exercise 3 - Load


Save the dataframe into a parquet file


In [None]:
#Write the data to a Parquet file, set the mode to overwrite
#TODO

<details>
    <summary>Click here for a Hint</summary>
    
Use df.write

</details>


<details>
    <summary>Click here for Solution</summary>

```
df.write.mode("overwrite").parquet("student_transformed.parquet")
```

</details>


Stop Spark Session


In [None]:
spark.stop()

Congratulations you have completed this lab.<br>


## Authors


[Ramesh Sannareddy](https://www.linkedin.com/in/rsannareddy/?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMBD0231ENSkillsNetwork866-2023-01-01)


### Other Contributors


Copyright © 2023 IBM Corporation. All rights reserved.


<!--
## Change Log
-->


<!--
|Date (YYYY-MM-DD)|Version|Changed By|Change Description|
|-|-|-|-|
|2023-05-21|0.1|Ramesh Sannareddy|Initial Version Created|
-->
