## 1. Initialization

> a. Read the excel/csv containing the entire query-output

> b. Initialize the parameters of thresholds, directories, and fields to concat for generating individual as well as combined match-score

> c. Get the unique list of countries in present dataframe

In [1]:
import time, numpy as np, pandas as pd, recordlinkage

_STATIC_FILE_NAME="SM_Temp_Shortlist.xlsx"
_RAW_SCORES_DIRECTORY='Raw_Scores'
_CLEANED_SCORES_DIRECTORY='Cleaned_Scores'
_MASTER_DATA_DIRECTORY='Master_Data'
_FIELDS_TO_CONCAT={ 
    'CONCAT_ADDRESS':   ['ADDRESS_LINE_1','ADDRESS_LINE_2','ADDRESS_LINE_3'],
    'CONCAT_SRC':       ['SITE_NAME','STATE','CITY','CONCAT_ADDRESS','POSTAL_CODE']
                }
_THRESHOLD_FOR_INDIVIDUAL=0.85
_THRESHOLD_FOR_COMBINED=0.55
_SCALING_FACTOR=3

_TOTAL_MATCHES_THRESHOLD=2


def write_df_to_csv(df, root_dir='', curr_country='', file_suffix='_temp.csv', index_flag=False):
    try:
        abs_path=os.path.join(root_dir, curr_country+file_suffix)
        df.to_csv(abs_path, index=index_flag)
        print(f'\nSuccessfully created \{abs_path}!')
    except:
        print(f'\nSomething went wrong while writing the file. Please check if it is currently in use.')

site_master_df=pd.read_excel(_STATIC_FILE_NAME, index_col=0)
#site_master_df.replace(np.nan, '', inplace=True)
for c in site_master_df.columns.values:
    site_master_df[c]=site_master_df[c].astype(str)


print('\nColumns: ', site_master_df.columns.values,'\n')
countries=site_master_df['COUNTRY'].unique()
print('\nUnique Countries: ',countries)


Columns:  ['SOURCE_IDENTIFIER' 'DATA_SOURCE_NAME' 'PROTOCOL_NUMBER' 'SITE_NUM'
 'UNIQUE_SITE_ID' 'COUNTRY' 'SITE_NAME' 'STATE' 'CITY' 'ADDRESS_LINE_1'
 'ADDRESS_LINE_2' 'ADDRESS_LINE_3' 'POSTAL_CODE' 'SITE_STATUS'] 


Unique Countries:  ['Italy']


## 2. Process data in batches: generating all the candidate-pairs with this library consumes very high RAM
> a. Partition the entire input dataset based on country-name. For-loop to do the entire process for all countries.

> b. Create a concatenated (address) field and drop the individual 3 address fields

> c. Create a concatenated (name, address) field

> c. Initialize the candidate-pairs (indexes) for comparison

In [2]:
# todo: Research on multithreading to speed up country-wise batches. RAM might crash for incoming batch-size>1800.

i=0
curr_country=countries[i]
country_df=site_master_df[site_master_df['COUNTRY']==curr_country]

for colname, cols_to_concat in _FIELDS_TO_CONCAT.items():
    country_df[colname]=country_df[cols_to_concat].apply(lambda single_row: ' '.join(single_row.values), axis=1)

country_df.drop(labels=_FIELDS_TO_CONCAT['CONCAT_ADDRESS'], axis=1, inplace=True)

# todo: Imputation of address values where Site-name is exactly the same, otherwise it'll result in 2 separate master-records

print(f'{curr_country} has {country_df.shape[0]} records')
indexer=recordlinkage.Index()
indexer.block(left_on='COUNTRY')
candidates=indexer.index(country_df)
print('Number of pairs for match consideration=',len(candidates))

# todo: Read source code to figure out how is number of pairs reduced? Possibly uses only unique combinations rather than permutations

Italy has 1371 records
Number of pairs for match consideration= 939135


## 3. Initialize Comparer() object with set of fields to compare amongst the candidate pairs

