In [None]:
# import requirements, configure, setup
import csv
import json
import requests
import pandas as pd
pd.set_option('max_colwidth', 100000)
pd.set_option('max_rows', 100000)

customer_taxonomy_path = "./near.csv"
output_path            = "./output/near_taxonomy_mapping.csv"
endpoint               = "https://ann-indexer.sharethis.com/api/v1" # this endpoint requires sharethis vpn
splitter               = "/"
k_nearest              = 30                                         # input signal's k nearest neighbors returned

In [None]:
# steps wrapped up in functions

# load customer's taxonomy, normalization, generate category - lowest level node pair as indexer input
def load_data(path):
    # get the max number of levels of this taxonomy ready for downstream roll-up
    max_num_of_levels = 0
    cat_lowest = []
    with open(path, newline='') as f:
        reader = csv.reader(f)
        next(reader) # skip header
        for l in reader:
            # split the first column, choose the proper splitter, "/" as example here
            levels = [x.lower() for x in l[0].split(splitter) if x and x != ' ']
            if len(levels) > max_num_of_levels:
                max_num_of_levels = len(levels)
            pair = []
            # normalization
            pair.append("/"+"/".join([x.replace(" & ", "&").replace(", ", ",").replace(" ", "_").replace("/", "_") for x in levels]))
            pair.append(levels[-1])
            cat_lowest.append(pair)
    print(f"Number of categories in this taxonomy: {len(cat_lowest)}")
    print()
    for x in cat_lowest:
        print(x)
    return cat_lowest, max_num_of_levels

# send request to ann-indexer, get concepts & entities back
# this function handles automatic retry when request failed in the first place
# until all categories are succesfully processed
def get_kecc_expand(kecc, todo_list, mapping_table, endpoint=endpoint, k=k_nearest):
    if (len(todo_list)) < 1:
        return mapping_table
    print("==========================Start==========================")
    error_list = []
    cnt = len(todo_list)
    for cat in todo_list:
        body = {"sentences": [cat[1]],"require_embed": False}
        if 'world_localities' in cat[0]:
            k = 10
        try:
            r = requests.post(f'{endpoint}/search/{kecc}/{k}', data = json.dumps(body))
            mapping = json.loads(r.text)[0]["matches"][0]
            expand = [x["key"] for x in mapping["topns"]]
            expand.append(mapping["key"].replace(" ", "_"))
            mapping_table.append({"category": cat[0], 
                                  f"{kecc}_expand": list(set(expand))})
            print(f"---{kecc.upper()} for category '{cat[0]}' processed---")
        except Exception as e:
            error_list.append(cat)
            print(f"### ERROR processing: '{cat[0]}', {len(error_list)} retries awaits ###")
            print(e)
        finally:
            cnt-=1
            print(f"[{cnt}/{len(todo_list)} left in current queue, {len(error_list)} retries awaits]")
            k = k_nearest
    return get_kecc_expand(kecc, error_list, mapping_table, endpoint, k)

# roll up
def roll_up(concept_mapping_table, entity_mapping_table):
    concept_df = pd.DataFrame.from_dict(concept_mapping_table).explode("concepts_expand").rename({'concepts_expand': 'expand'}, axis='columns')
    entity_df = pd.DataFrame.from_dict(entity_mapping_table).explode("entities_expand").rename({'entities_expand': 'expand'}, axis='columns')
    df_no_level = pd.concat([concept_df, entity_df], ignore_index=True)

    df = df_no_level.copy()
    df["level"] = 1
    df["split"] = df["category"].str.split("/")
    df["category_by_level"] = [x[0:2] for x in df["split"]]
    df["category"] = ["/".join(x) for x in df["category_by_level"]]
    for level in range(2, max_num_of_levels+1):
        tmp_df = df_no_level.copy()
        tmp_df["level"] = level
        tmp_df["split"] = tmp_df["category"].str.split("/")
        tmp_df["category_by_level"] = [x[0:level+1] for x in tmp_df["split"]]
        tmp_df["category"] = ["/".join(x) for x in tmp_df["category_by_level"]]
        df = pd.concat([df, tmp_df], ignore_index=True)

    output = df[["category", "expand"]].drop_duplicates(subset=["category", "expand"])
    print(f"before expand: {len(df_no_level)}")
    print(f"after expand: {len(output)}")
    return output.sort_values(by=['category'])

In [None]:
# execution

# load data
print("JOB START!!!")
print()
cat_lowest, max_num_of_levels = load_data(customer_taxonomy_path)
# get mapping table
print()
print("+++++++++++++++++++++requests to ann-indexer ++++++++++++++++++++++")
concept_mapping_table = get_kecc_expand("concepts", cat_lowest, [], endpoint, k_nearest)
entity_mapping_table = get_kecc_expand("entities", cat_lowest, [], endpoint, k_nearest)
print("++++++++++++++++++ ann-indexer requests finished ++++++++++++++++++++++")
print()
# do the roll up expansion
print("++++++++++++++++++++++++++rolling up++++++++++++++++++++++++++++++++")
output = roll_up(concept_mapping_table, entity_mapping_table)
# examine and write to disk
display(output)
print("++++++++++++++++++++++++writing to disk++++++++++++++++++++++++++++++++")
output.to_csv(output_path, encoding='utf-8', header=True, index=False)
print()
print("JOB FINISH!!!")