In [7]:
from pyspark.sql.functions import *

# Define paths
silver_base_path = "abfss://fintech@airbnbdatagds.dfs.core.windows.net/silver/fintech/"
output_base_path = "abfss://fintech@airbnbdatagds.dfs.core.windows.net/gold/fintech/"

# Load data from the silver layer
accounts_df = spark.read.format("delta").load(f"{silver_base_path}Accounts/")
customers_df = spark.read.format("delta").load(f"{silver_base_path}Customers/")
loans_df = spark.read.format("delta").load(f"{silver_base_path}Loans/")
payments_df = spark.read.format("delta").load(f"{silver_base_path}Payments/")
transactions_df = spark.read.format("delta").load(f"{silver_base_path}Transactions/")



StatementMeta(gdssparkpool, 6, 8, Finished, Available, Finished)

In [8]:
dim_customers_df = customers_df.select(
    col("CustomerID").alias("customer_id"),
    col("FirstName").alias("first_name"),
    col("LastName").alias("last_name"),
    col("Email").alias("email"),
    col("PhoneNumber").alias("phone_number"),
    col("Address").alias("address"),
    col("City").alias("city"),
    col("State").alias("state"),
    col("Country").alias("country"),
    col("ZipCode").alias("zip_code"),
    col("SignupDate").alias("signup_date")
)

dim_customers_df.write.format("delta").mode("overwrite").save(f"{output_base_path}dim_customers/")


StatementMeta(gdssparkpool, 6, 9, Finished, Available, Finished)

In [9]:
dim_accounts_df = accounts_df.select(
    col("AccountID").alias("account_id"),
    col("AccountType").alias("account_type"),
    col("Balance").alias("balance"),
    col("OpenDate").alias("open_date"),
    col("AccountAgeYears").alias("account_age_years")
)

dim_accounts_df.write.format("delta").mode("overwrite").save(f"{output_base_path}dim_accounts/")


StatementMeta(gdssparkpool, 6, 10, Finished, Available, Finished)

In [10]:
dim_loans_df = loans_df.select(
    col("LoanID").alias("loan_id"),
    col("LoanType").alias("loan_type"),
    col("LoanAmount").alias("loan_amount"),
    col("InterestRate").alias("interest_rate"),
    col("LoanStartDate").alias("loan_start_date"),
    col("LoanEndDate").alias("loan_end_date"),
    col("TotalInterest").alias("total_interest"),
    col("LoanDurationYears").alias("loan_duration_years")
)

dim_loans_df.write.format("delta").mode("overwrite").save(f"{output_base_path}dim_loans/")


StatementMeta(gdssparkpool, 6, 11, Finished, Available, Finished)

In [11]:
fact_payments_df = payments_df \
    .join(loans_df.select("LoanID", "CustomerID"), "LoanID") \
    .select(
        col("PaymentID").alias("payment_id"),
        col("LoanID").alias("loan_id"),
        col("CustomerID").alias("customer_id"),
        col("PaymentDate").alias("payment_date"),
        col("PaymentAmount").alias("payment_amount"),
        col("PaymentMethod").alias("payment_method")
    )

fact_payments_df.write.format("delta").mode("overwrite").save(f"{output_base_path}fact_payments/")


StatementMeta(gdssparkpool, 6, 12, Finished, Available, Finished)

In [12]:
fact_transactions_df = transactions_df \
    .join(accounts_df.select("AccountID", "CustomerID"), "AccountID") \
    .select(
        col("TransactionID").alias("transaction_id"),
        col("AccountID").alias("account_id"),
        col("CustomerID").alias("customer_id"),
        col("TransactionDate").alias("transaction_date"),
        col("Amount").alias("amount"),
        col("TransactionType").alias("transaction_type"),
        col("Description").alias("description")
    )

fact_transactions_df.write.format("delta").mode("overwrite").save(f"{output_base_path}fact_transactions/")


StatementMeta(gdssparkpool, 6, 13, Finished, Available, Finished)