### Currently configured such that: 

> if individual fields' match-score > 85% , then set the *_col_*_COMPARISON_SCORE column to 1, else 0

> if combined fields' match-score > 55% , then set the CONCAT_SRC_COMPARISON_SCORE column to 1, else 0

### Comparer algorithm can be: 'jaro', 'jarowinkler', 'levenshtein', 'damerau_levenshtein', 'qgram', 'cosine', 'smith_waterman', 'lcs'

### You can also set n_job=-1 to use up all cores available for parallel-computation of scores for the candidate-pairs.

In [3]:
# todo: check if non-latin scripts can be handled. It seems to be able to handle UTF-8 encoding data
def add_field_to_compare(comparer_obj, field_name, threshold, method='levenshtein'):
    comparer_obj.string( field_name, field_name, method=method, threshold=threshold, label=field_name+'_COMPARISON_SCORE' )

comparer=recordlinkage.Compare(n_jobs=-1)
add_field_to_compare(comparer, 'CONCAT_SRC', _THRESHOLD_FOR_COMBINED)

for column in _FIELDS_TO_CONCAT['CONCAT_SRC']:
    add_field_to_compare(comparer, column, _THRESHOLD_FOR_INDIVIDUAL)

print('Comparer created with individual-fields\' threshold=',_THRESHOLD_FOR_INDIVIDUAL, ' and combined-field threshold=',_THRESHOLD_FOR_COMBINED)

print('\n\n Starting computation for match-scores... \n')
start=time.time()
score_features=comparer.compute(candidates, country_df)
print(time.time()-start,' seconds needed for ',country_df.shape[0],' records.')
print('\n\nScore_features generated: ', score_features.shape)

Comparer created with individual-fields' threshold= 0.85  and combined-field threshold= 0.55


 Starting computation for match-scores... 

3780.1232018470764  seconds needed for  1371  records.


Score_features generated:  (939135, 6)


## 4. Get the set of potential duplicates where TOTAL_SCORE > THRESHOLD

In [4]:
_COLS_FOR_TOTAL_MATCH_CALC=[colname+'_COMPARISON_SCORE' for colname in _FIELDS_TO_CONCAT['CONCAT_SRC']]
_COLS_FOR_TOTAL_MATCH_CALC.append('CONCAT_SRC'+'_COMPARISON_SCORE')

def scale_up_comparison_score(df, colname='SITE_NAME_COMPARISON_SCORE', scaling_factor=_SCALING_FACTOR):
    print(f'\nScaling up {colname} by {scaling_factor}')
    df[colname]=df[colname].apply(lambda x: x*scaling_factor)

scale_up_comparison_score(score_features,'SITE_NAME_COMPARISON_SCORE', _SCALING_FACTOR)
scale_up_comparison_score(score_features,'CONCAT_ADDRESS_COMPARISON_SCORE', _SCALING_FACTOR)

write_df_to_csv(df=score_features, root_dir=_RAW_SCORES_DIRECTORY, curr_country=curr_country, file_suffix='_Raw_Scores.csv', index_flag=True)

duplicates=score_features[score_features.sum(axis=1) > _TOTAL_MATCHES_THRESHOLD].reset_index()
duplicates['NUM_OF_MATCHES_FOUND']=duplicates[_COLS_FOR_TOTAL_MATCH_CALC].sum(axis=1)

duplicates.head(30)


Scaling up SITE_NAME_COMPARISON_SCORE by 3

Scaling up CONCAT_ADDRESS_COMPARISON_SCORE by 3

Successfully created \Raw_Scores\Italy_Raw_Scores.csv!


