## 1. Initialization

> a. Read the excel/csv containing the entire query-output

> b. Initialize the parameters of thresholds, directories, and fields to concat for generating individual as well as combined match-score

> c. Get the unique list of countries in present dataframe

In [3]:
import time, numpy as np, pandas as pd, re, string, subprocess, os
from subprocess import Popen, PIPE

_STATIC_FILE_NAME="SM_Temp_Shortlist.xlsx"
# _STATIC_QUERY="SELECT SITE_INFO_ID SR_NUM, SOURCE_IDENTIFIER, DATA_SOURCE_NAME, PROTOCOL_NUMBER, SITE_NUMBER SITE_NUM, UNIQUE_SITE_ID, COUNTRY,SITE_NAME, SITE_STATESTATE, SITE_CITY CITY, SITE_ADDRESS_LINE_1 ADDRESS_LINE_1, SITE_ADDRESS_LINE_2 ADDRESS_LINE_2, SITE_ADDRESS_LINE_3 ADDRESS_LINE_3, SITE_ZIP_CODE POSTAL_CODE, NULL SITE_STATUS FROM RnD_MART.DM_SITE_INFORMATION WHERE (SITE_STATE+SITE_CITY+SITE_ADDRESS_LINE_1+SITE_ZIP_CODE) IS NOT NULL ORDER BY SITE_ZIP_CODE, SITE_NAME, SITE_STATE, SITE_CITY, (SITE_STATE+SITE_CITY+SITE_ADDRESS_LINE_1+SITE_ZIP_CODE);"
# _SQL_SERVER_CONN=pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};'+'SERVER=takdev-rnd.rda.onetakeda.com;'+'DATABASE=RnD_DB;'+'UID=Rnd_ETL;'+'PWD=rndetl@2017', charset='utf8')


_RAW_SCORES_DIRECTORY='Raw_Scores'
_CLEANED_SCORES_DIRECTORY='Cleaned_Scores'
_MASTER_DATA_DIRECTORY='Master_Data'
_STAGING_AREA_DIRECTORY=os.path.join(_MASTER_DATA_DIRECTORY, 'Recursive_Staging_Area')
_FIELDS_TO_CONCAT={ 'CONCAT_ADDRESS':   ['ADDRESS_LINE_1','ADDRESS_LINE_2','ADDRESS_LINE_3'] }

_COLUMNS_TO_CLEAN=['ADDRESS_LINE_1','ADDRESS_LINE_2','ADDRESS_LINE_3','SITE_NAME','STATE','CITY','POSTAL_CODE']
_BINARIES_NAME="levenshtein"
#_BINARIES_EXTENSION=".so"
_BINARIES_EXTENSION=".dll"
_MAXSIZE=5000

_THRESHOLD_FOR_INDIVIDUAL=0.85
_THRESHOLD_FOR_ADDRESS_COMBINED=0.75

_THRESHOLDS_DICT={
    'CONCAT_ADDRESS': _THRESHOLD_FOR_ADDRESS_COMBINED,
    'SITE_NAME': _THRESHOLD_FOR_INDIVIDUAL,
    'STATE': _THRESHOLD_FOR_INDIVIDUAL,
    'CITY': _THRESHOLD_FOR_INDIVIDUAL,
    'POSTAL_CODE': _THRESHOLD_FOR_INDIVIDUAL
    }
_COLS_FOR_TOTAL_MATCH_CALC=[colname+'_COMPARISON_SCORE' for colname in _THRESHOLDS_DICT]

_SCALING_FACTOR=3

_TOTAL_MATCHES_THRESHOLD=4

_RSCRIPT_CMD="C:/Program Files/R/R-3.4.4/bin/i386/Rscript"
#_RSCRIPT_CMD="Rscript"
_SCRIPT_NAME="Site_Master_Record_Linkage.R"
_DEDUP_METHOD='Dedup'
_LINKAGE_METHOD='Linkage'

def write_df_to_csv(df, root_dir='', curr_country='', file_suffix='_temp.csv', index_flag=False):
    """
        DOCSTRING:  Writes the dataframe to a csv file and throw error if it fails.
        INPUT:      Dataframe, Target-Directory, Country-name, Suffix-of-csv-file, Index-Flag
        OUTPUT:     Dataframe csv at target-directory, or error.
    """
    try:
        abs_path=os.path.join(root_dir, curr_country+file_suffix)
        df.to_csv(abs_path, index=index_flag)
        print('\nSuccessfully created \{}!'.format(abs_path))
    except:
        print('\nSomething went wrong while writing the file. Please check if it is currently in use.')


def preprocess_dataframe(df):
    """
        DOCSTRING:  Imputes blank cells with '', replaces whitespace with underscore in country-name, and strips whitespace in cells.
        INPUT:      Dataframe
        OUTPUT:     Imputed and cleaned dataframe.
    """
    df.replace(np.nan, '', inplace=True)
    for colname in df.columns.values:
        if colname=='COUNTRY':
            df[colname]=df[colname].apply(lambda x: x.replace(' ','_'))            
        df[colname]=df[colname].astype(str).apply(lambda x: x.strip())


