In [1]:
import pandas as pd
from thefuzz import process
import re
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
import multiprocessing as mp

DIR = "/Users/tlahtolli/dev/drone_warfare/data/cleanup"

In [2]:
af = pd.read_csv(f'{DIR}/3_manual_cleanup/af.csv',)
pk = pd.read_csv(f'{DIR}/3_manual_cleanup/pk.csv')
so = pd.read_csv(f'{DIR}/3_manual_cleanup/so.csv')
ye = pd.read_csv(f'{DIR}/3_manual_cleanup/ye.csv')

af_nga = pd.read_csv(f'{DIR}/nga/Afghanistan/Afghanistan.txt', sep='\t')
pk_nga = pd.read_csv(f'{DIR}/nga/Pakistan/Pakistan.txt', sep='\t')
so_nga = pd.read_csv(f'{DIR}/nga/Somalia/Somalia.txt', sep='\t')
ye_nga = pd.read_csv(f'{DIR}/nga/Yemen/Yemen.txt', sep='\t')

admin_nga = pd.read_csv(f'{DIR}/nga/Administrative_Regions/Administrative_Regions.txt', sep='\t')

  exec(code_obj, self.user_global_ns, self.user_ns)
  exec(code_obj, self.user_global_ns, self.user_ns)
  exec(code_obj, self.user_global_ns, self.user_ns)
  exec(code_obj, self.user_global_ns, self.user_ns)


In [3]:
class LocationNormalizer:
    def __init__(self, source_df, ref_df):
        self.source_df = source_df.copy()
        self.ref_df = ref_df.copy()
        self.target_df = None
        self.source_subset = None
        self.ref_subset = None
        self.adm1_codes = []

    def create_sort_name(self, name):
        name = re.sub(r'[^a-zA-Z0-9\s]', '', name)
        return name.upper().replace(' ', '')
    
    def create_sort_columns(self, df, col):
        df[f'sort_{col}'] = df[col].apply(self.create_sort_name)
        return df
    
    def get_unique_source_locations(self, df, col='Adm_1', adm1_code=None):
        unwanted_values = ["unknown", "unclear", "various", "multiple", 'Unknown', 'Unclear', 'Various', 'Multiple']
        filtered_df = df[~df[col].isin(unwanted_values) & df[col].notnull()]

        if adm1_code:
            adm1_col = 'matched_sort_Adm_1_adm1'
            filtered_df = filtered_df[filtered_df[adm1_col] == adm1_code]
        result = filtered_df[[col]].drop_duplicates()

        return result
    
    def get_unique_ref_location_data(self, desig_cd='ADM1', adm1_code=None):
        if desig_cd == '':
            filter_condition = (~self.ref_df['desig_cd'].isin(['ADM1', 'ADM2', 'ADM3', 'ADM4']))
        else:
            filter_condition = (self.ref_df['desig_cd'] == desig_cd) & (self.ref_df['name_rank'] == 1)

        if adm1_code:
            filter_condition &= (self.ref_df['adm1'] == adm1_code)

        unique_adm1_df = self.ref_df.loc[filter_condition, ['ufi', 'adm1', 'sort_name', 'full_name','full_nm_nd', 'desig_cd','lat_dd', 'long_dd']].drop_duplicates(subset=['sort_name'])
        return unique_adm1_df

    def fuzzy_search(self, query, choices):
        try:
            match = process.extractOne(query, choices)
            if match[1] >= 75:
                return match[0]
            else:
                return 'unclear'
        except:
            return 'unclear'
        

    def get_matches(self, source_subset, ref_subset, col='Adm_1'):
        target_col = f'matched_sort_{col}'
        # Rename the sort_name column to match the col
        ref_subset = ref_subset.rename(columns={'sort_name': f'{target_col}'})
        # Convert the Pandas DataFrames to Dask DataFrames
        source_ddf = dd.from_pandas(source_subset, npartitions=8)
        ref_ddf = dd.from_pandas(ref_subset, npartitions=8)

        def fuzzy_search_sort_name(query, ref_ddf):
            return self.fuzzy_search(query, ref_ddf[f'{target_col}'])

        # Apply the fuzzy search function on the source subset searching the reference subset
        source_ddf[f'{target_col}'] = source_ddf[f'sort_{col}'].apply(fuzzy_search_sort_name, ref_ddf=ref_ddf, meta=(f'{target_col}', 'object'))

        # Convert the Dask DataFrame back to a Pandas DataFrame
        source_df = source_ddf.compute()

        # Rename columns in ref_subset by prepending target_col
        renamed_columns = {col: f'{target_col}_{col}' for col in ref_subset.columns}
        ref_subset = ref_subset.rename(columns=renamed_columns)

        # Perform the left join between source_df and ref_subset
        result_df = pd.merge(source_df, ref_subset, left_on=f'{target_col}', right_on=f'{target_col}_{target_col}', how='left')
        
        # Return the original columns along with the matched columns
        return result_df[[col, *result_df.columns[result_df.columns.str.startswith(target_col)]]]


    def get_normalized_loc_data(self, col='Adm_1'):
        self.create_sort_columns(self.source_subset, col)

        normalized_names = self.get_matches(self.source_subset, self.ref_subset, col)
        return normalized_names

    def normalize(self, df, col="Adm_1", desig_cd='ADM1'):
        normalized_data = self.get_normalized_loc_data(col)
        merged_df = pd.merge(df, normalized_data, on=col, how='left', suffixes=('', '_matched'))

        # Update the original columns with matched values
        for col in df.columns:
            if f"{col}_matched" in merged_df.columns:
                # Update the original column with the matched values when they exist (i.e., not NaN)
                merged_df[col].update(merged_df[f"{col}_matched"].dropna())

        # Drop the extra columns created due to the merge
        cols_to_keep = [col for col in merged_df.columns if not col.endswith('_matched')]
        merged_df = merged_df[cols_to_keep]

        return merged_df

    def get_unique_adm1_codes(self, df):
        return df['matched_sort_Adm_1_adm1'].dropna().drop_duplicates().tolist()
    
    def prepare_data(self, df, col='Adm_1', desig_cd='ADM1', adm1_code=None):
        self.source_subset = self.get_unique_source_locations(df, col, adm1_code)
        self.ref_subset = self.get_unique_ref_location_data(desig_cd, adm1_code)

    def normalize_adm_1(self):
        self.prepare_data(self.source_df, 'Adm_1', 'ADM1')
        self.target_df = self.normalize(self.source_df, 'Adm_1', 'ADM1')
        self.adm1_codes = self.get_unique_adm1_codes(self.target_df)
    
    def normalize_remaining(self, col='Adm_2', desig_cd='ADM2'):
        for adm1_code in self.adm1_codes:
            self.prepare_data(self.target_df, col, desig_cd, adm1_code)
            self.target_df = self.normalize(self.target_df, col, desig_cd)

    def output(self, filename):
        self.target_df.to_csv(filename, index=False)



