In [None]:
%load_ext autoreload
%autoreload 2
%matplotlib inline

# Train a clustering model
Once we have a good scoring function (using the ensemble model), we can use a standard clustering algorithm to
group names into clusters.
We've found that many smaller clusters gives better F1 and F2 measures than fewer larger clusters, so long as you're willing to search multiple clusters scoring above a threshold.
In addition, FamilySearch wants to split records into different partitions (index shards) based upon the surname.
Partitions are higher-level groupings of clusters.

In [None]:
from bisect import bisect_left
from collections import namedtuple, defaultdict
import pickle
import random

import joblib
import numpy as np
import pandas as pd
import torch
import wandb

from src.data.normalize import normalize_freq_names
from src.data.utils import load_dataset
from src.data.filesystem import fopen
from src.models.cluster import (
    get_names_to_cluster,
    get_distances,
    generate_clusters_from_distances,
    write_clusters,
    read_clusters,
)
from src.models.swivel import SwivelModel
from src.models.utils import remove_padding

In [None]:
# configure
given_surname = "given"
vocab_size = 610000 if given_surname == "given" else 2100000
save_partitions = given_surname != "given"
max_partitions = 0 if given_surname == "given" else 720
n_to_cluster = 200000 if given_surname == "given" else 500000
cluster_threshold = 0.4 if given_surname == "given" else 0.6
n_jobs = 64

embed_dim = 100
encoder_layers = 2
num_matches = 1000
batch_size = 256
verbose = True

Config = namedtuple("Config", [
    "eval_path",
    "tree_freq_path",
    "hr_freq_path",
    "embed_dim",
    "swivel_vocab_path",
    "swivel_model_path",
    "tfidf_path",
    "ensemble_model_path",
    "cluster_partition_path",
    "cluster_path",
])
config = Config(
    eval_path=f"s3://familysearch-names/processed/tree-hr-{given_surname}-train.csv.gz",
    tree_freq_path=f"s3://familysearch-names/processed/tree-preferred-{given_surname}-aggr.csv.gz",
    hr_freq_path=f"s3://familysearch-names-private/hr-preferred-{given_surname}-aggr.csv.gz",
    embed_dim=embed_dim,
    swivel_vocab_path=f"s3://nama-data/data/models/fs-{given_surname}-swivel-vocab-{vocab_size}-augmented.csv",
    swivel_model_path=f"s3://nama-data/data/models/fs-{given_surname}-swivel-model-{vocab_size}-{embed_dim}-augmented.pth",
    tfidf_path=f"s3://nama-data/data/models/fs-{given_surname}-tfidf.joblib",
    ensemble_model_path=f"s3://nama-data/data/models/fs-{given_surname}-ensemble-model-{vocab_size}-{embed_dim}-augmented-100.joblib",
    cluster_partition_path=f"s3://nama-data/data/models/fs-{given_surname}-cluster-partitions.csv",
    cluster_path=f"s3://nama-data/data/models/fs-{given_surname}-cluster-names.csv",
)

In [None]:
wandb.init(
    project="nama",
    entity="nama",
    name="81_cluster",
    group=given_surname,
    notes="",
    config=config._asdict()
)

### Load data

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
input_names_eval, weighted_actual_names_eval, candidate_names_eval = load_dataset(config.eval_path, is_eval=True)

In [None]:
actual_names_eval = set([name for wans in weighted_actual_names_eval for name, _, _ in wans])
candidate_names_eval = np.array(list(actual_names_eval))
del actual_names_eval
print(len(candidate_names_eval))

In [None]:
freq_df = pd.read_csv(config.tree_freq_path, na_filter=False)
tree_name_freq = normalize_freq_names(freq_df, is_surname=given_surname != "given", add_padding=True)
freq_df = None

In [None]:
freq_df = pd.read_csv(config.hr_freq_path, na_filter=False)
hr_name_freq = normalize_freq_names(freq_df, is_surname=given_surname != "given", add_padding=True)
freq_df = None

In [None]:
# create partitions based upon hr freq
# but clusters based upon tree freq so we get consistent cluster names
partition_name_freq = hr_name_freq
cluster_name_freq = tree_name_freq

In [None]:
vocab_df = pd.read_csv(fopen(config.swivel_vocab_path, "rb"))
swivel_vocab = {name: _id for name, _id in zip(vocab_df["name"], vocab_df["index"])}

In [None]:
swivel_model = SwivelModel(len(swivel_vocab), config.embed_dim)
swivel_model.load_state_dict(torch.load(fopen(config.swivel_model_path, "rb"), map_location=torch.device(device)))
swivel_model.to(device)
swivel_model.eval()

In [None]:
tfidf_vectorizer = joblib.load(fopen(config.tfidf_path, mode='rb'))

In [None]:
ensemble_model = joblib.load(fopen(config.ensemble_model_path, mode='rb'))

### Get names to cluster

In [None]:
# TODO pass in swivel_vocab and ensure that only names in swivel_vocab were selected to cluster
names_to_cluster = get_names_to_cluster(cluster_name_freq, n_to_cluster)

