In [12]:
import pandas as pd
from rapidfuzz import process, fuzz

In [13]:
from char_converter import CharConverter
converter = CharConverter('v2t')
def convert(text):
    return converter.convert(text)

In [None]:
class sort_titles:
    def __init__(self, dataframe, column_names=["c_title_chn", "c_textid", "c_name_chn", "c_personid", "row_id"]):
        # must have at least the five columns
        # c_textid is in fact not needed, but keep it for now
        # c_textid is NOT unique (one text with multiple authors), and thus the row_id is necessary
        self.dtf = dataframe
        self.cols = column_names
        
    def get_sorted_choices(self, byid=False):
        self.dict_by_author = {} 
        # a nested dictionary
        # first layer: key=author or authorid, val=dict
        # second layer: key=(rowid, textid), val=title
        for _, row in self.dtf.iterrows():
            author = str(row[self.cols[2]])
            authorid = int(row[self.cols[3]])
            title = str(row[self.cols[0]])
            titleid = int(row[self.cols[1]])
            rowid = int(row[self.cols[4]])
            if byid:
                if authorid not in self.dict_by_author:
                    self.dict_by_author[authorid] = {}
                self.dict_by_author[authorid][(rowid, titleid)] = title
            else:
                if author not in self.dict_by_author:
                    self.dict_by_author[author] = {}
                self.dict_by_author[author][(rowid, titleid)] = title
        return self.dict_by_author
    
    def get_all_choices(self):
        self.dtf = self.dtf.astype({self.cols[4]: int, self.cols[1]: int, self.cols[0]: str})
        self.dict_all_titles = self.dtf.set_index([self.cols[4], self.cols[1]])[self.cols[0]].to_dict()
        return self.dict_all_titles # a simple dictionary, not nested
    

In [15]:
# use fuzzy match by char as an alternative scorer

def compare_chars(char_1, char_2):
    char_1_set = set(char_1)
    char_1_set_len = len(char_1_set)
    char_2_set = set(char_2)
    char_2_set_len = len(char_2_set)
    intersection_set = char_1_set.intersection(char_2_set)
    intersection_set_len = len(intersection_set)
    if(char_1_set_len != 0 and char_2_set_len != 0):
        max_rate = max(intersection_set_len/char_1_set_len, intersection_set_len/char_2_set_len)
    else:
        max_rate = 0
    return [intersection_set_len, max_rate, ";".join(intersection_set)]

def compare_chars_scorer(s1, s2, **kwargs):
    result = compare_chars(s1, s2)
    
    return result[1] * 100

In [16]:
# look up by author name, not id
# filter the compare table by dynasty first
def fuzzy_lookup_by_name(dfleft, dfright, right_cols=["c_title_chn", "c_textid", "c_name_chn", "c_personid", "row_id"], left_cols=["internalindex","title","author"], scorer=fuzz.WRatio, score_cutoff=75):
    # try also scorer = fuzz.token_set_ratio, compare_chars_scorer
    results = [] # the final values to be converted to df

    # sort the compare table
    df2 = sort_titles(dfright, column_names=right_cols)
    # create the nested dict, key=author, val=dict{key=(rowid,textid),val=title}
    titles_by_author = df2.get_sorted_choices(byid=False)
    choices = {} # decide by condition which dict to create

    for _, row in dfleft.iterrows():
        index = row[left_cols[0]]
        title = row[left_cols[1]]
        author = row[left_cols[2]]
        flag = ""
        if author in titles_by_author:
            if author == "未詳":
                flag = "anon work" # excluded from the automated doublecheck
            else:
                flag = "author matched"
            choices = titles_by_author[author]
        else:
            flag = "no match of author"
            choices = df2.get_all_choices() 
            # here the rowid matters
            # it may match titles by the same author, but the author names do not match due to character variants or errors

        # returns a list of tuples: [(match_title, score, (rowid,textid)),etc], or NoneType
        matches = process.extract(title, choices, scorer=scorer, limit=None, score_cutoff=score_cutoff)

        # doublecheck whether an authored title == an anon title in CBDB
        if (not matches) and (flag == "author matched"):
            doublecheck = titles_by_author["未詳"]
            matches = process.extract(title, doublecheck, scorer=scorer, limit=None, score_cutoff=score_cutoff)
            if matches:
                flag = "author matched but text from anon works"
        
        # write values
        if matches:
            for match_title, score, ids in matches:
                rowid = ids[0]
                textid = ids[1]
                results.append({
                    "internalindex": index, 
                    "title": title,
                    "authorname": author,
                    "best_match_title": match_title,
                    "best_match_textid": textid,
                    "row_id": rowid,
                    "notes": flag,
                    "score": score
                    })
        else:
            results.append({
                "internalindex": index,
                "title": title,
                "authorname": author,
                "best_match_title": None,
                "best_match_textid": None,
                "row_id": None,
                "notes": flag,
                "score": None
            })
    
    return pd.DataFrame(results)

