In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, regexp_extract
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType, LongType
import urllib

In [0]:
# Initialize Spark Session
spark = SparkSession.builder.appName("Silver Layer").getOrCreate()

In [0]:
# ACCESS_KEY=dbutils.secrets.get("aws", "aws_access_key")
# SECRET_KEY= dbutils.secrets.get("aws", "aws_secret_access_key")

In [0]:
# ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
# AWS_S3_BUCKET = "frauddetection-etl"

# # Mount name for the bucket
# MOUNT_NAME = "/mnt/frauddetection-etl"
# # Source url
# SOURCE_URL = "s3n://{0}:{1}@{2}".format(ACCESS_KEY, ENCODED_SECRET_KEY, AWS_S3_BUCKET)
# # Mount the drive
# dbutils.fs.mount(SOURCE_URL, MOUNT_NAME)

In [0]:
# Set silver path
SILVER_PATH = "/mnt/frauddetection-etl/silver"

In [0]:
# Read the transactions data from the external table
transactions_df = spark.sql("SELECT * FROM bronze.transactions")

# Read the fraud reports data from the external table
fraud_reports_df = spark.sql("SELECT * FROM bronze.fraud_reports")

In [0]:
# Define schemas
customer_schema = StructType([
    StructField("firstname", StringType(), True),
    StructField("lastname", StringType(), True),
    StructField("email", StringType(), True),
    StructField("address", StringType(), True),
    StructField("country", StringType(), True),
    StructField("last_country_logged", StringType(), True),
    StructField("creation_date", StringType(), True),
    StructField("last_activity_date", StringType(), True),
    StructField("age_group", StringType(), True),
    StructField("id", StringType(), True)
])

country_schema = StructType([
    StructField("country", StringType(), True),
    StructField("alpha2_code", StringType(), True),
    StructField("alpha3_code", StringType(), True),
    StructField("numeric_code", LongType(), True),
    StructField("lat_avg", FloatType(), True),
    StructField("long_avg", FloatType(), True)
])

In [0]:
# Reading banking and Country data
banking_customers_df = spark.read.format("csv").schema(customer_schema).load('/dbdemos/fsi/fraud-detection/customers', header=True, multiLine=True)
country_coordinates_df = spark.read.format("csv").schema(country_schema).load("/dbdemos/fsi/fraud-detection/country_code")

In [0]:
banking_customers_df.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- email: string (nullable = true)
 |-- address: string (nullable = true)
 |-- country: string (nullable = true)
 |-- last_country_logged: string (nullable = true)
 |-- creation_date: string (nullable = true)
 |-- last_activity_date: string (nullable = true)
 |-- age_group: string (nullable = true)
 |-- id: string (nullable = true)



In [0]:
# Data transformations and cleaning

# Clean fraud data
fraud_reports_df = fraud_reports_df.withColumn("cleaned_id", regexp_extract(col("id"), r"([A-Z]{2}\s\d+)", 0))

# Drop the old 'id' column and rename 'cleaned_id' to 'id'
fraud_reports_df = fraud_reports_df.drop("id").withColumnRenamed("cleaned_id", "id")

# Filter to keep only rows where 'id' is not null or empty
fraud_reports_df = fraud_reports_df.filter(col("id")!= "")

# Join Fraud and Transactions 
transactions_df = transactions_df.join(fraud_reports_df, "id", "left")

# Cleaning Transaction
transactions_df = transactions_df.withColumn("amount", col("amount").cast("float"))

cleaned_transactions = transactions_df.withColumn("countryOrig", regexp_replace("countryOrig", "--", "")) \
    .withColumn("countryDest", regexp_replace("countryDest", "--", "")) \
    .dropna(subset=["customer_id"])

# Clean customers data
cleaned_banking_customers = banking_customers_df.dropna(subset=["email", "id"])

# Clean country coordinates
cleaned_country_coordinates = country_coordinates_df \
    .withColumn("lat_avg", col("lat_avg").cast("float")) \
    .withColumn("long_avg", col("long_avg").cast("float"))

In [0]:
# Save cleaned data to Silver layer and create external tables#
cleaned_transactions.write.format("delta").mode("append").save(f"{SILVER_PATH}/transactions")

banking_customers_df.write.format("delta").mode("append").save(f"{SILVER_PATH}/banking_customers")

country_coordinates_df.write.format("delta").mode("append").save(f"{SILVER_PATH}/country_coordinates")

