# Data Engineering Exercise 3:

Exercise: During the ingest there is nothing to avoid ingesting duplicate copies of the same data.  Additionally, the transform/load steps currently process the entire `RAW` table on each run.  the ML Ops team plans to use Zero Copy Cloning (ZCC) for reproducibility and to avoid changing data during model training.  ZCC loses benefits if the entire base table is overwritten.  
  
With Change Data Capture (CDC) using Snowflake Streams and Tasks we can avoid duplicates and also avoid full overwrite and upstream duplication of data in clones.

### Add Change Data Capture to the ELT Pipeline


In [None]:
import snowflake.snowpark as snp
import json
import getpass 

with open('creds.json') as f:
    data = json.load(f)
    connection_parameters = {
      'account': data['account'],
      'user': data['username'],
      'password': data['password'], #getpass.getpass(),
      'role': data['role'],
      'schema': data['schema'],
      'database': data['database'],
      'warehouse': data['warehouse']}

session = snp.Session.builder.configs(connection_parameters).create()

In [None]:
from citibike_ml import elt as ELT

import snowflake.snowpark as snp
from datetime import datetime
import uuid 

start = datetime.now()
print("Start Time =", start.strftime("%H:%M:%S"))

download_base_url = 'https://s3.amazonaws.com/tripdata/'
cdc_task_warehouse_name = session.sql('SELECT CURRENT_WAREHOUSE()').collect()[0][0]
trips_table_name = 'TRIPS'
load_table_name = 'RAW_'
import uuid 
stage_id = str(uuid.uuid1()).replace('-', '_')
load_stage_name = 'load_stage_'+str(stage_id)

session.sql('CREATE OR REPLACE TEMPORARY STAGE '+str(load_stage_name)).collect()

In [None]:
files_to_download = ['201701-citibike-tripdata.csv.zip','201911-citibike-tripdata.csv.zip','202102-citibike-tripdata.csv.zip']

In [None]:
%%time
load_stage_name, files_to_load = ELT.extract_trips_to_stage(session=session, 
                                                            files_to_download=files_to_download, 
                                                            download_base_url=download_base_url, 
                                                            load_stage_name=load_stage_name)

In [None]:
%%time
stage_table_names = ELT.load_trips_to_raw(session, files_to_load, load_stage_name, load_table_name)

In [None]:
stage_table_names

In [None]:
for table in stage_table_names:
    print(table), print(session.table(table).count())

