<p style="text-align:center">
    <a href="https://skills.network" target="_blank">
    <img src="https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/assets/logos/SN_web_lightmode.png" width="200" alt="Skills Network Logo">
    </a>
</p>


# Practice Project

Estimated time needed: **60** minutes

This practice project focuses on data transformation and integration using PySpark. You will work with two datasets, perform various transformations such as adding columns, renaming columns, dropping unnecessary columns, joining dataframes, and finally, writing the results into both a Hive warehouse and an HDFS file system.


### Prerequisites 

For this lab assignment, you will use wget, Python and Spark (PySpark). Therefore, it's essential to make sure that the below-specified libraries are installed in your lab environment or within Skills Network (SN) Labs.  

 


In [1]:
# Installing required packages

!pip install wget pyspark  findspark

Collecting wget
  Downloading wget-3.2.zip (10 kB)
  Installing build dependencies: started
  Installing build dependencies: finished with status 'done'
  Getting requirements to build wheel: started
  Getting requirements to build wheel: finished with status 'done'
  Preparing metadata (pyproject.toml): started
  Preparing metadata (pyproject.toml): finished with status 'done'
Building wheels for collected packages: wget
  Building wheel for wget (pyproject.toml): started
  Building wheel for wget (pyproject.toml): finished with status 'done'
  Created wheel for wget: filename=wget-3.2-py3-none-any.whl size=9680 sha256=3bfe1f6154e037c3aace917fa554826a29b993a316b63ecd42092c3e71f47f75
  Stored in directory: c:\users\test\appdata\local\pip\cache\wheels\04\5f\3e\46cc37c5d698415694d83f607f833f83f0149e49b3af9d0f38
Successfully built wget
Installing collected packages: wget
Successfully installed wget-3.2




#### Prework - Initiate the Spark Session


In [2]:
import findspark

findspark.init()

In [3]:
# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the SparkContext.   

from pyspark import SparkContext, SparkConf

from pyspark.sql import SparkSession

In [4]:
# Creating a SparkContext object

sc = SparkContext.getOrCreate()

# Creating a Spark Session

spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

### Task 1: Load datasets into PySpark DataFrames

Download the datasets from the folloing links using `wget` and load it in a Spark Dataframe.

1. https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset1.csv  
2. https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset2.csv  

*Hint: Import wget*


In [None]:
#download dataset using wget



<details>
    <summary>Click here for Solution</summary>

```python
# download the dataset using wget
import wget

link_to_data1 = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset1.csv'
wget.download(link_to_data1)

link_to_data2 = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset2.csv'
wget.download(link_to_data2)
```

</details>


In [10]:
df1 =  spark.read.csv("dataset1.csv",header=True, inferSchema=True)
df2 =  spark.read.csv("dataset2.csv", header=True, inferSchema=True)


<details>
    <summary>Click here for Solution</summary>

```python

#load the data into a pyspark dataframe
    
df1 = spark.read.csv("dataset1.csv", header=True, inferSchema=True)
df2 = spark.read.csv("dataset2.csv", header=True, inferSchema=True)
```

</details>




### Task 2: Display the schema of both dataframes

Display the schema of `df1` and `df2` to understand the structure of the datasets.


In [11]:
#print the schema of df1 and df2
df1.printSchema()
df2.printSchema()


root
 |-- customer_id: integer (nullable = true)
 |-- date_column: string (nullable = true)
 |-- amount: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- location: string (nullable = true)