# Register external tables
spark.sql("CREATE SCHEMA IF NOT EXISTS silver")

# Set the current schema
spark.sql("USE silver")

# Register the external tables
spark.sql(f"""
CREATE EXTERNAL TABLE IF NOT EXISTS transactions
USING delta
LOCATION 's3://frauddetection-etl/silver/transactions'
""")

spark.sql(f"""
CREATE EXTERNAL TABLE IF NOT EXISTS banking_customers
USING delta
LOCATION 's3://frauddetection-etl/silver/banking_customers'
""")

spark.sql(f"""
CREATE EXTERNAL TABLE IF NOT EXISTS country_coordinates
USING delta
LOCATION 's3://frauddetection-etl/silver/country_coordinates'
""")

DataFrame[]

In [0]:
# Showing written data
transactions_df_output = spark.sql("SELECT * FROM silver.transactions")
transactions_df_output.show()

+--------------------+---------+-----------+-----------+--------------------+-----------------------+------------+-----------+--------------+--------------+--------------+--------------+----+--------+--------+
|                  id|   amount|countryDest|countryOrig|         customer_id|isUnauthorizedOverdraft|    nameDest|   nameOrig|newBalanceDest|newBalanceOrig|oldBalanceDest|oldBalanceOrig|step|    type|is_fraud|
+--------------------+---------+-----------+-----------+--------------------+-----------------------+------------+-----------+--------------+--------------+--------------+--------------+----+--------+--------+
|9db8db71-126b-4fe...| 10452.98|        REU|        REU|a0e67d95-b19d-487...|                      0| M3479766470|C9695372314|     363539.07|      67009.83|     353086.09|      77462.82| 146| PAYMENT|    NULL|
|30f56e89-0e4e-4eb...| 11230.51|        BRA|        TUR|0796aa96-5c08-4d3...|                      0| M9181931898|C2700694787|    1314521.73|    1973961.84|    

In [0]:
# Showing written data
banking_customers_df_output = spark.sql("SELECT * FROM silver.banking_customers")
banking_customers_df_output.show()

+---------+--------+--------------------+--------------------+-------+-------------------+-------------------+-------------------+---------+--------------------+
|firstname|lastname|               email|             address|country|last_country_logged|      creation_date| last_activity_date|age_group|                  id|
+---------+--------+--------------------+--------------------+-------+-------------------+-------------------+-------------------+---------+--------------------+
| Jonathan|   Scott|jessica82@mitchel...|553 James Fork Su...|    POL|                POL|12-30-2021 00:00:00|03-01-2023 11:52:05|      5.0|f3ec479d-f416-4ef...|
|    James|  Harris|james86@atkinson.com|3632 Debra Inlet\...|    GNB|                GNB|07-01-2021 00:00:00|03-02-2023 03:16:40|      5.0|ad794800-ac9a-49d...|
|   Ronald| Simmons|stevensjulie@good...|86754 Yesenia Inl...|    JPN|                JPN|04-04-2022 00:00:00|03-01-2023 17:17:08|      5.0|b0e7c97f-b6dc-4cc...|
|    Wendy|   Cowan|dcarrill

In [0]:
# Showing written data
country_coordinates_df_output = spark.sql("SELECT * FROM silver.country_coordinates")
country_coordinates_df_output.show()

+--------------------+-----------+-----------+------------+--------+--------+
|             country|alpha2_code|alpha3_code|numeric_code| lat_avg|long_avg|
+--------------------+-----------+-----------+------------+--------+--------+
|             country|alpha2_code|alpha3_code|        NULL|    NULL|    NULL|
|               Aruba|         AW|        ABW|         533|    12.5|-69.9667|
|         Afghanistan|         AF|        AFG|           4|    33.0|    65.0|
|              Angola|         AO|        AGO|          24|   -12.5|    18.5|
|            Anguilla|         AI|        AIA|         660|   18.25|-63.1667|
|             Albania|         AL|        ALB|           8|    41.0|    20.0|
|             Andorra|         AD|        AND|          20|    42.5|     1.6|
|Netherlands Antilles|         AN|        ANT|         530|   12.25|  -68.75|
|United Arab Emirates|         AE|        ARE|         784|    24.0|    54.0|
|           Argentina|         AR|        ARG|          32|   -3

In [0]:
# spark.sql("DROP SCHEMA IF EXISTS silver CASCADE")

DataFrame[]

In [0]:
# dbutils.fs.unmount("/mnt/frauddetection-etl")