# Cyndx DE Challenge

In [1]:
import pandas as pd
from sqlalchemy import create_engine, inspect
from sqlalchemy.engine import reflection
import psycopg2

## Prepare `company_contacts.csv` 
Load `company_contacts.csv` into Memory

In [2]:
df_contacts = pd.read_csv('company_contacts.csv')
print(df_contacts.shape)
df_contacts.head(10)

(13551, 4)


Unnamed: 0,homepage_url,email,phone,uuid
0,http://www.intel.com/,,1 4087658080,1e4f199c-363b-451b-a164-f94571075ee5
1,http://www.gruppoespresso.it/en/nc.html,,390684781,e74463f7-94a8-ab7b-ba31-f4cd57a16570
2,http://www.cypress.com,customercare@cypress.com,4089432600,2b76e227-6f8d-1a90-a86a-fdb291245333
3,http://www.microsoft.com,mdcc@microsoft.com,1800 1 441 0158,fd80725f-53fc-7009-9878-aeecf1e9ffbb
4,http://www.ea.com,contact@ea.com,,5a6207e2-b868-9f41-5178-8d51d2c40a93
5,https://www.oracle.com,oraclesales_us@oracle.com,,bef5bd4b-72c6-7877-d7ab-8bbe43f7bda7
6,http://www.kainos.com,info@kainos.com,617 217 2704,a307ed6d-2605-328a-e50a-d83f82871ac7
7,http://www.cisco.com,,8005536387,e0906c05-fae5-9591-ba5f-2142d8b0065a
8,http://www.cleanharbors.com,,17817925949,a59a37db-f621-afb1-6d1d-ab30d5d4eeb8
9,,,,97e3dae0-6b82-cd1e-b450-d54aa4a68dc7


### Parse the Phonenumbers in `company_contacts.csv`

In [3]:
import re
from phonenumbers import parse, NumberParseException

def phone_number_to_string(phone_number):
    """Convert various numeric types or strings masquerading as numeric types into a string"""
    if not phone_number: return None # Short circuit for null phone numbers
    phone_str = str(int(float(phone_number))) if 'E+' in str(phone_number) else str(phone_number) # Modify scientific notation to integer
    return str(phone_str) # Cast all to string

def clean_phone_number_naive(phone_number):
    """Clean phone numbers by parsing from right to left """
    phone_str = phone_number_to_string(phone_number)
    if not phone_str: return None # Short circuit for null phone numbers
    phone_clean = re.sub('[^0-9]','', phone_str)
    last_4 = phone_clean[-4:]
    mid_3 = phone_clean[-7:-4]
    first_3 = phone_clean[-10:-7]
    country = phone_clean[:-10]
    if last_4 and len(last_4.strip())>3:
        return country+' '+first_3+mid_3+last_4
    else:
        return None

def clean_phone_number_lib(phone_number):
    """Use the phonenumbers library from Google to parse the phone numbers"""
    phone_str = phone_number_to_string(phone_number)
    if not phone_str: return None # Short circuit for null phone numbers
    try:
        parsed_number = parse(phone_str, 'US')
        return str(parsed_number.country_code)+' '+str(parsed_number.national_number)
    except NumberParseException as npe:
        return None
    
# Combine methods, prefer clean_phone_number_lib method
def combined_clean(phone_number):
    """Use bother methods to parse/clean the phone number field"""
    phone_clean_combined = clean_phone_number_lib(phone_number)
    if not phone_clean_combined: phone_clean_combined = clean_phone_number_naive(phone_number)
    return phone_clean_combined        

## Phone Clean Results Comparison

Both methods have few false negatives -- they don't increase the null count by more than 10 compared to the original data in either case (naive r-l parse vs. phonenumbers parser)

Without going to deeply into accuracy we can combine the phonenumbers parser method, and then apply the naive method on any other false negatives, which will remove some additional false negatives. We could then isolate those false negatives and investigate their source.

In [4]:
print('Nulls in original CSV ', len(df_contacts[pd.isnull(df_contacts['phone'])]))
print('Null values in phonenumbers parser method: '
    ,len([ phone_number for phone_number in df_contacts['phone'].apply(clean_phone_number_lib) if not phone_number])
)
df_contacts['phone'].apply(clean_phone_number_lib)[0:10]

Nulls in original CSV  1818
Null values in phonenumbers parser method:  1826


0     1 4087658080
1      1 390684781
2     1 4089432600
3    1 80014410158
4             None
5             None
6     1 6172172704
7     1 8005536387
8     1 7817925949
9             None
Name: phone, dtype: object

