In [None]:
import configparser
import requests
import pandas as pd
import os
import logging
import sys

from sqlalchemy import create_engine, MetaData, Table
from datetime import datetime
from fuzzywuzzy import fuzz, process

In [None]:
# Set up logging configuration
today = datetime.today().strftime('%Y-%m-%d')
logging.basicConfig(filename=f'config/{today}.log', level=logging.INFO, 
                    format='%(asctime)s:%(levelname)s:%(message)s')


In [None]:
# read credential config file
config = configparser.ConfigParser()
config.read('config/credential.ini')
if len(config.read('config/credential.ini')) == 0:
    logging.error("No Credential")
    sys.exit(1)
else:
    logging.info("Credential obtained")

In [None]:
# Saleforce
url = 'https://login.salesforce.com/services/oauth2/token'

payload = {
    'grant_type': 'password',
    'client_id': config['Salesforce']['client_id'],
    'client_secret': config['Salesforce']['client_secret'],
    'username': config['Salesforce']['username'],
    'password': config['Salesforce']['password']
}

headers = {
    'Content-Type': 'application/x-www-form-urlencoded',
    'Accept': 'application/json'
}

response = requests.post(url, data=payload, headers=headers)

if response.status_code == 200:
    access_token = response.json()['access_token']
    logging.info("Saleforce Access Token obtained")
else:
    logging.exception(f"Error {response.status_code}: {response.text}")
    sys.exit(1)


In [None]:
host = config['MySQL']['host']
database = config['MySQL']['database']
user = config['MySQL']['user']
password = config['MySQL']['password']
url = f"mysql+pymysql://{user}:{password}@{host}/{database}"

# Create an SQLAlchemy engine object
try:
    engine = create_engine(url)
    logging.info("Open MySQL connection")
except:
    logging.exception("Unable to connect to MySQL database")
    sys.exit(1)
    
# Create a SQLAlchemy metadata object and bind it to the engine
metadata = MetaData(bind=engine)

# Reflect the table structure from the database
table = Table('Contacts', metadata, autoload=True)

# Get the list of column names
column_names_mysql = [column.name for column in table.columns]

In [None]:
json_res = response.json()
instance_url = json_res['instance_url']

headers = {
    'Authorization': f'Bearer {access_token}',
    'Content-Type': 'application/json'
}

# Get the column names
contact_describe_url = instance_url + '/services/data/v51.0/sobjects/Contact/describe/'
response_describe = requests.get(contact_describe_url, headers=headers)
column_names_saleforce = [field['name'] for field in response_describe.json()['fields']]


In [None]:
threshold = 50
corresponding_columns = []
for string in column_names_mysql:
    matches = process.extract(string, column_names_saleforce, scorer=fuzz.token_sort_ratio)
    best_match, score = max(matches, key=lambda x: x[1])
    if score >= threshold:
        print(f"'{string}' matches '{best_match}' with score {score}")
        corresponding_columns.append(best_match)
    else:
        print(f"'{string}' does not have a good match")
        corresponding_columns.append('')

In [None]:
# Even though OtherStreet, OtherCity, .... are chosen here. MailingStreet, MailingCity, ... seems to be more accurate
# Also, fuzzy lookup is unable to find a corresponding column for suffix, I take a look and decided to choose Salutation
corresponding_columns = [i.replace('Other', 'Mailing') for i in corresponding_columns]
corresponding_columns = ['Salutation' if i == '' else i for i in corresponding_columns ]

In [None]:
# Query the Contact Salesforce API (only get data with DoNotCall = False and Email is not null)
all_col = ','.join(corresponding_columns)
contact_query_url = instance_url + f'/services/data/v51.0/query?q=SELECT+{all_col}+FROM+contact+WHERE+DoNotCall=False+AND+Email!=NULL'
response_query = requests.get(contact_query_url, headers=headers)

