In [0]:
cards_data_df = spark.read.format("delta").load("/mnt/adls21s/bronze/cards_data/")
display(cards_data_df)

transactions_data_df = spark.read.format("delta").load("/mnt/adls21s/bronze/transactions_data/")
transactions_data_df.show()

users_data_df = spark.read.format("delta").load("/mnt/adls21s/bronze/users_data/")
display(users_data_df.head(5))

In [0]:
from pyspark.sql.functions import regexp_replace, col, count, when

'''Remove special characters'''
cards_data_df = cards_data_df.withColumn("credit_limit_in_$", regexp_replace('credit_limit', '\$', ''))
cards_data_df = cards_data_df.drop("credit_limit")
transactions_data_df = transactions_data_df.withColumn("amount_in_$", regexp_replace("amount", "\$", ""))
transactions_data_df = transactions_data_df.drop("amount")
users_data_df = users_data_df.withColumn("per_capita_income_in_$", regexp_replace("per_capita_income", "\$", ""))
users_data_df = users_data_df.drop("per_capita_income")
users_data_df = users_data_df.withColumn("yearly_income_in_$", regexp_replace("yearly_income", "\$", ""))
users_data_df = users_data_df.drop("yearly_income")
users_data_df = users_data_df.withColumn("total_debt_in_$", regexp_replace("total_debt", "\$", ""))
users_data_df = users_data_df.drop("total_debt")

'''Changing NULL values in transactions_data[error] column to No Errors'''
transactions_data_df = transactions_data_df.fillna({"errors": "No Errors"})

'''Checking NULL values in each dataframe'''
null_counts_cards_data_df = display(cards_data_df.select([count(when(col(c).isNull(), c)).alias(c) for c in cards_data_df.columns]))
null_counts_transactions_data_df = display(transactions_data_df.select([count(when(col(c).isNull(), c)).alias(c) for c in transactions_data_df.columns]))
null_counts_users_data_df = display(users_data_df.select([count(when(col(c).isNull(), c)).alias(c) for c in users_data_df.columns]))

'''Replacing NULL values in zip where merchant_city is Not ONLINE but zip is NULL to ABROAD'''
transactions_data_df = transactions_data_df.withColumn("zip", when((col("merchant_city") != "ONLINE")&col("zip").isNull(), "ABROAD").otherwise(col("zip")))

'''Replacing NULL values in zip where merchant_city is ONLINE and merchant_state, zip are NULL to ONLINE'''
transactions_data_df = transactions_data_df.fillna({"zip": "ONLINE", "merchant_state": "ONLINE"})

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

'''Masking card number and cvv columns'''
def mask_card_number(card):
    charlist = list(str(card))
    charlist[1:-1] = "X"*len(charlist[1:-1])
    return "".join(charlist)

def mask_cvv(cvv):
    charlist = list(str(cvv))
    charlist[:] = "X"*len(charlist)
    return "".join(charlist)

mask_card_udf = udf(mask_card_number, StringType())
mask_cvv_udf = udf(mask_cvv, StringType())

cards_data_df_masked = cards_data_df.withColumn("masked_card_number", mask_card_udf(cards_data_df["card_number"])).withColumn("masked_cvv", mask_cvv_udf(cards_data_df["cvv"]))

cards_data_df_masked.show()

'''Masking Address, latitude and longitude columns'''
def mask_address(address):
    charlist = list(address)
    charlist[1:-1] = "X"*len(charlist[1:-1])
    return "".join(charlist)

def mask_latitude(latitude):
    charlist = list(str(latitude))
    charlist[:] = "X"*len(charlist)
    return "".join(charlist)

def mask_longitude(longitude):
    charlist = list(str(longitude))
    charlist[:] = "X"*len(charlist)
    return "".join(charlist)

mask_address_udf = udf(mask_address, StringType())
mask_latitude_udf = udf(mask_latitude, StringType())
mask_longitude_udf = udf(mask_longitude, StringType())

users_data_df_masked = users_data_df.withColumn("masked_address", mask_address_udf(users_data_df["address"])).withColumn("masked_latitude", mask_latitude_udf(users_data_df["latitude"])).withColumn("masked_longitude", mask_longitude_udf(users_data_df["longitude"]))


In [0]:
from pyspark.sql.functions import date_format, year, to_timestamp, month, to_date

cards_data_df_cleaned = cards_data_df_masked.withColumn("id", col("id").cast("int"))\
                                            .withColumn("client_id", col("client_id").cast("int"))\
                                            .withColumn("card_brand", col("card_brand").cast("string"))\
                                            .withColumn("card_type", col("card_type").cast("string"))\
                                            .drop("card_number")\
                                            .withColumn("expires", date_format("expires", "MM/yyyy"))\
                                            .drop("cvv")\
                                            .withColumn("year_pin_last_changed", year(to_date(col("year_pin_last_changed"), 'yyyy')))\
                                            .withColumn("has_chip", col("has_chip").cast("string"))\
                                            .withColumn("num_cards_issued", col("num_cards_issued").cast("int"))\
                                            .withColumn("acct_open_date", date_format("acct_open_date", "MM/yyyy"))\
                                            .withColumn("card_on_dark_web", col("card_on_dark_web").cast("string"))\
                                            .withColumn("credit_limit_in_$", col("credit_limit_in_$").cast("long"))\
                                            .withColumn("masked_card_number", col("masked_card_number").cast("string"))\
                                            .withColumn("masked_cvv", col("masked_cvv").cast("string"))\
                                            .withColumn("ingested_at", to_timestamp("ingested_at", "dd/MM/yyyy HH:mm:ss"))