def clean_dataframe(df, columns_to_clean=_COLUMNS_TO_CLEAN, fields_to_concat=_FIELDS_TO_CONCAT, replace_punctuations=True):
    """
        DOCSTRING:  Replaces special-chars in lowercase-converted cells if replace_punctuation==True, for the columns relevant to computing match-scores.
                    Generates the concatenated address fields, and drops the individual ones.
                    Overall will be left with alphanumeric chars in UTF-8 encoding.
        INPUT:      Dataframe, columns-to-clean, address-fields-to-concat, flag-to-replace-punctuations
        OUTPUT:     Imputed and cleaned dataframe.
    """
    copy_df=df.copy(deep=True)
    # Added another special character which was causing Italy CSV file read to fail in R
    if replace_punctuations:
        special_chars=re.escape(string.punctuation)+''
        print('\nSpecial Character that will be replaced are:  {}'.format(special_chars))
    for colname in copy_df.columns.values:
        if colname in columns_to_clean and replace_punctuations:
            copy_df[colname]=copy_df[colname].replace(r'['+special_chars+']', '', regex=True).str.lower()
    for colname, cols_to_concat in fields_to_concat.items():
        copy_df[colname]=copy_df[cols_to_concat].apply(lambda single_row: ''.join(single_row.values), axis=1)
    copy_df.drop(labels=fields_to_concat['CONCAT_ADDRESS'], axis=1, inplace=True)
    return copy_df



def deduplicate_dataset_R(rscript_command, script_name, args):
    """
        DOCSTRING:  Invokes the R-code from Python using 32-bit Rscript 3.4.4 command.
                    Uses the Python subprocess module to create a new Pipe.
        INPUT:      Abs-path-of-32bit-Rscript-command, Script-to-invoke, Args-for-script
        OUTPUT:     Prints R-console output based on return-code. Rscript command generates a csv of the score_features, or errors out.
    """
    cmd = [rscript_command, script_name, args]
    pipe = Popen( cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE )
    output, error = pipe.communicate()

    if pipe.returncode==0:
        print('R OUTPUT:\n')
        print(output)
    else:
        print('R OUTPUT:\n')
        print(output.decode())
        print('R ERROR:\n')
        print(error.decode())



def scale_up_comparison_score(df, colname='SITE_NAME_COMPARISON_SCORE', scaling_factor=_SCALING_FACTOR):
    """
        DOCSTRING:  Scale-up a column's binary-valued score by a factor
        INPUT:      Dataframe, score-colname, scaling-factor
        OUTPUT:     Scaled up dataframe.
    """
    df[colname]=df[colname].apply(lambda x: x*scaling_factor)



def return_top_match(df, child_column, score_key_column):
    """
        DOCSTRING:  Input Dataframe has SR_NUM_1 (child-col) matching against multiple SR_NUM_2.
                    Orders by child-col asc, score-col desc, and chooses the first possible entry of child-col.
        INPUT:      Dataframe-of-score-features-above-a-total-threshold, index-column (SR_NUM_1), total-score-column (NUM_OF_MATCHES_FOUND)
        OUTPUT:     Dataframe of normalized-score-features.
    """
    normalized_duplicates=df.sort_values(by=[child_column, score_key_column],ascending=[True,False])
    normalized_duplicates=normalized_duplicates.groupby(child_column).head(1).sort_values(by=[child_column])
    return normalized_duplicates



def replace_cyclic_dependencies_backup(df, child_indicator, master_indicator, verbose=True):
    """
        DOCSTRING:  Input Dataframe has cases like-     Record45 matches with Record44, and Record67 matches with Record45.
                    In this case we should maintain-    Record67 matches with Record44.
                    Applies a for-loop and replaces values in master-column whenever such a cyclic-occurence observed.
        INPUT:      Dataframe-of-score-features-with-cyclic-indexes, child-column, master-column
        OUTPUT:     Dataframe of normalized-score-features.
    """
    arr=set(df[child_indicator].array)
    for val in df[master_indicator]:
        if val in arr:
            replace_val=df[df[child_indicator]==val][master_indicator].values[0]
            if verbose: print('{} found in normalized_duplicates[{}]. Replacement: {}'.format(val, child_indicator, replace_val))
            df[master_indicator].replace(val, replace_val, inplace=True)
    return df



