# Data Engineering - Demo
## Build a Lakehouse

- John is a Data Engineer working for.
- He’s been asked to build a dataset analysts to help them manage their **3 million** customer’s aged debt.
- John has decided to use a **Lakehouse** pattern as this will give him the most flexibility in the future as all of the RAW data will be in one place.

### Lakehouse on Snowflake
✅ Snowflake has infinite **storage capacity** and so will never run out of storage.

✅ All data is stored **COMPRESSED** (by up to 8-9 times) and **ENCRYPTED** by default.

✅ Snowflake can work with structured, semi-structured and unstructured data.

🥉🥉🥉🎬 Let's build the **BRONZE** layer which will hold all of the **raw** data as is.

In [None]:
USE ROLE ACCOUNTADMIN;
GRANT EXECUTE TASK, EXECUTE MANAGED TASK ON ACCOUNT TO ROLE SYSADMIN;

-- Recreate all the base objects needed.
use role SYSADMIN;

-- Database and schemas.
use warehouse COMPUTE_WH;
create or replace database ENERGY_RETAILER;
create or replace schema ENERGY_RETAILER.GOLD;
create or replace schema ENERGY_RETAILER.SILVER;
create or replace schema ENERGY_RETAILER.BRONZE;

use schema ENERGY_RETAILER.BRONZE;
create or replace stage ENERGY_RETAILER.BRONZE.RAW_DATA_STAGING;



-- Create a Warehouses
create or replace warehouse SYS_DATA_ORCHESTRATION_WH
    warehouse_size = 'XSMALL'
    initially_suspended = true
;

create warehouse if not exists COMPUTE_WH
    warehouse_size = 'XSMALL'
    initially_suspended = true
;



-- Get ready.
use role SYSADMIN;
use warehouse COMPUTE_WH;
use schema ENERGY_RETAILER.BRONZE;

In [None]:
-- Create a table to hold the customer details.
create or replace transient table BRONZE.CUSTOMER(
    C_CUSTKEY number(38,0) not null,
    C_NAME varchar(255),
    C_MPAN varchar(255),
    C_ADDRESS varchar(255),
    C_PHONE varchar(255),
    C_COMMENT varchar(255)
);



-- Create a table to hold the charges a customer will incur for electricity.
create or replace transient table BRONZE.CHARGE(
    CHARGE_JSON_DATA variant
);



-- Create a table to hold the payments a customer will make against the charges.
create or replace transient table BRONZE.PAYMENT(
    PAYMENT_JSON_DATA variant
);



-- Create a Stream on the customer table so we can process only what we need to easily.
create or replace stream BRONZE.CUSTOMER_STREAM on table BRONZE.CUSTOMER;

🥈🥈🥈 🎬 Let's build the SILVER layer which will hold all of the **clean data**. This could be used by for example by power users who know what they are looking for.

In [None]:
-- Create a table to hold the clean customer details, this will be populated using
-- a MERGE INTO statement so we have a "current view" of the data as well as a full 
-- history of every change made. We're using this approach in case we need to do
-- any data cleansing since this is coming from a CRM tool where significant 
-- cleansing may be needed.
create or replace table SILVER.CUSTOMER(
    C_CUSTKEY number(38,0) not null,
    C_NAME varchar(255),
    C_MPAN varchar(255),
    C_ADDRESS varchar(255),
    C_PHONE varchar(255),
    C_COMMENT varchar(255)
);

In [None]:
-- For the charges we're taking a slightly different approach, in this case the data
-- is coming from automated systems so data quality will be good, we may want to add
-- to it to make it more human readable though, so we can just put a view on top.

-- 💡 Even though the data is in JSON format, Snowflake still gathers metadata to 
--    improve query performance and other functions such as clustering, etc. are
--    still supported.
create or replace view SILVER.CHARGE as
    select 
        CHARGE_JSON_DATA:amount::number(30,2) as CH_AMOUNT,
        CHARGE_JSON_DATA:charge_id::varchar as CH_CHARGE_ID,
        CHARGE_JSON_DATA:charge_timestamp::timestamp_ntz as CH_TIMESTAMP,
        CHARGE_JSON_DATA:charge_type::varchar as CH_CHARGE_TYPE,
        CHARGE_JSON_DATA:cust_id::varchar as CH_CUSTKEY,
        CHARGE_JSON_DATA:mpan::varchar as CH_MPAN,
        CHARGE_JSON_DATA:mpid::varchar as CH_MPID,
        decode( CHARGE_JSON_DATA:mpid, 
            'EELC', 'UK Power Networks - Eastern England',
            'EMEB', 'National Grid - East Midlands	',
            'LOND', 'UK Power Networks - London',
            'MANW', 'SP Energy Networks - Merseyside and Northern Wales',
            'MIDE', 'National Grid - West Midlands'
        ) as CH_MARKET_PARTICIPANT,
        CHARGE_JSON_DATA as CH_RAW,
        
    from BRONZE.CHARGE