users_data_df_cleaned = users_data_df_masked.withColumn("id", col("id").cast("int"))\
                                            .withColumn("current_age", col("current_age").cast("int"))\
                                            .withColumn("retirement_age", col("retirement_age").cast("int"))\
                                            .withColumn("birth_year", year(to_date(col("birth_year"), 'yyyy')))\
                                            .withColumn("birth_month", month(to_date(col("birth_month"), 'MM')))\
                                            .withColumn("gender", col("gender").cast("string"))\
                                            .drop("address")\
                                            .drop("latitude")\
                                            .drop("longitude")\
                                            .withColumn("credit_score", col("credit_score").cast("int"))\
                                            .withColumn("num_credit_cards", col("num_credit_cards").cast("int"))\
                                            .withColumn("per_capita_income_in_$", col("per_capita_income_in_$").cast("long"))\
                                            .withColumn("yearly_income_in_$", col("yearly_income_in_$").cast("long"))\
                                            .withColumn("total_debt_in_$", col("total_debt_in_$").cast("long"))\
                                            .withColumn("masked_address", col("masked_address").cast("string"))\
                                            .withColumn("masked_latitude", col("masked_latitude").cast("string"))\
                                            .withColumn("masked_longitude", col("masked_longitude").cast("string"))\
                                            .withColumn("ingested_at", to_timestamp("ingested_at", "dd/MM/yyyy HH:mm:ss"))

transactions_data_df_cleaned = transactions_data_df.withColumn("id", col("id").cast("int"))\
                                                   .withColumn("client_id", col("client_id").cast("int"))\
                                                   .withColumn("card_id", col("card_id").cast("int"))\
                                                   .withColumn("use_chip", col("use_chip").cast("string"))\
                                                   .withColumn("amount_in_$", col("amount_in_$").cast("long"))\
                                                   .withColumn("merchant_id", col("merchant_id").cast("int"))\
                                                   .withColumn("merchant_city", col("merchant_city").cast("string"))\
                                                   .withColumn("merchant_state", col("merchant_state").cast("string"))\
                                                   .withColumn("zip", col("zip").cast("string"))\
                                                   .withColumn("mcc", col("mcc").cast("int"))\
                                                   .withColumn("errors", col("errors").cast("string"))\
                                                   .withColumn("ingested_at", to_timestamp("ingested_at", "dd/MM/yyyy HH:mm:ss"))

cards_data_df_cleaned.printSchema()
users_data_df_cleaned.printSchema()
transactions_data_df_cleaned.printSchema()
transactions_data_df_cleaned.show()

In [0]:
from delta.tables import DeltaTable

# Path to Delta table
users_data_delta_table_path = "/mnt/adls21s/silver/users_data/"
cards_data_delta_table_path = "/mnt/adls21s/silver/cards_data/"
transactions_data_delta_table_path = "/mnt/adls21s/silver/transactions_data/"

# Create Delta table (if not already created)
if not DeltaTable.isDeltaTable(spark, users_data_delta_table_path):
    users_data_df_cleaned.write.format("delta").mode("overwrite").save(users_data_delta_table_path)
    if not DeltaTable.isDeltaTable(spark,cards_data_delta_table_path):
        cards_data_df_cleaned.write.format("delta").mode("overwrite").save(cards_data_delta_table_path)
        if not DeltaTable.isDeltaTable(spark, transactions_data_delta_table_path):
            transactions_data_df_cleaned.write.format("delta").mode("overwrite").save(transactions_data_delta_table_path)
        else:
            pass

# Load the existing Delta table
users_data_delta_table = DeltaTable.forPath(spark, (users_data_delta_table_path))
cards_data_delta_table = DeltaTable.forPath(spark,(cards_data_delta_table_path))
transactions_data_delta_table = DeltaTable.forPath(spark,(transactions_data_delta_table_path))

# Define the condition for the merge: match based on a unique key
merge_condition = "t1.id = t2.id"

# Perform the merge (upsert operation)
users_data_delta_table.alias("t1").merge(
    users_data_df_masked.alias("t2"),
    merge_condition
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

cards_data_delta_table.alias("t1").merge(
    cards_data_df_masked.alias("t2"),
    merge_condition
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

transactions_data_delta_table.alias("t1").merge(
    transactions_data_df_cleaned.alias("t2"),
    merge_condition
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()