In [17]:
# look up by author id, and check against anon works
def fuzzy_lookup_by_id(dfleft, dfright, right_cols=["c_title_chn", "c_textid", "c_name_chn", "c_personid", "row_id"], left_cols=["internalindex","title","authorid"], scorer=fuzz.WRatio, score_cutoff=75):
    # try also scorer = fuzz.token_set_ratio, compare_chars_scorer
    df2 = sort_titles(dfright, column_names=right_cols)
    results = []
    titles_by_author = df2.get_sorted_choices(byid=True) # the nested dict
    choices = {} # decide by condition which dict to create
    for _, row in dfleft.iterrows():
        index = row[left_cols[0]]
        title = row[left_cols[1]]
        author = row[left_cols[2]]
        flag = ""
        if author in titles_by_author:
            if author == 0:
                flag = "check" # excluded from the automated doublecheck
            else:
                flag = "author matched"
            choices = titles_by_author[author]
        else:
            flag = "no match of author"
            choices = titles_by_author[0] # unlike filter by author name, no need to check against everything

        # returns a list of tuples: [(match_title, score, (rowid,textid)), etc], or NoneType
        matches = process.extract(title, choices, scorer=scorer, limit=None, score_cutoff=score_cutoff)

        # doublecheck whether an authored title == an anon title in CBDB
        if (not matches) and (flag == "author matched"):
            doublecheck = titles_by_author[0]
            matches = process.extract(title, doublecheck, scorer=scorer, limit=None, score_cutoff=score_cutoff)
            if matches:
                flag = "author matched but text from anon works"
        
        # write values
        if matches:
            for match_title, score, ids in matches:
                rowid = ids[0] 
                textid = ids[1]
                results.append({
                    "internalindex": index, 
                    "title": title,
                    "authorid": author,
                    "best_match_title": match_title,
                    "best_match_textid": textid,
                    "row_id": rowid,
                    "notes": flag,
                    "score": score
                    })
        else:
            results.append({
                "internalindex": index,
                "title": title,
                "authorid": author,
                "best_match_title": None,
                "best_match_textid": None,
                "row_id": None,
                "notes": flag,
                "score": None
            })
    
    return pd.DataFrame(results)

In [18]:
# simple fuzzy match, do not check against anon works
def fuzzy_lookup_simple_id(dfleft, dfright, right_cols=["c_title_chn", "c_textid", "c_name_chn", "c_personid", "row_id"], left_cols=["internalindex","title","authorid"], scorer=fuzz.WRatio, score_cutoff=75):
    # try also scorer = fuzz.token_set_ratio, compare_chars_scorer
    df2 = sort_titles(dfright, column_names=right_cols)
    results = []
    titles_by_author = df2.get_sorted_choices(byid=True) # the nested dict
    choices = {} # decide by condition which dict to create
    for _, row in dfleft.iterrows():
        index = row[left_cols[0]]
        title = row[left_cols[1]]
        author = row[left_cols[2]]
        flag = ""
        if author in titles_by_author:
            flag = "author matched"
            choices = titles_by_author[author]
            # returns a list of tuples: [(match_title, score, (rowid,textid))], or NoneType
            matches = process.extract(title, choices, scorer=scorer, limit=None, score_cutoff=score_cutoff)
        else:
            matches = None
            flag = "no match of author"
        
        # write values
        if matches:
            for match_title, score, ids in matches:
                rowid = ids[0]
                textid = ids[1]
                results.append({
                    "internalindex": index,
                    "title": title,
                    "authorid": author,
                    "best_match_title": match_title,
                    "best_match_textid": textid,
                    "row_id": rowid,
                    "notes": flag,
                    "score": score
                    })
        else:
            results.append({
                "internalindex": index,
                "title": title,
                "authorid": author,
                "best_match_title": None,
                "best_match_textid": None,
                "row_id": None,
                "notes": flag,
                "score": None
            })
    
    return pd.DataFrame(results)

