# eDataSource Fuzzy Match Process

In [None]:
import pandas as pd
import os
import cx_Oracle
import time
import requests

import numpy as np
import re
#from fuzzywuzzy import fuzz, process
from dask import dataframe as dd, delayed, compute, distributed
from dask.multiprocessing import get
import multiprocess as mp
from multiprocessing.dummy import Pool as ThreadPool

## Test out preformance improvement over FuzzyWuzzy
from rapidfuzz import fuzz, process


# Update path to where function file resides
if os.name == 'nt':
    state = !cd
    
    # Load DB Connection File from Windows Machine
    os.chdir(r'Directory Name')
    from db_connection import oracle_connection

elif os.name == 'posix':
    state = !pwd
    
    # Load DB Connection File from Mac Machine
    os.chdir('Directory Name')
    from db_connection import oracle_connection

else:
    print('No OS!')

#Change directory back to working Jupyter Notebook Directory after importing connection module
os.chdir(state[0])

pd.options.display.max_columns = None

# Create DB Connection String

In [None]:
if os.name == 'nt':
    # Update path to where config file resides
    db_creds = os.path.expanduser('~') + 'Directory Name'
    creds = oracle_connection(db_creds)

    url = creds['host'] + ":" + creds['port'] + "/" + creds['database']

    db = cx_Oracle.connect(creds['user'], creds['password'], url, encoding = 'UTF-8')

    cursor = db.cursor()
elif os.name == 'posix':
    # Update path to where config file resides
    db_creds = os.path.expanduser('~') + 'Directory Name'
    creds = oracle_connection(db_creds)

    url = creds['host'] + ":" + creds['port'] + "/" + creds['database']

    db = cx_Oracle.connect(creds['user'], creds['password'], url, encoding = 'UTF-8')
    cursor = db.cursor()
else:
    print('No OS!')

## Read in the CSV files for processing.
#### NOTE: This process may be automated via API access

In [None]:
FILE_PATH = r'Directory Name'

df_list = []

for filename in os.listdir(FILE_PATH):
    if filename.endswith('.csv'):
        tmp_df = pd.read_csv(FILE_PATH + filename, keep_default_na = False, encoding = 'ANSI')
        df_list.append(tmp_df)

df = pd.concat(df_list, axis = 0, ignore_index = False, sort = False)
df.head()

In [None]:
## Add additional columns to enhance the fuzzy match logic

df['column'] = df['column'].astype(str)

df['column'] = df['column'].fillna('-') + ' ' + df['column'].fillna('-') + ' ' + df['column'].fillna('-') + ' ' + df['column'].fillna('-')
df['column'] = df['column'].fillna('-') + ' ' + df['column'].fillna('-') + ' ' + df['column'].fillna('-') + ' ' + df['column'].fillna('-') + ' ' + df['column'].fillna('-')

sub_df = df[['COLUMN NAME']]

agg_df = sub_df.groupby(['COLUMN NAME'], as_index = False)[['COLUMN NAME']].agg('sum')

agg_df = agg_df.drop(index = agg_df[agg_df['COLUMN NAME'] == ''].index)

esp_df = agg_df.groupby(['column'], as_index = False)[['column']].quantile(q = 0.75)
esp_df = esp_df.drop(esp_df.index[0]).reset_index(drop = True)
esp_df.rename(columns = {'column': 'column'}, inplace = True)

agg_df = pd.merge(agg_df, esp_df, left_on = 'column', right_on = 'column', how = 'left')
agg_df = agg_df[(agg_df['column'] >= agg_df['column']) | (agg_df['column'] == '')]
agg_df = agg_df.drop(['column'], axis = 1).reset_index(drop = True)

agg_df.head()

In [None]:
agg_df['column'].value_counts()

### Read in list of accounts from System for fuzzy wuzzy matchin process

In [None]:
query = """
"""

org_df = pd.read_sql(query, cursor.connection)
org_df.head(20)

### Using Dask for parallel computation

In [None]:
# Create the list for ACCT/ADDR compare
org_acct_list = org_df['column'].astype(str)
agg_df['column'] = agg_df['column'].astype(str)

# Create the list for the ACCT/ADDR/URL compare
org_acct_url_list = org_df['column'].astype(str)
agg_df['column'] = agg_df['column'].astype(str)

acct_id_list = org6df['column']

# Create the dictionaries to return the correct account ID
org_acct_id_dict = dict(zip(org_acct_list.index, acct_id_list))
org_acct_url_id_dict = dict(zip(org_acct_url_list.index, acct_id_list))

In [None]:
num_cpu = mp.cpu_count()

dask_df = dd.from_pandas(agg_df, npartitions = num_cpu)

client = distributed.Client()

In [None]:
## May look to improve the process using the RapidFuzz module over FuzzyWuzzy