### Compute cluster hierarchy

In [None]:
%%time
distances = get_distances(cluster_name_freq, 
                          names_to_cluster,
                          swivel_model=swivel_model,
                          swivel_vocab=swivel_vocab,
                          tfidf_vectorizer=tfidf_vectorizer,
                          ensemble_model=ensemble_model,
                          num_matches=num_matches,
                          verbose=verbose,
                          n_jobs=n_jobs,
                         )

In [None]:
%%time
model, name_cluster = generate_clusters_from_distances(
                            cluster_algo="agglomerative",
                            cluster_linkage="average",
                            cluster_threshold=-10.0,  # initially put everything into a single cluster
                            distances=distances,
                            names_to_cluster=names_to_cluster,
                            verbose=verbose,
                            n_jobs=n_jobs)

In [None]:
# save names to cluster
with fopen(f"s3://nama-data/data/models/fs-{given_surname}-cluster-model-names.pickle", "wb") as f:
    pickle.dump(names_to_cluster, f)

In [None]:
# save model
joblib.dump(model, fopen(f"s3://nama-data/data/models/fs-{given_surname}-cluster-model.joblib", mode="wb"))

#### Load names to cluster and model

In [None]:
# load names to cluster
with fopen(f"s3://nama-data/data/models/fs-{given_surname}-cluster-model-names.pickle", "rb") as f:
    names_to_cluster = pickle.load(f)

In [None]:
# load model
with fopen(f"s3://nama-data/data/models/fs-{given_surname}-cluster-model.joblib", "rb") as f:
    model = joblib.load(f)

#### Split surnames into partitions

In [None]:
print(len(names_to_cluster))
print(len(model.children_))

In [None]:
# model.children_ is a list of all non-leaf nodes in the cluster hierarchy that contains their immediate children
missing_names = 0
leaf_node_count = len(names_to_cluster)
non_leaf_node_count = len(model.children_)
total_node_count = leaf_node_count + non_leaf_node_count

# count the total name frequency in each leaf and non-leaf node
cluster_freq = np.zeros(total_node_count)
for ix in range(0, leaf_node_count):
    if names_to_cluster[ix] not in partition_name_freq:
        missing_names += 1
    cluster_freq[ix] = partition_name_freq.get(names_to_cluster[ix], 1)

for ix in range(0, non_leaf_node_count):
    count = 0
    for child in model.children_[ix]:
        count += cluster_freq[child]
    cluster_freq[ix + leaf_node_count] = count

print("missing names", missing_names)

In [None]:
# make bisect with a key function work in python 3.9
class KeyWrapper:
    def __init__(self, iterable, key):
        self.it = iterable
        self.key = key

    def __getitem__(self, i):
        return self.key(self.it[i])

    def __len__(self):
        return len(self.it)

def insert_sorted(items, item):
    ix = bisect_left(KeyWrapper(items, key=lambda item: item[0]), item[0])
    items.insert(ix, item)

In [None]:
# starting with the partition at the root of the cluster hierarchy, split the largest partition until you have max_partitions
initial_node = total_node_count - 1
print("total = ", cluster_freq[initial_node])
print("average=", cluster_freq[initial_node] / max_partitions)

In [None]:
# generate max_partitions within min..max size 
max_partition_size = 13137845
min_partition_size = 10000000
total_partitions = 1
partitions = []
insert_sorted(partitions, (cluster_freq[initial_node], [initial_node], 1))

# split phase
while save_partitions and partitions[-1][0] > max_partition_size:
    # split the largest partition
    freq, nodes, n_partitions = partitions.pop()
    total_partitions -= n_partitions
    node = nodes[0]
    # if this is a merged partition, then split it apart
    if len(nodes) > 1:
        for node in nodes:
            insert_sorted(partitions, (cluster_freq[node], [node], 1))
            total_partitions += 1
    # if this is a leaf node that needs to be split, this will be a multi-partition leaf
    elif node < leaf_node_count:
        n_partitions += 1
        total_partitions += n_partitions
        insert_sorted(partitions, (cluster_freq[node] / n_partitions, nodes, n_partitions))
    # else split this node
    else:
        for child in model.children_[node - leaf_node_count]:
            total_partitions += 1
            insert_sorted(partitions, (cluster_freq[child], [child], 1))
print("num partitions after split", total_partitions)

# merge phase: merge partitions that are smaller than the smallest max_partitions
merged = True
i = j = 0
while save_partitions and total_partitions > max_partitions:
    # merge the largest small partition that isn't a multi-partition leaf
    if merged:
        print(len(partitions), i, j)
        i = len(partitions) - 1
        merged = False
    else:
        i -= 1
    while i >= 0:
        freq_i, nodes_i, n_partitions_i = partitions[i]
        if n_partitions_i == 1 and freq_i < min_partition_size:
            break
        i -= 1
    if i < 0:
        # couldn't find a partition
        print("unable to merge")
        break
    # merge into the largest partition such that the combination is <= max_partition_size
    j = len(partitions) - 1
    while j > i:
        freq_j, nodes_j, n_partitions_j = partitions[j]
        if n_partitions_j == 1 and freq_i + freq_j <= max_partition_size:
            break
        j -= 1
    if j == i:
        # couldn't find a partition to merge into
        continue
    # merge partitions
    merged = True
    del partitions[j]
    del partitions[i]
    insert_sorted(partitions, (freq_i + freq_j, nodes_i + nodes_j, 1))
    total_partitions -= 1
