In [0]:
%sql
CREATE TABLE IF NOT EXISTS metadata_table (
    workflow_id STRING,
    rule_id STRING,
    input_dataset STRING,
    output_dataset STRING,
    filter_condition STRING
)
USING DELTA;

In [0]:
%sql
-- alter table metadata_table drop column created_at;
ALTER TABLE metadata_table SET TBLPROPERTIES (
   'delta.columnMapping.mode' = 'name',
   'delta.minReaderVersion' = '2',
   'delta.minWriterVersion' = '5')

# Selection

### 1st Rule

In [0]:
%sql
INSERT INTO metadata_table 
VALUES (
  "wf01-01", 
  "001",
  "Base.csv",
  "bank_fraud",
  "payment_type!='AA'"
);

num_affected_rows,num_inserted_rows
1,1


### 2nd Rule

In [0]:
%sql
INSERT INTO metadata_table 
VALUES (
  'wf01-02', 
  '002',
  'bank_fraud',
  'bank_fraud',
  "customer_age BETWEEN 10 AND 65"
);


num_affected_rows,num_inserted_rows
1,1


### 3rd Rule

In [0]:
%sql
INSERT INTO metadata_table 
VALUES (
  'wf01-03', 
  '003',
  'bank_fraud',
  'bank_fraud',
  "days_since_request <= 70"
);


num_affected_rows,num_inserted_rows
1,1


# Validation

In [0]:
%sql
INSERT INTO metadata_table 
VALUES (
  'wf02-01', 
  '001',
  'bank_fraud',
  'bank_fraud',
  "fraud_bool = COALESCE(fraud_bool, 1)"
);


num_affected_rows,num_inserted_rows
1,1


# Transformation

In [0]:
%sql
-- INSERT INTO metadata_table 
-- VALUES (
--   'wf03-01', 
--   '001',
--   'bank_fraud',
--   'bank_fraud',
--   "CASE WHEN income >= 0.1 AND income < 0.4 THEN 'Low' 
--         WHEN income >= 0.4 AND income < 0.7 THEN 'Average' 
--         WHEN income >= 0.7 AND income <= 0.9 THEN 'High' 
--         ELSE 'Unknown' 
--    END",
--    NULL
-- );
ALTER TABLE metadata_table
ADD COLUMN income_category STRING;

INSERT INTO metadata_table 
VALUES (
  'wf03-01', 
  '001',
  'bank_fraud',
  'bank_fraud',
  "CASE WHEN income >= 0.1 AND income < 0.4 THEN 'Low' 
        WHEN income >= 0.4 AND income < 0.7 THEN 'Average' 
        WHEN income >= 0.7 AND income <= 0.9 THEN 'High' 
        ELSE 'Unknown' 
   END",
   'income_category'
);



num_affected_rows,num_inserted_rows
1,1


In [0]:
query = "SELECT * FROM metadata_table"
metadata_df = spark.sql(query)
print(metadata_df.show())

+-----------+-------+-------------+--------------+--------------------+---------------+
|workflow_id|rule_id|input_dataset|output_dataset|    filter_condition|income_category|
+-----------+-------+-------------+--------------+--------------------+---------------+
|    wf03-01|    001|   bank_fraud|    bank_fraud|CASE WHEN income ...|income_category|
|    wf02-01|    001|   bank_fraud|    bank_fraud|fraud_bool = COAL...|           NULL|
|    wf01-02|    002|   bank_fraud|    bank_fraud|customer_age BETW...|           NULL|
|    wf01-03|    003|   bank_fraud|    bank_fraud|days_since_reques...|           NULL|
|    wf01-01|    001|     Base.csv|    bank_fraud|  payment_type!='AA'|           NULL|
+-----------+-------+-------------+--------------+--------------------+---------------+

None


In [0]:
%sql
select count(*) from bank_fraud;

count(1)
734093
