# Key Notes

1. Goal is to find outliers in a corpus of addresses.
2. This notebook contains unsupervised approaches - Clustering and Rule-Based models.
3. DBScan and KModes clustering is done.

In [None]:
import pandas as pd
import numpy as np
import ast
import optimus
from optimus.functions import filter_row_by_data_type as fbdt
import pyspark.sql.functions as F
import dqtool
import requests
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Process
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from kmodes.kmodes import KModes
import math

In [None]:
offices_data_processed = pd.read_csv("offices_data.csv")

# Relationships

In [None]:
def find_binary_relation(df, col1, col2, thres_1_1=85):

    data = df[[col1, col2]].drop_duplicates([col1, col2])
    data = data.dropna()
    num_cat_col1 = data[col1].nunique()
    num_cat_col2 = data[col2].nunique()

    # per value of col1, distinct count of col2 values
    df_gb_col1 = pd.DataFrame(data.groupby(col1)[col2].nunique()).reset_index()
    # per value of col2, distinct count of col1 values
    df_gb_col2 = pd.DataFrame(data.groupby(col2)[col1].nunique()).reset_index()

    # % of categories having one to one mapping
    # 1 -> 2
    one_one_col1_perc = len(df_gb_col1[df_gb_col1[col2] == 1]) * 100 / num_cat_col1
    # 2 -> 1
    one_one_col2_perc = len(df_gb_col2[df_gb_col2[col1] == 1]) * 100 / num_cat_col2

    # list of categories having one to many relation
    # 1 -> 2
    list_flagged_entries_df1 = list(df_gb_col1[df_gb_col1[col2] != 1][col1])
    # 2 -> 1
    list_flagged_entries_df2 = list(df_gb_col2[df_gb_col2[col1] != 1][col2])

    if one_one_col1_perc > thres_1_1 and one_one_col2_perc > thres_1_1:
        relation = 'one_to_one'
    elif df_gb_col1[col2].mean() > 1 and one_one_col2_perc >= thres_1_1 and num_cat_col1 / num_cat_col2 < 100:
        relation = 'parent_col1_child_col2'
    elif df_gb_col2[col1].mean() > 1 and one_one_col1_perc >= thres_1_1 and num_cat_col2 / num_cat_col1 < 100:
        relation = 'parent_col2_child_col1'
    else:
        relation = 'none'

    if relation == 'one_to_one':
        return {'relation': relation, 'flagged': {col1: list_flagged_entries_df1, col2: list_flagged_entries_df2}}
    elif relation == 'parent_col1_child_col2':
        return {'relation': relation, 'flagged': {col2: list_flagged_entries_df2}}
    elif relation == 'parent_col2_child_col1':
        return {'relation': relation, 'flagged': {col1: list_flagged_entries_df1}}
    else:
        return {'relation': relation}

def find_all_binary_relations(df, columns):
    '''
    find relationship between all column pairs in columns
    :param df:
    :param columns:
    :return:
    '''
    rules = []
    for i in range(len(columns)):
        for j in range(i):
            col1, col2 = columns[i], columns[j]
            res = find_binary_relation(df, col1, col2)
            if res['relation'] != 'none':
                rules.append({'col1': col1, 'col2':col2, 'relation': res['relation'], 'flagged': res['flagged']})

    return rules

In [None]:
rules = find_all_binary_relations(offices_data_processed,['zip_cd', 'van_cty_nm', 'st_cd'])

In [None]:
for rule in rules:
    flagged_col = list(rule['flagged'].keys())[0]
    col = list(set([rule['col1'], rule['col2']]) - set(flagged_col))[0]
    print(flagged_col, col)
    rule['pairs'] = dict()
    for val in rule['flagged'][flagged_col]:
        counts = offices_data_processed[offices_data_processed[flagged_col] == val][col].value_counts()
        max_times = counts.idxmax()
        if counts.max() == counts.min():
            others = list(counts.keys())
        else:
            others = list(set(list(counts.keys())) - set([max_times]))
        rule['pairs'][val] = others