In [None]:
# Create a pandas DataFrame from the JSON data
json_output = response_query.json()["records"]
df = pd.json_normalize(json_output)
df = df[corresponding_columns]

In [None]:
# getting all columns having the word 'Mailing'
mailing_col = [i for i in corresponding_columns if "Mailing" in i and "Street" not in i]

In [None]:
# There are cases which MailingStreet also contains city, state, zip cod and country.
# We are going to use regular expression to populate them to other columns
city_regex = '(?<=\\n)(\w+\s*\w*),'
state_regex = '(?<=\, )(\w+)'
zip_regex = '(\d+)(?=\\n)'
country_regex = '(?<=\\n)(\w+\s*\w+)$'
all_regex = [city_regex, state_regex, zip_regex, country_regex]

for col, regex in zip(mailing_col,all_regex):
    df[col] = df[col].fillna(df["MailingStreet"].str.extract(regex)[0].str.strip())

In [None]:
# Replace string in MailingStreet so it will only contain the street
for col in mailing_col:
    df["MailingStreet"]= [x.replace(str(y), '').replace('\n', '').replace(',', '') \
                          if x is not None else None for x, y  in df[['MailingStreet',col]].to_numpy()]

In [None]:
# Remove duplicate rows having same email (remove the one having no emails)
df = df.sort_values(by='Phone').drop_duplicates(subset=['Email'], keep='first').reset_index().drop('index', axis = 1)

In [None]:
# Assume all phone number is 10 digits. Remove all characters, space and then take the last 10 digits
df.Phone = [i.replace(' ','-').replace('(','').replace(')','').replace('-','')[-10:] for i in df.Phone]

In [None]:
# Test if phone only contains number. If not export that rows to a csv file
test_phone = df[~df.Phone.str.isnumeric()]
df = df[df.Phone.str.isnumeric()]

if len(test_phone) > 0:
    test_phone["Error"] = "Wrong Phone"
    logging.error(f"Wrong Phone in error/{today}.csv")
    if not os.path.isfile(f'error/{today}.csv'):
        test_phone.to_csv(f'error/{today}.csv', index = False)
    else:
        test_phone.to_csv(f'error/{today}.csv', mode='a', header=False, index = False)

In [None]:
# Check if the 'email' column follows the regex pattern. If not export that rows to a csv file
pattern = r'^[\w\.-]+@([\w-]+\.)+[\w-]{2,4}$'
is_valid = df['Email'].str.match(pattern)

test_email = df[~is_valid]
df = df[is_valid]
if len(test_email) > 0:
    test_email["Error"] = "Wrong Email"
    logging.error(f"Wrong Email in error/{today}.csv")
    if not os.path.isfile(f'error/{today}.csv'):
        test_email.to_csv(f'error/{today}.csv', index = False)
    else:
        test_email.to_csv(f'error/{today}.csv', mode='a', header=False, index = False)

In [None]:
# There are a few countries having full name instead of abbrevation, we will need to change them
print(set(df["MailingCountry"]))
replacements = {
    'Singapore': 'SG',
    'France': 'FR',
    'USA': 'US',
    'United States': 'US',
    'United Kingdom' : 'GB'
}

df['MailingCountry'] = df['MailingCountry'].replace(replacements)
logging.info("Data preprocessed")

In [None]:
# Before inserting into MySQL database, we need to check all conditions
# check if all emails are unique
# check if there is any rows having no email
# check if any rows having DoNotCall != False
if len(set(df["Email"])) == len(df) and df.Email.isnull().sum() == 0 and len(df[df["DoNotCall"] != False]) == 0: 
    col_dict = {old_name: new_name for old_name, new_name in zip(df.columns, column_names_mysql)}
    df = df.rename(columns=col_dict)
    logging.info("All conditions checked")
    logging.info("Inserting to MySQL database")
    df.to_sql(name='Contacts', con=engine, if_exists='append', index=False)
    logging.info("Inserted")

In [None]:
# Aware of State having number in string