In [0]:
%sql SHOW CATALOGS;

In [0]:
%sql
use CATALOG glk_dbx_gcp_ext_catalog;

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS creditcard_schema
COMMENT "Schema for credit card ETL practice"
MANAGED LOCATION 'gs://databricks-glk-dbx-ext-storage/creditcard/';

In [0]:
%sql
CREATE TABLE creditcard_schema.creditcard_bronze
USING CSV
OPTIONS (
  path 'gs://databricks-glk-dbx-ext-storage/creditcard/creditcard.csv',
  header 'true',
  inferSchema 'true'
);

In [0]:
%sql
SELECT * FROM creditcard_schema.creditcard_bronze LIMIT 5;

In [0]:
from pyspark.sql.functions import col

df_bronze = spark.table("creditcard_schema.creditcard_bronze")

df_silver = df_bronze.dropDuplicates().fillna(0) \
    .withColumn("Amount", col("Amount").cast("double")) \
    .withColumn("Class", col("Class").cast("int"))

df_silver.show(5)


In [0]:
#Save Silver as Delta table:

df_silver.write.format("delta").mode("overwrite").save("gs://databricks-glk-dbx-ext-storage/creditcard/silver_creditcard")

In [0]:
%sql
--Register Silver table in Databricks:

CREATE TABLE creditcard_schema.creditcard_silver
using delta
location "gs://databricks-glk-dbx-ext-storage/creditcard/silver_creditcard";

In [0]:
%sql
CREATE OR REPLACE TABLE creditcard_schema.gold_fraud_amount_bucket
USING DELTA
AS
SELECT
  CASE 
    WHEN Amount < 50 THEN '<50'
    WHEN Amount < 100 THEN '50-100'
    WHEN Amount < 500 THEN '100-500'
    WHEN Amount < 1000 THEN '500-1000'
    ELSE '>1000'
  END AS amount_bucket,
  COUNT(*) AS total_txns,
  SUM(Class) AS fraud_txns,
  ROUND(SUM(Class)*100.0/COUNT(*),2) AS fraud_percentage
FROM creditcard_schema.creditcard_silver
GROUP BY 
  CASE 
    WHEN Amount < 50 THEN '<50'
    WHEN Amount < 100 THEN '50-100'
    WHEN Amount < 500 THEN '100-500'
    WHEN Amount < 1000 THEN '500-1000'
    ELSE '>1000'
  END
ORDER BY amount_bucket;

In [0]:
%sql
select * from creditcard_schema.gold_fraud_amount_bucket order by amount_bucket asc;

In [0]:
%sql
CREATE OR REPLACE TABLE creditcard_schema.gold_top_risk_txns
USING DELTA
AS
SELECT *
FROM creditcard_schema.creditcard_silver
WHERE Class = 1
ORDER BY Amount DESC
LIMIT 10;

In [0]:
%sql
CREATE OR REPLACE TABLE creditcard_schema.gold_fraud_hourly
USING DELTA
AS
SELECT
  CAST(FLOOR(Time/3600) AS INT) AS hour_bucket,
  COUNT(*) AS total_txns,
  SUM(Class) AS fraud_txns,
  ROUND(SUM(Class)*100.0/COUNT(*),2) AS fraud_percentage
FROM creditcard_schema.creditcard_silver
GROUP BY CAST(FLOOR(Time/3600) AS INT)
ORDER BY hour_bucket;

In [0]:
%sql
select * from creditcard_schema.gold_fraud_hourly;

In [0]:
%sql
CREATE OR REPLACE TABLE creditcard_schema.gold_fraud_summary
USING DELTA
AS
SELECT
  Class,
  COUNT(*) AS total_txns,
  SUM(Amount) AS total_amount,
  AVG(Amount) AS avg_amount,
  MIN(Amount) AS min_amount,
  MAX(Amount) AS max_amount
FROM creditcard_schema.creditcard_silver
GROUP BY Class;

In [0]:
%sql
select * from creditcard_schema.gold_fraud_summary;

In [0]:
from pyspark.sql.functions import col

df_silver = spark.table("creditcard_schema.creditcard_silver")

print(df_silver.stat.corr("Amount", "Class"))