In [None]:
def load_trips_from_raw_to_interim_target_cdc(session, 
                                      stage_table_names:list, 
                                      cdc_task_warehouse_name:str):
    from datetime import datetime
    interim_target_table_names = list()
    for stage_table_name in stage_table_names:
        schema = stage_table_name.split("_")[1]
        if schema == 'schema1':
            interim_target_table_name = 'INTERIM_schema1'
            stream_name = 'STREAM_schema1'
            task_name = 'TRIPSCDCTASK_schema1'
            procedure_name = 'TRIPSCDCPROC_schema1'
            create_processcdc_procedure_statement = "CREATE OR REPLACE PROCEDURE "+procedure_name+"() " + \
            "RETURNS VARCHAR " + \
            "LANGUAGE SQL " + \
            "AS " + \
            "$$ " + \
            "    BEGIN " + \
            "        MERGE INTO " + interim_target_table_name + \
            "           AS T USING (SELECT * FROM " + stream_name + ") \
                        AS S ON concat(T.BIKEID, T.STARTTIME, T.STOPTIME) = concat(S.BIKEID, S.STARTTIME, S.STOPTIME) \
                        WHEN MATCHED AND S.metadata$action = 'INSERT' \
                                     AND S.metadata$isupdate \
                          THEN UPDATE SET T.TRIPDURATION = S.TRIPDURATION, \
                                        T.STARTTIME = S.STARTTIME, \
                                        T.STOPTIME = S.STOPTIME, \
                                        T.START_STATION_ID = S.START_STATION_ID, \
                                        T.START_STATION_NAME = S.START_STATION_NAME, \
                                        T.START_STATION_LATITUDE = S.START_STATION_LATITUDE, \
                                        T.START_STATION_LONGITUDE = S.START_STATION_LONGITUDE, \
                                        T.END_STATION_ID = S.END_STATION_ID, \
                                        T.END_STATION_NAME = S.END_STATION_NAME, \
                                        T.END_STATION_LATITUDE = S.END_STATION_LATITUDE, \
                                        T.END_STATION_LONGITUDE = S.END_STATION_LONGITUDE, \
                                        T.BIKEID = S.BIKEID, \
                                        T.USERTYPE = S.USERTYPE, \
                                        T.BIRTH_YEAR = S.BIRTH_YEAR, \
                                        T.GENDER = S.GENDER \
                        WHEN MATCHED AND S.metadata$action = 'DELETE' \
                          THEN DELETE \
                        WHEN NOT MATCHED AND S.metadata$action = 'INSERT' \
                          THEN INSERT (TRIPDURATION, \
                                        STARTTIME , \
                                        STOPTIME , \
                                        START_STATION_ID , \
                                        START_STATION_NAME , \
                                        START_STATION_LATITUDE , \
                                        START_STATION_LONGITUDE , \
                                        END_STATION_ID , \
                                        END_STATION_NAME , \
                                        END_STATION_LATITUDE , \
                                        END_STATION_LONGITUDE , \
                                        BIKEID , \
                                        USERTYPE , \
                                        BIRTH_YEAR , \
                                        GENDER) \
                                VALUES (S.TRIPDURATION, \
                                        S.STARTTIME , \
                                        S.STOPTIME , \
                                        S.START_STATION_ID , \
                                        S.START_STATION_NAME , \
                                        S.START_STATION_LATITUDE , \
                                        S.START_STATION_LONGITUDE , \
                                        S.END_STATION_ID , \
                                        S.END_STATION_NAME , \
                                        S.END_STATION_LATITUDE , \
                                        S.END_STATION_LONGITUDE , \
                                        S.BIKEID , \
                                        S.USERTYPE , \
                                        S.BIRTH_YEAR , \
                                        S.GENDER); " + \
            "    END; " + \
            "$$"
            
            
        elif schema == 'schema2':
            interim_target_table_name = 'INTERIM_schema2'
            stream_name = 'STREAM_schema2'
            task_name = 'TRIPSCDCTASK_schema2'
            procedure_name = 'TRIPSCDCPROC_schema2'
            create_processcdc_procedure_statement = "CREATE OR REPLACE PROCEDURE "+procedure_name+"() " + \
            "RETURNS VARCHAR " + \
            "LANGUAGE SQL " + \
            "AS " + \
            "$$ " + \
            "    BEGIN " + \
            "        MERGE INTO " + interim_target_table_name + \
            "           AS T USING (SELECT * FROM " + stream_name + ") \
                        AS S ON T.RIDE_ID = S.RIDE_ID \
                        WHEN MATCHED AND S.metadata$action = 'INSERT' \
                                     AND S.metadata$isupdate \
                          THEN UPDATE SET T.RIDE_ID = S.RIDE_ID, \
                                          T.RIDEABLE_TYPE = S.RIDEABLE_TYPE, \
                                          T.STARTTIME = S.STARTTIME, \
                                          T.STOPTIME = S.STOPTIME, \
                                          T.START_STATION_NAME = S.START_STATION_NAME, \
                                          T.START_STATION_ID = S.START_STATION_ID, \
                                          T.END_STATION_NAME = S.END_STATION_NAME, \
                                          T.END_STATION_ID = S.END_STATION_ID, \
                                          T.START_STATION_LATITUDE = S.START_STATION_LATITUDE, \
                                          T.START_STATION_LONGITUDE = S.END_STATION_LATITUDE, \
                                          T.END_STATION_LONGITUDE = S.END_STATION_LONGITUDE, \
                                          T.USERTYPE = S.USERTYPE \
                        WHEN MATCHED AND S.metadata$action = 'DELETE' \
                          THEN DELETE \
                        WHEN NOT MATCHED AND S.metadata$action = 'INSERT' \
                          THEN INSERT (RIDE_ID, \
                                       RIDEABLE_TYPE, \
                                       STARTTIME, \
                                       STOPTIME, \
                                       START_STATION_NAME, \
                                       START_STATION_ID, \
                                       END_STATION_NAME, \
                                       END_STATION_ID, \
                                       START_STATION_LATITUDE, \
                                       END_STATION_LATITUDE, \
                                       END_STATION_LONGITUDE, \
                                       USERTYPE) \
                                VALUES (S.RIDE_ID, \
                                        S.RIDEABLE_TYPE, \
                                        S.STARTTIME, \
                                        S.STOPTIME, \
                                        S.START_STATION_NAME, \
                                        S.START_STATION_ID, \
                                        S.END_STATION_NAME, \
                                        S.END_STATION_ID, \
                                        S.START_STATION_LATITUDE, \
                                        S.END_STATION_LATITUDE, \
                                        S.END_STATION_LONGITUDE, \
                                        S.USERTYPE); " + \
            "    END; " + \
            "$$"
        
        #outside the if else condition but still inside the for loop
        interim_target_table_names.append(interim_target_table_name)
        create_stream_sql ='CREATE OR REPLACE STREAM ' + stream_name + \
                       ' ON TABLE ' + stage_table_name + \
                       ' APPEND_ONLY = FALSE SHOW_INITIAL_ROWS = TRUE'
        
        create_interim_target_table_sql = 'CREATE OR REPLACE TABLE ' + interim_target_table_name +\
                                    ' LIKE ' + stage_table_name
        create_task_statement = "CREATE OR REPLACE TASK " + task_name + \
                            " WAREHOUSE='" + cdc_task_warehouse_name +"'"+ \
                            " SCHEDULE = '1 minute'"+ \
                            " WHEN SYSTEM$STREAM_HAS_DATA('" + stream_name + "')"+\
                            " AS CALL " + procedure_name + "()"
        resume_task_statement = "ALTER TASK " + task_name + " RESUME"
        
        _ = session.sql(create_stream_sql).collect()
        _ = session.sql(create_interim_target_table_sql).collect() 
        _ = session.sql(create_processcdc_procedure_statement).collect()
        _ = session.sql(create_task_statement).collect()
        _ = session.sql(resume_task_statement).collect()

    return interim_target_table_names
    