In [None]:
def get_rule_results(zp, cty, st):
    
    rule_break = ""
    
    for rule in rules:
        if 'zip_cd' in rule['flagged'].keys():
            if zp in rule['flagged']['zip_cd']:
                rule_break += rule['col1'] + " - " + rule['col2'] + "; "
        if 'van_cty_nm' in rule['flagged'].keys():
            if cty in rule['flagged']['van_cty_nm']:
                rule_break += rule['col1'] + " - " + rule['col2'] + "; "
        if 'st_cd' in rule['flagged'].keys():
            if st in rule['flagged']['st_cd']:
                rule_break += rule['col1'] + " - " + rule['col2'] + "; "
        
    if rule_break == "":
        return np.nan
    return rule_break

In [None]:
for rule in rules:
    flagged_col = list(rule['flagged'].keys())[0]
    col = list(set([rule['col1'], rule['col2']]) - set(flagged_col))[0]
    offices_data_processed["Relation "+ rule['col1']+"-"+rule['col2']+" Check"] = \
     offices_data_processed.apply(
        lambda x: 1 if x[flagged_col] in rule['pairs'].keys() and x[col] in rule['pairs'][x[flagged_col]] else 0, axis=1)

# Datatype Mismatch

In [None]:
engine = optimus.Optimus(options={'spark.driver.memory' : '7g', 'spark.executer.memory' : '7g'})

In [None]:
ds = dqtool.datasource.DataSourceLocal("/Users/avarshn5/Desktop/Projects/DataQ/Adr_out/ml_ai_quest_providers/offices_data.csv")

In [None]:
df = dqtool.helpers.get_df(engine, ds)

In [None]:
data_type_counts = dqtool.pyspark.infer.count_category_dtype(engine, df, df.columns)

In [None]:
results_data_type = {}

for col, v in data_type_counts['columns'].items():
    typ = v['dtype']
    results_data_type[col] = {'type': typ}
    
    results_data_type[col]['mismatch'] = {}
    for k, v in v['details'].items():
        if k!=typ and v>0:
            temp = (df.h_repartition(col_name=col)
                    .select(col).withColumn("check", fbdt(col, data_type=k))
                    .filter(F.col("check"))
                    .select(col)
                    .distinct().collect())
            results_data_type[col]['mismatch'][k] = [row[col] for row in temp]
    

In [None]:
def get_mismatch_results(x):
    
    res = ""
    
    for col in results_data_type.keys():
        for k in results_data_type[col]['mismatch'].keys():
            if x[col] in results_data_type[col]['mismatch'][k]:
                res += k + "; "
                
    if res=="":
        return np.nan
    return res

In [None]:
offices_data_processed["Data Type Mismatch Check"] = \
offices_data_processed.apply(
    lambda x: get_mismatch_results(x), axis=1)

# DBScan

In [None]:
def identify_addr(addr):

    url = 'http://apsrp06825:5008/parser'
    myobj = {'query': addr}
    x = requests.request(method = 'POST', url=url, json = myobj, timeout=10.0)
    resp = {}

    for comp in x.json():
        k = comp['label']
        v = comp['value']
        resp[k] = v
    return resp
  

In [None]:
addr_resp = []
with ThreadPoolExecutor(max_workers=1000) as ex:
    for i, resp in enumerate(ex.map(identify_addr, offices_data_processed.adr_ln_1_txt)):
        resp['id'] = i
        addr_resp.append(resp)

In [None]:
addr_resp = []
for addr in offices_data_processed.adr_ln_1_txt:
    resp = identify_addr(addr)
    resp['id'] = i
    addr_resp.append(resp)

In [None]:
p = Process(target=identify_addr)

# Feature creation and Clustering

In [None]:
def get_numerical_count(lt):
    nc = 0
    for el in lt:
        if el.isnumeric():
            nc += 1
    return nc

def get_feature(addr):
    addr = addr.strip()
    features = []
    features.append(len(addr))
    features.append(len(addr.split(" ")))
    features.append(get_numerical_count(addr.split(" ")))
    
    is_start_with_number_or_po = 0
    if addr.split(" ")[0].isnumeric() \
        or addr.split(" ")[0][:-1].isnumeric() \
        or addr.split(" ")[0] in ["PO", "P.O."]:
        is_start_with_number_or_po = 1
    features.append(is_start_with_number_or_po)
    
    non_obvious_char_count = 0
    for c in addr:
        if not (c.isalpha() or c.isnumeric() or c in [" ", "-", "#", "&", "/", ".", ","]):
            non_obvious_char_count += 1
    
    features.append(non_obvious_char_count)
    
    return features