In [19]:
def reorderdf (df1, df2, targetcol, mergeon):
    '''adjust column orders after merging two dfs
    df1 is the merged df (big), df2 is the added df (small)
    insert everything in df2 before the targetcol
    return a new df'''
    cols_to_move = [col for col in df2 if col != mergeon]
    all_cols = list(df1.columns)
    for col in cols_to_move:
        all_cols.remove(col)
    insert_pos = all_cols.index(targetcol)
    newcol_list = all_cols[:insert_pos] + cols_to_move + all_cols[insert_pos:]
    reordered = df1[newcol_list]
    return reordered

In [20]:
def run_and_merge(func, inputtable, comparetable, comparecols, inputcols, scorer, score_cutoff):
    result = func(inputtable, comparetable, right_cols=comparecols, left_cols=inputcols, scorer=scorer, score_cutoff=score_cutoff)
    if func == fuzzy_lookup_by_name:
        inputtable_cleaned = inputtable.drop(columns=["title","authorname"])
    else:
        inputtable_cleaned = inputtable.drop(columns=["title","authorid"])
    comparetable_cleaned = comparetable.drop(columns=["c_title_chn", "c_textid"])
    # join the comparetable and adjust the order
    mergestep = pd.merge(result, comparetable_cleaned, on="row_id", how="left")
    reordered = reorderdf(mergestep, comparetable_cleaned, "notes", "row_id")
    # join the original table
    mergefurther = pd.merge(reordered, inputtable_cleaned, on="internalindex", how="left")
    reordered_further = reorderdf(mergefurther, inputtable_cleaned, "best_match_title", "internalindex")
    merge = reordered_further.drop(columns=["row_id"])
    merge.to_excel("output.xlsx")

The values in the following four can be modified if necessary:

In [None]:
## compare against the table from the latest.db, run the py file first
comparetablename = "JOINED_BIOG_TEXT.xlsx"
comparetable = pd.read_excel(comparetablename, sheet_name=0) 
comparecols=["c_title_chn", "c_textid", "c_name_chn", "c_personid","row_id"] # change column headers if necessary

run_char = 1

inputtable = pd.read_excel("input.xlsx", sheet_name=0) # change file name if necessary
inputcols = ["internalindex","title","authorid"] # change column headers if necessary
# inputcols = ["internalindex","title","authorname"] # match by author name; filter by dynasties first
if run_char == 1:
    inputtable["title"] = inputtable["title"].astype(str).apply(convert)
    # inputtable["authorname"] = inputtable["authorname"].astype(str).apply(convert)

In [None]:
scorer = fuzz.WRatio # for higher accuracy
# scorer = compare_chars_scorer # for higher sensitivity
score_cutoff = 80 # adjust if necessary

func = fuzzy_lookup_simple_id
## default setting: match author by CBDB id

# func = fuzzy_lookup_by_id
# match author by CBDB id AND if no match by id, check against works with no known author (c_name_chn = 未詳)

# func = fuzzy_lookup_by_name
## match author by Chinese character AND if no match by author name, check against works with no known author (c_name_chn = 未詳)

In [23]:
run_and_merge(func, inputtable, comparetable, comparecols, inputcols, scorer, score_cutoff)