In [4]:
import sentencepiece as spm
from gensim.models.word2vec import Word2Vec
from os import makedirs
import logging
import numpy as np
from scipy.stats import pearsonr
from sklearn.metrics.pairwise import cosine_similarity

In [5]:
class SentencePieceConfig:
    def __init__(
        self,
        max_sentence_length: int = 5000,
        vocab_size: int = 20000,
        model_type: str = "unigram",
        model_tag: str = "lw"
    ):
        self.max_sentence_length = max_sentence_length
        self.vocab_size = vocab_size
        self.model_type = model_type
        self.model_tag = model_tag

    def get_model_name(
            self,
            model_name: str = None,
            model_dir: str = "models/level_wise/levelX"
    ):
        if model_name is not None:
            return model_name
        return f"{model_dir.strip('/')}/" \
            f"{self.model_type}_vs{self.vocab_size}_{self.model_tag}"

class Word2VecConfig:
    def __init__(
        self,
        vector_size: int = 100,
        window: int = 5,
        model_tag: str = "lw"
    ):
        self.vector_size = vector_size
        self.window = window
        self.model_tag = model_tag

    def get_model_path(
            self,
            model_name: str = None,
            model_dir: str = "models/level_wise/levelX"
    ):
        if model_name is not None:
            return model_name
        return f"{model_dir.strip('/')}/" \
            f"w2v_vs{self.vector_size}_w{self.window}_{self.model_tag}.model"


class Config:
    def __init__(
            self,
            sp_config: SentencePieceConfig = SentencePieceConfig(),
            w2v_config: Word2VecConfig = Word2VecConfig(),
            cluster_threshold: float = 0.45
    ):
        self.sp_config = sp_config
        self.w2v_config = w2v_config
        self.cluster_threshold = cluster_threshold

In [6]:
class Cluster:
    def __init__(
            self,
            threshold: float = None,
            model: str | Word2Vec = None,
            map_file: str = None,
            log_dir: str = None
    ):
        self.cluster_to_words = dict()
        self.word_to_cluster = dict()

        if model is not None and threshold is not None:
            self.create_clusters_from_model(model, threshold)
        elif map_file is not None:
            self.load_mapping(map_file)

        self.log_dir = log_dir
        self.logger = logging.getLogger("Cluster")
        self.logger.setLevel("DEBUG")
        if log_dir is not None:
            cluster_log_fh = logging.FileHandler(self.log_dir +"clusters_log.txt")
            log_fh = logging.FileHandler(self.log_dir +"log.txt")
            cluster_log_fh.setLevel("DEBUG")
            log_fh.setLevel("DEBUG")
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            cluster_log_fh.setFormatter(formatter)
            log_fh.setFormatter(formatter)
            self.logger.addHandler(cluster_log_fh)
            self.logger.addHandler(log_fh)

    def create_clusters_from_model(
            self,
            model: Word2Vec,
            threshold: float
    ):
        words = list(model.wv.keys_to_index.keys())

        cluster_idx = 0  # Counter

        for word in words:
            # Check if word has already been clustered
            if word not in self.word_to_cluster.keys():
                # Create new cluster
                cluster_idx += 1
                while (not chr(0x0020 + cluster_idx).isalpha()) or len(chr(0x0020 + cluster_idx)) > 1:
                    cluster_idx += 1
                cluster_key = chr(0x0020 + cluster_idx)

                # Add new word to cluster
                self.cluster_to_words[cluster_key] = [word]
                self.word_to_cluster[word] = cluster_key
                
                # Add all similar words
                for similar_word, score in model.wv.most_similar(word, topn=200):
                    if score > threshold:
                        self.cluster_to_words[cluster_key].append(similar_word)
                        self.word_to_cluster[similar_word] = cluster_key
        
        self.logger.info(f"Created {len(self.cluster_to_words)} clusters.")

    def load_mapping(
            self,
            map_file: str
    ) -> None:
        with open(map_file, "r", encoding="utf-8") as map_fp:
            for line in map_fp.readlines():
                cluster, words = line.split("\t")
                words = words.split(",")
                self.cluster_to_words[cluster] = words
                for word in words:
                    self.word_to_cluster[word] = cluster

    def save_mapping(
            self,
            map_file: str = None,
            map_dir: str = "data/level_wise/levelX"
    ) -> None:
        if map_file is None:
            map_dir = map_dir.strip("/")
            makedirs(map_dir)
            map_file = map_dir + "/clusters.txt"
        with open(map_file, "w+", encoding="utf-8") as map_fp:
            for cluster in list(self.cluster_to_words.keys()):
                map_fp.write(
                    cluster +"\t"
                    +",".join([word for word in self.cluster_to_words[cluster]])
                    +"\n"
                )


