In [None]:
%python
import datetime
import sys

#Define variables   123123
start_time = datetime.datetime.now();

tgt_objectname = '[dwd_ms].[sales_transaction_reason]';
src_objectname = '[ods].[ecc_tvaut]';

version_str = spark.sql(f"""
                             select max(data_watermark_value) as version from dwd_ms.edw_loading_control where tgt_objectname = '{tgt_objectname}' and src_objectname = '{src_objectname}'
                         """).collect()[0]['version'];

last_commit_time = datetime.datetime.now();

#Get version variables
if version_str == None:
    version_str = 0
else:
    #Get last end_time in edw_loading_control list
    last_commit_time = spark.sql(f"""
                                      select commit_time from dwd_ms.edw_loading_control where tgt_objectname = '{tgt_objectname}' and src_objectname = '{src_objectname}'
                                  """).collect()[0]['commit_time']
    
    #Get this commit_timestamp in change_log list
    this_commit_time = spark.sql(f"""
                                      select _commit_timestamp as commit_timestamp from table_changes('dwd_ms.ecc_tvaut',0)
                                      where _commit_version < int({version_str})
                                  """).collect()[0]['commit_timestamp']

    new_version = spark.sql(f"""
                                 select min(_commit_version) as new_version from table_changes('dwd_ms.ecc_tvaut',0)
                                 where _commit_version > int({version_str})
                                   and _change_type <> 'update_preimage'
                                 ;
                             """).collect()[0]['new_version'];

print('last_commit_time = ' + str(last_commit_time));
print('this_commit_time = ' + str(this_commit_time));
            
version = 0;

if version_str == 0:
    print('No data in the edw_loading_control list!')
    version = 0
else:
    print('Have the data in the edw_loading_control list!')
    #Determines whether the source table initializes data
    if this_commit_time > last_commit_time:
        print('The source table was drop, full capacity run number')
        version = 0
    else:
        if new_version == None:
            print('No new data')
            sys.exit()
        else:
            version = int(new_version);
            
            
max_version = int(spark.sql(f"""
                                 select max(_commit_version) as max_version from table_changes('dwd_ms.ecc_tvaut',{version})
                                 ;
                             """).collect()[0]['max_version']);

print('version = %s' %version);
print('max_version = %s' %max_version);

#Logic processing section
run_full = f"""
                 create or replace temp view tmp_ecc_tvaut_full
                 as
                 select  
                     a.augru                     as order_rsn_cd,
                     a.spras                     as order_rsn_lang_type,
                     a.bezei                     as order_rsn_desc, 
                     nvl(a.__dl_source_id,-1)    as dw_source_id,
                     now()                       as dw_insert_dt,
                     now()                       as dw_update_dt,
                     ''                          as dw_del_flg,
                     case when a.__src_record_del_status = 'D' then 'delete'
                          else '' end            as change_type
                 from dwd_ms.ecc_tvaut a
                 where a.mandt = '301' 
                 ;
             """;
    
run_increment = f"""
                  create or replace temp view tmp_ecc_tvaut_incr
                  as 
                  select 
                      order_rsn_cd,
                      order_rsn_lang_type,
                      order_rsn_desc, 
                      dw_source_id,
                      dw_insert_dt,
                      dw_update_dt,
                      dw_del_flg,
                      change_type
                  from (
                      select  
                          a.augru                     as order_rsn_cd,
                          a.spras                     as order_rsn_lang_type,
                          a.bezei                     as order_rsn_desc, 
                          nvl(a.__dl_source_id,-1)    as dw_source_id,
                          now()                       as dw_insert_dt,
                          now()                       as dw_update_dt,
                          ''                          as dw_del_flg,
                          case when a.__src_record_del_status = 'D' then 'delete'
                               else a._change_type 
                          end                         as change_type,
                          row_number() over (partition by a.augru order by _commit_timestamp desc) as rn
                      from table_changes('dwd_ms.ecc_tvaut',{version},{max_version}) a
                      where a.mandt = '301' 
                        and _commit_timestamp >= cast('{last_commit_time}' as timestamp)
                        and _change_type <> 'update_preimage'
                       ) t
                  where rn = 1
                  ;
              """;