def fuzzy_match_top_list_dask(value_to_compare, list_to_compare_against, dictionary_with_ids, cutoff_thresh):
        
        value_list = []
    
        value = process.extractOne(value_to_compare, list_to_compare_against, scorer = fuzz.ratio)

        if value[1] >= cutoff_thresh:
            (comp_value, match_pct, record_index) = value

            rec_id = dictionary_with_ids.get(record_index)
            
            value_list.extend([comp_value, match_pct, record_index, rec_id])

        else:
            value_list.extend(['No Value', -1, -1, 'No Acct ID'])

        return(value_list)

In [None]:
dask_df['fuzzy_match_id_acct_addr'] = dask_df.map_partitions(
    lambda df: df.apply(
        lambda x: fuzzy_match_top_list_dask(x.NAME_ADDR_STRING, org_acct_list, org_acct_id_dict, 85), axis = 1
    )
).compute(scheduler = 'processes')

In [None]:
dask_df['fuzzy_match_id_acct_addr_url'] = dask_df.map_partitions(
    lambda df: df.apply(
        lambda x: fuzzy_match_top_list_dask(x.NAME_ADDR_URL_STRING, org_acct_url_list, org_acct_url_id_dict, 85), axis = 1
    )
).compute(scheduler = 'processes')

In [None]:
client.close()

In [None]:
dask_df.head()

In [None]:
pd_df = dask_df.compute()
pd_df.head()

In [None]:
full_match_db_df = dask_df[['COLUMN NAME']].compute()
full_match_db_df.head()

In [None]:
full_match_db_df[['COLUMN NAME']] = pd.DataFrame(full_match_db_df['fuzzy_match_id_acct_addr'].tolist(), index=full_match_db_df.index)
full_match_db_df[['COLUMN NAME']] = pd.DataFrame(full_match_db_df['fuzzy_match_id_acct_addr_url'].tolist(), index=full_match_db_df.index)

full_match_db_df = full_match_db_df[['COLUMN NAME']]

full_match_db_df.head(20)

### Drop, Recreate table, and insert matched data

In [None]:
drop_table_sql = """
"""

cursor.execute(drop_table_sql)

In [None]:
create_table_sql = """
"""

cursor.execute(create_table_sql)

In [None]:
records = [tuple(x) for x in full_match_db_df.values]

cursor.executemany('''''', records)
db.commit()

#### Run basic comparison on percentage score to get correct ID

In [None]:
drop_table_sql = """
"""

cursor.execute(drop_table_sql)

In [None]:
create_table_sql = """
"""

cursor.execute(create_table_sql)

## Get list of accounts and data and create aggregation of all data names and usage for an account.

In [None]:
query = """
"""

df = pd.read_sql(query, cursor.connection)
df.head(20)

In [None]:
df.to_csv(r'DIRECTORY NAME', index = False)

In [None]:
acct_nm_list = list(df['column'].unique())

tmp_output_df_list = []

for acct in acct_nm_list:
    tmp_output_df = pd.DataFrame()
    tmp_df = df[df['column'] == acct]
    
    esp_names = tmp_df['column'].to_list()
    esp_volume = tmp_df['column'].to_list()
    esp_volume = [str(format(val, ",")) for val in esp_volume]
    
    combo_list = ['Overall ESP Volume for ' + ': '.join(val) for val in zip(esp_names, esp_volume)]
    
    oppty_sub_desc = '- ' + '\n- '.join(combo_list)
    
    tmp_output_df['column'] = tmp_df['column'].unique()
    tmp_output_df['column'] = tmp_df['column'].unique()
    tmp_output_df['column'] = tmp_df['column'].unique()
    tmp_output_df['column'] = oppty_sub_desc
    
    tmp_output_df_list.append(tmp_output_df)
    
output_df = pd.concat(tmp_output_df_list)
output_df.head()

### Load data aggregated at account level back into DB for last processing step 

In [None]:
drop_table_sql = """
"""

cursor.execute(drop_table_sql)

In [None]:
create_table_sql = """
"""

cursor.execute(create_table_sql)

In [None]:
records = [tuple(x) for x in output_df.values]

cursor.executemany('''''', records)
db.commit()

### Create final target list

In [None]:
drop_table_sql = """
"""

cursor.execute(drop_table_sql)

In [None]:
create_table_sql = """
"""

cursor.execute(create_table_sql)

In [None]:
query = """
"""

final_df = pd.read_sql(query, cursor.connection)
final_df.head()

# TF-IDF implementation

In [None]:
# t = term (word)
# d = document (set of words)
# N = count of corpus
# corpus = the total document set

# Term Frequency (TF) * Inverse Document Frequecny (IDF)
# Number of times a word is in a document * the total number of documents the word appears in
# TF is individual to each document and word
# tf(t, d) = count of t in d / number of words in d

# DF is the count of occurrences of term t in the document set N.
# df(t) = occurence of t in documents
# idf(t) = log(N/df + 1)

# tf-idf(t, d) = tf(t, d) * log(n/df+1)

# In this problem the documents are the edata company name list and the org62 account name list
# the terms are the names in the respective list
# We want to identify the most similar org62 account name to that of the edata company names