Unnamed: 0,SR_NUM_1,SR_NUM_2,CONCAT_SRC_COMPARISON_SCORE,SITE_NAME_COMPARISON_SCORE,STATE_COMPARISON_SCORE,CITY_COMPARISON_SCORE,CONCAT_ADDRESS_COMPARISON_SCORE,POSTAL_CODE_COMPARISON_SCORE,NUM_OF_MATCHES_FOUND
0,6,1,1.0,0.0,0.0,1.0,3.0,1.0,6.0
1,7,3,1.0,0.0,0.0,1.0,0.0,1.0,3.0
2,8,3,1.0,0.0,0.0,1.0,0.0,1.0,3.0
3,8,7,1.0,0.0,0.0,1.0,0.0,1.0,3.0
4,10,4,1.0,3.0,1.0,1.0,3.0,1.0,10.0
5,16,15,1.0,0.0,0.0,1.0,3.0,1.0,6.0
6,17,15,1.0,0.0,0.0,1.0,3.0,1.0,6.0
7,17,16,1.0,0.0,0.0,1.0,3.0,1.0,6.0
8,20,19,1.0,0.0,0.0,0.0,3.0,1.0,5.0
9,22,1,1.0,3.0,0.0,1.0,3.0,1.0,9.0


## 5. Choose the best match for incoming child records based on highest total-score

In [5]:
def return_top_match(df, child_column, score_key_column):
    normalized_duplicates=df.sort_values(by=[child_column]).sort_values(by=[score_key_column],ascending=False)
    normalized_duplicates=normalized_duplicates.groupby(child_column).head(1).sort_values(by=[child_column])
    return normalized_duplicates

normalized_duplicates=return_top_match(df=duplicates, child_column='SR_NUM_1', score_key_column='NUM_OF_MATCHES_FOUND')
normalized_duplicates.head(30)

Unnamed: 0,SR_NUM_1,SR_NUM_2,CONCAT_SRC_COMPARISON_SCORE,SITE_NAME_COMPARISON_SCORE,STATE_COMPARISON_SCORE,CITY_COMPARISON_SCORE,CONCAT_ADDRESS_COMPARISON_SCORE,POSTAL_CODE_COMPARISON_SCORE,NUM_OF_MATCHES_FOUND
0,6,1,1.0,0.0,0.0,1.0,3.0,1.0,6.0
1,7,3,1.0,0.0,0.0,1.0,0.0,1.0,3.0
2,8,3,1.0,0.0,0.0,1.0,0.0,1.0,3.0
4,10,4,1.0,3.0,1.0,1.0,3.0,1.0,10.0
5,16,15,1.0,0.0,0.0,1.0,3.0,1.0,6.0
7,17,16,1.0,0.0,0.0,1.0,3.0,1.0,6.0
8,20,19,1.0,0.0,0.0,0.0,3.0,1.0,5.0
9,22,1,1.0,3.0,0.0,1.0,3.0,1.0,9.0
11,26,25,1.0,0.0,1.0,1.0,0.0,1.0,4.0
12,35,9,1.0,3.0,0.0,1.0,0.0,1.0,6.0


## 5. Reusable function to replace the cyclic matches
### For example:

>   Record45 matches with Record44

>   Record67 matches with Record45

### In this case we should maintain:
>   Record67 matches with Record44

In [6]:
def replace_cyclic_dependencies(df, child_indicator, master_indicator):
    arr=set(df[child_indicator].array)
    for val in df[master_indicator]:
        if val in arr:
            replace_val=df[df[child_indicator]==val][master_indicator].values[0]
            print(val,' found in normalized_duplicates[',child_indicator,']. Replacement: ', replace_val)
            df[master_indicator].replace(val, replace_val, inplace=True)
    return df

normalized_duplicates=replace_cyclic_dependencies(df=normalized_duplicates, child_indicator='SR_NUM_1', master_indicator='SR_NUM_2')
normalized_duplicates

