In [1]:
# Installing required packages

!pip install wget pyspark  findspark

Collecting wget
  Downloading wget-3.2.zip (10 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Building wheels for collected packages: wget
  Building wheel for wget (setup.py) ... [?25l[?25hdone
  Created wheel for wget: filename=wget-3.2-py3-none-any.whl size=9655 sha256=d34e4bbddfdc4d1d3b94871eb4c345b467cf4ad556026d3dfce6d99637270dcb
  Stored in directory: /root/.cache/pip/wheels/40/b3/0f/a40dbd1c6861731779f62cc4babcb234387e11d697df70ee97
Successfully built wget
Installing collected packages: wget, findspark
Successfully installed findspark-2.0.1 wget-3.2


Initiate the Spark Session

In [2]:
import findspark

findspark.init()

In [3]:
# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the SparkContext.

from pyspark import SparkContext, SparkConf

from pyspark.sql import SparkSession

In [4]:
# Creating a SparkContext object

sc = SparkContext.getOrCreate()

# Creating a Spark Session

spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [5]:
#download dataset using wget
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset1.csv
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset2.csv


--2025-05-02 00:51:05--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset1.csv
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.45.118.108
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.45.118.108|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4115 (4.0K) [text/csv]
Saving to: ‘dataset1.csv’


2025-05-02 00:51:05 (1.23 GB/s) - ‘dataset1.csv’ saved [4115/4115]

--2025-05-02 00:51:05--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset2.csv
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.45.118.108
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-c

In [9]:
# load the data into a pyspark dataframe
df1 = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("dataset1.csv")
df2 = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("dataset2.csv")

In [10]:
#print the schema of df1 and df2

df1.printSchema()
df2.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- date_column: string (nullable = true)
 |-- amount: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- location: string (nullable = true)

root
 |-- customer_id: integer (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- value: integer (nullable = true)
 |-- notes: string (nullable = true)



## Task 3: Add a new column to each dataframe

In [13]:
from pyspark.sql.functions import year, quarter, to_date

# Add year column to df1
df1 = df1.withColumn('year', year(to_date('date_column','M/d/yyyy'))) # Changed the date format to 'M/d/yyyy' to accommodate single-digit months

# Add quarter column to df2
df2 = df2.withColumn('quarter', quarter(to_date('transaction_date','M/d/yyyy'))) # Changed the date format to 'M/d/yyyy' to accommodate single-digit months

# Optional: display results
df1.show(5)
df2.show(5)

+-----------+-----------+------+-----------+--------+----+
|customer_id|date_column|amount|description|location|year|
+-----------+-----------+------+-----------+--------+----+
|          1|   1/1/2022|  5000| Purchase A| Store A|2022|
|          2|  15/2/2022|  1200| Purchase B| Store B|NULL|
|          3|  20/3/2022|   800| Purchase C| Store C|NULL|
|          4|  10/4/2022|  3000| Purchase D| Store D|2022|
|          5|   5/5/2022|  6000| Purchase E| Store E|2022|
+-----------+-----------+------+-----------+--------+----+
only showing top 5 rows

+-----------+----------------+-----+------+-------+
|customer_id|transaction_date|value| notes|quarter|
+-----------+----------------+-----+------+-------+
|          1|        1/1/2022| 1500|Note 1|      1|
|          2|       15/2/2022| 2000|Note 2|   NULL|
|          3|       20/3/2022| 1000|Note 3|   NULL|
|          4|       10/4/2022| 2500|Note 4|      4|
|          5|        5/5/2022| 1800|Note 5|      2|
+-----------+---------------

In [14]:
# Rename 'amount' to 'transaction_amount' in df1
df1 = df1.withColumnRenamed("amount", "transaction_amount")

# Rename 'value' to 'transaction_value' in df2
df2 = df2.withColumnRenamed("value", "transaction_value")

In [15]:
# Drop 'description' and 'location' from df1
df1 = df1.drop("description", "location")

# Drop 'notes' from df2
df2 = df2.drop("notes")

In [16]:
# Inner join based on 'customer_id'
joined_df = df1.join(df2, on='customer_id', how='inner')

# Show the result
joined_df.show()

+-----------+-----------+------------------+----+----------------+-----------------+-------+
|customer_id|date_column|transaction_amount|year|transaction_date|transaction_value|quarter|
+-----------+-----------+------------------+----+----------------+-----------------+-------+
|          1|   1/1/2022|              5000|2022|        1/1/2022|             1500|      1|
|          2|  15/2/2022|              1200|NULL|       15/2/2022|             2000|   NULL|
|          3|  20/3/2022|               800|NULL|       20/3/2022|             1000|   NULL|
|          4|  10/4/2022|              3000|2022|       10/4/2022|             2500|      4|
|          5|   5/5/2022|              6000|2022|        5/5/2022|             1800|      2|
|          6|  10/6/2022|              4500|2022|       10/6/2022|             1200|      4|
|          7|  15/7/2022|               200|NULL|       15/7/2022|              700|   NULL|
|          8|  20/8/2022|              3500|NULL|       20/8/2022|    

In [17]:
# Filter rows where transaction_amount > 1000
filtered_df = joined_df.filter("transaction_amount > 1000")

# Show result
filtered_df.show()

+-----------+-----------+------------------+----+----------------+-----------------+-------+
|customer_id|date_column|transaction_amount|year|transaction_date|transaction_value|quarter|
+-----------+-----------+------------------+----+----------------+-----------------+-------+
|          1|   1/1/2022|              5000|2022|        1/1/2022|             1500|      1|
|          2|  15/2/2022|              1200|NULL|       15/2/2022|             2000|   NULL|
|          4|  10/4/2022|              3000|2022|       10/4/2022|             2500|      4|
|          5|   5/5/2022|              6000|2022|        5/5/2022|             1800|      2|
|          6|  10/6/2022|              4500|2022|       10/6/2022|             1200|      4|
|          8|  20/8/2022|              3500|NULL|       20/8/2022|             3000|   NULL|
|         10| 30/10/2022|              1800|NULL|      30/10/2022|             1200|   NULL|
|         11|  5/11/2022|              2200|2022|       5/11/2022|    

In [19]:
from pyspark.sql.functions import sum as _sum # import sum function and rename

# Group by customer_id and calculate total transaction_amount
total_per_customer = filtered_df.groupBy("customer_id").agg(
    _sum(filtered_df["transaction_amount"].cast("double")).alias("total_transaction_amount") # Cast 'transaction_amount' to numeric type (double)
)

# Show result
total_per_customer.show()

+-----------+------------------------+
|customer_id|total_transaction_amount|
+-----------+------------------------+
|         31|                  3200.0|
|         85|                  1800.0|
|         78|                  1500.0|
|         34|                  1200.0|
|         81|                  5500.0|
|         28|                  2600.0|
|         76|                  2600.0|
|         27|                  4200.0|
|         91|                  3200.0|
|         22|                  1200.0|
|         93|                  5500.0|
|          1|                  5000.0|
|         52|                  2600.0|
|         13|                  4800.0|
|          6|                  4500.0|
|         16|                  2600.0|
|         40|                  2600.0|
|         94|                  1200.0|
|         57|                  5500.0|
|         54|                  1500.0|
+-----------+------------------------+
only showing top 20 rows



## Task 9: Write the result to a Hive table

In [22]:
# Write to Hive table named customer_totals
total_per_customer.write.mode("overwrite").saveAsTable("customer_totals")


## Task 10: Write the filtered data to HDFS

In [23]:
#Write filtered_df to HDFS in parquet format file filtered_data

filtered_df.write.mode("overwrite").parquet("filtered_data.parquet")

## Task 11: Add a new column based on a condition

In [25]:
from pyspark.sql.functions import when, col # Import when and col

# Add high_value column to indicate if transaction_amount > 5000
df1 = df1.withColumn(
    "high_value",
    when(col("transaction_amount") > 5000, "Yes").otherwise("No")
)

# Show result
df1.show()

+-----------+-----------+------------------+----+----------+
|customer_id|date_column|transaction_amount|year|high_value|
+-----------+-----------+------------------+----+----------+
|          1|   1/1/2022|              5000|2022|        No|
|          2|  15/2/2022|              1200|NULL|        No|
|          3|  20/3/2022|               800|NULL|        No|
|          4|  10/4/2022|              3000|2022|        No|
|          5|   5/5/2022|              6000|2022|       Yes|
|          6|  10/6/2022|              4500|2022|        No|
|          7|  15/7/2022|               200|NULL|        No|
|          8|  20/8/2022|              3500|NULL|        No|
|          9|  25/9/2022|               700|NULL|        No|
|         10| 30/10/2022|              1800|NULL|        No|
|         11|  5/11/2022|              2200|2022|        No|
|         12| 10/12/2022|               900|2022|        No|
|         13|  15/1/2023|              4800|NULL|        No|
|         14|  20/2/2023

## Task 12: Calculate the average transaction value per quarter

In [26]:
from pyspark.sql.functions import avg

# Group by 'quarter' and calculate the average of 'transaction_value'
average_value_per_quarter = df2.groupBy("quarter").agg(
    avg("transaction_value").alias("avg_trans_val")
)

# Show result
average_value_per_quarter.show()


+-------+------------------+
|quarter|     avg_trans_val|
+-------+------------------+
|   NULL|1357.5757575757575|
|      1|            1500.0|
|      4|1376.4705882352941|
|      2|            556.25|
+-------+------------------+



## Task 13: Write the result to a Hive table

In [27]:
# Write to Hive table named quarterly_averages
average_value_per_quarter.write.mode("overwrite").saveAsTable("quarterly_averages")


## Task 14: Calculate the total transaction value per year

In [30]:
total_value_per_year = df1.groupBy('year').agg(
    _sum(col("transaction_amount").cast("double")).alias("total_transaction_val")  # Cast 'transaction_amount' to numeric type (double or integer)
)


# show the total transaction value for each year in df1.
total_value_per_year.show()

+----+---------------------+
|year|total_transaction_val|
+----+---------------------+
|2025|               5300.0|
|2027|               5300.0|
|2023|               5300.0|
|2022|              21600.0|
|NULL|             162900.0|
|2026|               5300.0|
|2029|               5300.0|
|2028|               5300.0|
|2024|               5300.0|
+----+---------------------+



## Task 15: Write the result to HDFS

In [31]:
total_value_per_year.write.mode("overwrite").csv("total_value_per_year.csv")