In [None]:
import dremio_client.lib as dlib
import numpy as np

import pandas as pd
import h3pandas
from datetime import datetime
import time

from pyarrow import flight
from pyarrow.flight import FlightClient
import pyarrow.dataset as ds
import polars as pl

from rapidfuzz import fuzz
import datetime
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd


In [None]:
def get_bakong_data():

    query = f"""
        SELECT 
        
            DISTINCT MerchantName
            
        FROM "DataScience_DB"."General_DB".dbo.bk_outgoing
        where date_trunc('month',created_at) = '2023-12-01'
        and type = 'QR' AND MerchantType = '29'
        AND src_name <> MerchantName
        
        ORDER BY 1
        
    """

    data = dlib.simple_query(query)

    return data

In [None]:
def get_aba_data():
    query = f"""
        SELECT DISTINCT CIF, AC_NAME
        FROM dwh.FCCBOREP.VW_CUSTACC
        ORDER BY 1
    """
    data = dlib.simple_query(query)
    return data



In [None]:
#bakong_data = get_bakong_data()
#aba_data_set = get_aba_data()
aba_data_set = pd.read_csv('dataset/aba_data_1m.csv')

In [None]:
aba_data = aba_data_set

In [None]:
#bakong_data.shape
aba_data.shape

<hr>
Clean ABA Data
<hr>

In [None]:
aba_data.tail()

In [None]:
aba_data.info()

In [None]:
aba_data['AC_NAME'] = aba_data['AC_NAME'].str.upper()
aba_data['AC_NAME'] = aba_data['AC_NAME'].str.strip()
aba_data['AC_NAME'] = aba_data['AC_NAME'].str.replace(" ", "")

In [None]:
aba_data = aba_data.drop_duplicates()
aba_data = aba_data.dropna()

In [None]:
aba_data.shape

<hr>
Clean Bakong Data
<hr>


In [None]:
bakong_data = pd.read_csv('dataset/unique_merchantname_bk_500001_lastrow.csv')
bakong_data.shape

In [None]:
bakong_data.drop('Unnamed: 0', axis=1, inplace=True)
bakong_data.head()

In [None]:
bakong_data['Unique_MerchantName'] = bakong_data['Unique_MerchantName'].str.upper()
bakong_data['Unique_MerchantName'] = bakong_data['Unique_MerchantName'].str.strip()
bakong_data['Unique_MerchantName'] = bakong_data['Unique_MerchantName'].str.replace(" ", "")

bakong_data.shape

In [None]:
bk_cust = bakong_data.head(1000)
aba_cust = aba_data.head(10000)

In [None]:
print(bk_cust.shape)
print(aba_cust.shape)

In [None]:
#concate the dataframe 
data = pd.DataFrame({"bakong_cust" : bk_cust['Unique_MerchantName'] , "aba_cust" : aba_cust['AC_NAME']})
data.head()

In [None]:
data.dropna(subset=['aba_cust'], inplace=True)
#data.dropna(subset=['bakong_cust'], inplace=True)

data.head()

<hr>
Dask Python - Data Frame 
<hr>

In [None]:
data.shape

In [None]:

def fuzzy_match_partition(df, aba_cust_list):
    results = []
    for index, row in df.iterrows():
        for item_b in aba_cust_list:
            score = fuzz.token_set_ratio(str(row['bakong_cust']), str(item_b))
            if score >= 95.0:
                results.append({'bakong_cust': row['bakong_cust'], 'aba_cust': item_b, 'score': score})
                break
    return pd.DataFrame(results)

In [None]:
# Convert the Pandas dataframe to a Dask dataframe
dask_df = dd.from_pandas(data, npartitions=10)
dask_df

In [None]:
client = Client(n_workers=2, memory_limit="5GB")
client

In [None]:
#create dask data to list
aba_cust_list = dask_df['aba_cust'].compute().tolist()

# save the dask data above to each cluster to process bakong data
distributed_aba_cust_list = client.scatter(aba_cust_list, broadcast=True)

#create meta 
meta = {'bakong_cust': 'object', 'aba_cust': 'object', 'score': 'float'}

In [None]:

# apply the data and do matching 
result_1 = dask_df.map_partitions(fuzzy_match_partition, distributed_aba_cust_list, meta=meta)

start = datetime.datetime.now()
print("Start time:", start)

results_df_1 = result_1.compute()

end = datetime.datetime.now()

print("End time:", end)
print("Duration:", end - start)

In [None]:
results_df_1

<hr>
Terminal Cluster
<hr>

In [None]:
#close the cluster 
# client.close()