root
 |-- customer_id: integer (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- value: integer (nullable = true)
 |-- notes: string (nullable = true)




<details>
    <summary>Click here for Solution</summary>

```python

#print the schema of df1 and df2
    
df1.printSchema()
df2.printSchema()
```

</details>




#### Task 3: Add a new column to each dataframe

Add a new column named **year** to `df1` and **quarter** to `df2` representing the year and quarter of the data.

*Hint: use withColumn. Convert the date columns which are present as string to date before extracting the year and quarter information*




In [22]:
from pyspark.sql.functions import year, quarter, to_date

spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")
#Add new column year to df1
# df1 = df1.withColumn("year",year(to_date('date_column','dd/MM/yyyy')))
# df1.show()
#Add new column quarter to df2    
df2 = df2.withColumn('quarter', quarter(to_date('transaction_date','dd/MM/yyyy')))
df2.show()



+-----------+----------------+-----+-------+-------+
|customer_id|transaction_date|value|  notes|quarter|
+-----------+----------------+-----+-------+-------+
|          1|        1/1/2022| 1500| Note 1|      1|
|          2|       15/2/2022| 2000| Note 2|      1|
|          3|       20/3/2022| 1000| Note 3|      1|
|          4|       10/4/2022| 2500| Note 4|      2|
|          5|        5/5/2022| 1800| Note 5|      2|
|          6|       10/6/2022| 1200| Note 6|      2|
|          7|       15/7/2022|  700| Note 7|      3|
|          8|       20/8/2022| 3000| Note 8|      3|
|          9|       25/9/2022|  600| Note 9|      3|
|         10|      30/10/2022| 1200|Note 10|      4|
|         11|       5/11/2022| 1500|Note 11|      4|
|         12|      10/12/2022|  800|Note 12|      4|
|         13|       15/1/2023| 2000|Note 13|      1|
|         14|       20/2/2023|  700|Note 14|      1|
|         15|       25/3/2023| 1800|Note 15|      1|
|         16|       30/4/2023| 1000|Note 16|  


<details>
    <summary>Click here for Solution</summary>

```python
from pyspark.sql.functions import year, quarter

#Add new column year to df1
df1 = df1.withColumn('year', year(to_date('date_column','dd/MM/yyyy')))
    
#Add new column quarter to df2    
df2 = df2.withColumn('quarter', quarter(to_date('transaction_date','dd/MM/yyyy')))```

</details>





#### Task 4: Rename columns in both dataframes

Rename the column **amount** to **transaction_amount** in `df1` and **value** to **transaction_value** in `df2`.

*Hint: Use withColumnRenamed*


In [25]:
#Rename df1 column amount to transaction_amount
df1 = df1.withColumnRenamed("amount","transaction_amount")
df1.show()
#Rename df2 column value to transaction_value
df2 = df2.withColumnRenamed("value","transaction_value")
df2.show()

+-----------+-----------+------------------+-----------+--------+----+
|customer_id|date_column|transaction_amount|description|location|year|
+-----------+-----------+------------------+-----------+--------+----+
|          1|   1/1/2022|              5000| Purchase A| Store A|2022|
|          2|  15/2/2022|              1200| Purchase B| Store B|2022|
|          3|  20/3/2022|               800| Purchase C| Store C|2022|
|          4|  10/4/2022|              3000| Purchase D| Store D|2022|
|          5|   5/5/2022|              6000| Purchase E| Store E|2022|
|          6|  10/6/2022|              4500| Purchase F| Store F|2022|
|          7|  15/7/2022|               200| Purchase G| Store G|2022|
|          8|  20/8/2022|              3500| Purchase H| Store H|2022|
|          9|  25/9/2022|               700| Purchase I| Store I|2022|
|         10| 30/10/2022|              1800| Purchase J| Store J|2022|
|         11|  5/11/2022|              2200| Purchase K| Store K|2022|
|     



<details>
    <summary>Click here for Solution</summary>

```python

#Rename df1 column amount to transaction_amount
df1 = df1.withColumnRenamed('amount', 'transaction_amount')
    
#Rename df2 column value to transaction_value
df2 = df2.withColumnRenamed('value', 'transaction_value')
```

</details>





#### Task 5: Drop unnecessary columns

Drop the columns **description** and **location** from `df1` and **notes** from `df2`.




In [26]:
#Drop columns description and location from df1
df1.drop("description","location")

#Drop column notes from df2

df2.drop("notes")

DataFrame[customer_id: int, transaction_date: string, transaction_value: int, quarter: int]




<details>
    <summary>Click here for Solution</summary>

```python

#Drop columns description and location from df1
df1 = df1.drop('description', 'location')
    
#Drop column notes from df2
df2 = df2.drop('notes')
```

</details>





#### Task 6: Join dataframes based on a common column

Join `df1` and `df2` based on the common column **customer_id** and create a new dataframe named `joined_df`.




In [31]:
#join df1 and df2 based on common column customer_id

df1.join(df2, on="customer_id",how="inner")
df1.show()

+-----------+-----------+------------------+-----------+--------+----+
|customer_id|date_column|transaction_amount|description|location|year|
+-----------+-----------+------------------+-----------+--------+----+
|          1|   1/1/2022|              5000| Purchase A| Store A|2022|
|          2|  15/2/2022|              1200| Purchase B| Store B|2022|
|          3|  20/3/2022|               800| Purchase C| Store C|2022|
|          4|  10/4/2022|              3000| Purchase D| Store D|2022|
|          5|   5/5/2022|              6000| Purchase E| Store E|2022|
|          6|  10/6/2022|              4500| Purchase F| Store F|2022|
|          7|  15/7/2022|               200| Purchase G| Store G|2022|
|          8|  20/8/2022|              3500| Purchase H| Store H|2022|
|          9|  25/9/2022|               700| Purchase I| Store I|2022|
|         10| 30/10/2022|              1800| Purchase J| Store J|2022|
|         11|  5/11/2022|              2200| Purchase K| Store K|2022|
|     

<details>
    <summary>Click here for Solution</summary>

```python

#join df1 and df2 based on common column customer_id
joined_df = df1.join(df2, 'customer_id', 'inner')
    
```

</details>


#### Task 7: Filter data based on a condition

Filter `joined_df` to include only transactions where "transaction_amount" is greater than 1000 and create a new dataframe named `filtered_df`.





In [None]:
# filter the dataframe for transaction amount > 1000


<details>
    <summary>Click here for Solution</summary>

```python

# filter the dataframe for transaction amount > 1000
filtered_df = joined_df.filter("transaction_amount > 1000")    
```

</details>


#### Task 8: Aggregate data by customer

Calculate the total transaction amount for each customer in `filtered_df` and display the result.

*Hint: Use sum from pyspark.sql.functions*


In [None]:

# group by customer_id and aggregate the sum of transaction amount


#display the result



<details>
    <summary>Click here for Solution</summary>

```python

from pyspark.sql.functions import sum
   
# group by customer_id and aggregate the sum of transaction amount

total_amount_per_customer = filtered_df.groupBy('customer_id').agg(sum('transaction_amount').alias('total_amount'))

#display the result
total_amount_per_customer.show()
```

</details>


#### Task 9: Write the result to a Hive table

Write `total_amount_per_customer` to a Hive table named **customer_totals**.


In [None]:
# Write total_amount_per_customer to a Hive table named customer_totals




<details>
    <summary>Click here for Solution</summary>

```python

# Write total_amount_per_customer to a Hive table named customer_totals
total_amount_per_customer.write.mode("overwrite").saveAsTable("customer_totals")
```

</details>


#### Task 10: Write the filtered data to HDFS

Write `filtered_df` to HDFS in parquet format to a file named **filtered_data**.


In [None]:
#Write filtered_df to HDFS in parquet format file filtered_data.parquet




<details>
    <summary>Click here for Solution</summary>

```python

#Write filtered_df to HDFS in parquet format file filtered_data

filtered_df.write.mode("overwrite").parquet("filtered_data.parquet")
```

</details>


#### Task 11: Add a new column based on a condition

Add a new column named **high_value** to `df1` indicating whether the transaction_amount is greater than 5000. When the value is greater than 5000, the value of the column should be **Yes**. When the value is less than or equal to 5000, the value of the column should be **No**. 

*Hint: Use when and lit from pyspark.sql.functions


In [None]:
# Add new column with value indicating whether transaction amount is > 5000 or not




<details>
    <summary>Click here for Solution</summary>

```python

from pyspark.sql.functions import when, lit

# Add new column with value indicating whether transaction amount is > 5000 or not
df1 = df1.withColumn("high_value", when(df1.transaction_amount > 5000, lit("Yes")).otherwise(lit("No")))
```

</details>


#### Task 12: Calculate the average transaction value per quarter

Calculate and display the average transaction value for each quarter in `df2` and create a new dataframe named `average_value_per_quarter` with column `avg_trans_val`.

*Hint: Use avg from pyspark.sql.functions*


In [None]:
#calculate the average transaction value for each quarter in df2


#show the average transaction value for each quarter in df2    




<details>
    <summary>Click here for Solution</summary>

```python
from pyspark.sql.functions import avg

#calculate the average transaction value for each quarter in df2
average_value_per_quarter = df2.groupBy('quarter').agg(avg("transaction_value").alias("avg_trans_val"))

    
#show the average transaction value for each quarter in df2    
average_value_per_quarter.show()

```

</details>


#### Task 13: Write the result to a Hive table

Write `average_value_per_quarter` to a Hive table named **quarterly_averages**.


In [None]:
#Write average_value_per_quarter to a Hive table named quarterly_averages




<details>
    <summary>Click here for Solution</summary>

```python

#Write average_value_per_quarter to a Hive table named quarterly_averages

average_value_per_quarter.write.mode("overwrite").saveAsTable("quarterly_averages")

```

</details>


#### Task 14: Calculate the total transaction value per year

Calculate and display the total transaction value for each year in `df1` and create a new dataframe named `total_value_per_year` with column `total_transaction_val`.


In [None]:
# calculate the total transaction value for each year in df1.


# show the total transaction value for each year in df1.




<details>
    <summary>Click here for Solution</summary>

```python

# calculate the total transaction value for each year in df1.
total_value_per_year = df1.groupBy('year').agg(sum("transaction_amount").alias("total_transaction_val"))

# show the total transaction value for each year in df1.
total_value_per_year.show()

```

</details>


#### Task 15: Write the result to HDFS

Write `total_value_per_year` to HDFS in the CSV format to file named **total_value_per_year**.



In [None]:
#Write total_value_per_year to HDFS in the CSV format




<details>
    <summary>Click here for Solution</summary>

```python

#Write total_value_per_year to HDFS in the CSV format

total_value_per_year.write.mode("overwrite").csv("total_value_per_year.csv")

```

</details>


### Congratulations! You have completed the lab.
This practice project provides hands-on experience with data transformation and integration using PySpark. You've performed various tasks, including adding columns, renaming columns, dropping unnecessary columns, joining dataframes, and writing the results into both a Hive warehouse and an HDFS file system.


## Authors

Raghul Ramesh

Lavanya T S


<!--## Change Log -->


<!--|Date (YYYY-MM-DD)|Version|Changed By|Change Description|
|-|-|-|-|
|2023-09-01|0.1|Lavanya T S|Initial version|
|2023-09-08|0.2|Pornima More|QA pass with edits|-->


<h3 align="center"> &#169; IBM Corporation. All rights reserved. <h3/>