In [5]:
print('Null values in naive r-l method: '
    ,len([ phone_number for phone_number in df_contacts['phone'].apply(clean_phone_number_naive) if not phone_number])
)
df_contacts['phone'].apply(clean_phone_number_naive)[0:10]

Null values in naive r-l method:  1823


0     1 4087658080
1        390684781
2       4089432600
3    18 0014410158
4             None
5             None
6       6172172704
7       8005536387
8     1 7817925949
9             None
Name: phone, dtype: object

In [6]:
combined_clean_phone_numbers = df_contacts['phone'].apply(combined_clean)
print( 'Null values in combined method: ',
    len([ phone_number for phone_number in combined_clean_phone_numbers if not phone_number])
)
combined_clean_phone_numbers[0:10]

Null values in combined method:  1820


0     1 4087658080
1      1 390684781
2     1 4089432600
3    1 80014410158
4             None
5             None
6     1 6172172704
7     1 8005536387
8     1 7817925949
9             None
Name: phone, dtype: object

## Parse the `homepage_url` field to match formatting of `domain`

In [7]:
from urllib.parse import urlparse
def get_domain(homepage):
    """Use urllib to parsed domain from a given url"""
    domain = urlparse(str(homepage))[1]
    if len(domain)<3: # domain needs at least 3 characters
        return None
    else:
        return domain
        
    return urlparse(str(homepage))[1]
df_contacts['homepage_url'].apply(get_domain)[0:15]

0             www.intel.com
1     www.gruppoespresso.it
2           www.cypress.com
3         www.microsoft.com
4                www.ea.com
5            www.oracle.com
6            www.kainos.com
7             www.cisco.com
8      www.cleanharbors.com
9                      None
10       www.stericycle.com
11         www.progress.com
12                     None
13         www.rovicorp.com
14                     None
Name: homepage_url, dtype: object

## Connect to Postgres and Inspect the Database

In [8]:
# Normally would store credentials in environment, hardcoded here
eng = create_engine('postgresql://{}:{}@data-eng-challenge.cyndx.io:5432/test'.format('de_test_user','cofDen-fuzhu2-moqreq'))
insp = inspect(eng)
insp

<sqlalchemy.dialects.postgresql.base.PGInspector at 0x1200c6c50>

In [9]:
insp.get_table_names()

['dubs',
 'companies',
 'solution_artyom',
 'solution_patrick',
 'jbaehne_solution']

### Read `companies` Into Pandas

In [10]:
sql_all_companies = 'SELECT * FROM companies'
df_company_db = pd.read_sql(sql_all_companies, con=eng)
print(df_company_db.shape)
df_company_db.head()

(1577896, 2)


Unnamed: 0,name,domain
0,Value Creation Fonds-Services GmbH,value-creation-gmbh.de
1,ValueCreationTeam,valuecreationteam.com
2,Value Creative International Ltd.,valuecreative.com
3,"valuedesign, Inc.",valuedesign.jp
4,Valuedge Partners BV,valuedge.nl


## Create the `solution_patrick` table

Structure:

CREATE TABLE solution_patrick (
  contact_id uuid DEFAULT uuid_generate_v4()
  , name text
  , domain text
  , email text
  , phone text
  , PRIMARY KEY (contact_id)
)


In [11]:
# Leverage the existing data from `companies`
create_table_sql = """
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
DROP TABLE IF EXISTS solution_patrick;
CREATE TABLE solution_patrick AS TABLE companies;
ALTER TABLE solution_patrick ADD email text;
ALTER TABLE solution_patrick ADD phone text;
ALTER TABLE solution_patrick ADD contact_id uuid;
"""
# Will need to add primary key constraint later after cleaning data and considering normalization / data model
execution = eng.execute(create_table_sql)
execution

<sqlalchemy.engine.result.ResultProxy at 0x12034b550>

In [12]:
# Show where companies.domain and contacts.domain overlap
df_contacts[df_contacts['homepage_url'].apply(get_domain).isin(df_company_db['domain'])].head(10)

Unnamed: 0,homepage_url,email,phone,uuid
9,,,,97e3dae0-6b82-cd1e-b450-d54aa4a68dc7
12,,,5087782000,01915de2-7ae6-9787-e397-855a71b1fafe
14,,,,5fe5f05b-16ed-9344-0234-1e1f2f848dd9
19,http://netapp.com,xdl-uspr@netapp.com,1.14088E+11,bbd3ae58-58e2-d81c-42bf-97d4a07ac0d1
20,https://veon.com,pr@vimpelcom.com,31 20 797 7200,3565d953-9352-8281-302d-721fd05a0f6a
26,http://cariboucoffee.com,,1 7635922200,383f5008-b628-a2e9-21e6-111b7bab9242
35,,,,4b5e9b11-fde4-94f3-a630-b7835c122048
38,http://ferro.com,,7242283170,92a7f058-61c2-0b8d-986c-376fb982e5b6
41,http://ebay.com,,41313590590,e56b0ceb-bb30-bbec-805e-d5dc7412dcb1
44,http://kodak.com,,,bf190e5b-9223-10d8-93d3-34c592db9f82