def replace_cyclic_dependencies(df, country_df, child_indicator, master_indicator, verbose=True):
    """
        DOCSTRING:  Input Dataframe has cases like-     Record45 matches with Record44, and Record67 matches with Record45.
                    In this case we should maintain-    Record67 matches with Record44.
                    Applies a for-loop and replaces values in master-column whenever such a cyclic-occurence observed.
        INPUT:      Dataframe-of-score-features-with-cyclic-indexes, child-column, master-column
        OUTPUT:     Dataframe of normalized-score-features.
    """
    df.sort_values(by=['SR_NUM_2', 'NUM_OF_MATCHES_FOUND'], ascending=[True, False], inplace=True)
    arr1=set(df['SR_NUM_1'].array)
    arr2=set(df['SR_NUM_2'].array)
    for val in arr2:
        if val in arr1:
            replace_val=df.loc[df['SR_NUM_1']==val,'SR_NUM_2'].values[0]
            score_for_original=df.loc[df['SR_NUM_2']==val, 'NUM_OF_MATCHES_FOUND'].values[0]
            score_for_replacement=df.loc[df['SR_NUM_1']==val, 'NUM_OF_MATCHES_FOUND'].values[0]
            original_sitename=country_df.loc[val, 'SITE_NAME']
            replacement_sitename=country_df.loc[replace_val, 'SITE_NAME']
            if score_for_original>score_for_replacement and original_sitename!=replacement_sitename:
                df.loc[df['SR_NUM_1']==val, 'COUNTRY']='To be deleted'
                if verbose: print("Delete from df where SR_NUM_1={} and SR_NUM_2={}".format(val,replace_val))
            else:
                df['SR_NUM_2'].replace(val, replace_val, inplace=True)
                if verbose: print("{} [{}] will be replaced with {} [{}]".format(val,score_for_original,replace_val,score_for_replacement))

    indexes_to_delete=df[df['COUNTRY']=='To be deleted'].index
    print("\n\n{} raw-score pairs will be deleted off as their cyclic dependecies have lower score than existing.".format(len(indexes_to_delete)))
    df.drop(indexes_to_delete, inplace=True)
    df.sort_values(by='SR_NUM_1')
    return df





def replace_cyclic_dependencies_bkp(df, child_indicator, master_indicator, verbose=True):
    """
        DOCSTRING:  Input Dataframe has cases like-     Record45 matches with Record44, and Record67 matches with Record45.
                    In this case we should maintain-    Record67 matches with Record44.
                    Applies a for-loop and replaces values in master-column whenever such a cyclic-occurence observed.
        INPUT:      Dataframe-of-score-features-with-cyclic-indexes, child-column, master-column
        OUTPUT:     Dataframe of normalized-score-features.
    """
    df.sort_values(by=['SR_NUM_2', 'NUM_OF_MATCHES_FOUND'], ascending=[True, False], inplace=True)
    arr1=set(df['SR_NUM_1'].array)
    arr2=set(df['SR_NUM_2'].array)
    for val in arr2:
        if val in arr1:
            replace_val=df.loc[df['SR_NUM_1']==val,'SR_NUM_2'].values[0]
            score_for_original=df.loc[df['SR_NUM_2']==val, 'NUM_OF_MATCHES_FOUND'].values[0]
            score_for_replacement=df.loc[df['SR_NUM_1']==val, 'NUM_OF_MATCHES_FOUND'].values[0]
            if score_for_original>score_for_replacement:
                df.loc[df['SR_NUM_1']==val, 'COUNTRY']='To be deleted'
                if verbose: print("Delete from df where SR_NUM_1={} and SR_NUM_2={}".format(val,replace_val))
            else:
                df['SR_NUM_2'].replace(val, replace_val, inplace=True)
                if verbose: print("{} [{}] will be replaced with {} [{}]".format(val,score_for_original,replace_val,score_for_replacement))

    indexes_to_delete=df[df['COUNTRY']=='To be deleted'].index
    print("\n\n{} raw-score pairs will be deleted off as their cyclic dependecies have lower score than existing.".format(len(indexes_to_delete)))
    df.drop(indexes_to_delete, inplace=True)
    df.sort_values(by='SR_NUM_1')
    return df


def clean_score_features(curr_country, country_df, source_dir=_RAW_SCORES_DIRECTORY, target_dir=_CLEANED_SCORES_DIRECTORY, verbose=True):
    """
        DOCSTRING:  Reads the output of the Rscript command that is a csv of score_features having total-score greater than a total-threshold.
                    Invokes the top-match function, and the replace-cyclic-occurences function to get a set of clean-score-features.
                    Writes the dataframe in the Cleaned-Scores directory.
        INPUT:      country-name
        OUTPUT:     Dataframe of cleaned-normalized-score-features.
    """
    duplicates=pd.read_csv(os.path.join(source_dir, curr_country+'_Score_Features.csv'))
    # if no potential duplicates found, return an empty df
    if duplicates.shape[0]==1 and duplicates['SR_NUM_1'][0]==0 and duplicates['SR_NUM_2'][0]==0:
        return duplicates.head(0)
    
    duplicates['COUNTRY']=curr_country
    duplicates=return_top_match(df=duplicates, child_column='SR_NUM_1', score_key_column='NUM_OF_MATCHES_FOUND')
    duplicates=replace_cyclic_dependencies(df=duplicates, country_df=country_df, child_indicator='SR_NUM_1', master_indicator='SR_NUM_2', verbose=verbose)
    write_df_to_csv(df=duplicates, root_dir=target_dir, curr_country=curr_country, file_suffix='_Cleaned_Feature_Scores.csv')
    print('\n"SR_NUM_2" will be the master record')
    return duplicates


