##   Group matched pid pairs as cluster of matched pids using BFS

### 1. Import relevant packages

In [1]:
from collections import deque
import pandas as pd
import gc

### 2. Import helper functions

In [2]:
import group_pids

### 3. Resolve duplicated entities process

#### Create clusters of matched pids

In [3]:
# Read tables 
# deterministic_table = pd.read_parquet("./linked_data/deterministic_linking_pairs.parquet", engine = "pyarrow").astype("string")
# probabilistic_table = pd.read_parquet("./linked_data/post_inspection_linked_pairs.parquet", columns=["pid_l", "pid_r"], engine = "pyarrow").astype("string")

deterministic_table = pd.read_parquet("./post_inspection_linked_pairs/post_inspection_dem_link.parquet", engine = "pyarrow").astype("string")
probabilistic_table = pd.read_parquet("./post_inspection_linked_pairs/post_inspection_prob_link.parquet", columns=["pid_l", "pid_r"], engine = "pyarrow").astype("string")

# Concatenate pairs from deterministic linking and probabilistic linking (after manual inspection)
concat_table = pd.concat([deterministic_table, probabilistic_table])

# Convert table to dicitonary
pid_map = group_pids.table_to_dict(concat_table)
print(len(pid_map))

# Compute clusters of matched pids
clusters = group_pids.matched_pids(pid_map)
print(len(clusters))

599810
294729


#### Select representitive pid of each cluster based on vacdate

In [4]:
# read personal info table as dictionary
personal_info = pd.read_parquet("/cluster_data/vrdata/standardized/personal_info.parquet", columns=["pid", "vacdate"], engine = "pyarrow")
personal_info["pid"] = personal_info["pid"].astype("string")

# convert to dictionary for faster vacdate retrieval
pid_vacdate = dict()
for index, row in personal_info.iterrows():
    pid_vacdate[row["pid"]] = row["vacdate"]
print(len(pid_vacdate))

# clean up unused variables
del personal_info
gc.collect()

In [5]:
# choose pid with the latest vacdate as the representitive pid for each matched cluster
pid_cluster = dict() # dictionary of rep pid point to list of pids in the cluster
for cluster in clusters:
    lastest_vacdate = pid_vacdate[cluster[0]]
    rep_pid = cluster[0]
    for pid in cluster:
        if pid_vacdate[pid]>lastest_vacdate:
            lastest_vacdate = pid_vacdate[pid]
            rep_pid = pid
    pid_cluster[rep_pid] = cluster
    
print(len(pid_cluster))

298730


#### Create pandas dataframe for pid mapping

In [8]:
# --- Create table with old_pid and new_pid columns
# old_pid - non-unique pid in the original dataset
# new_pid - updated pid (i.e. representative pid of each cluster)

# generate dictionary first as adding rows to dataframe iteratively is extremely inefficient 
pid_mapping = {"old_pid":[], "unified_pid":[]}
for rep_pid, old_pids in pid_cluster.items():
    for pid in old_pids:
        pid_mapping["old_pid"].append(pid)
        pid_mapping["unified_pid"].append(rep_pid)
len(pid_mapping["old_pid"])

608091

In [10]:
# ---- Convert to pandas dataframe
pid_mapping = pd.DataFrame(pid_mapping)
pid_mapping.shape

(608091, 2)

#### Save result data frame as a parquet

In [None]:

pid_mapping.to_parquet("./unified_pid.parquet")