In [13]:
df_company_db.head()

Unnamed: 0,name,domain
0,Value Creation Fonds-Services GmbH,value-creation-gmbh.de
1,ValueCreationTeam,valuecreationteam.com
2,Value Creative International Ltd.,valuecreative.com
3,"valuedesign, Inc.",valuedesign.jp
4,Valuedge Partners BV,valuedge.nl


In [14]:
df_contacts.head()

Unnamed: 0,homepage_url,email,phone,uuid
0,http://www.intel.com/,,1 4087658080,1e4f199c-363b-451b-a164-f94571075ee5
1,http://www.gruppoespresso.it/en/nc.html,,390684781,e74463f7-94a8-ab7b-ba31-f4cd57a16570
2,http://www.cypress.com,customercare@cypress.com,4089432600,2b76e227-6f8d-1a90-a86a-fdb291245333
3,http://www.microsoft.com,mdcc@microsoft.com,1800 1 441 0158,fd80725f-53fc-7009-9878-aeecf1e9ffbb
4,http://www.ea.com,contact@ea.com,,5a6207e2-b868-9f41-5178-8d51d2c40a93


## Merge the `company_contacts.csv` and `companies` using Pandas

### Part 1 `df_solution`: entries in `company_contacts.csv` with no matching `domain` in `companies`

In [15]:
# Write non-overlapping fields to temp_table in db as insert operation
df_contacts_unique = df_contacts[~df_contacts['homepage_url'].apply(get_domain).isin(df_company_db['domain'])]
len(df_contacts_unique)

10554

In [16]:
df_new_entries = pd.DataFrame()
df_new_entries = df_contacts_unique.copy()
df_new_entries['name'] = pd.np.nan
df_new_entries['domain'] = df_new_entries['homepage_url'].apply(get_domain)
df_new_entries['phone'] = df_new_entries['phone'].apply(combined_clean)
df_new_entries['contact_id'] = df_new_entries['uuid']
df_new_entries.drop(axis=1,columns='homepage_url', inplace=True)
df_new_entries.drop(axis=1,columns='uuid', inplace=True)
len(df_new_entries)

10554

In [17]:
df_new_entries.head()

Unnamed: 0,email,phone,name,domain,contact_id
0,,1 4087658080,,www.intel.com,1e4f199c-363b-451b-a164-f94571075ee5
1,,1 390684781,,www.gruppoespresso.it,e74463f7-94a8-ab7b-ba31-f4cd57a16570
2,customercare@cypress.com,1 4089432600,,www.cypress.com,2b76e227-6f8d-1a90-a86a-fdb291245333
3,mdcc@microsoft.com,1 80014410158,,www.microsoft.com,fd80725f-53fc-7009-9878-aeecf1e9ffbb
4,contact@ea.com,,,www.ea.com,5a6207e2-b868-9f41-5178-8d51d2c40a93


### Perform Insert in Postgres

In [18]:
df_insert = df_new_entries.copy()

In [19]:
def create_insert_query(table):
    """Create the insert query"""
    columns = ', '.join([f'{col}' for col in DATABASE_COLUMNS])
    constraint = ', '.join([f'{col}' for col in PRIMARY_KEY])
    placeholder = ', '.join([f'%({col})s' for col in DATABASE_COLUMNS])
    updates = ', '.join([f'{col} = EXCLUDED.{col}' for col in DATABASE_COLUMNS])
    query = f"""INSERT INTO {table} ({columns})
                VALUES ({placeholder})
                """
    query.split()
    query = ' '.join(query.split())
    return query

def load_inserts(df, table, connection):
    cursor = connection.cursor()
    df1 = df
    insert_values = df1.applymap(lambda x: None if pd.isnull(x) else x).to_dict(orient='records')
    for row in insert_values:
        cursor.execute(create_insert_query(table=table), row)
        conn.commit()
    row_count = len(insert_values)
    print(f'Inserted {row_count} rows.')
    cursor.close()
    del cursor
    conn.close()

