In [0]:
spark.sql("DROP DATABASE IF EXISTS bronze CASCADE ")
spark.sql("DROP DATABASE IF EXISTS silver CASCADE ")
spark.sql("DROP DATABASE IF EXISTS gold CASCADE ")

Out[87]: DataFrame[]

In [0]:
spark.sql("CREATE DATABASE bronze")
spark.sql("CREATE DATABASE silver")
spark.sql("CREATE DATABASE gold")

Out[88]: DataFrame[]

Section A: Data Ingestion and Bronze Layer

In [0]:
# dbutils.fs.ls('dbfs:/FileStore/tables')
dbutils.fs.ls('dbfs:/FileStore/tables')

Out[89]: [FileInfo(path='dbfs:/FileStore/tables/66a262dd3659f_customer_dataset.csv', name='66a262dd3659f_customer_dataset.csv', size=81951758, modificationTime=1722183027000),
 FileInfo(path='dbfs:/FileStore/tables/66a2634026e86_transactions_dataset.csv', name='66a2634026e86_transactions_dataset.csv', size=75105439, modificationTime=1722183054000)]

In [0]:
# transactions_df=spark.read.csv()
spark.sql("""USE schema bronze""")
spark.sql(""" CREATE TABLE IF NOT EXISTS transactions_bronze using CSV options (path="dbfs:/FileStore/tables/66a2634026e86_transactions_dataset.csv",header="True",mode="FAILFAST",inferSchema "True")""")

Out[90]: DataFrame[]

In [0]:
# transactions_df=spark.read.csv()
spark.sql("""USE schema bronze""")
spark.sql(""" CREATE TABLE IF NOT EXISTS customer_bronze using CSV options (path="dbfs:/FileStore/tables/66a262dd3659f_customer_dataset.csv",header="True",mode="FAILFAST",inferSchema "True")""")

Out[91]: DataFrame[]

In [0]:
# Preview first 5 rows
#Display the first 5 rows of both transactions_bronze and customer_bronze tables.
spark.sql("SELECT * FROM bronze.transactions_bronze LIMIT 5").show()
spark.sql("SELECT * FROM bronze.customer_bronze LIMIT 5").show()

# Count total records
transactions_count = spark.sql("SELECT COUNT(*) FROM bronze.transactions_bronze").show()
customers_count = spark.sql("SELECT COUNT(*) FROM bronze.customer_bronze").show()
 

+--------------+-----------+------------------+-------------------+----------------+-----------------+------------------+
|transaction_id|customer_id|transaction_amount|   transaction_date|transaction_type|merchant_category|transaction_status|
+--------------+-----------+------------------+-------------------+----------------+-----------------+------------------+
|             0| cust_02732|           1014.73|2023-01-01 00:00:00|        purchase|         clothing|         completed|
|             1| cust_43567|           3138.73|2023-01-01 01:00:00|          refund|         clothing|            failed|
|             2| cust_42613|           1647.88|2023-01-01 02:00:00|          refund|           travel|         completed|
|             3| cust_45891|           4864.71|2023-01-01 03:00:00|        purchase|         clothing|         completed|
|             4| cust_21243|           3623.23|2023-01-01 04:00:00|        purchase|         clothing|            failed|
+--------------+--------

Questions:

In [0]:
# 1.1 Extract records where balance is greater than 15000 and loan is 'yes' from customers_bronze.
customers_filtered = spark.sql("""
SELECT *
FROM bronze.customer_bronze
WHERE balance > 15000 AND lower(trim(loan)) = 'yes'
""")
customers_filtered.show()


+-----------+---+------------+--------------+---------+--------+----+------------+------------------+-----------------------+-------------------------+
|customer_id|age|         job|marital_status|education| balance|loan|contact_type|last_contact_month|days_since_last_contact|previous_campaign_outcome|
+-----------+---+------------+--------------+---------+--------+----+------------+------------------+-----------------------+-------------------------+
| cust_00000| 62| blue-collar|       married| tertiary|15148.37| yes|   telephone|           October|                     82|                  unknown|
| cust_00006| 21|     student|      divorced|secondary|19600.01| yes|   telephone|           October|                     94|                  failure|
| cust_00011| 68|  technician|        single| tertiary|17575.37| yes|   telephone|               May|                     41|                  success|
| cust_00020| 56|  technician|       married|  primary|15840.76| yes|   telephone|      

