In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, when, quarter, to_date, col, avg, sum as _sum

spark = SparkSession.builder \
    .appName("TestApp") \
    .master("spark://spark-master:7077") \
    .getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/06 03:33:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

### Task 1: Load datasets into PySpark DataFrames

Download the datasets from the folloing links using `wget` and load it in a Spark Dataframe.

1. https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset1.csv  
2. https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset2.csv  

I used make file, and wget in bash to download the files

In [3]:
df_Raw_ds1=spark.read.csv(r"/opt/data/imports/dataset1.csv", header=True, inferSchema=True)

                                                                                

In [4]:
df_Raw_ds2=spark.read.csv(r"/opt/data/imports/dataset2.csv", header=True, inferSchema=True)

### Task 2: Display the schema of both dataframes

Display the schema of `df1` and `df2` to understand the structure of the datasets.


In [5]:
df_Raw_ds1.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- date_column: string (nullable = true)
 |-- amount: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- location: string (nullable = true)



In [6]:
df_Raw_ds2.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- value: integer (nullable = true)
 |-- notes: string (nullable = true)



#### Task 3: Add a new column to each dataframe

Add a new column named **year** to `df1` and **quarter** to `df2` representing the year and quarter of the data.

*Hint: use withColumn. Convert the date columns which are present as string to date before extracting the year and quarter information*




In [7]:
df_Raw_ds1=\
    df_Raw_ds1\
    .withColumn("year", year(to_date("date_column", "dd/MM/yyyy")))

In [8]:
df_Raw_ds1.show(5)

+-----------+-----------+------+-----------+--------+----+
|customer_id|date_column|amount|description|location|year|
+-----------+-----------+------+-----------+--------+----+
|          1|   1/1/2022|  5000| Purchase A| Store A|2022|
|          2|  15/2/2022|  1200| Purchase B| Store B|2022|
|          3|  20/3/2022|   800| Purchase C| Store C|2022|
|          4|  10/4/2022|  3000| Purchase D| Store D|2022|
|          5|   5/5/2022|  6000| Purchase E| Store E|2022|
+-----------+-----------+------+-----------+--------+----+
only showing top 5 rows



In [9]:
df_Raw_ds2=\
    df_Raw_ds2\
    .withColumn("quarter", quarter(to_date("transaction_date", "dd/MM/yyyy")))

In [10]:
df_Raw_ds2.show(5)

+-----------+----------------+-----+------+-------+
|customer_id|transaction_date|value| notes|quarter|
+-----------+----------------+-----+------+-------+
|          1|        1/1/2022| 1500|Note 1|      1|
|          2|       15/2/2022| 2000|Note 2|      1|
|          3|       20/3/2022| 1000|Note 3|      1|
|          4|       10/4/2022| 2500|Note 4|      2|
|          5|        5/5/2022| 1800|Note 5|      2|
+-----------+----------------+-----+------+-------+
only showing top 5 rows



#### Task 4: Rename columns in both dataframes

Rename the column **amount** to **transaction_amount** in `df1` and **value** to **transaction_value** in `df2`.

*Hint: Use withColumnRenamed*


In [11]:
df_Raw_ds1=\
    df_Raw_ds1\
    .withColumnRenamed("amount", "transaction_amount")

In [12]:
df_Raw_ds1.show(5)

+-----------+-----------+------------------+-----------+--------+----+
|customer_id|date_column|transaction_amount|description|location|year|
+-----------+-----------+------------------+-----------+--------+----+
|          1|   1/1/2022|              5000| Purchase A| Store A|2022|
|          2|  15/2/2022|              1200| Purchase B| Store B|2022|
|          3|  20/3/2022|               800| Purchase C| Store C|2022|
|          4|  10/4/2022|              3000| Purchase D| Store D|2022|
|          5|   5/5/2022|              6000| Purchase E| Store E|2022|
+-----------+-----------+------------------+-----------+--------+----+
only showing top 5 rows