In [None]:
%%time
interim_target_table_names = load_trips_from_raw_to_interim_target_cdc(session, stage_table_names, cdc_task_warehouse_name)

In [None]:
schema1streamsdf = session.table("STREAM_schema1")
schema2streamsdf = session.table("STREAM_schema2")
print(f'schema1 streams count: {schema1streamsdf.count()}, schema2 streams count: {schema2streamsdf.count()}')

In [None]:
task_history_statement = "select state, scheduled_time, query_id, query_start_time, next_scheduled_time, completed_time, name, query_text, condition_text " + \
    "from table(information_schema.task_history( " + \
    "scheduled_time_range_start=>dateadd('hour',-1,current_timestamp()), " + \
    "result_limit => 10)) s where s.name like '%TRIPSCDCTASK%' order by s.name asc, s.scheduled_time desc"
taskdf = session.sql(task_history_statement)
taskdf.show()

In [None]:
schema1rawdf = session.table('RAW_schema1')
schema1interimtargetdf = session.table(interim_target_table_names[0])
schema2rawdf = session.table('RAW_schema2')
schema2interimtargetdf = session.table(interim_target_table_names[1])
print(f'schema1 raw table count: {schema1rawdf.count()} and interim table count: {schema1interimtargetdf.count()}')
print(f'schema2 raw table count: {schema2rawdf.count()} and interim table count: {schema2interimtargetdf.count()}')

