In [30]:
import pandas as pd
from pathlib import Path
import os
import sqlite3
import hashlib
import datetime

#### Program variables

In [62]:
ODATE = datetime.datetime(2022, 3, 20)
ODATE_MINUS_1 = ODATE - datetime.timedelta(days=1)

DBNAME = "DPC_IQ.db"

DROP_INITIAL_DATABASE = False

### Connect to sqlite

In [32]:
file_path = Path(os.getcwd()) / "../" / "data"

In [56]:
conn = sqlite3.connect(file_path / DBNAME)

In [34]:
def execute_query(connection, query):

    cur = connection.cursor()
    cur.execute(query)

def initialise_db(connection, drop_tables=False):
    
    if drop_tables:
        for _t in ["country_dim"]:
            execute_query(connection, f"DROP TABLE IF EXISTS {_t}")
          
    _dim_create = """
    create table if not exists country_dim (
        country_id INTEGER,
        key TEXT,
        place_id TEXT,
        country_code TEXT,
        country_name TEXT,
        subregion1_code TEXT,
        subregion1_name TEXT,
        subregion2_name TEXT,
        effective_start_date DATE,
        effective_end_date DATE,
        hash_val TEXT,
        current_indicator INTEGER
    )    
    """
    
    execute_query(connection, _dim_create)

### Some pandas helper functions

In [35]:
def add_hash(df):
    # better to drop any existing hash_val incase this is run multiple times.
    df.drop(['__str','hash_val'], axis=1, errors='ignore', inplace=True)
    df['__str'] = df.astype(str).values.sum(axis=1)
    df['hash_val'] = df['__str'].apply(lambda x: hashlib.md5(str(x).encode('utf-8')).hexdigest())

# main()

In [36]:
initialise_db(conn, drop_tables=DROP_INITIAL_DATABASE)

In [37]:
customer_dimension = pd.read_sql_query("select key, hash_val from country_dim where current_indicator='1'", conn)

customer_dimension.set_index('key', inplace=True)

print(customer_dimension.shape)

(10, 1)


In [38]:
customer_dimension

Unnamed: 0_level_0,hash_val
key,Unnamed: 1_level_1
,aee73a5003fd6f3714887492e38dd166
AU,ea006a33d07849ac46cb238b5ecd6e09
AU_ACT,928da08b75184b5932056b98c62e8b54
AU_NSW,1409a631858768c4bf24b91a439417b6
AU_NT,2cf4a65dab52fcb03c874fa3e2ceb27d
AU_QLD,2aa46942c6ea82839fe8fb3a32b30b95
AU_SA,818e4a41ce7ac6b7f79967526291df5f
AU_TAS,42a788615067d27c47e9beda35629ffd
AU_VIC,1
AU_WA,9c3d6def3b4f38ffd327c7dae0063a8b


In [39]:
ref = file_path / "reference.csv"

In [40]:
df = pd.read_csv(filepath_or_buffer=ref, header=0)

In [41]:
add_hash(df)

In [42]:
df.iloc[0]

key                                                             AD
place_id                               ChIJlfCemC71pRIRkn_qeNc-yQc
country_code                                                    AD
country_name                                               Andorra
subregion1_code                                                NaN
subregion1_name                                                NaN
subregion2_name                                                NaN
__str              ADChIJlfCemC71pRIRkn_qeNc-yQcADAndorranannannan
hash_val                          8fe6c3bdc1bf3065d9925832b5fefd50
Name: 0, dtype: object

### Check for duplicate 'key' values

In [43]:
duplicate_keys_df = df[df.duplicated(keep=False)]

In [44]:
print(f"Found {duplicate_keys_df.shape[0]} records that are duplicates")

Found 4 records that are duplicates


#### Show the duplicates (for debug)

In [45]:
df[df.duplicated(keep='first')]

Unnamed: 0,key,place_id,country_code,country_name,subregion1_code,subregion1_name,subregion2_name,__str,hash_val
13,AU,ChIJ38WHZwf9KysRUhNblaFnglM,AU,Australia,,,,AUChIJ38WHZwf9KysRUhNblaFnglMAUAustralianannannan,ea006a33d07849ac46cb238b5ecd6e09
21,AU_VIC,ChIJT5UYfksx1GoRNJWCvuL8Tlo,AU,Australia,VIC,Victoria,,AU_VICChIJT5UYfksx1GoRNJWCvuL8TloAUAustraliaVI...,334bcfa977f2febffd35e738a2f6fd9c


#### Drop the duplicates

In [46]:
df.drop_duplicates(keep='first', inplace=True)

In [47]:
customer_load_pre = df.join(customer_dimension, how='left', on=['key'], rsuffix='_existing')

In [48]:
# create insert
customer_dim_insert = customer_load_pre[customer_load_pre['hash_val_existing'].isnull()]
del customer_dim_insert['hash_val_existing']

In [49]:
customer_matched = customer_load_pre[~customer_load_pre['hash_val_existing'].isnull()]

In [50]:
# create update
customer_matched = customer_load_pre[~customer_load_pre['hash_val_existing'].isnull()]
customer_dim_upsert = customer_matched.query('hash_val != hash_val_existing')

In [51]:
customer_dim_insert.to_sql('stg_country_dim_insert', conn, if_exists='replace')
customer_dim_upsert.to_sql('stg_country_dim_upsert', conn, if_exists='replace')

In [66]:
update_query = f"""
update country_dim
   set effective_end_date = '{ODATE_MINUS_1}',
       current_indicator = 0
 where current_indicator = 1
   and exists (
         select 1 
           from stg_country_dim_upsert
          where key = country_dim.key
          );
"""
execute_query(conn, update_query)
conn.commit()

In [67]:
insert_query = f"""
insert into country_dim
(country_id, key, place_id, country_code, country_name, subregion1_code, subregion1_name, subregion2_name, effective_start_date, effective_end_date, hash_val, current_indicator)
select row_number() over () + id_tbl.min_id as country_id,
       stg.key,
       stg.place_id,
       stg.country_code,
       stg.country_name,
       stg.subregion1_code,
       stg.subregion1_name,
       stg.subregion2_name,
       '{ODATE}' as 'effective_start_date',
       '9999-12-31 00:00:00' as 'effective_end_date',
       stg.hash_val,
       1 as current_indicator
from (
select key,
       place_id,
       country_code,
       country_name,
       subregion1_code,
       subregion1_name,
       subregion2_name,
       hash_val
 from stg_country_dim_insert
union
select key,
       place_id,
       country_code,
       country_name,
       subregion1_code,
       subregion1_name,
       subregion2_name,
       hash_val
  from stg_country_dim_upsert
     ) stg
cross join (select coalesce(max(country_id),0) as min_id from country_dim) id_tbl
"""

execute_query(conn, insert_query)
conn.commit()

In [54]:
print("Done.")

Done.


In [55]:
conn.commit()

In [27]:
conn.close()

In [None]:
customer_dim_insert