## Account Dataframe

In [0]:
from pyspark.sql.functions import *

In [0]:
Account_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://broze@sabankingkd.dfs.core.windows.net/account/account.csv")


In [0]:
Final_Account_df = Account_df.select("account_id", "district_id", "frequency", col("parseddate").alias("issuance_date"))
# display(Final_Account_df)

In [0]:
account_path = "abfss://silver@sabankingkd.dfs.core.windows.net/account/"

In [0]:
def path_exists(account_path):
    try:
        dbutils.fs.ls(account_path)
        return True
    except:
        return False

In [0]:
# Incremental loading
if path_exists(account_path):
    silver_df = spark.read.format("delta").load(account_path).select("account_id")
    incremental_account_df = Final_Account_df.join(
        silver_df,
        on="account_id",
        how="left_anti"
    )
    incremental_account_df.write.format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save(account_path)
else:
    Final_Account_df.write.format("delta").mode("overwrite").save(account_path)

In [0]:
# df = spark.read.format("delta").load("abfss://silver@sabankingkd.dfs.core.windows.net/account/")
# df_count = df.count()
# print(df_count)

## Card Dataframe

In [0]:
Card_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://broze@sabankingkd.dfs.core.windows.net/card/card.csv")
drop_duplicate_card = Card_df.dropDuplicates(["card_id"])
visa_card = drop_duplicate_card.withColumn("type", when(col("type") == "VISA Infinite", "Infinite")
                                           .when(col("type") == "VISA Signature", "Signature")
                                           .when(col("type") == "VISA Standard", "Standard")
                                           .otherwise(col("type")))
Card_final_df = visa_card.select("card_id", "disp_id", "type", col("fulldate").alias("card_date"))
Card_final_df.write.format("delta").mode("overwrite").save("abfss://silver@sabankingkd.dfs.core.windows.net/card/")

In [0]:
# df = spark.read.format("delta").load("abfss://silver@sabankingkd.dfs.core.windows.net/card/")
# df_count = df.count()
# print(df_count)

### Client Dataframe

In [0]:
client_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://broze@sabankingkd.dfs.core.windows.net/client/client.csv")
client_concat_df = client_df.withColumn("fullname", concat_ws(" ", col("first"), col("middle"), col("last")))
drop_duplicate_card = client_concat_df.dropDuplicates(["client_id"])
client_final_df = drop_duplicate_card.select("client_id","fullname", "sex", col("fulldate").alias("birthdate"), "age", "social", "phone", "email", "address_1", "address_2", "city", "state", "zipcode", "district_id")

In [0]:
#  Incremental load using watermark column
client_path = "abfss://silver@sabankingkd.dfs.core.windows.net/client/"
try:
    last_client_id = spark.read.delta(client_path).selectExpr("max(client_id)").cast("string").collect()[0][0] or 0 
except:
    last_client_id = '0'
print(last_client_id)

In [0]:

def path_exists(client_path):
    try:
        dbutils.fs.ls(client_path)
        return True
    except:
        return False

In [0]:
if path_exists(client_path):
    silver_df = spark.read.format("delta").load(client_path).select("client_id")
    incremental_client_df = client_final_df.filter(col("client_id") > last_client_id)
    incremental_client_df.write.format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save(client_path)
else:
    client_final_df.write.format("delta").mode("overwrite").save(client_path)

In [0]:
# df = spark.read.format("delta").load(client_path)
# df_count = df.count()
# print(df_count)

### Position_Dataframe

In [0]:
position_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://broze@sabankingkd.dfs.core.windows.net/position/")

In [0]:
position_path = "abfss://silver@sabankingkd.dfs.core.windows.net/position/"

In [0]:
def path_exists(position_path):
    try:
        dbutils.fs.ls(position_path)
        return True
    except:
        return False

In [0]:
if path_exists(position_path):
    silver_df = spark.read.format("delta").load(position_path).select("disp_id")
    incremental_position_df = position_df.join(
        silver_df,
        on="disp_id",
        how="left_anti"
    )
    incremental_position_df.write.format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save(position_path)
else:
    position_df.write.format("delta").mode("overwrite").save(position_path)

In [0]:
# df = spark.read.format("delta").load(position_path)
# df_count = df.count()
# print(df_count)

### District dataframe

In [0]:
district_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://broze@sabankingkd.dfs.core.windows.net/district/district.csv")
district_df.write.format("delta").mode("overwrite").save("abfss://silver@sabankingkd.dfs.core.windows.net/district/")

In [0]:
# df = spark.read.format("delta").load("abfss://silver@sabankingkd.dfs.core.windows.net/district/")
# df_count = df.count()
# print(df_count)

### Loan Dataframe

In [0]:
loan_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://broze@sabankingkd.dfs.core.windows.net/loan/loan.csv")
loan_final_df = loan_df.select("loan_id", "account_id", "amount", "duration", "payments", "status", col("fulldate").cast("date").alias("loandate"),"location", "purpose")
display(loan_final_df)
# Apply SCD type 2 logic

### Order Dataframe

In [0]:
order_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://broze@sabankingkd.dfs.core.windows.net/order/order.csv")
display(order_df)
# Increment this dataset using order_id
# Check for null values

### Transactions Dataframe

In [0]:
#  use widgets to load the file by year
transactions_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://broze@sabankingkd.dfs.core.windows.net/transactions/transactions2013.csv")
transactions_final_df = transactions_df.select("trans_id", "account_id", "type", "operation", "amount", "balance", "k_symbol", "bank", "account", col("fulldate").cast("date").alias("transdate"),col("fulldatewithtime").alias("trans_datetime")).withColumn("year", year(col("transdate")))

In [0]:
transactions_path = "abfss://silver@sabankingkd.dfs.core.windows.net/transactions/"

In [0]:
def path_exists(transactions_path):
    try:
        dbutils.fs.ls(transactions_path)
        return True
    except:
        return False

In [0]:
# Incremental loading
if path_exists(transactions_path):
    silver_df = spark.read.format("delta").load(transactions_path).select("trans_id")
    incremental_transactions_df = transactions_final_df.join(
        silver_df,
        on="trans_id",
        how="left_anti"
    )
    incremental_transactions_df.write.format("delta") \
    .partitionBy("year") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save(transactions_path)
else:
    transactions_final_df.write.format("delta").partitionBy("year").mode("overwrite").save(transactions_path)

In [0]:
# df = spark.read.format("delta").load(transactions_path)
# df_count = df.count()
# print(df_count)