In [None]:
#Demonstrating that inserting duplicates in raw table won't make their way to the interim table

In [None]:
file_name_end = '201701-citibike-tripdata.csv.zip'
files_to_download = [file_name_end]

In [None]:
session.sql('CREATE OR REPLACE TEMPORARY STAGE '+str(load_stage_name)).collect()

In [None]:
%%time
load_stage_name, files_to_load = ELT.extract_trips_to_stage(session=session, 
                                                            files_to_download=files_to_download, 
                                                            download_base_url=download_base_url, 
                                                            load_stage_name=load_stage_name)

In [None]:
%%time
stage_table_names = ELT.load_trips_to_raw(session, files_to_load, load_stage_name, load_table_name)

In [None]:
rawdf = session.table(stage_table_names[0])
rawdf.count()

In [None]:
schema1streamsdf = session.table("STREAM_schema1")
schema2streamsdf = session.table("STREAM_schema2")
print(f'schema1 streams count: {schema1streamsdf.count()}, schema2 streams count: {schema2streamsdf.count()}')

In [None]:
task_history_statement = "select state, scheduled_time, query_id, query_start_time, next_scheduled_time, completed_time, name, query_text, condition_text " + \
    "from table(information_schema.task_history( " + \
    "scheduled_time_range_start=>dateadd('hour',-1,current_timestamp()), " + \
    "result_limit => 10)) s where s.name like '%TRIPSCDCTASK%' order by s.name asc, s.scheduled_time desc"
taskdf = session.sql(task_history_statement)
taskdf.show()

In [None]:
schema1rawdf = session.table('RAW_schema1')
schema1interimtargetdf = session.table(interim_target_table_names[0])
schema2rawdf = session.table('RAW_schema2')
schema2interimtargetdf = session.table(interim_target_table_names[1])
print(f'schema1 raw table count: {schema1rawdf.count()} and interim table count: {schema1interimtargetdf.count()}')
print(f'schema2 raw table count: {schema2rawdf.count()} and interim table count: {schema2interimtargetdf.count()}')

In [None]:
%%time
trips_table_name = ELT.transform_trips(session, interim_target_table_names, trips_table_name)

In [None]:
tripsdf = session.table(trips_table_name)
tripsdf.count()

In [None]:
%%writefile citibike_ml/cdc.py