lized_duplicates[ SR_NUM_1 ]. Replacement:  76
415  found in normalized_duplicates[ SR_NUM_1 ]. Replacement:  327
461  found in normalized_duplicates[ SR_NUM_1 ]. Replacement:  18
640  found in normalized_duplicates[ SR_NUM_1 ]. Replacement:  72
637  found in normalized_duplicates[ SR_NUM_1 ]. Replacement:  327
204  found in normalized_duplicates[ SR_NUM_1 ]. Replacement:  36
661  found in normalized_duplicates[ SR_NUM_1 ]. Replacement:  327
675  found in normalized_duplicates[ SR_NUM_1 ]. Replacement:  18
462  found in normalized_duplicates[ SR_NUM_1 ]. Replacement:  9
465  found in normalized_duplicates[ SR_NUM_1 ]. Replacement:  9
664  found in normalized_duplicates[ SR_NUM_1 ]. Replacement:  340
648  found in normalized_duplicates[ SR_NUM_1 ]. Replacement:  422
665  found in normalized_duplicates[ SR_NUM_1 ]. Replacement:  76
698  found in normalized_duplicates[ SR_NUM_1 ]. Replacement:  36
683  found in normalized_duplicates[ SR_NUM_1 ]. Replacement:  327
673  found in normalized_

Unnamed: 0,SR_NUM_1,SR_NUM_2,CONCAT_SRC_COMPARISON_SCORE,SITE_NAME_COMPARISON_SCORE,STATE_COMPARISON_SCORE,CITY_COMPARISON_SCORE,CONCAT_ADDRESS_COMPARISON_SCORE,POSTAL_CODE_COMPARISON_SCORE,NUM_OF_MATCHES_FOUND
0,6,1,1.0,0.0,0.0,1.0,3.0,1.0,6.0
1,7,3,1.0,0.0,0.0,1.0,0.0,1.0,3.0
2,8,3,1.0,0.0,0.0,1.0,0.0,1.0,3.0
4,10,4,1.0,3.0,1.0,1.0,3.0,1.0,10.0
5,16,15,1.0,0.0,0.0,1.0,3.0,1.0,6.0
...,...,...,...,...,...,...,...,...,...
22967,1367,76,1.0,3.0,0.0,0.0,3.0,0.0,7.0
23041,1368,76,1.0,0.0,0.0,0.0,3.0,0.0,4.0
23235,1369,76,0.0,0.0,0.0,0.0,3.0,0.0,3.0
23477,1370,76,1.0,0.0,0.0,0.0,3.0,0.0,4.0


## 6. CSV for static-analysis of matches

In [7]:
write_df_to_csv(df=normalized_duplicates, root_dir=_CLEANED_SCORES_DIRECTORY, curr_country=curr_country, file_suffix='_Cleaned_Feature_Scores.csv')
print('\n"SR_NUM_2" will be the master record')


Successfully created \Cleaned_Scores\Italy_Cleaned_Feature_Scores.csv!

"SR_NUM_2" will be the master record


## 7. Get the unique set of Master-Records and create a master CSV file for each country

> a. Think of 'SR_NUM_1' as the list of incoming Primary-keys, and 'SR_NUM_2' as the value to which it should be mapped based on match-score

> b. Hence, union of 'SR_NUM_1' & 'SR_NUM_2' will be entire set of duplicates

> c. Stand-alone records in the current country_batch_dataframe will not fall in this entire set of duplicates

> d. Master-records set wil be the sets of 'SR_NUM_2' & #c above.

In [8]:
a1=set(normalized_duplicates['SR_NUM_1'].values.tolist())
a2=set(normalized_duplicates['SR_NUM_2'].values.tolist())
country_set=set(country_df.index.values.tolist())
entire_duplicates_set=a1.union(a2)
no_match_set=country_set.difference(entire_duplicates_set)
master_record_ids=no_match_set.union(a2)

print(f'{site_master_df.shape[0]} records get merged into {len(master_record_ids)}')
country_master_df=country_df.loc[master_record_ids].drop('CONCAT_SRC',axis=1)
write_df_to_csv(df=country_master_df, root_dir=_MASTER_DATA_DIRECTORY, curr_country=curr_country, index_flag=True, file_suffix='_Master.csv')

1371 records get merged into 192