;

In [None]:
-- We'll do the same for payments...
-- 👀 Notice the DECODE function, similar to CASE WHEN, but for equality cases
create or replace view SILVER.PAYMENT as
    select 
        PAYMENT_JSON_DATA:amount::number(30,2) as P_AMOUNT,
        PAYMENT_JSON_DATA:cust_id::varchar as P_CUSTKEY,
        PAYMENT_JSON_DATA:mpan::varchar as P_MPAN,
        PAYMENT_JSON_DATA:payment_id::varchar as P_PAYMENT_ID,
        PAYMENT_JSON_DATA:payment_timestamp::timestamp_ntz as P_TIMESTAMP,
        PAYMENT_JSON_DATA:payment_type::varchar as P_PAYMENT_TYPE,
        PAYMENT_JSON_DATA:mpid::varchar as P_MPID,
        decode( PAYMENT_JSON_DATA:mpid, 
            'EELC', 'UK Power Networks - Eastern England',
            'EMEB', 'National Grid - East Midlands	',
            'LOND', 'UK Power Networks - London',
            'MANW', 'SP Energy Networks - Merseyside and Northern Wales',
            'MIDE', 'National Grid - West Midlands'
        ) as P_MARKET_PARTICIPANT,
        PAYMENT_JSON_DATA as P_RAW,
        
    from ENERGY_RETAILER.BRONZE.PAYMENT
;

🥇🥇🥇🎬 Finally we'll build a **GOLD** layer which will hold business ready data. This means we need to go one step further than simply providing a set of tables and views for Scottish Power's collections team to use.

We'll use a CTE to create a a **view** which will consolidate all the customer, charges and payments data - maintaining charges and payments in chronological order

In [None]:
-- We'll use a CTE to create a a view which will bring all the customer, charges
-- and payments data into one chronological place.
create or replace secure view GOLD.CUSTOMER_STORY
    change_tracking = true
    as
    with CUSTOMER_CHARGE as (
    
        select 
            CUSTOMER.*,
            'Charge' as TYPE,
            CH_CHARGE_TYPE as SUBTYPE,
            CH_CHARGE_ID as ID,
            CH_AMOUNT as AMOUNT,
            CH_TIMESTAMP as EVENT_DATETIME,
            CH_MPAN as MPAN,
            CH_MPID as MPID,
            CH_MARKET_PARTICIPANT as MARKET_PARTICIPANT
            
        from SILVER.CUSTOMER
        inner join SILVER.CHARGE on C_CUSTKEY = CH_CUSTKEY
        
    )
    , CUSTOMER_PAYMENT as (
    
        select
            CUSTOMER.*,
            'Payment',
            P_PAYMENT_TYPE,
            P_PAYMENT_ID,
            P_AMOUNT,
            P_TIMESTAMP,
            P_MPAN,
            P_MPID,
            P_MARKET_PARTICIPANT
            
        from SILVER.CUSTOMER
        inner join SILVER.PAYMENT on C_CUSTKEY = P_CUSTKEY
    
    )
    select * from CUSTOMER_CHARGE
    union all
    select * from CUSTOMER_PAYMENT
;


-- Create a table 
create or replace dynamic table GOLD.ACCT_BALANCES
    target_lag = '1 minute'
    warehouse = SYS_DATA_ORCHESTRATION_WH
    refresh_mode = incremental
    initialize = on_create
    as
    
    select 
        C_CUSTKEY,
        C_NAME,
        MPID,
        MARKET_PARTICIPANT,
        sum( AMOUNT ) as ACT_BALANCE
        
    from GOLD.CUSTOMER_STORY
    
    group by all
;

### ⏸️ To recap...

💬 We've just built the data model for our Data Lakehouse.

💡 Everything we've just done manually can be integrated into a CI/CD pipeline and completely automated. Many customer use Terraform, Jenkins, GitLab, GitHub Copilot, Bamboo, Microsoft Azure DevOps and others.

📺 Let's take a look in the Object Browser and see what we've created.

This Task will take the raw data in the 🥉BRONZE🥉 layer and merge it into the data held in the 🥈SILVER🥈 layer. This is a Triggered Task which means it will only run when new data has arrived.

💡 If no data is in the stream, Snowflake skips the run WITHOUT using compute resources.

💡 The lowest time interval Triggered Tasks can be set to run is every 10 seconds.

In [None]:
-- UPSERT logic