In [0]:

# 1.2 Extract transactions from transactions_bronze where transaction_type is 'purchase' and merchant_category is 'travel'.
transactions_filtered = spark.sql("""
SELECT *
FROM bronze.transactions_bronze
WHERE transaction_type = 'purchase' AND merchant_category = 'travel'
""")
transactions_filtered.show()


+--------------+-----------+------------------+-------------------+----------------+-----------------+------------------+
|transaction_id|customer_id|transaction_amount|   transaction_date|transaction_type|merchant_category|transaction_status|
+--------------+-----------+------------------+-------------------+----------------+-----------------+------------------+
|             9| cust_46884|            195.86|2023-01-01 09:00:00|        purchase|           travel|         completed|
|            13| cust_39512|           2514.84|2023-01-01 13:00:00|        purchase|           travel|         completed|
|            28| cust_37619|           4956.73|2023-01-02 04:00:00|        purchase|           travel|            failed|
|            31| cust_01871|           4019.53|2023-01-02 07:00:00|        purchase|           travel|            failed|
|            45| cust_19129|           2460.44|2023-01-02 21:00:00|        purchase|           travel|            failed|
|            47| cust_49

In [0]:
# 1.3 Find the job type of the customer who have done max amount of transaction 
max_transaction = spark.sql("""
SELECT customer_id, SUM(transaction_amount) as total_amount
FROM bronze.transactions_bronze
GROUP BY customer_id
ORDER BY total_amount DESC
LIMIT 1
""").collect()[0]

max_transaction_customer_id = max_transaction['customer_id']

customer_job = spark.sql(f"""
SELECT job
FROM bronze.customer_bronze
WHERE customer_id = '{max_transaction_customer_id}'
""")
customer_job.show()


+----------+
|       job|
+----------+
|management|
+----------+



Section B: Create Silver Tables with Transformations

In [0]:
# Load bronze tables
transactions_bronze_df = spark.table("bronze.transactions_bronze")
customers_bronze_df = spark.table("bronze.customer_bronze")


In [0]:
# Clean transactions_bronze table
transactions_silver_df = transactions_bronze_df.dropna(subset=["customer_id"]).dropDuplicates()
transactions_silver_df.write.format("delta").mode("overwrite").save("/mnt/dataset/transactions_silver_delta")
spark.sql("""
    drop table if exists silver.transactions_silver
""")
spark.sql("""
    CREATE TABLE silver.transactions_silver
    USING DELTA
    LOCATION '/mnt/dataset/transactions_silver_delta'
""")

# Clean customer_bronze table
customers_silver_df = customers_bronze_df.dropDuplicates()
customers_silver_df.write.format("delta").mode("overwrite").save("/mnt/dataset/customers_silver_delta")
spark.sql("""
    drop table if exists silver.customers_silver
""")
spark.sql("""
    CREATE TABLE silver.customers_silver
    USING DELTA
    LOCATION '/mnt/dataset/customers_silver_delta'
""")


Out[97]: DataFrame[]

In [0]:
from pyspark.sql.functions import date_format

# Convert the timestamp string to timestamp type if needed
transactions_silver_df = transactions_silver_df.withColumn("timestamp_col", transactions_silver_df["transaction_date"].cast("timestamp"))  

# Extract the date from the timestamp
transactions_silver_df = transactions_silver_df.withColumn("date_col", date_format(transactions_silver_df["timestamp_col"], "yyyy-MM-dd"))

# Show the result
# transactions_silver_df.select("transaction_date","timestamp_col","date_col").show()
# transactions_silver_df["date_col"].distinct().count()

In [0]:
# Optimization Techniques:
# Apply partitioning on the transactions_silver table by transaction_date. 
transactions_silver_df.write.partitionBy("transaction_type").mode("overwrite").format("delta").save("/mnt/dataset/transactions_silver_partitioned_delta")
spark.sql("""
    CREATE TABLE silver.transactions_silver
    USING DELTA
    LOCATION '/mnt/dataset/transactions_silver_partitioned_delta'
""")


Out[113]: DataFrame[]

In [0]:
# Apply bucketing on the bank_marketing_silver table by customer_id.
customers_silver_df.write.bucketBy(8, "customer_id").mode("overwrite").format("parquet").saveAsTable("silver.customers_silver_bucketed")


Questions:

In [0]:
# 2.1 Calculate the cumulative transaction amount for each merchant_category ordered by transaction_date available in transactions_silver. 
spark.sql("""
SELECT merchant_category, transaction_date, SUM(transaction_amount) OVER (PARTITION BY merchant_category ORDER BY transaction_date) AS cumulative_amount
FROM silver.transactions_silver
""").show()


+-----------------+-------------------+------------------+
|merchant_category|   transaction_date| cumulative_amount|
+-----------------+-------------------+------------------+
|           travel|2023-01-01 02:00:00|           1647.88|
|           travel|2023-01-01 09:00:00|1843.7400000000002|
|           travel|2023-01-01 13:00:00|           4358.58|
|           travel|2023-01-01 21:00:00|           5516.74|
|           travel|2023-01-01 22:00:00| 7683.049999999999|
|           travel|2023-01-01 23:00:00|          10263.43|
|           travel|2023-01-02 03:00:00|11653.810000000001|
|           travel|2023-01-02 04:00:00|          16610.54|
|           travel|2023-01-02 05:00:00|          21604.33|
|           travel|2023-01-02 07:00:00|          25623.86|
|           travel|2023-01-02 08:00:00|          29675.96|
|           travel|2023-01-02 15:00:00|          31186.64|
|           travel|2023-01-02 16:00:00|          35223.04|
|           travel|2023-01-02 21:00:00|          37683.4

In [0]:
# 2.2 Calculate the total number of transactions for each combination of transaction_status and merchant_category in transactions_silver. 
spark.sql("""
SELECT transaction_status, merchant_category, COUNT(*) AS total_transactions
FROM silver.transactions_silver
GROUP BY transaction_status, merchant_category
""").show()


+------------------+-----------------+------------------+
|transaction_status|merchant_category|total_transactions|
+------------------+-----------------+------------------+
|         completed|           travel|            250148|
|            failed|           travel|            249848|
|         completed|         clothing|            249892|
|            failed|         clothing|            250112|
+------------------+-----------------+------------------+



In [0]:
# 2.3 Calculate Aggregates (Join based on Customer_ID)
# 2.3.1 Select top 10 customers who have done maximum transactions per week
spark.sql("""
SELECT customer_id, weekofyear(transaction_date) AS week, COUNT(*) AS transaction_count
FROM silver.transactions_silver
GROUP BY customer_id, week
ORDER BY transaction_count DESC
LIMIT 10
""").show()


+-----------+----+-----------------+
|customer_id|week|transaction_count|
+-----------+----+-----------------+
| cust_05153|  35|                6|
| cust_23289|  29|                6|
| cust_26569|  15|                6|
| cust_23438|  33|                6|
| cust_35219|  50|                6|
| cust_45315|  27|                6|
| cust_38698|  48|                6|
| cust_44721|  19|                6|
| cust_23480|   3|                5|
| cust_32450|  22|                5|
+-----------+----+-----------------+



In [0]:
# 2.3.2 Calculate the percentage contribution of each segment to the total monthly transactions.

monthly_transactions = spark.sql("""
SELECT month(transaction_date) AS month, COUNT(*) AS total_transactions
FROM silver.transactions_silver
GROUP BY month
""")

total_transactions = monthly_transactions.groupBy().sum("total_transactions").collect()[0][0]

monthly_transactions.withColumn("percentage_contribution", (monthly_transactions["total_transactions"] / total_transactions) * 100).show()


+-----+------------------+-----------------------+
|month|total_transactions|percentage_contribution|
+-----+------------------+-----------------------+
|   12|             84816|                 8.4816|
|    1|             85504|                 8.5504|
|    6|             82080|                  8.208|
|    3|             84816|                 8.4816|
|    5|             84816|                 8.4816|
|    9|             82080|                  8.208|
|    4|             82080|                  8.208|
|    8|             84816|                 8.4816|
|    7|             84816|                 8.4816|
|   10|             84816|                 8.4816|
|   11|             82080|                  8.208|
|    2|             77280|                  7.728|
+-----+------------------+-----------------------+



In [0]:
# 2.3.2 Calculate the percentage contribution of each segment to the total monthly transactions. 
from pyspark.sql.functions import when,col,month
monthly_spend = spark.sql("""
SELECT customer_id, month(transaction_date) AS month, SUM(transaction_amount) AS monthly_amount
FROM silver.transactions_silver
GROUP BY customer_id, month
""")