def get_deduplicated_master_records(normalized_duplicates, country_df):
    """
        DOCSTRING:  From the list of cleaned-normalized-score-features, use set-theory to find the unique list of masters.
                        a.  Think of 'SR_NUM_1' as the list of incoming Primary-keys, and 'SR_NUM_2' as the value to which it should be mapped based on match-score.
                        b.  Hence, union of 'SR_NUM_1' & 'SR_NUM_2' will be entire set of duplicates.
                        c.  Stand-alone records in the current country_batch_dataframe will not fall in this entire set of duplicates.
                        d.  Master-records set wil be the sets of 'SR_NUM_2' & #c above.
                        >   Universe                            = {SR_NUM}
                        >   a1                                  = {SR_NUM_1}
                        >   a2                                  = {SR_NUM_2}
                        >   Falls into any duplication-scenario = anymatch  = {a1 U a2}
                        >   Falls into no duplication-scenario  = nomatch   = {Universe - anymatch}
                        >   Total masters                       = {nomatch U a2}
        INPUT:      Dataframe-of-cleaned-normalized-score_features
        OUTPUT:     Unique set of master-record-ids (SR_NUM)
    """
    a1=set(normalized_duplicates['SR_NUM_1'].values.tolist())
    a2=set(normalized_duplicates['SR_NUM_2'].values.tolist())
    country_set=set(country_df.index.values.tolist())
    entire_duplicates_set=a1.union(a2)
    no_match_set=country_set.difference(entire_duplicates_set)
    master_record_ids=no_match_set.union(a2)
    return master_record_ids


def generate_deduplicated_master(country_df, master_record_ids, target_dir=_MASTER_DATA_DIRECTORY, write_csv=True):
    """
        DOCSTRING:  Use the original df to extract columns-info and generate the country-specific Master file.
        INPUT:      Original-country-Dataframe, Unique set of master-record-ids (SR_NUM)
        OUTPUT:     Dataframe-for-country-with-original-info, Master-Dataframe
    """
    country_master_df=country_df.loc[master_record_ids]
    if write_csv:
        write_df_to_csv(df=country_master_df, root_dir=target_dir, curr_country=curr_country, index_flag=True, file_suffix='_Master.csv')
    print('{} records get merged into {}'.format(country_df.shape[0],len(master_record_ids)))
    return country_master_df



def generate_dummy_cross_refs_for_masters(master_record_ids):
    """
        DOCSTRING:  Create a dummy cross-reference dataframe for master-records; Record45 matches with Record45 having a total match-score of maximum.
        INPUT:      Unique set of master-record-ids (SR_NUM)
        OUTPUT:     Dataframe-of-dummy-entries-for-master-cross-references.
    """
    master_record_score_array=[1.0]*len(master_record_ids)
    master_record_df_dict={
        'SR_NUM_1': list(master_record_ids),
        'SR_NUM_2': list(master_record_ids),
        'SITE_NAME_COMPARISON_SCORE': master_record_score_array,
        'STATE_COMPARISON_SCORE': master_record_score_array,
        'CITY_COMPARISON_SCORE': master_record_score_array,
        'CONCAT_ADDRESS_COMPARISON_SCORE': master_record_score_array,
        'POSTAL_CODE_COMPARISON_SCORE': master_record_score_array }

    cross_ref_df=pd.DataFrame(master_record_df_dict)
    cross_ref_df['COUNTRY']=curr_country
    scale_up_comparison_score(cross_ref_df,'CONCAT_ADDRESS_COMPARISON_SCORE',_SCALING_FACTOR)
    cross_ref_df['NUM_OF_MATCHES_FOUND']=cross_ref_df[_COLS_FOR_TOTAL_MATCH_CALC].sum(axis=1)
    return cross_ref_df


def generate_final_cross_refs(cross_ref_df, normalized_duplicates, target_dir=_MASTER_DATA_DIRECTORY, write_csv=True):
    """
        DOCSTRING:  Merges the dummy cross-reference of masters, with the cleaned-normalized-feature-scores.
        INPUT:      Dataframe-of-dummy-entries-for-master-cross-references, Dataframe-of-cleaned-normalized-score_features
        OUTPUT:     Dataframe-of-cross-references.
    """
    cross_ref_df=cross_ref_df.append(normalized_duplicates)
    cross_ref_df.sort_values(by=['SR_NUM_1'], axis=0, inplace=True)
    if write_csv:
        write_df_to_csv(df=cross_ref_df, root_dir=target_dir, curr_country=curr_country, file_suffix='_Raw_Cross_Ref.csv')
    return cross_ref_df