create or replace task CUSTOMER_UPDATE_TASK
    warehouse = 'SYS_DATA_ORCHESTRATION_WH'
    user_task_minimum_trigger_interval_in_seconds = 15
    when
    system$stream_has_data( 'BRONZE.CUSTOMER_STREAM' )
    as

    merge into SILVER.CUSTOMER as TGT using (
    
        select *
        from BRONZE.CUSTOMER_STREAM
        where METADATA$ACTION = 'INSERT'
    
    ) as SRC
    
        on TGT.C_CUSTKEY = SRC.C_CUSTKEY
        
    when matched then update set
        TGT.C_NAME = SRC.C_NAME,
        TGT.C_MPAN = SRC.C_MPAN,
        TGT.C_ADDRESS = SRC.C_ADDRESS,
        TGT.C_PHONE = SRC.C_PHONE,
        TGT.C_COMMENT = SRC.C_COMMENT
        
    when not matched then insert (
        C_CUSTKEY,
        C_NAME,
        C_MPAN,
        C_ADDRESS,
        C_PHONE,
        C_COMMENT
        
    ) values (
        SRC.C_CUSTKEY,
        SRC.C_NAME,
        SRC.C_MPAN,
        SRC.C_ADDRESS,
        SRC.C_PHONE,
        SRC.C_COMMENT
    )
;

🎬 Now lets start populating our Lakehouse with data! We'll be cheating a little as I don't have a CRM or energy billing and payment systems handy to integrate with, so we're going to generate the data instead.

These 2 Tasks will simulate new JSON data arriving in the Stage, ready to be ingested, by generating some JSON files and saving them into a Stage.

💡 There are various ways you can ensure a Task will only run on certain conditions or other Task(s) have completed. We only want this to run after we've got new customer data.

- Load fake data for **CHARGES** to stage (uses `C_CUSTKEY` from CUSTOMER silver table)
- Load fake data for **PAYMENTS** to stage (uses `C_CUSTKEY` from CUSTOMER silver table)
- **CUSTOMER** data taken from SNOWFLAKE_SAMPLE_DATE

In [None]:
create or replace task BRONZE.CHARGE_DATA_GENERATION_TASK
after BRONZE.CUSTOMER_UPDATE_TASK
as
    copy into @RAW_DATA_STAGING/chg from (
    
        select
            object_construct(
                'cust_id', C_CUSTKEY,
                'mpan', C_MPAN,
                'mpid', decode( uniform( 1, 5, random() ), 
                    1, 'EELC', 
                    2, 'EMEB',
                    3, 'LOND',
                    4, 'MANW',
                    5, 'MIDE'
                ),
                'amount', round( uniform( 20.01, 299.99, random() ), 2 ) * -1,
                'charge_type', decode( uniform( 1, 4, random() ), 
                    1, 'Electricity', 
                    2, 'Electricity',
                    3, 'Electricity',
                    4, 'Correction'
                ),
                'charge_timestamp', dateadd( day, uniform( 0, 180, random() ) * -1, current_timestamp() ),
                'charge_id', uniform( 100000000, 999999999, random() )
            )
        
        from SILVER.CUSTOMER sample(75)
        
    )
    overwrite = true
;

-- Let's take the same approach for Payments, generate the data...
create or replace task BRONZE.PAYMENT_DATA_GENERATION_TASK
after BRONZE.CUSTOMER_UPDATE_TASK
as
    copy into @RAW_DATA_STAGING/pay from (
    
        select
            object_construct(
                'amount', round( uniform( 10.01, 150.99, random() ), 2 ),
                'cust_id', C_CUSTKEY,
                'mpan', C_MPAN,
                'mpid', decode( uniform( 1, 5, random() ), 
                    1, 'EELC', 
                    2, 'EMEB',
                    3, 'LOND',
                    4, 'MANW',
                    5, 'MIDE'
                ),
                'payment_type', decode( uniform( 1, 2, random() ), 
                    1, 'Direct Debit', 
                    2, 'Credit/Debit Card'
                ),
                'payment_timestamp', dateadd( day, uniform( 0, 30, random() ) * -1, current_timestamp() ),
                'payment_id', uniform( 100000000, 999999999, random() )
            )
        
        from ENERGY_RETAILER.SILVER.CUSTOMER sample(60)
    
    )
    overwrite = true
;

In [None]:
-- Now create tasks to load the data.
create or replace task BRONZE.CHARGE_INGEST_TASK
after BRONZE.CHARGE_DATA_GENERATION_TASK
as
    copy into BRONZE.CHARGE
    from @RAW_DATA_STAGING
    pattern = '.*chg.*'
;


-- Now load it into a table.
create or replace task PAYMENT_INGEST_TASK
after BRONZE.PAYMENT_DATA_GENERATION_TASK
as
    copy into BRONZE.PAYMENT
    from @RAW_DATA_STAGING
    pattern = '.*pay.*'
