In [0]:
spark.conf.set(
"fs.azure.account.key.myrgstore.dfs.core.windows.net",
"Cdhaz7zz+7n6gOYQq7TH36+rB8P+5HMV5yf=====================================")

# Read Data from Silver Container:
accounts_df = spark.read.format("delta").load("abfss://silver@myrgstore.dfs.core.windows.net/delta/accounts_delta")
customers_df = spark.read.format("delta").load("abfss://silver@myrgstore.dfs.core.windows.net/delta/customers_delta")
loan_payments_df = spark.read.format("delta").load("abfss://silver@myrgstore.dfs.core.windows.net/delta/loan_payments_delta")
loans_df = spark.read.format("delta").load("abfss://silver@myrgstore.dfs.core.windows.net/delta/loans_delta")
transactions_df = spark.read.format("delta").load("abfss://silver@myrgstore.dfs.core.windows.net/delta/transactions_delta")

In [0]:
# TROUBLESHOOT PARTITION Data Shuffling with groupBy()

from pyspark.sql.functions import sum as _sum

# Example of a `groupBy()` operation in PySpark
transactions_grouped = transactions_df.groupBy("account_id").agg(
    _sum("transaction_amount").alias("total_transactions")
)

# Check the current number of partitions
print(transactions_df.rdd.getNumPartitions())

# Repartition data by `account_id` to minimize shuffling
transactions_repartitioned = transactions_df.repartition(10, "account_id")

# Perform the GroupBy and Aggregation
transactions_grouped = transactions_repartitioned.groupBy("account_id").agg(
    _sum("transaction_amount").alias("total_transactions")
)

display(transactions_grouped.explain(True))

1
== Parsed Logical Plan ==
'Aggregate ['account_id], ['account_id, 'sum('transaction_amount) AS total_transactions#7012]
+- RepartitionByExpression [account_id#6976], 10
   +- Relation [transaction_id#6975,account_id#6976,transaction_date#6977,transaction_amount#6978,transaction_type#6979] parquet

== Analyzed Logical Plan ==
account_id: int, total_transactions: double
Aggregate [account_id#6976], [account_id#6976, sum(transaction_amount#6978) AS total_transactions#7012]
+- RepartitionByExpression [account_id#6976], 10
   +- Relation [transaction_id#6975,account_id#6976,transaction_date#6977,transaction_amount#6978,transaction_type#6979] parquet

== Optimized Logical Plan ==
Aggregate [account_id#6976], [account_id#6976, sum(transaction_amount#6978) AS total_transactions#7012]
+- RepartitionByExpression [account_id#6976], 10
   +- Project [account_id#6976, transaction_amount#6978]
      +- Relation [transaction_id#6975,account_id#6976,transaction_date#6977,transaction_amount#6978,tran

In [0]:
#Identify Data Skew - group transactions by account_id and calculate the total transaction amount for each customer. As customers have many more transactions than others, this result in a skewed dataset.

from pyspark.sql.functions import sum

transactions_grouped = transactions_df.groupBy("account_id").agg(
    sum("transaction_amount").alias("total_transaction_amount")
)
display(transactions_grouped)

account_id,total_transaction_amount
1,100.0
6,-300.0
3,150.0
5,250.0
4,-200.0
8,-150.0
7,400.0
2,250.0


In [0]:
# Salting 

from pyspark.sql.functions import col, rand, sum

# Step 1: Add a random number (salt) to the account_id
salted_transactions_df = transactions_df.withColumn(
    "salted_account_id", 
    (col("account_id") + (rand() * 10).cast("int"))  # Adding random number between 0 and 9
)

# Step 2: Repartition the DataFrame based on the salted account_id
salted_transactions_repartitioned = salted_transactions_df.repartition(10, "salted_account_id")

# Verify the number of partitions
print(salted_transactions_repartitioned.rdd.getNumPartitions())

# Step 3: Perform aggregation after salting (example: summing transactions per account_id)
transactions_grouped_salted = salted_transactions_repartitioned.groupBy("salted_account_id").agg(
    sum("transaction_amount").alias("total_transaction_amount")
)

display(transactions_grouped_salted)

10


salted_account_id,total_transaction_amount
4,150.0
10,700.0
11,-50.0
8,450.0
15,-300.0
9,-250.0
6,-200.0


In [0]:
# Join Tables Using Salted Keys
from pyspark.sql.functions import col, rand

# Salt the customers DataFrame
salted_customers_df = customers_df.withColumn(
    "salted_customer_id", 
    (col("customer_id") + (rand() * 10).cast("int"))
)

# Salt the transactions DataFrame using the correct column name
salted_transactions_repartitioned = transactions_df.withColumn(
    "salted_customer_id", 
    (col("account_id") + (rand() * 10).cast("int"))
)

# Perform the join using the salted_customer_id
joined_df = salted_transactions_repartitioned.join(
    salted_customers_df, 
    salted_transactions_repartitioned["salted_customer_id"] == salted_customers_df["salted_customer_id"],
    "inner"
)

# Show the results of the join
display(joined_df)

transaction_id,account_id,transaction_date,transaction_amount,transaction_type,salted_customer_id,customer_id,first_name,last_name,address,city,state,postal_code,salted_customer_id.1
7,5,2024-09-07,250.0,Deposit,14,9,Olivia,Davis,606 Fir St,Boston,MA,2101,14
3,2,2024-09-02,300.0,Deposit,6,6,David,Jones,303 Cedar St,Los Angeles,CA,90001,6
5,3,2024-09-05,150.0,Deposit,5,2,Jane,Smith,456 Oak St,Chicago,IL,60614,5
7,5,2024-09-07,250.0,Deposit,14,7,Laura,Garcia,404 Willow St,San Francisco,CA,94101,14
6,4,2024-09-06,-200.0,Withdrawal,4,4,Michael,Williams,101 Maple St,Seattle,WA,98101,4


In [0]:
joined_df.show(truncate=False)

+--------------+----------+----------------+------------------+----------------+------------------+-----------+----------+---------+-------------+-------------+-----+-----------+------------------+
|transaction_id|account_id|transaction_date|transaction_amount|transaction_type|salted_customer_id|customer_id|first_name|last_name|address      |city         |state|postal_code|salted_customer_id|
+--------------+----------+----------------+------------------+----------------+------------------+-----------+----------+---------+-------------+-------------+-----+-----------+------------------+
|7             |5         |2024-09-07      |250.0             |Deposit         |14                |9          |Olivia    |Davis    |606 Fir St   |Boston       |MA   |2101       |14                |
|3             |2         |2024-09-02      |300.0             |Deposit         |6                 |6          |David     |Jones    |303 Cedar St |Los Angeles  |CA   |90001      |6                 |
|5        

In [0]:
# Remove duplicate columns

joined_df = joined_df.toDF(*(col if col not in joined_df.columns[:i] else f"{col}_duplicate" for i, col in enumerate(joined_df.columns)))

# Define the path for gold container

gold_delta = "abfss://gold@myrgstore.dfs.core.windows.net/delta/gold_delta"

# Save the DataFrame in Delta format, overwriting if it exists

joined_df.write.format("delta").mode("overwrite").save(gold_delta)