In [None]:
adr_ln_1_txt_features = offices_data_processed['adr_ln_1_txt'].apply(lambda x: get_feature(str(x))).values.tolist()

In [None]:
X = StandardScaler().fit_transform(adr_ln_1_txt_features)

In [None]:
counts = []
for i in [2, 4, 6, 8, 10]:
    db = DBSCAN(eps=i, min_samples=10).fit(X)
    labels = db.labels_

    counts.append(pd.Series(labels).value_counts())

In [None]:
db = DBSCAN(eps=10, min_samples=10).fit(X)
labels = db.labels_

counts.append(pd.Series(labels).value_counts())

In [None]:
out_data = offices_data_processed['adr_ln_1_txt'].values.tolist()

In [None]:
for i,l in enumerate(labels):
    if l != 0:
        print(i,l, out_data[i])

In [None]:
def is_dbscan_outlier(row, i):
    if labels[i] == 1:
        return 1
    return 0

In [None]:
offices_data_processed['DBScan_Outlier'] = offices_data_processed.apply(lambda x: is_dbscan_outlier(x, x.name), axis=1)

# KModes

In [None]:
X = StandardScaler().fit_transform(adr_ln_1_txt_features)

In [None]:
db = KModes(n_clusters=15).fit(X)
labels_kmode = db.labels_

pd.Series(labels_kmode).value_counts()

In [None]:
for i,l in enumerate(labels_kmode):
    if l == 14:
        print(i,l, out_data[i])

In [None]:
def is_kmode_outlier(row, i):
    if labels_kmode[i] == 1:
        return 1
    return 0

In [None]:
offices_data_processed['KMODE_Outlier'] = offices_data_processed.apply(lambda x: is_kmode_outlier(x, x.name), axis=1)

# Zip Code Length 5

In [None]:
def is_zip_len_5(zp):
    if math.isnan(zp):
        return np.nan
    
    if len(str(int(zp))) == 5:
        return 0
    return 1

In [None]:
offices_data_processed['zip_cd'] = offices_data_processed['zip_cd'].astype('Int32')
offices_data_processed["Zip Length 5"] = offices_data_processed['zip_cd'].apply(lambda x: is_zip_len_5(x))                                                                                               

# Zip Code Length 4

In [None]:
def is_zip_4(zp):
    if math.isnan(zp):
        return np.nan

    if len(str(int(zp))) == 4:
        return 0
    return 1

In [None]:
offices_data_processed["ZIP + 4 Code is length 4"] = offices_data_processed['zip_pls_4_cd'].apply(lambda x: is_zip_4(x))

# Address line 1 has PO Box then adr_typ_desc != PLACE OF SERVICE

In [None]:
def po_adr_type_check(addr, adr_typ):
    
    if ('PO ' in addr \
        or 'P0 ' in addr \
        or 'P.O. ' in addr \
        or 'P.O ' in addr \
        or 'P O ' in addr ) \
        and 'BOX' in addr.upper():
        if adr_typ == 'PLACE OF SERVICE':
            return 1
    return 0

In [None]:
offices_data_processed['PO Box with Place of service check'] = offices_data_processed.apply(lambda x: 
                                                                                            po_adr_type_check(x['adr_ln_1_txt'], x['adr_typ_desc']), axis = 1)

In [None]:
def po_adr(addr):
    
    if 'PO ' in addr \
        or 'P0 ' in addr \
        or 'P.O. ' in addr \
        or 'P.O ' in addr \
        or 'P O ' in addr:
        return 1
    return 0

In [None]:
offices_data_processed['PO Box'] = offices_data_processed['adr_ln_1_txt'].apply(lambda x: 
                                                                                      po_adr(x))

In [None]:
offices_data_processed = offices_data_processed.drop(columns=['PO Box'])

In [None]:
offices_data_processed['Number of check failed'] =  offices_data_processed.apply(lambda x: x['Relation van_cty_nm-zip_cd Check']+
                                                                                 x['Relation st_cd-zip_cd Check']+
                                                                                 x['Relation st_cd-van_cty_nm Check']+
                                                                                 x['PO Box with Place of service check']+
                                                                                 x['DBScan_Outlier'], axis=1)

In [None]:
offices_data_processed[offices_data_processed['Number of check failed'] > 0]