In [13]:
df_Raw_ds2=\
    df_Raw_ds2\
    .withColumnRenamed("value", "transaction_value")

In [14]:
df_Raw_ds2.show(5)

+-----------+----------------+-----------------+------+-------+
|customer_id|transaction_date|transaction_value| notes|quarter|
+-----------+----------------+-----------------+------+-------+
|          1|        1/1/2022|             1500|Note 1|      1|
|          2|       15/2/2022|             2000|Note 2|      1|
|          3|       20/3/2022|             1000|Note 3|      1|
|          4|       10/4/2022|             2500|Note 4|      2|
|          5|        5/5/2022|             1800|Note 5|      2|
+-----------+----------------+-----------------+------+-------+
only showing top 5 rows



#### Task 5: Drop unnecessary columns

Drop the columns **description** and **location** from `df1` and **notes** from `df2`.




In [15]:
# Decided to drop the raw df here, for no specific reason other
# than realizing I was still using the raw naming

df_1 = df_Raw_ds1.drop("description", 'location')
df_2 = df_Raw_ds2.drop("note")

#### Task 6: Join dataframes based on a common column

Join `df1` and `df2` based on the common column **customer_id** and create a new dataframe named `joined_df`.




In [16]:
joined_df = df_1.join(df_2, on="customer_id", how='inner')

In [17]:
joined_df.show(5)

+-----------+-----------+------------------+----+----------------+-----------------+------+-------+
|customer_id|date_column|transaction_amount|year|transaction_date|transaction_value| notes|quarter|
+-----------+-----------+------------------+----+----------------+-----------------+------+-------+
|          1|   1/1/2022|              5000|2022|        1/1/2022|             1500|Note 1|      1|
|          2|  15/2/2022|              1200|2022|       15/2/2022|             2000|Note 2|      1|
|          3|  20/3/2022|               800|2022|       20/3/2022|             1000|Note 3|      1|
|          4|  10/4/2022|              3000|2022|       10/4/2022|             2500|Note 4|      2|
|          5|   5/5/2022|              6000|2022|        5/5/2022|             1800|Note 5|      2|
+-----------+-----------+------------------+----+----------------+-----------------+------+-------+
only showing top 5 rows



In [18]:
filtered_df=\
    joined_df\
    .filter(col("transaction_amount")>1000)

In [19]:
filtered_df.show(5)

+-----------+-----------+------------------+----+----------------+-----------------+------+-------+
|customer_id|date_column|transaction_amount|year|transaction_date|transaction_value| notes|quarter|
+-----------+-----------+------------------+----+----------------+-----------------+------+-------+
|          1|   1/1/2022|              5000|2022|        1/1/2022|             1500|Note 1|      1|
|          2|  15/2/2022|              1200|2022|       15/2/2022|             2000|Note 2|      1|
|          4|  10/4/2022|              3000|2022|       10/4/2022|             2500|Note 4|      2|
|          5|   5/5/2022|              6000|2022|        5/5/2022|             1800|Note 5|      2|
|          6|  10/6/2022|              4500|2022|       10/6/2022|             1200|Note 6|      2|
+-----------+-----------+------------------+----+----------------+-----------------+------+-------+
only showing top 5 rows



In [20]:
Agg_amount_by_customer=\
    filtered_df\
    .groupBy("customer_id")\
    .agg(_sum("transaction_amount").alias("Total_Transactions"))

In [21]:
Agg_amount_by_customer.show(5)

+-----------+------------------+
|customer_id|Total_Transactions|
+-----------+------------------+
|         31|              3200|
|         85|              1800|
|         78|              1500|
|         34|              1200|
|         81|              5500|
+-----------+------------------+
only showing top 5 rows



#### Task 10: Write the filtered data to HDFS

Write `filtered_df` to HDFS in parquet format to a file named **filtered_data**.


In [22]:
filtered_df.write.mode("overwrite").parquet("filtered_data.parquet")

                                                                                

In [23]:
df_1.show(5)