def update_entire_country_cross_ref(new_depth_cross_ref_df, entire_country_cross_ref_df):
    """
        DOCSTRING:  --Specific to recursively processing a huge country-batch--
                    Updates the entire cross-reference for a country at depth=0, with the merges that are observed for new depth=d.
                    The update function performs left-join on the indexes, hence we set-index before the operation, and reset-it later.
        INPUT:      Dataframe-of-cross-references-at-new-depth, Dataframe-of-existing-cross-references
        OUTPUT:     Dataframe-of-updated-cross-references.
    """
    cross_refs_with_merges=new_depth_cross_ref_df[new_depth_cross_ref_df['SR_NUM_1'] != new_depth_cross_ref_df['SR_NUM_2']]
    entire_country_cross_ref_df.set_index('SR_NUM_1', inplace=True)
    cross_refs_with_merges.set_index('SR_NUM_1', inplace=True)

    entire_country_cross_ref_df.update(cross_refs_with_merges, join='left', overwrite=True)
    entire_country_cross_ref_df['SR_NUM_2']=entire_country_cross_ref_df['SR_NUM_2'].apply(pd.to_numeric)
    entire_country_cross_ref_df.reset_index(inplace=True)




def generate_cross_ref_report(cross_ref_df, country_df, target_dir=_MASTER_DATA_DIRECTORY):
    """
        DOCSTRING:  Creates cross-reference report by performing left-join of cross-reference-dataframe with the original-info in country-df.
                        a. Merge the master_cross_reference_df with the country_batch_dataframe as a left-outer-join on Primary-key='SR_NUM_1'
                        b. Merge this master_cross_reference_df with the country_batch_dataframe as a left-outer-join on Primary-key='SR_NUM_2'
                    Writes the dataframe in the Master-Data directory.
        INPUT:      Dataframe-of-cross-references, Dataframe-for-country-with-original-info
        OUTPUT:     Dataframe-of-cross-references-with-original-info.
    """
    country_df.reset_index(inplace=True)
    country_df_colnames=country_df.columns.values

    country_df.columns=[colname+'_1' for colname in country_df_colnames]
    cross_ref_df=cross_ref_df.merge(country_df, how='left', on='SR_NUM_1')

    country_df.columns=[colname+'_2' for colname in country_df_colnames]
    cross_ref_df=cross_ref_df.merge(country_df, how='left', on='SR_NUM_2')

    columns_in_report_format=['SR_NUM_1', 'SR_NUM_2', 'SITE_NAME_1','SITE_NAME_2','SITE_NAME_COMPARISON_SCORE','STATE_1','STATE_2','STATE_COMPARISON_SCORE', 'CITY_1', 'CITY_2','CITY_COMPARISON_SCORE','CONCAT_ADDRESS_1','CONCAT_ADDRESS_2','CONCAT_ADDRESS_COMPARISON_SCORE', 'POSTAL_CODE_1','POSTAL_CODE_2',   'POSTAL_CODE_COMPARISON_SCORE','NUM_OF_MATCHES_FOUND']
    cross_ref_df=cross_ref_df[columns_in_report_format]
    write_df_to_csv(df=cross_ref_df, root_dir=target_dir, curr_country=curr_country, file_suffix='_Cross_Ref_Full_Report.csv')


#site_master_df=pd.read_csv(_STATIC_FILE_NAME, index_col=0)
site_master_df=pd.read_excel(_STATIC_FILE_NAME, index_col=0)
print("Read the Source-file {}".format(_STATIC_FILE_NAME))
#site_master_df=pd.read_sql_query(_STATIC_QUERY, _SQL_SERVER_CONN)
#site_master_df.set_index('SR_NUM', inplace=True)
preprocess_dataframe(site_master_df)
print('\nColumns: {}\n'.format(site_master_df.columns.values))
countries=list(site_master_df['COUNTRY'].unique())
print('\nCountries : {}'.format(countries))

Read the Source-file SM_Temp_Shortlist.xlsx

Columns: ['SOURCE_IDENTIFIER' 'DATA_SOURCE_NAME' 'PROTOCOL_NUMBER' 'SITE_NUM'
 'UNIQUE_SITE_ID' 'COUNTRY' 'SITE_NAME' 'STATE' 'CITY' 'ADDRESS_LINE_1'
 'ADDRESS_LINE_2' 'ADDRESS_LINE_3' 'POSTAL_CODE' 'SITE_STATUS']


Countries : ['Algeria', 'Argentina', 'Australia', 'Turkey', 'United_Kingdom', 'United_States']


# TESTING OUT THE RECURSIVE APPROACH WITH MINI-BATCHES OF MASTERS