class WordToUtteranceMapping:
    def __init__(
            self,
            map_file: str = None
    ):
        self.utterances = {}
        if map_file is not None:
            self.load_mapping(map_file)

    def load_mapping(
            self,
            map_file: str
    ):
        """
        Loads mapping from a file with word and utterance separated by a tab-space
        """
        with open(map_file, "r") as utterance_file:
            for line in utterance_file.readlines():
                key, seq = line.strip().split("\t")

                if key not in self.utterances:
                    self.utterances[key] = []

                self.utterances[key].append(seq)

    def get_vectors_from_word(
            self,
            word,
            sp_model: spm.SentencePieceProcessor,
            w2v_model: Word2Vec
    ):
        return np.array(
            [self.get_vector_from_utterance(
                utterance,
                sp_model,
                w2v_model
            ) for utterance in self.utterances[word]]
        )

    def get_vector_from_utterance(
            self,
            utterance,
            sp_model: spm.SentencePieceProcessor,
            w2v_model: Word2Vec
    ):
        if utterance in w2v_model.wv.key_to_index.keys():
            return w2v_model.wv[utterance].reshape(1, -1)
        else:
            pieces = list(
                filter(
                    lambda x: x != "_",
                    sp_model.EncodeAsPieces(utterance)
                )
            )

            units = [piece.replace("_", "") for piece in pieces]

            vectors = np.array([w2v_model.wv[unit] for unit in units])
            return vectors.mean(axis=0).reshape(1, -1)

In [7]:
class TestBench:
    def __init__(
            self,
            scores_file: str = None
    ) -> None:
        if scores_file:
            self.load_scores(scores_file)

    def load_scores(
            self,
            scores_file: str
    ):
        pass

    def score(
            self,
            sp_model: spm.SentencePieceProcessor,
            w2v_model: Word2Vec,
            utterances: WordToUtteranceMapping
    ) -> dict:
        pass


class LSTestBench(TestBench):
    def __init__(self, scores_file: str = None) -> None:
        super().__init__(scores_file)

    def load_scores(
            self,
            scores_file: str
    ):
        self.sim_pairs = []
        self.rel_pairs = []

        with open(scores_file, "r") as pairs_file:
            for line in pairs_file.readlines()[1:]:
                w1, w2, sim, rel = line.strip().split(",")
                if sim:
                    self.sim_pairs.append((w1, w2, float(sim)))
                if rel:
                    self.rel_pairs.append((w1, w2, float(rel)))

    def single_test(
            self,
            pairs: list,
            sp_model: spm.SentencePieceProcessor,
            w2v_model: Word2Vec,
            utterances: WordToUtteranceMapping
    ):
        scores = {
            test_set: {
                method: [] for method in ["min", "max", "avg", "all"]
            } for test_set in ["librispeech", "synthetic"]
        }
        gold_standard = {
            "librispeech": [],
            "synthetic": []
        }
        trials = 0
        errors = 0

        for pair in pairs:
            try:
                w1, w2, rel = pair

                test_set = "librispeech" if w1.startswith("ls_") else "synthetic"
                w1.replace("ls_", "").replace("sy_", "")
                w2.replace("ls_", "").replace("sy_", "")

                w1_vectors = utterances.get_vectors_from_word(w1, sp_model, w2v_model)
                w2_vectors = utterances.get_vectors_from_word(w2, sp_model, w2v_model)

                similarities = [cosine_similarity(i, j) for i in w1_vectors for j in w2_vectors]

                scores[test_set]["min"].append(np.min(similarities))
                scores[test_set]["avg"].append(np.mean(similarities))
                scores[test_set]["max"].append(np.max(similarities))

                gold_standard[test_set].append(rel)
            except:
                errors += 1
            trials += 1
        
        return {
            'score' : {
                dataset : {
                    variant : pearsonr(scores[dataset][variant], gold_standard[dataset])[0] * 100
                    for variant in scores[dataset].keys()
                }
                for dataset in scores.keys()
            },
            'errors' : errors,
            'trials' : trials
        }


    def score(
            self,
            sp_model: spm.SentencePieceProcessor,
            w2v_model: Word2Vec,
            utterances: WordToUtteranceMapping
    ) -> dict:
        tests = {'sim' : self.sim_pairs, 'rel' : self.rel_pairs}

        return {
            test : self.single_test(
                tests[test],
                sp_model,
                w2v_model,
                utterances
            ) for test in tests
        }
        