Successfully created \Master_Data\Italy_Master.csv!


## 8. Get the normalized-duplicates into a CSV to show translation of incoming record into single golden record

> a. Create a master_cross_reference_df for the master_record_ids with relevant scores scaled up by _SCALING_FACTOR, and other comparison scores set to 1

> b. concat it with the normalized_duplicates dataframe

In [9]:
master_record_score_array=[1.0]*len(master_record_ids)
master_record_df_dict={'SR_NUM_1': list(master_record_ids), 'SR_NUM_2': list(master_record_ids), 'SITE_NAME_COMPARISON_SCORE': master_record_score_array, 'STATE_COMPARISON_SCORE': master_record_score_array, 'CITY_COMPARISON_SCORE': master_record_score_array, 'CONCAT_ADDRESS_COMPARISON_SCORE': master_record_score_array, 'POSTAL_CODE_COMPARISON_SCORE': master_record_score_array, 'CONCAT_SRC_COMPARISON_SCORE': master_record_score_array}

cross_ref_df=pd.DataFrame(master_record_df_dict)

scale_up_comparison_score(cross_ref_df,'SITE_NAME_COMPARISON_SCORE',_SCALING_FACTOR)
scale_up_comparison_score(cross_ref_df,'CONCAT_ADDRESS_COMPARISON_SCORE',_SCALING_FACTOR)
cross_ref_df['NUM_OF_MATCHES_FOUND']=cross_ref_df[_COLS_FOR_TOTAL_MATCH_CALC].sum(axis=1)


cross_ref_df=cross_ref_df.append(normalized_duplicates)
cross_ref_df.sort_values(by=['SR_NUM_1'], axis=0, inplace=True)

write_df_to_csv(df=cross_ref_df, root_dir=_MASTER_DATA_DIRECTORY, curr_country=curr_country, file_suffix='_Raw_Cross_Ref.csv')


Scaling up SITE_NAME_COMPARISON_SCORE by 3

Scaling up CONCAT_ADDRESS_COMPARISON_SCORE by 3

Successfully created \Master_Data\Italy_Raw_Cross_Ref.csv!


## 9. Generate the report to display name & address fields of match-and-merge combinations

> a. Merge the master_cross_reference_df with the country_batch_dataframe as a left-outer-join on Primary-key='SR_NUM_1'

> b. Merge this master_cross_reference_df with the country_batch_dataframe as a left-outer-join on Primary-key='SR_NUM_2'

In [10]:
country_df_copy=country_df[_FIELDS_TO_CONCAT['CONCAT_SRC']].copy(deep=True).reset_index()
country_df_colnames=country_df_copy.columns.values

country_df_copy.columns=[colname+'_1' for colname in country_df_colnames]
cross_ref_df=cross_ref_df.merge(country_df_copy, how='left', on='SR_NUM_1')

country_df_copy.columns=[colname+'_2' for colname in country_df_colnames]
cross_ref_df=cross_ref_df.merge(country_df_copy, how='left', on='SR_NUM_2')
cross_ref_df=cross_ref_df[['SR_NUM_1', 'SR_NUM_2', 'SITE_NAME_1','SITE_NAME_2','SITE_NAME_COMPARISON_SCORE','STATE_1','STATE_2','STATE_COMPARISON_SCORE', 'CITY_1','CITY_2','CITY_COMPARISON_SCORE','CONCAT_ADDRESS_1','CONCAT_ADDRESS_2','CONCAT_ADDRESS_COMPARISON_SCORE', 'POSTAL_CODE_1','POSTAL_CODE_2','POSTAL_CODE_COMPARISON_SCORE','NUM_OF_MATCHES_FOUND']]

write_df_to_csv(df=cross_ref_df, root_dir=_MASTER_DATA_DIRECTORY, curr_country=curr_country, file_suffix='_Cross_Ref_Full_Report.csv')


Successfully created \Master_Data\Italy_Cross_Ref_Full_Report.csv!
