In [1]:
from dotenv import load_dotenv
import logging
import os

from data_cleaning import DataCleaning
from data_extraction import DataExtractor
from database_utils import DatabaseConnector

logging.basicConfig(filename='pipeline.log', encoding='utf-8', level=logging.DEBUG,
                    format="%(asctime)s [%(levelname)s] %(name)s - %(funcName).40s - %(message)s",)
logger = logging.getLogger(__name__)

load_dotenv()
CARD_DATA_PDF_PATH = 'https://data-handling-public.s3.eu-west-1.amazonaws.com/card_details.pdf'
API_KEY = os.getenv('x-api-key')
NUMBER_STORES_ENDPOINT_URL = 'https://aqj7u5id95.execute-api.eu-west-1.amazonaws.com/prod/number_stores'
STORE_ENDPOINT_URL = 'https://aqj7u5id95.execute-api.eu-west-1.amazonaws.com/prod/store_details/'


def setup_database(filename):
    db_conn = DatabaseConnector()
    db_credentials = db_conn.read_db_creds(filename)
    engine = db_conn.init_db_engine(db_credentials)
    return db_conn, engine


In [2]:
logger.info('****************************** Starting pipeline ******************************')
src_db, source_engine = setup_database(filename='config/db_creds.yaml')
tgt_db, tgt_engine = setup_database(filename='config/db_creds_target.yaml')

extractor, cleaner = DataExtractor(), DataCleaning()

with source_engine.execution_options(isolation_level='AUTOCOMMIT').connect() as conn:
    # Uncomment below to list all database tables
    # source_db.list_db_tables(source_engine)
    extracted = extractor.read_rds_table(conn, 'legacy_users')

df_users = cleaner.clean_user_data(df=extracted)

In [3]:
# df_users[df_users['address'].str.contains(',')]
df_users

Unnamed: 0_level_0,first_name,last_name,date_of_birth,company,email_address,address,country,country_code,phone_number,join_date,user_uuid
index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
0,Sigfried,Noack,1990-09-30,Heydrich Junitz KG,rudi79@winkler.de,Zimmerstr. 1/0\n59015 Gießen,Germany,DE,+49(0) 047905356,2018-10-10,93caf182-e4e9-4c6e-bebb-60a1a9dcf9b8
1,Guy,Allen,1940-12-01,Fox Ltd,rhodesclifford@henderson.com,Studio 22a\nLynne terrace\nMcCarthymouth\nTF0 9GH,United Kingdom,GB,(0161) 496 0674,2001-12-20,8fe96c3a-d62d-4eb5-b313-cf12d9126a49
2,Harry,Lawrence,1995-08-02,"Johnson, Jones and Harris",glen98@bryant-marshall.co.uk,92 Ann drive\nJoanborough\nSK0 6LR,United Kingdom,GB,+44(0)121 4960340,2016-12-16,fc461df4-b919-48b2-909e-55c95a03fe6b
3,Darren,Hussain,1972-09-23,Wheeler LLC,daniellebryan@thompson.org,19 Robinson meadow\nNew Tracy\nW22 2QG,United Kingdom,GB,(0306) 999 0871,2004-02-23,6104719f-ef14-4b09-bf04-fb0c4620acb0
4,Garry,Stone,1952-12-20,Warner Inc,billy14@long-warren.com,3 White pass\nHunterborough\nNN96 4UE,United Kingdom,GB,0121 496 0225,2006-09-01,9523a6d3-b2dd-4670-a51a-36aebc89f579
...,...,...,...,...,...,...,...,...,...,...,...
15315,Marta,Rogge,1981-03-03,Dehmel,baererklothilde@trubin.com,Ziegertstr. 60\n93330 Stollberg,Germany,DE,(05917) 549662,2000-03-29,8a77629e-7ca1-409f-b22c-c24056bd4eb1
15316,Erna,Hoffmann,1967-10-28,Atzler Seifert AG & Co. KGaA,dunjafischer@vollbrecht.de,Henkallee 186\n33456 Sankt Goarshausen,Germany,DE,+49(0)7384 51073,2018-03-13,5f57209e-8695-4863-b9e7-084a4ba02808
15317,Konstantinos,Thanel,1954-08-05,Fritsch Ehlert GmbH,rpruschke@gotthard.com,Steffi-Rose-Platz 16\n12365 Apolda,Germany,DE,(07856) 050049,2007-07-21,6f16e0ce-9b07-4479-a151-7efdd35408aa
15318,Caroline,Fisher,1975-09-27,Coleman Ltd,wardshaun@miah.org,826 Hollie park\nKhanberg\nM9J 1GP,United Kingdom,GB,(0115) 496 0754,2016-11-26,1a202edd-20aa-4787-b3b3-622fc01a9d08


In [43]:
# Remove 
# df_stores['address_locality'] = df_stores.loc[
#     :,
#     'address'
# ].apply(lambda x: x.split(',')[0])

# df_stores['post_code'] = df_stores.loc[
#     :,
#     'address_locality'
# ].apply(lambda x: x.split('\n')[-1])

# df_stores[df_stores['country_code'] == 'GB']  # works
# df_stores[df_stores['country_code'] == 'DE'] # Needs work!!!
# df_stores[df_stores['country_code'] == 'US']


# df_stores.loc[df_stores['country_code'] == 'GB', 'post_code'].unique()



# df_gb_stores = df_stores.loc[df_stores['country_code'] == 'GB', :].copy()
# df_stores['post_code'] = df_stores.loc[
#     :,
#     'address_locality'
# ].apply(lambda x: x.split('\n')[-1])
# df_gb_stores


# gb_address = "Flat 72W\nSally isle\nEast Deantown\nE7B 8EB, High Wycombe"

df_stores = cleaner._clean_address(df_stores)

AttributeError: 'DataCleaning' object has no attribute '_clean_address'

In [36]:


# def gb_address_constituents(address):
#     """
#     Remove locality, which is after the comma
#     """
#     locality_removed = address.split(',')[:-1]
#     address = "".join(locality_removed)
#     # print(f"locality removed {address}")
    
#     rows = address.split('\n')
#     print(f"rows {rows}")
    
#     post_code = rows.pop()
#     print(f"post code is {post_code}")
    
#     print(f"rows {rows}")

# gb_address = "Flat 72W\nSally isle\nEast Deantown\nE7B 8EB, High Wycombe"
# gb_address_constituents(gb_address)


rows ['Flat 72W', 'Sally isle', 'East Deantown', 'E7B 8EB']
post code is E7B 8EB
rows ['Flat 72W', 'Sally isle', 'East Deantown']