monthly_spend.withColumn("spender_type", when(monthly_spend["monthly_amount"] > 5000, "High").otherwise("Low")).show()


+-----------+-----+------------------+------------+
|customer_id|month|    monthly_amount|spender_type|
+-----------+-----+------------------+------------+
| cust_18873|    9|           8023.45|        High|
| cust_24571|    4|          13564.42|        High|
| cust_31309|    2|12019.619999999999|        High|
| cust_41750|    2|8025.7300000000005|        High|
| cust_18867|    6|            6698.1|        High|
| cust_04378|    2|12922.019999999999|        High|
| cust_37763|    7|           4499.63|         Low|
| cust_16545|   12|2313.5099999999998|         Low|
| cust_09946|    5| 5119.219999999999|        High|
| cust_01851|   11|           4015.53|         Low|
| cust_29332|   12|            395.64|         Low|
| cust_42247|    4|           2879.36|         Low|
| cust_45743|    8|           9691.91|        High|
| cust_42057|    3|           3550.65|         Low|
| cust_24966|   11| 4833.150000000001|         Low|
| cust_06378|   12|           3962.12|         Low|
| cust_12620

In [0]:
# 2.3.3 Segment customers into high and low spenders based on monthly transactions.
monthly_spend = spark.sql("""
SELECT customer_id, month(transaction_date) AS month, SUM(transaction_amount) AS monthly_amount
FROM silver.transactions_silver
GROUP BY customer_id, month
""")

monthly_spend.withColumn("spender_type", when(monthly_spend["monthly_amount"] > 5000, "High").otherwise("Low")).show()


+-----------+-----+------------------+------------+
|customer_id|month|    monthly_amount|spender_type|
+-----------+-----+------------------+------------+
| cust_18873|    9|           8023.45|        High|
| cust_24571|    4|          13564.42|        High|
| cust_31309|    2|12019.619999999999|        High|
| cust_41750|    2|8025.7300000000005|        High|
| cust_18867|    6|            6698.1|        High|
| cust_04378|    2|12922.019999999999|        High|
| cust_37763|    7|           4499.63|         Low|
| cust_16545|   12|2313.5099999999998|         Low|
| cust_09946|    5| 5119.219999999999|        High|
| cust_01851|   11|           4015.53|         Low|
| cust_29332|   12|            395.64|         Low|
| cust_42247|    4|           2879.36|         Low|
| cust_45743|    8|           9691.91|        High|
| cust_42057|    3|           3550.65|         Low|
| cust_24966|   11| 4833.150000000001|         Low|
| cust_06378|   12|           3962.12|         Low|
| cust_12620

In [0]:
# 2.3.4 Rank job categories based on the total transaction_amount in the joined dataset.

spark.sql("""
SELECT c.job, SUM(t.transaction_amount) AS total_amount
FROM silver.transactions_silver t
JOIN silver.customers_silver c ON t.customer_id = c.customer_id
GROUP BY c.job
ORDER BY total_amount DESC
""").show()



+------------+--------------------+
|         job|        total_amount|
+------------+--------------------+
|    services|4.3250783742000055E8|
|  management| 4.278125024100009E8|
|     student| 4.267906247700003E8|
| blue-collar| 4.259000806900003E8|
|entrepreneur|4.2264909784999996E8|
|  technician|4.1520509505000085E8|
+------------+--------------------+



Section C: Gold Tables and Final Aggregations 



In [0]:
spark.sql("""
    select count(distinct transaction_type),count(distinct merchant_category ) from bronze.transactions_bronze
""").show()


+--------------------------------+---------------------------------+
|count(DISTINCT transaction_type)|count(DISTINCT merchant_category)|
+--------------------------------+---------------------------------+
|                               2|                                2|
+--------------------------------+---------------------------------+



In [0]:
# Create Fact/Dim Tables on Gold layer: 
# Task:  
# Create Fact and Dimensions tables via Silver tables on Gold Schema and apply partitioning on respective column. 

# Create Fact table
transactions_silver_df.write.partitionBy("transaction_type").mode("overwrite").format("delta").save("/mnt/dataset/transactions_fact_delta")
spark.sql("""
    CREATE TABLE gold.transactions_fact
    USING DELTA
    LOCATION '/mnt/dataset/transactions_fact_delta'
""")