;

In [None]:
-- All we need to do now is kick the Tasks off and we'll be ready to test.
alter task BRONZE.CHARGE_DATA_GENERATION_TASK resume;
alter task BRONZE.CHARGE_INGEST_TASK resume;
alter task BRONZE.PAYMENT_DATA_GENERATION_TASK resume;
alter task BRONZE.PAYMENT_INGEST_TASK resume;
alter task BRONZE.CUSTOMER_UPDATE_TASK resume;

show tasks in database;

-- will be SKIPPED, until customer stream has data

In [None]:
// 🎬 Now lets do an end-to-end test.
// --------------------------------------------------------------------------------


-- Use COPY to generate sample data to put into the Stage ready to be loaded.
copy into @RAW_DATA_STAGING/cust from (
    select C_CUSTKEY, C_NAME, to_char( uniform( 10000000, 99999999, random() ) ), C_ADDRESS, C_PHONE, C_COMMENT
    from SNOWFLAKE_SAMPLE_DATA.TPCH_SF10.CUSTOMER
    sample(25)
)
overwrite = true
;

In [None]:
-- Load it into our Bronze Customer table (matched on column ordering here)
copy into BRONZE.CUSTOMER
from @RAW_DATA_STAGING
pattern = '.*cust.*';

In [None]:
select '0. Bronze CUSTOMER Stream Records' as DESCRIPTION, count(*) as RECORD_COUNT from BRONZE.CUSTOMER_STREAM
union all
select '1. Bronze CUSTOMER Records' as DESCRIPTION, count(*) as RECORD_COUNT from BRONZE.CUSTOMER
union all
select '2. Bronze CHARGE Records' as DESCRIPTION, count(*) as RECORD_COUNT from BRONZE.CHARGE
union all
select '3. Bronze PAYMENT Records' as DESCRIPTION, count(*) as RECORD_COUNT from SILVER.PAYMENT
union all
select '4. Silver CUSTOMER Records' as DESCRIPTION, count(*) as RECORD_COUNT from SILVER.CUSTOMER
union all
select '5. Silver CHARGE Records' as DESCRIPTION, count(*) as RECORD_COUNT from SILVER.CHARGE
union all
select '6. Silver PAYMENT Records' as DESCRIPTION, count(*) as RECORD_COUNT from SILVER.PAYMENT
union all
select '7. Gold STORY Records' as DESCRIPTION, count(*) as RECORD_COUNT from GOLD.CUSTOMER_STORY
union all
select '8. Gold ACC_BALANCES Records' as DESCRIPTION, count(*) as RECORD_COUNT from GOLD.ACCT_BALANCES

order by 1 asc
;

In [None]:
select C_CUSTKEY, count(*)
from BRONZE.CUSTOMER
group by all
order by 2 desc
;

In [None]:
-- Structured data in BRONZE.CUSTOMER 🥉
select * from BRONZE.CUSTOMER limit 10;

In [None]:
-- Semi-Structured data in BRONZE.CHARGE🥉
select * from BRONZE.CHARGE limit 10;

In [None]:
-- Flattened Semi-Structured data in SILVER.CHARGE
select * from SILVER.CHARGE limit 10;

In [None]:
-- Everything combined into a single view in GOLD.CUSTOMER_STORY
select * from GOLD.CUSTOMER_STORY limit 10;

In [None]:
-- Summarised information in GOLD.ACCT_BALANCES
select * from GOLD.ACCT_BALANCES limit 10;

In [None]:
-- But importantly, all of Snowflake features, like clustering, still work.

-- Remeber that ACCT_BALANCES was our dyanamic table!

SELECT SYSTEM$CLUSTERING_INFORMATION( 'GOLD.ACCT_BALANCES', '(MPID)' );

📺 But hang on, we've gone through a lot of code, where did the field MPID come from?
Lets take a look in Snowsight to see what we can see:

    ✅ Lineage
    
    ✅ AI generated column descriptions
    
    ✅ Monitoring Tasks
    
    ✅ Monitoring Dynamic Tables

In [None]:
-- Demo End
alter task BRONZE.CUSTOMER_UPDATE_TASK suspend;
alter task BRONZE.CHARGE_DATA_GENERATION_TASK suspend;
alter task BRONZE.CHARGE_INGEST_TASK suspend;
alter task BRONZE.PAYMENT_DATA_GENERATION_TASK suspend;
alter task BRONZE.PAYMENT_INGEST_TASK suspend;
alter dynamic table GOLD.ACCT_BALANCES suspend;

-- Drop the objects.
drop database ENERGY_RETAILER;