In [4]:
# for c in range(len(countries)):
c=2
curr_country=countries[c]
entire_country_df=site_master_df[site_master_df['COUNTRY']==curr_country]
entire_country_df=clean_dataframe(entire_country_df, columns_to_clean=_COLUMNS_TO_CLEAN, fields_to_concat=_FIELDS_TO_CONCAT, replace_punctuations=True)
entire_country_df_copy=site_master_df[site_master_df['COUNTRY']==curr_country]
entire_country_df_copy=clean_dataframe(entire_country_df_copy, columns_to_clean=_COLUMNS_TO_CLEAN, fields_to_concat=_FIELDS_TO_CONCAT, replace_punctuations=False)
nrows=entire_country_df.shape[0]
m=(nrows//_MAXSIZE)+1
entire_country_cross_ref_df=pd.DataFrame()
queue_of_csvs=list()
_CREATE_MASTER_MINIBATCHES=True

for i in range(m):
    print("\n\nStarting Batch[{}]...".format(i))
    country_df=entire_country_df.iloc[i*_MAXSIZE : (i+1)*_MAXSIZE]
    country_df_copy=entire_country_df_copy.iloc[i*_MAXSIZE : (i+1)*_MAXSIZE]
    write_df_to_csv(df=country_df[_THRESHOLDS_DICT.keys()], curr_country=curr_country, file_suffix='_country_df.csv', index_flag=True)

    if _CREATE_MASTER_MINIBATCHES==False or country_df.shape[0]==1:

        print("\n\nGet the unique set of all record-ids, since Layer-zero cannot create mastered mini-batches.\n")
        # Get the unique set of master-record-ids
        master_record_ids = country_df.index.values.astype(list)
        # Create the country-master-df
        country_master_df = generate_deduplicated_master(country_df=country_df, master_record_ids=master_record_ids, target_dir=_STAGING_AREA_DIRECTORY, write_csv=False)
        # Create a dummy set of cross-refs for masters
        cross_ref_df = generate_dummy_cross_refs_for_masters(master_record_ids=master_record_ids)

        new_file_name='_{}_Master.csv'.format(i)
        write_df_to_csv(df=country_master_df, root_dir=_STAGING_AREA_DIRECTORY, curr_country=curr_country, file_suffix=new_file_name, index_flag=True)
        new_file_name=curr_country+new_file_name
        queue_of_csvs.append(new_file_name)
        entire_country_cross_ref_df = entire_country_cross_ref_df.append(cross_ref_df)

    else:

        # Invoke the Rscript and generate the Raw_score_features csv file for each minibatch
        args="{} {} {} {} {} {} {} {} {} NA NA".format(_BINARIES_NAME, _BINARIES_EXTENSION, _THRESHOLD_FOR_INDIVIDUAL, _THRESHOLD_FOR_ADDRESS_COMBINED, _SCALING_FACTOR, curr_country, _RAW_SCORES_DIRECTORY, _TOTAL_MATCHES_THRESHOLD, _DEDUP_METHOD)
        print('\n{}_{} has {} records.\n\nInvoking the Rscript now...'.format(curr_country, i, country_df.shape[0]))
        deduplicate_dataset_R( rscript_command=_RSCRIPT_CMD,  script_name=_SCRIPT_NAME, args=args )

        normalized_duplicates=pd.DataFrame()
        # Clean and normalize the score features
        normalized_duplicates = clean_score_features(curr_country=curr_country, country_df=country_df, source_dir=_RAW_SCORES_DIRECTORY, target_dir=_CLEANED_SCORES_DIRECTORY, verbose=False)

        if normalized_duplicates.shape[0]!=0:
            
            print("\n\nFound potential duplicates. Processing their master and cross-reference...\n")
            # Get the unique set of master-record-ids
            master_record_ids = get_deduplicated_master_records(normalized_duplicates=normalized_duplicates, country_df=country_df)
            # Get the country-master-df
            country_master_df = generate_deduplicated_master(country_df=country_df, master_record_ids=master_record_ids, target_dir=_STAGING_AREA_DIRECTORY, write_csv=False)
            # Create a dummy set of cross-refs for masters
            cross_ref_df = generate_dummy_cross_refs_for_masters(master_record_ids=master_record_ids)
            # Create full set of cross-refs for country-df
            cross_ref_df = generate_final_cross_refs(cross_ref_df=cross_ref_df, normalized_duplicates=normalized_duplicates, target_dir=_STAGING_AREA_DIRECTORY, write_csv=False)
            # Create the csv for the cross-ref report
            generate_cross_ref_report(cross_ref_df=cross_ref_df, country_df=country_df_copy, target_dir=_STAGING_AREA_DIRECTORY)


        else:
            print("\n\nGet the unique set of all record-ids since there aren't any potential duplicates.\n")
			# Get the unique set of all-record-ids since there aren't any potential duplicates
            master_record_ids = country_df.index.values.astype(list)
			# Get the country-master-df
            country_master_df = generate_deduplicated_master(country_df=country_df, master_record_ids=master_record_ids, target_dir=_STAGING_AREA_DIRECTORY, write_csv=False)
			# Create a dummy set of cross-refs for masters
            cross_ref_df = generate_dummy_cross_refs_for_masters(master_record_ids=master_record_ids)
                
        new_file_name='_{}_Master.csv'.format(i)
        write_df_to_csv(df=country_master_df, root_dir=_STAGING_AREA_DIRECTORY, curr_country=curr_country, file_suffix=new_file_name, index_flag=True)
        new_file_name=curr_country+new_file_name
        queue_of_csvs.append(new_file_name)
        entire_country_cross_ref_df = entire_country_cross_ref_df.append(cross_ref_df)

        del country_df, country_df_copy, master_record_ids
        if _CREATE_MASTER_MINIBATCHES:
            del normalized_duplicates

print("{} csvs generated are: {}".format(len(queue_of_csvs), queue_of_csvs))


Special Character that will be replaced are:  !"\#\$%\&'\(\)\*\+,\-\./:;<=>\?@\[\\\]\^_`\{\|\}\~


Starting Batch[0]...

Successfully created \Australia_country_df.csv!

Australia_0 has 576 records.

Invoking the Rscript now...
R OUTPUT:

b'[1] "levenshtein .dll 0.85 0.75 3 Australia Raw_Scores 4 Dedup NA NA"\r\n[1] "Loading levenshtein.dll !"\r\n         used (Mb) gc trigger (Mb) max used (Mb)\r\nNcells 120133  3.3     350000  9.4   302969  8.1\r\nVcells 169834  1.3     786432  6.0   697518  5.4\r\n[1] "NRows= 576 , Candidate-pairs= 165600 , Columns are "\r\n[1] "SR_NUM"         "CONCAT_ADDRESS" "SITE_NAME"      "STATE"         \r\n[5] "CITY"           "POSTAL_CODE"   \r\n[1] "N_combinations= 165600 , Columns are "\r\n[1] "id1"            "id2"            "CONCAT_ADDRESS" "SITE_NAME"     \r\n[5] "STATE"          "CITY"           "POSTAL_CODE"    "is_match"      \r\n[1] "Scaling up column scores if threshold crossed"\r\n[1] "SITE_NAME  :  0.85"\r\n[1] "STATE  :  0.85"\r\n[1] "CITY  : 

In [5]:
# Number of levels for the recursive computations
d=(m+1)//2
print("\nMax-depth for {} will be {}".format(curr_country,d))

for j in range(1,d+1):
    M_CR_df=pd.DataFrame()
    n_csvs_to_read=len(queue_of_csvs)
    length=n_csvs_to_read if n_csvs_to_read%2==0 else n_csvs_to_read+1
    print("{} csvs need to be processed: {} , length={}".format(n_csvs_to_read, queue_of_csvs, length))
    for i in range(0, length, 2):
        M_df_file1=os.path.join(_STAGING_AREA_DIRECTORY, queue_of_csvs[i])
        M_df_1=pd.read_csv(M_df_file1, index_col=0)
        
        if i+1<n_csvs_to_read:
            M_df_file2=os.path.join(_STAGING_AREA_DIRECTORY, queue_of_csvs[i+1])
            M_df_2=pd.read_csv(M_df_file2, index_col=0)
            
			# Invoke the Rscript and generate the Raw_score_features csv file
            print('\n{} has {} records, and {} has {} records.\n\nInvoking the Rscript now...\n'.format(M_df_file1, M_df_1.shape[0],M_df_file2, M_df_2.shape[0]))
            args="{} {} {} {} {} {} {} {} {} {} {}".format(_BINARIES_NAME, _BINARIES_EXTENSION, _THRESHOLD_FOR_INDIVIDUAL, _THRESHOLD_FOR_ADDRESS_COMBINED, _SCALING_FACTOR, curr_country, _RAW_SCORES_DIRECTORY, _TOTAL_MATCHES_THRESHOLD, _LINKAGE_METHOD, M_df_file1, M_df_file2)
            deduplicate_dataset_R( rscript_command=_RSCRIPT_CMD,  script_name=_SCRIPT_NAME, args=args )
            
            
            normalized_duplicates=pd.DataFrame()
            # Clean and normalize the score features
            normalized_duplicates = clean_score_features(curr_country=curr_country, country_df=M_df_1.append(M_df_2), source_dir=_RAW_SCORES_DIRECTORY, target_dir=_CLEANED_SCORES_DIRECTORY, verbose=False)
            
            if normalized_duplicates.shape[0]!=0:
                
                print("\n\nFound potential duplicates. Processing their master and cross-reference...\n")
                # Get the unique set of master-record-ids
                master_record_ids = get_deduplicated_master_records(normalized_duplicates=normalized_duplicates, country_df=M_df_1.append(M_df_2))
                # Get the country-master-df
                country_master_df = generate_deduplicated_master(country_df=M_df_1.append(M_df_2), master_record_ids=list(master_record_ids), target_dir=_STAGING_AREA_DIRECTORY, write_csv=False)
                # Create a dummy set of cross-refs for masters
                cross_ref_df = generate_dummy_cross_refs_for_masters(master_record_ids=master_record_ids)
                # Create full set of cross-refs for country-df
                cross_ref_df = generate_final_cross_refs(cross_ref_df=cross_ref_df, normalized_duplicates=normalized_duplicates, target_dir=_STAGING_AREA_DIRECTORY, write_csv=False)
                # Create the csv for the cross-ref report
                generate_cross_ref_report(cross_ref_df=cross_ref_df, country_df=M_df_1.append(M_df_2), target_dir=_STAGING_AREA_DIRECTORY)
                
            else:
                
                print("\n\nGet the unique set of all record-ids since there aren't any potential duplicates.\n")
                # Get the unique set of all-record-ids since there aren't any potential duplicates
                master_record_ids = M_df_1.append(M_df_2).index.values.astype(list)
                # Get the country-master-df
                country_master_df = generate_deduplicated_master(country_df=M_df_1.append(M_df_2), master_record_ids=master_record_ids, target_dir=_STAGING_AREA_DIRECTORY, write_csv=False)
                # Create a dummy set of cross-refs for masters
                cross_ref_df = generate_dummy_cross_refs_for_masters(master_record_ids=master_record_ids)
                
        else:
            
            print("\n\nGet the unique set of all record-ids since there isn't a second file to compare.\n")
            # Get the unique set of master-record-ids
            master_record_ids = M_df_1.index.values.astype(list)
            # Get the country-master-df
            country_master_df = generate_deduplicated_master(country_df=M_df_1, master_record_ids=master_record_ids, target_dir=_STAGING_AREA_DIRECTORY, write_csv=False)
            # Create a dummy set of cross-refs for masters
            cross_ref_df = generate_dummy_cross_refs_for_masters(master_record_ids=master_record_ids)
            
            
        M_CR_df=M_CR_df.append(cross_ref_df)
        new_file_name="_d{}_{}_Master.csv".format(j,i)
        write_df_to_csv(df=country_master_df, root_dir=_STAGING_AREA_DIRECTORY, curr_country=curr_country, file_suffix=new_file_name, index_flag=True)
        new_file_name=curr_country+new_file_name
        queue_of_csvs.append(new_file_name)
        del master_record_ids, M_df_1
        if i+1<n_csvs_to_read:
            del normalized_duplicates, M_df_2
    
    write_df_to_csv(df=M_CR_df, root_dir=_STAGING_AREA_DIRECTORY, curr_country=curr_country, file_suffix="_d{}_Raw_Cross_Ref.csv".format(j), index_flag=False)
    print("\n\nDepth[{}] processed successfully.".format(j))
    update_entire_country_cross_ref(new_depth_cross_ref_df=M_CR_df, entire_country_cross_ref_df=entire_country_cross_ref_df)
    queue_of_csvs=queue_of_csvs[n_csvs_to_read:]



if len(queue_of_csvs)==1:
    print("\n\n\n\nProcessed all {} levels. Generating the master and cross-reference at the final-layer...".format(d))
    M_df_file1=os.path.join(_STAGING_AREA_DIRECTORY, queue_of_csvs[i])
    M_df_1=pd.read_csv(M_df_file1, index_col=0)
    # Get the unique set of master-record-ids
    master_record_ids = M_df_1.index.values.astype(list)
    # Get the country-master-df
    country_master_df = generate_deduplicated_master(country_df=entire_country_df_copy, master_record_ids=master_record_ids, target_dir=_MASTER_DATA_DIRECTORY, write_csv=True)
	# Write the final raw-cross-ref to a csv
    write_df_to_csv(df=entire_country_cross_ref_df, root_dir=_MASTER_DATA_DIRECTORY, curr_country=curr_country, file_suffix="_Raw_Cross_Ref.csv", index_flag=False)
    # Create the csv for the cross-ref report
    generate_cross_ref_report(cross_ref_df=entire_country_cross_ref_df, country_df=entire_country_df_copy, target_dir=_MASTER_DATA_DIRECTORY)


Max-depth for Australia will be 1
1 csvs need to be processed: ['Australia_0_Master.csv'] , length=2


Get the unique set of all record-ids since there isn't a second file to compare.

178 records get merged into 178

Successfully created \Master_Data\Recursive_Staging_Area\Australia_d1_0_Master.csv!

Successfully created \Master_Data\Recursive_Staging_Area\Australia_d1_Raw_Cross_Ref.csv!


Depth[1] processed successfully.




Processed all 1 levels. Generating the master and cross-reference at the final-layer...

Successfully created \Master_Data\Australia_Master.csv!
576 records get merged into 178

Successfully created \Master_Data\Australia_Raw_Cross_Ref.csv!

Successfully created \Master_Data\Australia_Cross_Ref_Full_Report.csv!