if version == 0:
    print('For the first time to run or the source table was dropped!')
    spark.sql(run_full);
    source_data = 'tmp_ecc_tvaut_full'

else:
    print('More than two runs!')
    spark.sql(run_increment)
    source_data = 'tmp_ecc_tvaut_incr'

common_run1 = f"""
                   create or replace temp view tmp_sales_transaction_reason
                   as 
                   select 
                       *
                       ,'' as change_type
                   from (
                       select 
                           order_rsn_cd,
                           order_rsn_lang_type,
                           order_rsn_desc, 
                           dw_source_id,
                           dw_insert_dt,
                           dw_update_dt,
                           dw_del_flg
                       from {source_data}
                       where change_type <> 'delete'
                       except
                       select 
                           order_rsn_cd,
                           order_rsn_lang_type,
                           order_rsn_desc, 
                           dw_source_id,
                           dw_insert_dt,
                           dw_update_dt,
                           dw_del_flg
                       from dwd_ms.sales_transaction_reason
                   ) t
                   union
                          select 
                              order_rsn_cd,
                              order_rsn_lang_type,
                              order_rsn_desc, 
                              dw_source_id,
                              dw_insert_dt,
                              dw_update_dt,
                              dw_del_flg,
                              change_type
                          from {source_data}
                          where change_type = 'delete'
                   ;
               """;

#merger into 
common_run2 = f"""
                   merge into dwd_ms.sales_transaction_reason as target
                   using tmp_sales_transaction_reason as source
                   on target.order_rsn_cd = source.order_rsn_cd
                   when matched and source.change_type = 'delete' then
                     delete
                   when matched and source.change_type = '' then
                     update set target.order_rsn_lang_type = source.order_rsn_lang_type
                                ,target.order_rsn_desc     = source.order_rsn_desc
                                ,target.dw_source_id       = source.dw_source_id
                                ,target.dw_insert_dt       = source.dw_insert_dt
                                ,target.dw_update_dt       = source.dw_update_dt
                                ,target.dw_del_flg         = source.dw_del_flg
                   when not matched and source.change_type = ''
                     then insert(target.order_rsn_cd, target.order_rsn_lang_type, target.order_rsn_desc, target.dw_source_id, target.dw_insert_dt, target.dw_update_dt, target.dw_del_flg) 
                          values(source.order_rsn_cd, source.order_rsn_lang_type, source.order_rsn_desc, source.dw_source_id, source.dw_insert_dt, source.dw_update_dt, source.dw_del_flg)
                   ;
               """;

spark.sql(common_run1)
spark.sql(common_run2)
    
#update edw_loading_control list
end_time = datetime.datetime.now();

#Get this max commit time in change_log list
max_commit_time = spark.sql(f"""
                                 select max(_commit_timestamp) as commit_time from table_changes('dwd_ms.ecc_tvaut',{version},{max_version})
                             """
                             ).collect()[0]['commit_time'];

#insert watermark
sql_insert = f"""
                  insert into dwd_ms.edw_loading_control(tgt_objectname,src_objectname,start_time,end_time,data_watermark_type,data_watermark_value,process_status,error_number,error_message,commit_time)
                  values('{tgt_objectname}','{src_objectname}','{start_time}','{end_time}',null,'{max_version}',null,null,null,'{max_commit_time}')
                  ;
              """;

#update watermark
sql_update = f"""
                  update dwd_ms.edw_loading_control 
                  set 
                      start_time = '{start_time}'
                      ,end_time = '{end_time}'
                      ,data_watermark_value = '{max_version}'
                      ,commit_time = '{max_commit_time}'
                  where tgt_objectname = '{tgt_objectname}'
                    and src_objectname = '{src_objectname}'
                  ;
              """;

#Obtaining count
numbers = spark.sql(f"""
                         select count(1) from dwd_ms.edw_loading_control 
                         where tgt_objectname = '{tgt_objectname}' and src_objectname = '{src_objectname}'
                     """).collect()[0]['count(1)'];

print('---------update the edw_loading_control list start---------');

if numbers == 0:
    spark.sql(sql_insert)
else:
    spark.sql(sql_update);

print('---------update the edw_loading_control list end---------');