# ETL-PySpark Project


This  project focuses on data transformation and integration using PySpark. You will work with two datasets, perform various transformations such as adding columns, renaming columns, dropping unnecessary columns, joining dataframes, and finally, writing the results into both a Hive warehouse and an HDFS file system.


### Prerequisites 

For this project, you will use wget, Python and Spark (PySpark). Therefore, it's essential to make sure that the below-specified libraries are installed in your lab environment.  

 


In [1]:
# Installing required packages

!pip install wget pyspark  findspark

Collecting pyspark
  Downloading pyspark-3.4.3.tar.gz (311.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m311.4/311.4 MB[0m [31m1.3 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Collecting py4j==0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m24.6 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.4.3-py2.py3-none-any.whl size=311885504 sha256=681352eae57410733bef76ed9a6a9171cbffe74def7f94bf83cfde3f3d0a76f5
  Stored in directory: /home/jupyterlab/.cache/pip/wheels/37/bc/bb/77785f6fcd2c83e663647f73225b76f3a3d5fd00762d7daf6f
Successfully built pyspark
Installing colle

#### Prework - Initiate the Spark Session


In [2]:
import findspark

findspark.init()

In [3]:
# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the SparkContext.   

from pyspark import SparkContext, SparkConf

from pyspark.sql import SparkSession

In [4]:
# Creating a SparkContext object

sc = SparkContext.getOrCreate()

# Creating a Spark Session

spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

24/08/23 11:22:41 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### Task 1: Load datasets into PySpark DataFrames

Download the datasets from the folloing links using `wget` and load it in a Spark Dataframe.

1. https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset1.csv  
2. https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset2.csv  




In [5]:
#download dataset using wget
import wget

link_to_data1 = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset1.csv'
wget.download(link_to_data1)

link_to_data2 = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset2.csv'
wget.download(link_to_data2)

'dataset2.csv'

In [6]:
#load the data into a pyspark dataframe

df1= spark.read.csv('dataset1.csv',header=True, inferSchema=True)
df2= spark.read.csv('dataset2.csv',header=True, inferSchema=True)

                                                                                

### Task 2: Display the schema of both dataframes

Display the schema of `df1` and `df2` to understand the structure of the datasets.


In [7]:
#print the schema of df1 and df2

df1.printSchema()
df2.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- date_column: string (nullable = true)
 |-- amount: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- location: string (nullable = true)

root
 |-- customer_id: integer (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- value: integer (nullable = true)
 |-- notes: string (nullable = true)



#### Task 3: Add a new column to each dataframe

Add a new column named **year** to `df1` and **quarter** to `df2` representing the year and quarter of the data.





In [12]:
from pyspark.sql.functions import year, quarter, to_date

#Add new column year to df1

df1 = df1.withColumn('year',year(to_date('date_column','dd/MM/yyyy')))

#Add new column quarter to df2    
df2 = df2.withColumn('quarter',quarter(to_date('transaction_date','dd/MM/yyyy')))


df1.show(5)
df2.show(5)

+-----------+-----------+------+-----------+--------+----+
|customer_id|date_column|amount|description|location|year|
+-----------+-----------+------+-----------+--------+----+
|          1|   1/1/2022|  5000| Purchase A| Store A|2022|
|          2|  15/2/2022|  1200| Purchase B| Store B|2022|
|          3|  20/3/2022|   800| Purchase C| Store C|2022|
|          4|  10/4/2022|  3000| Purchase D| Store D|2022|
|          5|   5/5/2022|  6000| Purchase E| Store E|2022|
+-----------+-----------+------+-----------+--------+----+
only showing top 5 rows

+-----------+----------------+-----+------+-------+
|customer_id|transaction_date|value| notes|quarter|
+-----------+----------------+-----+------+-------+
|          1|        1/1/2022| 1500|Note 1|      1|
|          2|       15/2/2022| 2000|Note 2|      1|
|          3|       20/3/2022| 1000|Note 3|      1|
|          4|       10/4/2022| 2500|Note 4|      2|
|          5|        5/5/2022| 1800|Note 5|      2|
+-----------+---------------

#### Task 4: Rename columns in both dataframes

Rename the column **amount** to **transaction_amount** in `df1` and **value** to **transaction_value** in `df2`.




In [13]:
#Rename df1 column amount to transaction_amount
df1 = df1.withColumnRenamed('amount','transaction_amount')

#Rename df2 column value to transaction_value

df2 = df2.withColumnRenamed('value','transaction_value')


#### Task 5: Drop unnecessary columns

Drop the columns **description** and **location** from `df1` and **notes** from `df2`.




In [15]:
#Drop columns description and location from df1
df1 = df1.drop('description','location')

#Drop column notes from df2
df2 = df2.drop('notes')


#### Task 6: Join dataframes based on a common column

Join `df1` and `df2` based on the common column **customer_id** and create a new dataframe named `joined_df`.




In [16]:
#join df1 and df2 based on common column customer_id

joined_df = df1.join(df2,'customer_id','inner')

In [17]:
joined_df.show(5)

+-----------+-----------+------------------+----+----------------+-----------------+-------+
|customer_id|date_column|transaction_amount|year|transaction_date|transaction_value|quarter|
+-----------+-----------+------------------+----+----------------+-----------------+-------+
|          1|   1/1/2022|              5000|2022|        1/1/2022|             1500|      1|
|          2|  15/2/2022|              1200|2022|       15/2/2022|             2000|      1|
|          3|  20/3/2022|               800|2022|       20/3/2022|             1000|      1|
|          4|  10/4/2022|              3000|2022|       10/4/2022|             2500|      2|
|          5|   5/5/2022|              6000|2022|        5/5/2022|             1800|      2|
+-----------+-----------+------------------+----+----------------+-----------------+-------+
only showing top 5 rows



#### Task 7: Filter data based on a condition

Filter `joined_df` to include only transactions where "transaction_amount" is greater than 1000 and create a new dataframe named `filtered_df`.





In [18]:
# filter the dataframe for transaction amount > 1000
filtered_df = joined_df.filter("transaction_amount > 1000")

#### Task 8: Aggregate data by customer

Calculate the total transaction amount for each customer in `filtered_df` and display the result.


In [19]:
from pyspark.sql.functions import sum
# group by customer_id and aggregate the sum of transaction amount

total_amount_per_customer = filtered_df.groupBy('customer_id').agg(sum('transaction_amount').alias('total_amount'))

#display the result
total_amount_per_customer.show(5)


+-----------+------------+
|customer_id|total_amount|
+-----------+------------+
|         31|        3200|
|         85|        1800|
|         78|        1500|
|         34|        1200|
|         81|        5500|
+-----------+------------+
only showing top 5 rows



#### Task 9: Write the result to a Hive table

Write `total_amount_per_customer` to a Hive table named **customer_totals**.


In [20]:
# Write total_amount_per_customer to a Hive table named customer_totals
total_amount_per_customer.write.mode("overwrite").saveAsTable("customer_totals")


                                                                                

#### Task 10: Write the filtered data to HDFS

Write `filtered_df` to HDFS in parquet format to a file named **filtered_data**.


In [21]:
#Write filtered_df to HDFS in parquet format file filtered_data.parquet

filtered_df.write.mode("overwrite").parquet("filtered_data.parquet")

                                                                                

#### Task 11: Add a new column based on a condition

Add a new column named **high_value** to `df1` indicating whether the transaction_amount is greater than 5000. When the value is greater than 5000, the value of the column should be **Yes**. When the value is less than or equal to 5000, the value of the column should be **No**. 


In [22]:
# Add new column with value indicating whether transaction amount is > 5000 or not
from pyspark.sql.functions import when, lit
df1 = df1.withColumn("high_value", when(df1.transaction_amount > 5000, lit("Yes")).otherwise(lit("No")))


#### Task 12: Calculate the average transaction value per quarter

Calculate and display the average transaction value for each quarter in `df2` and create a new dataframe named `average_value_per_quarter` with column `avg_trans_val`.



In [23]:
from pyspark.sql.functions import avg

#calculate the average transaction value for each quarter in df2
average_value_per_quarter = df2.groupBy('quarter').agg(avg("transaction_value").alias("avg_trans_val"))

#show the average transaction value for each quarter in df2    
average_value_per_quarter.show(5)


                                                                                

+-------+------------------+
|quarter|     avg_trans_val|
+-------+------------------+
|      1| 1111.111111111111|
|      3|1958.3333333333333|
|      4| 816.6666666666666|
|      2|            1072.0|
+-------+------------------+



                                                                                

#### Task 13: Write the result to a Hive table

Write `average_value_per_quarter` to a Hive table named **quarterly_averages**.


In [24]:
#Write average_value_per_quarter to a Hive table named quarterly_averages
average_value_per_quarter.write.mode("overwrite").saveAsTable("quarterly_averages")



                                                                                

#### Task 14: Calculate the total transaction value per year

Calculate and display the total transaction value for each year in `df1` and create a new dataframe named `total_value_per_year` with column `total_transaction_val`.
> **Note:** The provided DataFrame `df1` does not explicitly have a `year` column initially. However, in Task 3, a new column named `year` is added to `df1` by extracting the year from the date column. Additionally, in Task 4, the column `amount` is renamed to `transaction_amount`.



In [25]:
# calculate the total transaction value for each year in df1.
total_value_per_year = df1.groupBy('year').agg(sum("transaction_amount").alias("total_transaction_val"))

# show the total transaction value for each year in df1.
total_value_per_year.show(5)


+----+---------------------+
|year|total_transaction_val|
+----+---------------------+
|2025|                25700|
|2027|                25700|
|2023|                28100|
|2022|                29800|
|2026|                25700|
+----+---------------------+
only showing top 5 rows



#### Task 15: Write the result to HDFS

Write `total_value_per_year` to HDFS in the CSV format to file named **total_value_per_year**.



In [26]:
#Write total_value_per_year to HDFS in the CSV format
total_value_per_year.write.mode("overwrite").csv("total_value_per_year.csv")



                                                                                

<!--## Change Log -->