+-----------+-----------+------------------+----+
|customer_id|date_column|transaction_amount|year|
+-----------+-----------+------------------+----+
|          1|   1/1/2022|              5000|2022|
|          2|  15/2/2022|              1200|2022|
|          3|  20/3/2022|               800|2022|
|          4|  10/4/2022|              3000|2022|
|          5|   5/5/2022|              6000|2022|
+-----------+-----------+------------------+----+
only showing top 5 rows



#### Task 11: Add a new column based on a condition

Add a new column named **high_value** to `df1` indicating whether the transaction_amount is greater than 5000. When the value is greater than 5000, the value of the column should be **Yes**. When the value is less than or equal to 5000, the value of the column should be **No**. 

*Hint: Use when and lit from pyspark.sql.functions


In [24]:
df_1=\
     df_1\
    .withColumn("high_value", when(col("transaction_amount") > 5000, "Yes").otherwise("No"))

In [25]:
df_1.show(5)

+-----------+-----------+------------------+----+----------+
|customer_id|date_column|transaction_amount|year|high_value|
+-----------+-----------+------------------+----+----------+
|          1|   1/1/2022|              5000|2022|        No|
|          2|  15/2/2022|              1200|2022|        No|
|          3|  20/3/2022|               800|2022|        No|
|          4|  10/4/2022|              3000|2022|        No|
|          5|   5/5/2022|              6000|2022|       Yes|
+-----------+-----------+------------------+----+----------+
only showing top 5 rows



#### Task 12: Calculate the average transaction value per quarter

Calculate and display the average transaction value for each quarter in `df2` and create a new dataframe named `average_value_per_quarter` with column `avg_trans_val`.

*Hint: Use avg from pyspark.sql.functions*


In [26]:
average_value_per_quarter=\
    df_2\
    .withColumn("year", year(to_date(col("transaction_date"), "dd/MM/yyyy")))\
    .groupBy("year", "quarter")\
    .agg(avg(col("transaction_value")).alias('avg_trans_val'))\
    .select("year", "quarter", "avg_trans_val")\
    .orderBy(col("year").asc(), col("quarter").asc())

In [27]:
average_value_per_quarter.show(5)

+----+-------+------------------+
|year|quarter|     avg_trans_val|
+----+-------+------------------+
|2022|      1|            1500.0|
|2022|      2|1833.3333333333333|
|2022|      3|1433.3333333333333|
|2022|      4|1166.6666666666667|
|2023|      1|            1500.0|
+----+-------+------------------+
only showing top 5 rows



In [28]:
average_value_per_quarter.write.mode("overwrite").saveAsTable("quarterly_averages")

SparkRuntimeException: [LOCATION_ALREADY_EXISTS] Cannot name the managed table as `spark_catalog`.`default`.`quarterly_averages`, as its associated location 'file:/opt/notebooks/spark-warehouse/quarterly_averages' already exists. Please pick a different table name, or remove the existing location first.

#### Task 14: Calculate the total transaction value per year

Calculate and display the total transaction value for each year in `df1` and create a new dataframe named `total_value_per_year` with column `total_transaction_val`.
> **Note:** The provided DataFrame `df1` does not explicitly have a `year` column initially. However, in Task 3, a new column named `year` is added to `df1` by extracting the year from the date column. Additionally, in Task 4, the column `amount` is renamed to `transaction_amount`.



In [None]:
total_value_per_year=\
     df_1\
    .groupBy("year")\
    .agg(_sum("transaction_amount").alias("total_transaction_val"))\
    .orderBy(col("year").asc())

In [None]:
total_value_per_year.show(5)

+----+---------------------+
|year|total_transaction_val|
+----+---------------------+
|2022|                29800|
|2023|                28100|
|2024|                25700|
|2025|                25700|
|2026|                25700|
+----+---------------------+
only showing top 5 rows



#### Task 15: Write the result to HDFS

Write `total_value_per_year` to HDFS in the CSV format to file named **total_value_per_year**.



In [None]:
total_value_per_year.write.mode("overwrite").csv("total_value_per_year.csv")