# Documents are each respective account/company name
# Words or terms are the words in an account/company name
# Corpus is the entire list of company names or account names

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

### User for straight name match
company_names_list = list(agg_df['column'].astype(str))
org_acct_names_list = list(org_df['column'].astype(str))
org_acct_names = org_df['column'].astype(str)

acct_id_list = org_df['column']

org_acct_id_dict = dict(zip(org_acct_names.index, acct_id_list))

vectorizer = TfidfVectorizer(stop_words = ['llc', 'inc', 'corp', 'main', 'dupe'])

# Rows in matrices represent the documents (Company Names)
# Columns represent unique tokens (or words)
org_mat = vectorizer.fit_transform(org_acct_names_list)
edata_mat = vectorizer.transform(edata_company_names_list)
#print(vectorizer.get_feature_names())

cos_sim_mat = cosine_similarity(org_mat, edata_mat)

output_list = []

for idx, cos_sim in enumerate(cos_sim_mat):
    cos_sim = cos_sim.flatten()
    target_index = cos_sim.argsort()[-1]
    source_index = idx
    sim = cos_sim[target_index]
    if cos_sim[target_index] >= 0.90:
        tmp_df = pd.DataFrame(data = {'column': [org_acct_names_list[source_index]]
                                      , 'coluumn': [edata_company_names_list[target_index]]
                                      , 'column': [sim]
                                      , 'column': [org_acct_id_dict.get(source_index)]
                                     }
                              , columns = ['COLUMN NAME'])
        
        output_list.append(tmp_df)
        
        
final_df = pd.concat(output_list)
final_df.head()

In [None]:
final_df.shape

# Get Target ID for @ Mention in Chatter Post

In [None]:
# Replace the Target Table with final list of accounts oppty's will be loaded to

query = """        
        """

manager_df = pd.read_sql(query, cursor.connection)

manager_df.head()

In [None]:
# Check the Join columns

oppty_chatter_df = df.merge(manager_df, how = 'left', left_on = 'column', right_on = 'column', suffixes = (False, False))
oppty_chatter_df.head()

# Create the Chatter Message Body for POST

In [None]:
chatter_msg_list = []

for idx, val in oppty_chatter_df.iterrows():
    oppty_amt = oppty_chatter_df.loc[idx, 'column']
    campaign_name = oppty_chatter_df.loc[idx, 'column']
    
    chatter_msg = ''.format(camp_nm = campaign_name, amt = oppty_amt)

    chatter_msg_list.append(chatter_msg)
    
oppty_chatter_df['column'] = chatter_msg_list
    
oppty_chatter_df.head()

## After Successful Load in read file to get ID

In [None]:
for file in glob.glob('directory'):
    success_df = pd.read_csv(file)
    
success_df.head()

In [None]:
# Check Column Names

oppty_chatter_df = oppty_chatter_df.merge(success_df, how = 'inner', left_on = ['column'], right_on = ['column'], suffixes = ['','_y'])
oppty_chatter_df = oppty_chatter_df.drop(oppty_chatter_df.columns.difference(['COLUMN NAME']), axis = 1)
oppty_chatter_df.head()

# Push Chatter Message to AE

In [None]:
# Testing Purposes
#oppty_chatter_df = oppty_chatter_df.head(1)
#oppty_chatter_df['column'] = ''
#oppty_chatter_df['column'] = ''
#oppty_chatter_df['column'] = ''

#oppty_chatter_df[['COLUMN NAME']].head()

In [None]:
chatter_url = "URL".format(sf = native['instance'])

bad_record_list = []

start_time = time.time()

for idx, val in oppty_chatter_df.iterrows():
    
    hyperlink_dict = {
        'hyperlink_1': ['URL', ' Monetization Strategy Team.', 1]
    }
    
    oppty_id = df.loc[idx, 'column']
    data_for_payload = {
        'mention_1': oppty_chatter_df.loc[idx, 'column']
        , 'body_1': oppty_chatter_df.loc[idx, 'column']
        , 'mention_2': oppty_chatter_df.loc[idx, 'column']
        , 'hyperlink': hyperlink_dict
    }
    
    chatter_json_payload = create_oppty_chatter_payload_hyperlink(oppty_id
                                                                 , data_for_payload
                                                                 , hyperlink_key = 'hyperlink'
                                                                 , inline_hyperlink = True)
    
    #print(chatter_json_payload)

    #"""
    try:
        chatter_response = requests.request("POST", chatter_url, data = chatter_json_payload, headers = headers)
        chatter_response.json()
        
        chatter_response.raise_for_status()
        print('Chatter Message Post Successful for index: ' + str(idx))
        
    except Exception as msg:
        print('Issue with POST request for index: ' + str(idx) + ' with error: ' + str(msg))
        bad_record_list.append(idx)
    
end_time = time.time()
print('Total Time Taken: ' + str(end_time - start_time) + ' seconds')
    
bad_record_df = oppty_chatter_df.iloc[bad_record_list, :]
#"""