In [5]:
with LocalCluster(n_workers=int(0.9 * mp.cpu_count()),
    processes=True,
    threads_per_worker=1,
    memory_limit='2GB',
) as cluster, Client(cluster) as client:
    # Do something using 'client'
    af_norm = LocationNormalizer(af, af_nga)
    af_norm.normalize_adm_1()
    af_norm.normalize_remaining('Adm_2', 'ADM2')
    af_norm.normalize_remaining('Loc', '')
    af_norm.output(f'{DIR}/4_nga_normalize/af.csv')
    print('AF Done')

with LocalCluster(n_workers=int(0.9 * mp.cpu_count()),
    processes=True,
    threads_per_worker=1,
    memory_limit='2GB',
) as cluster, Client(cluster) as client:
    # Do something using 'client'
    pk_norm = LocationNormalizer(pk, pk_nga)
    pk_norm.normalize_adm_1()
    pk_norm.normalize_remaining('Adm_2', 'ADM2')
    pk_norm.normalize_remaining('Adm_3', 'ADM3')
    pk_norm.normalize_remaining('Adm_4', '') # not designated as ADM4 in NGA
    pk_norm.normalize_remaining('Loc', '')
    pk_norm.output(f'{DIR}/4_nga_normalize/pk.csv')
    print('PK Done')

with LocalCluster(n_workers=int(0.9 * mp.cpu_count()),
    processes=True,
    threads_per_worker=1,
    memory_limit='2GB',
) as cluster, Client(cluster) as client:
    # Do something using 'client'
    so_norm = LocationNormalizer(so, so_nga)
    so_norm.normalize_adm_1()
    so_norm.normalize_remaining('Adm_2', 'ADM2')
    so_norm.normalize_remaining('Loc', '')
    so_norm.output(f'{DIR}/4_nga_normalize/so.csv')
    print('SO Done')

with LocalCluster(n_workers=int(0.9 * mp.cpu_count()),
    processes=True,
    threads_per_worker=1,
    memory_limit='2GB',
) as cluster, Client(cluster) as client:
    # Do something using 'client'
    ye_norm = LocationNormalizer(ye, ye_nga)
    ye_norm.normalize_adm_1()
    ye_norm.normalize_remaining('Adm_2', 'ADM2')
    ye_norm.normalize_remaining('Loc', '')
    ye_norm.output(f'{DIR}/4_nga_normalize/ye.csv')
    print('YE Done')

2023-03-21 21:31:48,260 - distributed.worker - ERROR - failed during get data with tcp://127.0.0.1:58870 -> tcp://127.0.0.1:58834
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/distributed/comm/tcp.py", line 225, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/distributed/worker.py", line 1797, in get_data
    response = await comm.read(deserializers=serializers)
  File "/usr/local/lib/python3.9/site-packages/distributed/comm/tcp.py", line 241, in read
    convert_stream_closed_error(self, e)
  File "/usr/local/lib/python3.9/site-packages/distributed/comm/tcp.py", line 144, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed)  local=tcp://127.0.0.1

KeyboardInterrupt: 