# Create Dimension table
customers_silver_df.write.bucketBy(8, "customer_id").sortBy("customer_id").mode("overwrite").format("parquet").saveAsTable("gold.customers_dim")


In [0]:
# Create view named Customer_transactions_vw based on Joining Fact/Dimension tables containing functional and attribute columns from both the tables. 
customer_transactions_vw = spark.sql("""
SELECT 
    t.*, 
    c.job, 
    c.balance
FROM gold.transactions_fact t
JOIN gold.customers_dim c
ON t.customer_id = c.customer_id
""")
customer_transactions_vw.createOrReplaceTempView("Customer_transactions_vw")


Question

In [0]:
#3.1 Calculate the total balance and the number of customers in each job category (Based on View). 
spark.sql("""
SELECT 
    job, 
    SUM(balance) AS total_balance, 
    COUNT(customer_id) AS num_customers
FROM Customer_transactions_vw
GROUP BY job
""").show()


+------------+--------------------+-------------+
|         job|       total_balance|num_customers|
+------------+--------------------+-------------+
|  management|1.5897315279700124E9|       167591|
|     student|1.5878093077100046E9|       167365|
| blue-collar|1.5801121299199913E9|       166910|
|entrepreneur|1.5882998554299977E9|       165466|
|  technician|1.5523407036499972E9|       163183|
|    services|1.6095491742800062E9|       169485|
+------------+--------------------+-------------+



In [0]:
# 3.2 Create Visual Chart for Total transactions done based on Merchant Category, Job_type. 
visual_df = spark.sql("""
SELECT 
    job, 
    merchant_category, 
    COUNT(transaction_id) AS total_transactions
FROM Customer_transactions_vw
GROUP BY job, merchant_category
""")
# Use Databricks visualization tools to create the chart


In [0]:
# 3.3 Derive logic for -  
# 3.3.1 Considering only the customers who made purchases, what was the average purchase amount per customer. 
spark.sql("""
SELECT 
    AVG(transaction_amount) AS avg_purchase_amount
FROM gold.transactions_fact
WHERE transaction_type = 'purchase'
""").show()


+-------------------+
|avg_purchase_amount|
+-------------------+
|  2550.024903197417|
+-------------------+



In [0]:
# 3.3.2 Which customer had the highest total transaction amount (sum of purchases minus sum of refunds) on Weekly basis, and what was that amount? 
spark.sql("""
SELECT 
    customer_id, 
    weekofyear(transaction_date) AS week, 
    (SUM(CASE WHEN transaction_type = 'purchase' THEN transaction_amount ELSE 0 END) - 
    SUM(CASE WHEN transaction_type = 'refund' THEN transaction_amount ELSE 0 END)) AS net_amount
FROM gold.transactions_fact
GROUP BY customer_id, week
ORDER BY net_amount DESC
LIMIT 1
""").show()


+-----------+----+------------------+
|customer_id|week|        net_amount|
+-----------+----+------------------+
| cust_28869|  36|15569.910000000002|
+-----------+----+------------------+



In [0]:
# 3.4 Build logic for Reconcile the total number of records in the bronze, silver, and gold tables for customer and transactions datasets.  
bronze_counts = spark.sql("""
SELECT 'bronze' AS layer, 'transactions' AS table, COUNT(*) AS count FROM bronze.transactions_bronze
UNION ALL
SELECT 'bronze', 'customers', COUNT(*) FROM bronze.customer_bronze
""")

silver_counts = spark.sql("""
SELECT 'silver' AS layer, 'transactions' AS table, COUNT(*) AS count FROM silver.transactions_silver
UNION ALL
SELECT 'silver', 'customers', COUNT(*) FROM silver.customers_silver
""")

gold_counts = spark.sql("""
SELECT 'gold' AS layer, 'transactions' AS table, COUNT(*) AS count FROM gold.transactions_fact
UNION ALL
SELECT 'gold', 'customers', COUNT(*) FROM gold.customers_dim
""")

bronze_counts.union(silver_counts).union(gold_counts).show()


+------+------------+-------+
| layer|       table|  count|
+------+------------+-------+
|bronze|transactions|1000000|
|bronze|   customers|1000000|
|  gold|   customers|1000000|
|silver|transactions|1000000|
|silver|   customers|1000000|
|  gold|transactions|1000000|
+------+------------+-------+