In [8]:
class LevelwiseModels:
    def __init__(
            self,
            model_dir: str = "models/level_wise/",
            data_dir: str = "data/level_wise/",
            log_dir: str = "logs/level_wise/"
    ):
        self.n_levels = 0

        self.model_dir = model_dir
        self.data_dir = data_dir
        self.log_dir = log_dir

        self.logger = logging.getLogger("LevelwiseModels")
        self.logger.setLevel("DEBUG")
        if log_dir is not None:
            log_fh = logging.FileHandler(self.log_dir +"log.txt")
            log_fh.setLevel("DEBUG")
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            log_fh.setFormatter(formatter)
            self.logger.addHandler(log_fh)

        self.sp_models = []
        self.w2v_models = []
        self.clusters = []
        self.mappings = []

    def _create_level_folders(
            self,
            level: int
    ):
        data_dir = f"{self.data_dir}/level{level}"
        model_dir = f"{self.model_dir}/level{level}"
        makedirs(data_dir)
        makedirs(model_dir)
        return data_dir, model_dir

    def train(
            self,
            input_file: str,
            n_levels: int = 1,
            configs: list[Config] = None
    ) -> None:
        """
        Main training function

        Input
        -----
            input_file : str
                Filepath of the input. The file must contain a sentence
                per line, with maximum line length of 5000.
            configs : list
                Arguments for SentencePiece and Word2Vec at each level.
        """
        for level in range(1, n_levels + 1):
            self.logger.info(f"STARTING LEVEL {level}")

            # Create folder for level files
            data_dir, model_dir = self._create_level_folders(level)
            self.logger.info("Created level folders.")

            # Load run config
            config = Config()
            if len(configs) >= level:
                config = configs[level]

            # Train SentencePiece
            spm.SentencePieceTrainer.train(
                f"--input={input_file} " \
                f"--model_type={config.sp_config.model_type} " \
                f"--model_prefix={config.sp_config.get_model_name(model_dir=model_dir)} " \
                f"--vocab_size={config.sp_config.vocab_size} " \
                f"--max_sentence_length={config.sp_config.max_sentence_length} " \
                f"--train_extremely_large_corpus"
            )
            self.logger.info("Created SentencePiece model - "
                             f"{config.sp_config.get_model_name(model_dir=model_dir)}.")

            # Load SentencePiece model
            sp_model = spm.SentencePieceProcessor()
            sp_model.load(f"{config.sp_config.get_model_name(model_dir=model_dir)}.model")


            # Convert input file to sentences
            with open(input_file, "r", encoding="utf-8") as corpus_file:
                corpus = corpus_file.readlines()

            sentences = []
            for sentence in corpus:
                pieces = list(
                    filter(
                        lambda x: x != "_",
                        sp_model.EncodeAsPieces(sentence)
                    )
                )

                new_pieces = [piece.replace("_", "") for piece in pieces]
                sentences.append(new_pieces)
            self.logger.info(f"Converted input to sentences (Sample = {sentences[0][:10]})")


            # Train Word2Vec
            w2v_model = Word2Vec(
                sentences,
                window=config.w2v_config.window,
                vector_size=config.w2v_config.vector_size,
                min_count=0,
                workers=4,
                epochs=7
            )
            w2v_model.save(config.w2v_config.get_model_path(model_dir=model_dir))


            # Perform clustering
            cluster = Cluster(model=w2v_model, threshold=config.cluster_threshold)

            # Save next level corpus
            with open(f"{data_dir}/corpus.txt", "w+", encoding="utf-8") as corpus_fp:
                for line in sentences:
                    corpus_fp.write(
                        "".join(
                            cluster.word_to_cluster[piece] for piece in line
                        ) + "\n"
                    )
            self.logger.info(f"Created level {level} corpus.")


            

            # Save all files
            cluster.save_mapping()