print("total_partitions=", total_partitions, len(partitions))

In [None]:
# histo on partition sizes
partition_sizes = []
n_small_partitions = 0
for freq, _, n_partitions in partitions:
    for ix in range(0, n_partitions):
        partition_sizes.append(freq)
        if freq < min_partition_size:
            n_small_partitions += 1
partition_sizes_df = pd.DataFrame(partition_sizes)
partition_sizes_df.hist(bins=40)
print("small partitions", n_small_partitions)
print("all partitions", len(partition_sizes))

In [None]:
# histo in machine sizes assuming round-robin placement of partitions onto machines
n_machines = 40
machine_sizes = [0] * n_machines
ix = 0
for freq, _, n_partitions in partitions:
    for _ in range(0, n_partitions):
        machine_sizes[ix % n_machines] += freq
        ix += 1
machine_sizes_df = pd.DataFrame(machine_sizes)
machine_sizes_df.hist(bins=n_machines) 
print(machine_sizes)

#### Split partition(s) into clusters

In [None]:
clusters = []
# start with the partition nodes
for _, nodes, _ in partitions:
    for node in nodes:
        distance = 0.0 if node < leaf_node_count else model.distances_[node - leaf_node_count]
        insert_sorted(clusters, (distance, node))

# then split each node into clusters if the node's distance is above threshold
while True:
    distance, cluster = clusters.pop()
    if distance <= 1 - cluster_threshold:  # cluster threshold is measured in terms of (1 - distance)
        insert_sorted(clusters, (distance, cluster))
        break
    for child in model.children_[cluster - leaf_node_count]:
        distance = 0.0 if child < leaf_node_count else model.distances_[child - leaf_node_count]
        insert_sorted(clusters, (distance, child))

#### Save partitions and clusters

In [None]:
def get_most_frequent_name(names):
    most_freq_name = None
    highest_freq = -1
    for name in names:
        freq = partition_name_freq.get(name, 0)
        if freq > highest_freq:
            most_freq_name = name
            highest_freq = freq
    return most_freq_name

def name_finder(node_id):
    return names_to_cluster[node_id] if node_id < leaf_node_count else None

def gather_children(node_id, fn, result):
    item = fn(node_id)
    if item:
        result.append(item)
    elif node_id >= leaf_node_count:
        for child in model.children_[node_id - leaf_node_count]:
            gather_children(child, fn, result)

In [None]:
# walk the cluster hierarchy to get the names in each cluster
cluster2names = {}
n_names_oov = 0
for _, cluster in clusters:
    names = []
    gather_children(cluster, name_finder, names)
    n_names = len(names)
    # remove names not in vocab
    names = [name for names if name in swivel_vocab]
    n_names_oov += n_names - len(names)
    if len(names) == 0:
        print("WARN: empty cluster", cluster)
        continue
    # the name of the cluster is the most-frequent name
    freq_name = remove_padding(get_most_frequent_name(names))
    cluster2names[freq_name] = names
print("removed names out of vocab:", n_names_oov)

# invert cluster2names
name_cluster = {}
for cluster, names in cluster2names.items():
    for name in names:
        name_cluster[name] = cluster

# write the dataframe to a csv file
write_clusters(config.cluster_path, name_cluster)

In [None]:
if save_partitions:
    # walk the cluster hierarchy to get the names in each partition
    partition_clusters = []
    for _, nodes, n_partitions in partitions:
        names = []
        for node in nodes:
            gather_children(node, name_finder, names)
        # remove names not in vocab
        names = [name for names if name in swivel_vocab]
        if len(names) == 0:
            print("WARN empty partition", nodes)
            continue
        partition_clusters.append((list(set(name_cluster[name] for name in names)), n_partitions))

    # invert partition2names to get a dataframe with name, partition pairs
    partition_number = 0
    cluster_partition_cluster = []
    cluster_partition_partition = []
    cluster_partition_count = []
    for clusts, n_partitions in partition_clusters:
        for cluster in clusts:
            cluster_partition_cluster.append(cluster)
            cluster_partition_partition.append(partition_number)
            cluster_partition_count.append(n_partitions)
        partition_number += n_partitions
    cluster_partition_df = pd.DataFrame({
        "cluster": cluster_partition_cluster,
        "start_partition": cluster_partition_partition,
        "n_partitions": cluster_partition_count,
    })

    # write the dataframe to a csv file
    cluster_partition_df.to_csv(config.cluster_partition_path, index=False)

In [None]:
if save_partitions:
    pd.set_option('display.max_rows', 500)
    cluster_partition_df[0:500]

In [None]:
wandb.finish()