In [8]:
from pyspark.sql.functions import *

# Define paths
base_path = "abfss://fintech@airbnbdatagds.dfs.core.windows.net/bronze/fintech/"
output_base_path = "abfss://fintech@airbnbdatagds.dfs.core.windows.net/silver/fintech/"

spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

# Transformation for Accounts dataset
def transform_accounts():
    df = spark.read.parquet(f"{base_path}Accounts/Accounts.parquet")
    # Example transformation: Calculate account age in years
    df_transformed = df.withColumn("AccountAgeYears", 
                                   round(datediff(current_date(), col("OpenDate")) / 365.25, 2))
    df_transformed.write.format("delta").mode("overwrite").save(f"{output_base_path}Accounts/")

# Transformation for Customers dataset
def transform_customers():
    df = spark.read.parquet(f"{base_path}Customers/Customers.parquet")
    # Example transformation: Create a full name column and mask the email address
    df_transformed = df.withColumn("FullName", concat_ws(" ", col("FirstName"), col("LastName"))) \
                       .withColumn("MaskedEmail", 
                                   concat(lit("***@"), substring_index(col("Email"), "@", -1)))
    df_transformed.write.format("delta").mode("overwrite").save(f"{output_base_path}Customers/")

# Transformation for Loans dataset with explicit casting
def transform_loans():
    df = spark.read.parquet(f"{base_path}Loans/Loans.parquet")
    # Example transformation: Calculate total interest with explicit casting to match the Delta table
    df_transformed = df.withColumn("TotalInterest", 
                                   (col("LoanAmount") * col("InterestRate") / 100).cast("decimal(28,8)")) \
                       .withColumn("LoanDurationYears", 
                                   round(datediff(col("LoanEndDate"), col("LoanStartDate")) / 365.25, 2))
    df_transformed.write.format("delta").mode("overwrite").save(f"{output_base_path}Loans/")

# Transformation for Payments dataset
def transform_payments():
    df = spark.read.parquet(f"{base_path}Payments/Payments.parquet")
    # Example transformation: Calculate days since last payment
    df_transformed = df.withColumn("DaysSinceLastPayment", 
                                   datediff(current_date(), col("PaymentDate")))
    df_transformed.write.format("delta").mode("overwrite").save(f"{output_base_path}Payments/")

# Transformation for Transactions dataset
def transform_transactions():
    df = spark.read.parquet(f"{base_path}Transactions/Transactions.parquet")
    # Example transformation: Categorize transaction types
    df_transformed = df.withColumn("TransactionCategory", 
                                   when(col("TransactionType") == "Deposit", "Income")
                                   .when(col("TransactionType") == "Withdrawal", "Expense")
                                   .otherwise("Other"))
    df_transformed.write.format("delta").mode("overwrite").save(f"{output_base_path}Transactions/")

# Process each table
transform_accounts()
transform_customers()
transform_loans()
transform_payments()
transform_transactions()

print("Bronze To Silver Completed !!")


StatementMeta(gdssparkpool, 2, 9, Finished, Available, Finished)

Bronze To Silver Completed !!