In [21]:
DATABASE_COLUMNS = ['contact_id','domain','email','name','phone']
PRIMARY_KEY = ['domain']

In [22]:
DATABASE_COLUMNS = ['contact_id','domain','email','name','phone']
PRIMARY_KEY = ['domain']
conn = eng.raw_connection()
load_inserts(df_insert.applymap(lambda x: None if pd.isnull(x) else x), 'solution_patrick', conn)

Inserted 10554 rows.


### Part 2 `df_update`: Entries in `company_contacts.csv` with a matching `domain` in `companies`

In [24]:
# Update data with matching domains from company
df_update = pd.DataFrame()
# Intially load the data from `company_contacts.csv` that has a domain name in the `companies` table

df_update = df_contacts[df_contacts['homepage_url'].apply(get_domain).isin(df_company_db['domain'])].copy()
df_update['domain'] = df_update['homepage_url'].apply(get_domain)
# Merge name via domain lookup
df_overlaps = df_company_db[df_company_db['domain'].isin(df_contacts['homepage_url'].apply(get_domain))].copy() # Restrict search space to improve lookup speed
def get_name_via_domain(row):
    if not row['domain']: return None # Short circuit
    return df_overlaps[df_overlaps['domain']==row['domain']]['name'].values[0]
    
df_update['name'] = df_update.apply(get_name_via_domain, axis=1)
df_update['domain'] = df_update['homepage_url'].apply(get_domain)
df_update['phone'] = df_update['phone'].apply(combined_clean)
df_update['contact_id'] = df_update['uuid']
df_update.drop(axis=1,columns='homepage_url', inplace=True)
df_update.drop(axis=1,columns='uuid', inplace=True)
len(df_update)

2997

In [25]:
df_update[df_update['contact_id'].apply(len)<2]

Unnamed: 0,email,phone,domain,name,contact_id


### Perform Update in Postgres

In [26]:
def create_update_query(table, condition, row_dict):
    """Create the update query statement"""
    sql_statement = f"""UPDATE {table} SET"""
    update_clauses = []
    for k, v in row_dict.items():
        update_clauses.append(f" {k} = %({k})s ")
    
    sql_statement = sql_statement+ ', '.join(update_clauses)+ f" WHERE {condition} = %({condition})s "
    return sql_statement

def load_updates(df, table, connection):
    cursor = connection.cursor()
    df1 = df
    update_values = df1.applymap(lambda x: None if pd.isnull(x) else x).to_dict(orient='records')
    for row in update_values:
        cursor.execute(create_update_query(table, 'domain', row), row)
        conn.commit()
    row_count = len(update_values)
    print(f'Updated {row_count} rows.')
    cursor.close()
    del cursor
    conn.close()

In [27]:
update_row = df_update.to_dict(orient='records')[0]
create_update_query('solution_patrick', 'domain', update_row)

'UPDATE solution_patrick SET email = %(email)s ,  phone = %(phone)s ,  domain = %(domain)s ,  name = %(name)s ,  contact_id = %(contact_id)s  WHERE domain = %(domain)s '

In [28]:
conn = eng.raw_connection()
load_updates(df_update.applymap(lambda x: None if pd.isnull(x) else x), 'solution_patrick', conn)

Updated 2997 rows.


## Check the Result

In [29]:
df_result = pd.read_sql('SELECT * FROM solution_patrick',con=eng)
print(df_result.shape)
df_result.head()

(1588450, 5)


Unnamed: 0,name,domain,email,phone,contact_id
0,KOBE Law Office,kobe-law.jp,,,
1,"KOBELCO Advanced Coating (America), Inc.",kobac-us.com,,,
2,Kobelco Business Support Co. Ltd.,kobelco-kbs.co.jp,,,
3,Kobelco Compressors Corp.,kobelco-comp.co.jp,,,
4,KOBELCO Compressors & Machinery Philippines Corp.,kobelco.com.ph,,,


In [30]:
df_result.describe()

Unnamed: 0,name,domain,email,phone,contact_id
count,1577896,1588350,7901,11622,13064
unique,1577886,1588344,7860,11083,13064
top,Marufuji KK (Tokyo),www.hsbc.com,info@amstock.com,1 9999999999,5f053b76-0f2a-4092-9162-1868d58dfb53
freq,2,2,6,17,1


Solution checks out -- we expect domain for all of out entries, with some duplicates. Some manual cleaning is likely required for bogus entries. From here we have to address normalizing this table into 3NF a company / contact pair of tables to reduce upsert complexity and anomalies. Once that rigor is introduced to the data model, we can explore the process that generates this data and potentially build the process into a data pipleline