def load_trips_from_raw_to_interim_target_cdc(session, 
                                      stage_table_names:list, 
                                      cdc_task_warehouse_name:str):
    from datetime import datetime
    interim_target_table_names = list()
    for stage_table_name in stage_table_names:
        schema = stage_table_name.split("_")[1]
        if schema == 'schema1':
            interim_target_table_name = 'INTERIM_schema1'
            stream_name = 'STREAM_schema1'
            task_name = 'TRIPSCDCTASK_schema1'
            procedure_name = 'TRIPSCDCPROC_schema1'
            create_processcdc_procedure_statement = "CREATE OR REPLACE PROCEDURE "+procedure_name+"() " + \
            "RETURNS VARCHAR " + \
            "LANGUAGE SQL " + \
            "AS " + \
            "$$ " + \
            "    BEGIN " + \
            "        MERGE INTO " + interim_target_table_name + \
            "           AS T USING (SELECT * FROM " + stream_name + ") \
                        AS S ON concat(T.BIKEID, T.STARTTIME, T.STOPTIME) = concat(S.BIKEID, S.STARTTIME, S.STOPTIME) \
                        WHEN MATCHED AND S.metadata$action = 'INSERT' \
                                     AND S.metadata$isupdate \
                          THEN UPDATE SET T.TRIPDURATION = S.TRIPDURATION, \
                                        T.STARTTIME = S.STARTTIME, \
                                        T.STOPTIME = S.STOPTIME, \
                                        T.START_STATION_ID = S.START_STATION_ID, \
                                        T.START_STATION_NAME = S.START_STATION_NAME, \
                                        T.START_STATION_LATITUDE = S.START_STATION_LATITUDE, \
                                        T.START_STATION_LONGITUDE = S.START_STATION_LONGITUDE, \
                                        T.END_STATION_ID = S.END_STATION_ID, \
                                        T.END_STATION_NAME = S.END_STATION_NAME, \
                                        T.END_STATION_LATITUDE = S.END_STATION_LATITUDE, \
                                        T.END_STATION_LONGITUDE = S.END_STATION_LONGITUDE, \
                                        T.BIKEID = S.BIKEID, \
                                        T.USERTYPE = S.USERTYPE, \
                                        T.BIRTH_YEAR = S.BIRTH_YEAR, \
                                        T.GENDER = S.GENDER \
                        WHEN MATCHED AND S.metadata$action = 'DELETE' \
                          THEN DELETE \
                        WHEN NOT MATCHED AND S.metadata$action = 'INSERT' \
                          THEN INSERT (TRIPDURATION, \
                                        STARTTIME , \
                                        STOPTIME , \
                                        START_STATION_ID , \
                                        START_STATION_NAME , \
                                        START_STATION_LATITUDE , \
                                        START_STATION_LONGITUDE , \
                                        END_STATION_ID , \
                                        END_STATION_NAME , \
                                        END_STATION_LATITUDE , \
                                        END_STATION_LONGITUDE , \
                                        BIKEID , \
                                        USERTYPE , \
                                        BIRTH_YEAR , \
                                        GENDER) \
                                VALUES (S.TRIPDURATION, \
                                        S.STARTTIME , \
                                        S.STOPTIME , \
                                        S.START_STATION_ID , \
                                        S.START_STATION_NAME , \
                                        S.START_STATION_LATITUDE , \
                                        S.START_STATION_LONGITUDE , \
                                        S.END_STATION_ID , \
                                        S.END_STATION_NAME , \
                                        S.END_STATION_LATITUDE , \
                                        S.END_STATION_LONGITUDE , \
                                        S.BIKEID , \
                                        S.USERTYPE , \
                                        S.BIRTH_YEAR , \
                                        S.GENDER); " + \
            "    END; " + \
            "$$"
            
            
        elif schema == 'schema2':
            interim_target_table_name = 'INTERIM_schema2'
            stream_name = 'STREAM_schema2'
            task_name = 'TRIPSCDCTASK_schema2'
            procedure_name = 'TRIPSCDCPROC_schema2'
            create_processcdc_procedure_statement = "CREATE OR REPLACE PROCEDURE "+procedure_name+"() " + \
            "RETURNS VARCHAR " + \
            "LANGUAGE SQL " + \
            "AS " + \
            "$$ " + \
            "    BEGIN " + \
            "        MERGE INTO " + interim_target_table_name + \
            "           AS T USING (SELECT * FROM " + stream_name + ") \
                        AS S ON T.RIDE_ID = S.RIDE_ID \
                        WHEN MATCHED AND S.metadata$action = 'INSERT' \
                                     AND S.metadata$isupdate \
                          THEN UPDATE SET T.RIDE_ID = S.RIDE_ID, \
                                          T.RIDEABLE_TYPE = S.RIDEABLE_TYPE, \
                                          T.STARTTIME = S.STARTTIME, \
                                          T.STOPTIME = S.STOPTIME, \
                                          T.START_STATION_NAME = S.START_STATION_NAME, \
                                          T.START_STATION_ID = S.START_STATION_ID, \
                                          T.END_STATION_NAME = S.END_STATION_NAME, \
                                          T.END_STATION_ID = S.END_STATION_ID, \
                                          T.START_STATION_LATITUDE = S.START_STATION_LATITUDE, \
                                          T.START_STATION_LONGITUDE = S.END_STATION_LATITUDE, \
                                          T.END_STATION_LONGITUDE = S.END_STATION_LONGITUDE, \
                                          T.USERTYPE = S.USERTYPE \
                        WHEN MATCHED AND S.metadata$action = 'DELETE' \
                          THEN DELETE \
                        WHEN NOT MATCHED AND S.metadata$action = 'INSERT' \
                          THEN INSERT (RIDE_ID, \
                                       RIDEABLE_TYPE, \
                                       STARTTIME, \
                                       STOPTIME, \
                                       START_STATION_NAME, \
                                       START_STATION_ID, \
                                       END_STATION_NAME, \
                                       END_STATION_ID, \
                                       START_STATION_LATITUDE, \
                                       END_STATION_LATITUDE, \
                                       END_STATION_LONGITUDE, \
                                       USERTYPE) \
                                VALUES (S.RIDE_ID, \
                                        S.RIDEABLE_TYPE, \
                                        S.STARTTIME, \
                                        S.STOPTIME, \
                                        S.START_STATION_NAME, \
                                        S.START_STATION_ID, \
                                        S.END_STATION_NAME, \
                                        S.END_STATION_ID, \
                                        S.START_STATION_LATITUDE, \
                                        S.END_STATION_LATITUDE, \
                                        S.END_STATION_LONGITUDE, \
                                        S.USERTYPE); " + \
            "    END; " + \
            "$$"
        
        #outside the if else condition but still inside the for loop
        interim_target_table_names.append(interim_target_table_name)
        create_stream_sql ='CREATE OR REPLACE STREAM ' + stream_name + \
                       ' ON TABLE ' + stage_table_name + \
                       ' APPEND_ONLY = FALSE SHOW_INITIAL_ROWS = TRUE'
        
        create_interim_target_table_sql = 'CREATE OR REPLACE TABLE ' + interim_target_table_name +\
                                    ' LIKE ' + stage_table_name
        create_task_statement = "CREATE OR REPLACE TASK " + task_name + \
                            " WAREHOUSE='" + cdc_task_warehouse_name +"'"+ \
                            " SCHEDULE = '1 minute'"+ \
                            " WHEN SYSTEM$STREAM_HAS_DATA('" + stream_name + "')"+\
                            " AS CALL " + procedure_name + "()"
        resume_task_statement = "ALTER TASK " + task_name + " RESUME"
        
        _ = session.sql(create_stream_sql).collect()
        _ = session.sql(create_interim_target_table_sql).collect() 
        _ = session.sql(create_processcdc_procedure_statement).collect()
        _ = session.sql(create_task_statement).collect()
        _ = session.sql(resume_task_statement).collect()

    return interim_target_table_names
    
    

def cdc_elt(session, load_stage_name, files_to_download, download_base_url, load_table_name, trips_table_name) -> str:
    from citibike_ml import elt as ELT

    load_stage_name, files_to_load = ELT.extract_trips_to_stage(session=session, 
                                                                files_to_download=files_to_download, 
                                                                download_base_url=download_base_url, 
                                                                load_stage_name=load_stage_name)
    stage_table_names = ELT.load_trips_to_raw(session, 
                                              files_to_load=files_to_load, 
                                              load_stage_name=load_stage_name, 
                                              load_table_name=load_table_name)
    
    interim_target_table_names = load_trips_from_raw_to_target_cdc(session, files_to_load, load_table_name, cdc_target_table_name, stream_name, cdc_task_warehouse_name, procedure_name, full_task_name)
    
    trips_table_name = ELT.transform_trips(session=session, 
                                           stage_table_names=interim_target_table_names, 
                                           trips_table_name=trips_table_name)
    return trips_table_name



In [None]:
session.close()

In [None]:
#TODO: How to avoid duplicates?