# modules

## setup

### imports

In [None]:
import os
import pickle
import random
from functools import partial
from typing import TypeAlias

import hnswlib
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import spacy
from gensim.models import Word2Vec
from scipy.linalg import orthogonal_procrustes
from sklearn.manifold import TSNE
from sklearn.metrics.pairwise import cosine_similarity

### config

In [None]:
ENABLE_TEST = True

MODELS_WORD2VEC_FOLDER = "/veld/input/models/"
TEXTS_FOLDER = "/veld/input/texts/"
PICKLE_FOLDER = "/veld/storage/cache/"

INDEX_EF_CONSTRUCTION = 100
INDEX_M = 16

nlp = spacy.load("de_core_news_sm")
random.seed(42)

### load_cache_or_run

In [None]:
def load_cache_or_run(func):
    pickle_file_path = PICKLE_FOLDER + func.__name__ + ".pkl"
    if os.path.exists(pickle_file_path):
        with open(pickle_file_path, "rb") as f:
            result = pickle.load(f)
            print("load_cache_or_run: loaded from cache at:", pickle_file_path)
    else:
        result = func()
        with open(pickle_file_path, "wb") as f:
            pickle.dump(result, f)
            print("load_cache_or_run: persisted into cache at:", pickle_file_path)
    return result

### load_if_test

In [None]:
def load_if_test(func):
    if ENABLE_TEST:
        return load_cache_or_run(func)

## data structures

### tpye aliases

In [None]:
Lemma: TypeAlias = str
Word: TypeAlias = str
Decade: TypeAlias = int
LineNumber: TypeAlias = int
WordNumber: TypeAlias = int
OccurrenceCount: TypeAlias = int
Diff: TypeAlias = float
Embedding: TypeAlias = np.ndarray
Model: TypeAlias = Word2Vec
DecadeList: TypeAlias = list[Decade]

# lemma data structures
LemmaDiffDict: TypeAlias = dict[Lemma, Diff]
LineNumberDict: TypeAlias = dict[LineNumber, list[WordNumber]]
LemmaOccurrencePositionDict: TypeAlias = dict[Lemma, LineNumberDict]
LemmaOccurrenceCountDict: TypeAlias = dict[Lemma, OccurrenceCount]
LemmaWordDict: TypeAlias = dict[Lemma, list[Word]]
WordLemmaDict: TypeAlias = dict[Word, Lemma]

# index data structure
IdToLemmaDict: TypeAlias = dict[int, Lemma]
LemmaToIdDict: TypeAlias = dict[Lemma, int]
Index: TypeAlias = tuple[hnswlib.Index, LemmaToIdDict, IdToLemmaDict]

### create_decades_list

In [None]:
def create_decades_list(folder: str, decade_start: int = 155, decade_end: int = 191) -> DecadeList:
    decade_list = []
    for model_file in os.listdir(folder):
        if model_file.endswith(".bin"):
            decade = int(model_file.split(".bin")[0])
            decade_list.append(decade)
    decade_list = sorted(decade_list)
    i_start = 0
    i_end = len(decade_list)
    for i, decade in enumerate(decade_list):
        if decade_start and decade_start == decade:
            i_start = i
        if decade_end and decade_end == decade:
            i_end = i + 1
    decade_list = decade_list[i_start:i_end]
    print("create_decades_list: decade_list:", decade_list)
    return decade_list


def create_decades_list_test():
    return create_decades_list(MODELS_WORD2VEC_FOLDER)


decade_list = load_if_test(create_decades_list_test)

### load_model

In [None]:
def load_model(decade: Decade) -> Model:
    model_path = MODELS_WORD2VEC_FOLDER + str(decade) + ".bin"
    model = Word2Vec.load(model_path)
    print("load_model_word2vec: model_path:", model_path)
    print("load_model_word2vec: model:", model)
    return model


def load_model_test():
    model_174 = load_model(174)
    model_175 = load_model(175)
    model_176 = load_model(176)
    return model_174, model_175, model_176


model_174, model_175, model_176 = load_if_test(load_model_test)

### create_lemma_word_dicts

In [None]:
def create_lemma_word_dicts(model: Model) -> tuple[LemmaWordDict, WordLemmaDict]:
    word_lemma_dict = {}
    lemma_word_dict = {}
    for word in model.wv.index_to_key:
        lemma = nlp(word)[0].lemma_
        word_list = lemma_word_dict.get(lemma, [])
        word_list.append(word)
        lemma_word_dict[lemma] = word_list
        word_lemma_dict[word] = lemma
    print("create_lemma_word_dicts: len(lemma_word_dict):", len(lemma_word_dict))
    return lemma_word_dict, word_lemma_dict


def create_lemma_word_dicts_test():
    lemma_word_dict_174, word_lemma_dict_174 = create_lemma_word_dicts(model_174)
    lemma_word_dict_175, word_lemma_dict_175 = create_lemma_word_dicts(model_175)
    lemma_word_dict_176, word_lemma_dict_176 = create_lemma_word_dicts(model_176)
    return lemma_word_dict_174, word_lemma_dict_174, lemma_word_dict_175, word_lemma_dict_175, lemma_word_dict_176, word_lemma_dict_176


lemma_word_dict_174, word_lemma_dict_174, lemma_word_dict_175, word_lemma_dict_175, lemma_word_dict_176, word_lemma_dict_176 = load_if_test(
    create_lemma_word_dicts_test
)

### create_occurrence_dicts

In [None]:
def create_occurrence_dicts(decade: Decade, word_lemma_dict: WordLemmaDict) -> tuple[LemmaOccurrencePositionDict, LemmaOccurrenceCountDict]:
    lemma_occurrence_position_dict = {}
    lemma_occurrence_count_dict = {}
    total_occurrence_count = 0
    with open(TEXTS_FOLDER + str(decade) + ".txt", "r") as f:
        for line_number, line in enumerate(f):
            for word_number, word in enumerate(line.rstrip("\n").split(" ")):
                lemma = word_lemma_dict.get(word)
                if lemma:
                    line_number_dict: LineNumberDict = lemma_occurrence_position_dict.get(lemma, {})
                    word_number_list: list[WordNumber] = line_number_dict.get(line_number, [])
                    word_number_list.append(word_number)
                    line_number_dict[line_number] = word_number_list
                    lemma_occurrence_position_dict[lemma] = line_number_dict
                    occurrence_count = lemma_occurrence_count_dict.get(lemma, 0)
                    lemma_occurrence_count_dict[lemma] = occurrence_count + 1
                    total_occurrence_count += 1
    lemma_count = len(lemma_occurrence_count_dict)
    occurrence_avg = total_occurrence_count / lemma_count
    median_pos = int(lemma_count / 2)
    occurrence_median = list(lemma_occurrence_count_dict.values())[median_pos]
    print("create_occurrence_dicts: lemma_count:", lemma_count)
    print("create_occurrence_dicts: total_occurrence_count:", total_occurrence_count)
    print("create_occurrence_dicts: occurrence_avg:", occurrence_avg)
    print("create_occurrence_dicts: occurrence_median:", occurrence_median)
    return lemma_occurrence_position_dict, lemma_occurrence_count_dict


def create_occurrence_dicts_test():
    lemma_occurrence_position_dict_174, lemma_occurrence_count_dict_174 = create_occurrence_dicts(174, word_lemma_dict_174)
    lemma_occurrence_position_dict_175, lemma_occurrence_count_dict_175 = create_occurrence_dicts(175, word_lemma_dict_175)
    lemma_occurrence_position_dict_176, lemma_occurrence_count_dict_176 = create_occurrence_dicts(176, word_lemma_dict_176)
    return (
        lemma_occurrence_position_dict_174,
        lemma_occurrence_count_dict_174,
        lemma_occurrence_position_dict_175,
        lemma_occurrence_count_dict_175,
        lemma_occurrence_position_dict_176,
        lemma_occurrence_count_dict_176,
    )


(
    lemma_occurrence_position_dict_174,
    lemma_occurrence_count_dict_174,
    lemma_occurrence_position_dict_175,
    lemma_occurrence_count_dict_175,
    lemma_occurrence_position_dict_176,
    lemma_occurrence_count_dict_176,
) = load_if_test(create_occurrence_dicts_test)

### sort_lemma_dict_by_value_to_list

In [None]:
def sort_lemma_dict_by_value_to_list(lemma_dict: dict[Lemma, int | float], desc=True) -> list[Lemma]:
    if desc:
        sort_mod = -1
    else:
        sort_mod = 1
    lemma_list_sorted = [(lemma, value) for lemma, value in lemma_dict.items()]
    lemma_list_sorted = [l[0] for l in sorted(lemma_list_sorted, key=lambda x: sort_mod * x[1])]
    print("sort_lemma_occurrence_count_dict: len(lemma_list_sorted):", len(lemma_list_sorted))
    return lemma_list_sorted


def sort_lemma_dict_by_value_to_list_test():
    lemma_list_sorted_174 = sort_lemma_dict_by_value_to_list(lemma_occurrence_count_dict_174)
    lemma_list_sorted_175 = sort_lemma_dict_by_value_to_list(lemma_occurrence_count_dict_175)
    lemma_list_sorted_176 = sort_lemma_dict_by_value_to_list(lemma_occurrence_count_dict_176)
    return lemma_list_sorted_174, lemma_list_sorted_175, lemma_list_sorted_176


lemma_list_sorted_174, lemma_list_sorted_175, lemma_list_sorted_176 = load_if_test(sort_lemma_dict_by_value_to_list_test)

### sort_lemma_dict_by_value_to_dict

In [None]:
def sort_lemma_dict_by_value_to_dict(lemma_dict: dict[Lemma, int | float], desc=True) -> dict:
    lemma_list_sorted = sort_lemma_dict_by_value_to_list(lemma_dict, desc)
    lemma_dict_new = {}
    for lemma in lemma_list_sorted:
        lemma_dict_new[lemma] = lemma_dict[lemma]
    return lemma_dict_new


def sort_lemma_dict_by_value_to_dict_test():
    global lemma_occurrence_count_dict_174
    global lemma_occurrence_count_dict_175
    global lemma_occurrence_count_dict_176
    lemma_occurrence_count_dict_174 = sort_lemma_dict_by_value_to_dict(lemma_occurrence_count_dict_174)
    lemma_occurrence_count_dict_175 = sort_lemma_dict_by_value_to_dict(lemma_occurrence_count_dict_175)
    lemma_occurrence_count_dict_176 = sort_lemma_dict_by_value_to_dict(lemma_occurrence_count_dict_176)
    return lemma_occurrence_count_dict_174, lemma_occurrence_count_dict_175, lemma_occurrence_count_dict_176


lemma_occurrence_count_dict_174, lemma_occurrence_count_dict_175, lemma_occurrence_count_dict_176 = load_if_test(
    sort_lemma_dict_by_value_to_dict_test
)

### sort_by_occurrence

In [None]:
def sort_by_occurrence(lemma_list_sorted: list[Lemma], sortable_lemma_dict: dict) -> dict:
    sortable_lemma_dict_new = {}
    for lemma in lemma_list_sorted:
        sortable_lemma_dict_new[lemma] = sortable_lemma_dict[lemma]
    print("sort_by_occurrence: len(sortable_lemma_dict_new):", len(sortable_lemma_dict_new))
    return sortable_lemma_dict_new


def sort_by_occurrence_test():
    global lemma_occurrence_position_dict_174
    global lemma_occurrence_position_dict_175
    global lemma_occurrence_position_dict_176
    global lemma_word_dict_174
    global lemma_word_dict_175
    global lemma_word_dict_176
    lemma_occurrence_position_dict_174 = sort_by_occurrence(lemma_list_sorted_174, lemma_occurrence_position_dict_174)
    lemma_occurrence_position_dict_175 = sort_by_occurrence(lemma_list_sorted_175, lemma_occurrence_position_dict_175)
    lemma_occurrence_position_dict_176 = sort_by_occurrence(lemma_list_sorted_176, lemma_occurrence_position_dict_176)
    lemma_word_dict_174 = sort_by_occurrence(lemma_list_sorted_174, lemma_word_dict_174)
    lemma_word_dict_175 = sort_by_occurrence(lemma_list_sorted_175, lemma_word_dict_175)
    lemma_word_dict_176 = sort_by_occurrence(lemma_list_sorted_176, lemma_word_dict_176)
    return (
        lemma_occurrence_position_dict_174,
        lemma_occurrence_position_dict_175,
        lemma_occurrence_position_dict_176,
        lemma_word_dict_174,
        lemma_word_dict_175,
        lemma_word_dict_176,
    )


(
    lemma_occurrence_position_dict_174,
    lemma_occurrence_position_dict_175,
    lemma_occurrence_position_dict_176,
    lemma_word_dict_174,
    lemma_word_dict_175,
    lemma_word_dict_176,
) = load_if_test(sort_by_occurrence_test)

### get_occurrences

In [None]:
def get_occurrences(
    decade: Decade,
    line_number_dict: LineNumberDict,
    max_elem: int = None,
    highlight_lemma: bool = True,
    keep_lemma: bool = True,
) -> list[str]:
    text_list = []
    with open(TEXTS_FOLDER + str(decade) + ".txt", "r") as f:
        num_print = 0
        for line_number, line in enumerate(f):
            word_number_list = line_number_dict.get(line_number)
            if word_number_list:
                word_number_set = set(word_number_list)
                text = ""
                for word_number, word in enumerate(line.rstrip("\n").split(" ")):
                    if word_number in word_number_set and highlight_lemma and keep_lemma:
                        text += " ### " + word + " ###"
                    elif word_number not in word_number_set or (not highlight_lemma and keep_lemma):
                        text += " " + word
                    else:
                        pass
                text_list.append(text)
                num_print += 1
                if max_elem and num_print == max_elem:
                    break
    return text_list


if ENABLE_TEST:
    print(get_occurrences(175, lemma_occurrence_position_dict_175["gehen"], max_elem=1))
    print(get_occurrences(175, lemma_occurrence_position_dict_175["gehen"], max_elem=1, highlight_lemma=False))
    print(get_occurrences(175, lemma_occurrence_position_dict_175["gehen"], max_elem=1, keep_lemma=False))
    print(len(get_occurrences(175, lemma_occurrence_position_dict_175["gehen"], max_elem=None, highlight_lemma=False)))

### create_index

In [None]:
def create_index(lemma_word_dict: LemmaWordDict, model: Model) -> Index:
    id_to_lemma_dict: IdToLemmaDict = {}
    lemma_to_id_dict: LemmaToIdDict = {}
    embedding_array = []
    for lemma_id, (lemma, word_list) in enumerate(lemma_word_dict.items()):
        id_to_lemma_dict[lemma_id] = lemma
        lemma_to_id_dict[lemma] = lemma_id
        word_embedding_list = [model.wv[word] for word in word_list]
        embedding_average = np.mean(np.array(word_embedding_list), axis=0)
        embedding_normalized = embedding_average / np.linalg.norm(embedding_average)
        embedding_array.append(embedding_normalized)
    embedding_array = np.array(embedding_array)
    max_elements = len(embedding_array)
    dim = embedding_array[0].shape[0]
    hnsw_index = hnswlib.Index(space="cosine", dim=dim)
    hnsw_index.init_index(max_elements=max_elements, ef_construction=INDEX_EF_CONSTRUCTION, M=INDEX_M)
    hnsw_index.add_items(embedding_array, ids=list(id_to_lemma_dict.keys()))
    index = (hnsw_index, lemma_to_id_dict, id_to_lemma_dict)
    print("create_lemma_dict_and_index: hnsw_index.get_current_count:", hnsw_index.get_current_count())
    return index


def create_index_test():
    index_174 = create_index(lemma_word_dict_174, model_174)
    index_175 = create_index(lemma_word_dict_175, model_175)
    index_176 = create_index(lemma_word_dict_176, model_176)
    return index_174, index_175, index_176


index_174, index_175, index_176 = load_if_test(create_index_test)

## vector and index functions

### get_common_lemma

In [None]:
def get_common_lemma(*lemma_dict_list):
    lemma_set_list = []
    for lemma_dict in lemma_dict_list:
        lemma_set_list.append(set(lemma_dict.keys()))
    return set.intersection(*lemma_set_list)


if ENABLE_TEST:
    print(get_common_lemma({"a": 1, "b": 2}, {"a": 1, "c": 3}, {"a": 1, "b": 2, "c": 3}))

### calculate_cos_sim

In [None]:
def calculate_cos_sim(embedding_a: np.ndarray, embedding_b: np.ndarray) -> float:
    return np.dot(embedding_a, embedding_b) / (np.linalg.norm(embedding_a) * np.linalg.norm(embedding_b))

### calculate_cos_distance

In [None]:
def calculate_cos_distance(embedding_a: np.ndarray, embedding_b: np.ndarray) -> float:
    return 1 - calculate_cos_sim(embedding_a, embedding_b)

### query_embedding

In [None]:
def query_embedding(index: Index, lemma: Lemma) -> float:
    embedding = index[0].get_items([index[1][lemma]])[0]
    return embedding


if ENABLE_TEST:
    embedding = query_embedding(index_175, "gehen")
    print(embedding.shape)

### query_related

In [None]:
def query_related(
    index: Index,
    lemma: Lemma,
    n: int = 10,
    return_as_dict: bool = True,
    keep_search: bool = False,
) -> dict[str, float] | list[str]:
    result = None
    try:
        id_embedding = index[1][lemma]
    except:
        print("not found")
    else:
        if keep_search:
            distances_start = 0
        else:
            n += 1
            distances_start = 1
        embedding = index[0].get_items([id_embedding])[0]
        ids, distances = index[0].knn_query(embedding.reshape(1, -1), k=n)
        if return_as_dict:
            result = {}
        else:
            result = []
        for id_other, distance in list(zip(ids[0], distances[0]))[distances_start:]:
            lemma_related = index[2][id_other]
            if return_as_dict:
                result[lemma_related] = distance
            else:
                result.append(lemma_related)
    return result


if ENABLE_TEST:
    print(query_related(index_175, "gehen", n=10))
    print(query_related(index_176, "gehen", n=10))
    print(query_related(index_176, "gehen", n=10, return_as_dict=False, keep_search=True))

### calculate_average_sentence_embedding_from_sentence

In [None]:
def calculate_average_sentence_embedding_from_sentence(
    sentence: str,
    index: Index,
    word_lemma_dict: WordLemmaDict,
    show_exception: bool = False,
) -> np.ndarray:
    embedding_list = []
    for word in sentence.split(" "):
        try:
            embedding_list.append(query_embedding(index, word_lemma_dict[word]))
        except Exception as ex:
            if show_exception:
                print(ex, word)
    embedding_avg = np.mean(np.array(embedding_list), axis=0)
    return embedding_avg


if ENABLE_TEST:
    v1 = calculate_average_sentence_embedding_from_sentence(
        "der mensch ist in dem haus", index_175, word_lemma_dict_174, show_exception=True
    )
    v2 = calculate_average_sentence_embedding_from_sentence(
        "der mann ist in der hütte", index_175, word_lemma_dict_175, show_exception=True
    )
    v3 = calculate_average_sentence_embedding_from_sentence(
        "die ziege ist auf dem feld", index_175, word_lemma_dict_175, show_exception=True
    )
    print(calculate_cos_sim(v1, v2))
    print(calculate_cos_sim(v2, v3))

### calculate_average_sentence_embedding_from_lemma

In [None]:
def calculate_average_sentence_embedding_from_lemma(
    decade: Decade,
    line_number_dict: LineNumberDict,
    index: Index,
    word_lemma_dict: WordLemmaDict,
    max_sentences: int = 2,
) -> np.ndarray:
    sentence_list = get_occurrences(decade, line_number_dict, max_elem=max_sentences, highlight_lemma=False)
    sentence_embedding_dict = {}
    for sentence in sentence_list:
        sentence_embedding_dict[sentence] = calculate_average_sentence_embedding_from_sentence(sentence, index, word_lemma_dict)
    return sentence_embedding_dict


if ENABLE_TEST:
    sentence_embedding_dict = calculate_average_sentence_embedding_from_lemma(
        175,
        lemma_occurrence_position_dict_175["gehen"],
        index_175,
        word_lemma_dict_175,
        max_sentences=1,
    )
    for sentence, embedding in sentence_embedding_dict.items():
        print(sentence)
        print(embedding.shape)

### create_tsne

In [None]:
def create_tsne(embeddings, perplexity=None):
    if perplexity is None:
        perplexity = 5
    tsne = TSNE(n_components=2, perplexity=perplexity, random_state=42)
    embeddings_reduced = tsne.fit_transform(np.array(embeddings))
    return embeddings_reduced


if ENABLE_TEST:
    embeddings_reduced = create_tsne([query_embedding(index_175, "gehen"), query_embedding(index_175, "laufen")], perplexity=1)
    print(embeddings_reduced)

## difference analysis functions

### create_diff_of_lemma

In [None]:
def create_diff_of_lemma(
    index_a: Index,
    index_b: Index,
    lemma_occurrence_count_dict_a: LemmaOccurrenceCountDict,
    lemma_occurrence_count_dict_b: LemmaOccurrenceCountDict,
    lemma: Lemma,
) -> float:
    diff = None

    # key and dict synchronization
    distances_a_dict = query_related(index_a, lemma, n=100)
    distances_b_dict = query_related(index_b, lemma, n=100)
    embedding_index_a_lemma = query_embedding(index_a, lemma)
    embedding_index_b_lemma = query_embedding(index_b, lemma)
    if distances_a_dict and distances_b_dict:
        distances_a_lemma_set = set(distances_a_dict.keys())
        distances_b_lemma_set = set(distances_b_dict.keys())
        lemma_all = set()
        for lemma_a in distances_a_lemma_set:
            is_in_both = True
            if lemma_a not in distances_b_lemma_set:
                try:
                    embedding_index_b_lemma_a = query_embedding(index_b, lemma_a)
                except:
                    is_in_both = False
                else:
                    distances_b_dict[lemma_a] = calculate_cos_distance(embedding_index_a_lemma, embedding_index_b_lemma_a)
            if is_in_both:
                lemma_all.add(lemma_a)
        for lemma_b in distances_b_lemma_set:
            is_in_both = True
            if lemma_b not in distances_a_lemma_set:
                try:
                    embedding_index_a_lemma_b = query_embedding(index_a, lemma_b)
                except:
                    is_in_both = False
                else:
                    distances_a_dict[lemma_b] = calculate_cos_distance(embedding_index_b_lemma, embedding_index_a_lemma_b)
            if is_in_both:
                lemma_all.add(lemma_b)

        # difference calculation
        diff = 0
        for lemma_related in lemma_all:
            distance_a = distances_a_dict[lemma_related]
            distance_b = distances_b_dict[lemma_related]
            occurrence_count_avg = (lemma_occurrence_count_dict_a[lemma_related] + lemma_occurrence_count_dict_b[lemma_related]) / 2
            diff += (abs(distance_a - distance_b) / np.sqrt(occurrence_count_avg)) * occurrence_count_avg
        diff /= len(lemma_all)

    return diff


if ENABLE_TEST:
    print(create_diff_of_lemma(index_174, index_175, lemma_occurrence_count_dict_174, lemma_occurrence_count_dict_175, "gehen"))
    print(create_diff_of_lemma(index_175, index_176, lemma_occurrence_count_dict_175, lemma_occurrence_count_dict_176, "brauen"))

### create_diff_dict_from_index

In [None]:
def create_diff_dict_from_index(
    index_a: Index,
    index_b: Index,
    lemma_occurrence_count_dict_a: LemmaOccurrenceCountDict,
    lemma_occurrence_count_dict_b: LemmaOccurrenceCountDict,
    min_occurrence: int = None,
    max_occurrence: int = None,
) -> LemmaDiffDict:
    diff_list = []
    lemma_common = get_common_lemma(index_a[1], index_b[1])
    for lemma in lemma_common:
        count_a = lemma_occurrence_count_dict_a[lemma]
        count_b = lemma_occurrence_count_dict_b[lemma]
        if (min_occurrence is None or (count_a >= min_occurrence and count_b >= min_occurrence)) and (
            max_occurrence is None or (count_a <= max_occurrence and count_b <= max_occurrence)
        ):
            diff = create_diff_of_lemma(index_a, index_b, lemma_occurrence_count_dict_a, lemma_occurrence_count_dict_b, lemma)
            diff_list.append((lemma, diff))
    diff_list = sorted(diff_list, key=lambda x: -x[1])
    lemma_diff_dict = {lemma: diff for lemma, diff in diff_list}
    print("create_diff_dict_from_index: len(lemma_diff_dict):", len(lemma_diff_dict))
    return lemma_diff_dict


def create_diff_dict_from_index_test():
    lemma_diff_174_175_dict = create_diff_dict_from_index(
        index_174,
        index_175,
        lemma_occurrence_count_dict_174,
        lemma_occurrence_count_dict_175,
        min_occurrence=100,
        max_occurrence=1000,
    )
    lemma_diff_175_176_dict = create_diff_dict_from_index(
        index_175,
        index_176,
        lemma_occurrence_count_dict_175,
        lemma_occurrence_count_dict_176,
        min_occurrence=100,
        max_occurrence=1000,
    )
    return lemma_diff_174_175_dict, lemma_diff_175_176_dict


lemma_diff_174_175_dict, lemma_diff_175_176_dict = load_if_test(create_diff_dict_from_index_test)

### create_procrustes_alignment

In [None]:
def create_procrustes_alignment(
    index_a: Index,
    index_b: Index,
    lemma_occurrence_count_dict_a: LemmaOccurrenceCountDict,
    lemma_occurrence_count_dict_b: LemmaOccurrenceCountDict,
):

    # create overlap matrices with embeddings weighted by count of occurrence
    common_lemma = get_common_lemma(lemma_occurrence_count_dict_a, lemma_occurrence_count_dict_b)
    overlap_matrix_a = []
    overlap_matrix_b = []
    print("create_procrustes_alignment: len(common_lemma):", len(common_lemma))
    for lemma in common_lemma:
        occurrence_count_sqrt = np.sqrt((lemma_occurrence_count_dict_a[lemma] + lemma_occurrence_count_dict_b[lemma]) / 2)
        embedding_a = query_embedding(index_a, lemma) * occurrence_count_sqrt
        embedding_b = query_embedding(index_b, lemma) * occurrence_count_sqrt
        overlap_matrix_a.append(embedding_a)
        overlap_matrix_b.append(embedding_b)
    overlap_matrix_a = np.stack(overlap_matrix_a)
    overlap_matrix_b = np.stack(overlap_matrix_b)

    # do procrustes transformation
    r, _ = orthogonal_procrustes(overlap_matrix_b, overlap_matrix_a)
    matrix_b = []
    index_b_hnsw, lemma_to_id_b, id_to_lemma_b = index_b
    index_b_id_to_lemma_keys = list(id_to_lemma_b.keys())
    for i in index_b_id_to_lemma_keys:
        embedding_b = index_b_hnsw.get_items([i])[0]
        matrix_b.append(embedding_b)
    matrix_b = np.stack(matrix_b)
    matrix_b_aligned = matrix_b @ r
    matrix_b_aligned_normalized = matrix_b_aligned / np.linalg.norm(matrix_b_aligned, axis=1, keepdims=True)
    print("create_procrustes_alignment: matrix_b_aligned.shape:", matrix_b_aligned.shape)

    # create new index data structure
    index_b_aligned = hnswlib.Index(space="cosine", dim=index_b[0].dim)
    index_b_aligned.init_index(max_elements=index_b[0].get_max_elements(), ef_construction=INDEX_EF_CONSTRUCTION, M=INDEX_M)
    index_b_aligned.add_items(matrix_b_aligned_normalized, index_b_id_to_lemma_keys)

    return (index_b_aligned, index_b[1], index_b[2])


def create_procrustes_alignment_test():
    index_aligned_175 = create_procrustes_alignment(
        index_174,
        index_175,
        lemma_occurrence_count_dict_174,
        lemma_occurrence_count_dict_175,
    )
    index_aligned_176 = create_procrustes_alignment(
        index_aligned_175,
        index_176,
        lemma_occurrence_count_dict_175,
        lemma_occurrence_count_dict_176,
    )
    return index_aligned_175, index_aligned_176


index_aligned_175, index_aligned_176 = load_if_test(create_procrustes_alignment_test)
if ENABLE_TEST:
    for lemma in ["gehen", "und", "wohnen", "brauen", "Fürst"]:
        print("create_procrustes_alignment: lemma:", lemma)
        embedding_174 = query_embedding(index_174, lemma)
        embedding_175 = query_embedding(index_175, lemma)
        embedding_176 = query_embedding(index_176, lemma)
        embedding_aligned_175 = query_embedding(index_aligned_175, lemma)
        embedding_aligned_176 = query_embedding(index_aligned_176, lemma)
        cos_sim_174_175 = calculate_cos_sim(embedding_174, embedding_175)
        cos_sim_175_176 = calculate_cos_sim(embedding_175, embedding_176)
        cos_sim_aligned_174_175 = calculate_cos_sim(embedding_174, embedding_aligned_175)
        cos_sim_aligned_175_176 = calculate_cos_sim(embedding_aligned_175, embedding_aligned_176)
        print("create_procrustes_alignment:", "cos_sim_174_175:", cos_sim_174_175)
        print("create_procrustes_alignment:", "cos_sim_175_176:", cos_sim_175_176)
        print("create_procrustes_alignment:", "cos_sim_aligned_174_175:", cos_sim_aligned_174_175)
        print("create_procrustes_alignment:", "cos_sim_aligned_175_176:", cos_sim_aligned_175_176)

### calculate_trajectory_from_diff_per_lemma

In [None]:
def calculate_trajectory_from_diff_per_lemma(index_a: Index, index_b: Index, index_c: Index, lemma: Lemma):
    a = query_embedding(index_a, lemma)
    b = query_embedding(index_b, lemma)
    c = query_embedding(index_c, lemma)
    ab = a - b
    bc = b - c
    ab_bc_trajectory = np.dot(ab, bc)
    return ab_bc_trajectory


if ENABLE_TEST:
    for lemma in ["gehen", "und", "wohnen", "brauen", "Fürst"]:
        ab_bc_trajectory = calculate_trajectory_from_diff_per_lemma(index_174, index_aligned_175, index_aligned_176, lemma)
        print("calculate_trajectory_from_diff_per_lemma:", lemma, ab_bc_trajectory)

### calculate_trajectory_from_diff_all

In [None]:
def calculate_trajectory_from_diff_all(index_a: Index, index_b: Index, index_c: Index):
    lemma_trajectory_diff_list = []
    common_lemma = get_common_lemma(index_a[1], index_b[1], index_c[1])
    for lemma in common_lemma:
        lemma_trajectory_diff = calculate_trajectory_from_diff_per_lemma(index_a, index_b, index_c, lemma)
        lemma_trajectory_diff_list.append((lemma, lemma_trajectory_diff))
    lemma_trajectory_diff_list = sorted(lemma_trajectory_diff_list, key=lambda x: -x[1])
    lemma_trajectory_diff_dict = {l: d for l, d in lemma_trajectory_diff_list}
    return lemma_trajectory_diff_dict


if ENABLE_TEST:
    lemma_trajectory_diff_dict_174_175_176 = calculate_trajectory_from_diff_all(index_174, index_aligned_175, index_aligned_176)
    print("len(lemma_trajectory_diff_dict_174_175_176)", len(lemma_trajectory_diff_dict_174_175_176))

In [None]:
def normalize_lemma_value_dict(lemma_value_dict):
    move = min(lemma_value_dict.values())
    lemma_value_dict_normalized = {
        lemma: diff - move
        for lemma, diff in lemma_value_dict.items()
    }
    scale = 2 / max(lemma_value_dict_normalized.values())
    lemma_value_dict_normalized = {
        lemma: diff * scale
        for lemma, diff in lemma_value_dict_normalized.items()
    }
    lemma_value_dict_normalized = {
        lemma: diff - 1
        for lemma, diff in lemma_value_dict_normalized.items()
    }
    print("normalize_lemma_value_dict: len(lemma_value_dict_normalized):", len(lemma_value_dict_normalized))
    return lemma_value_dict_normalized


if ENABLE_TEST:
    lemma_diff_175_176_dict_normalized = normalize_lemma_value_dict(lemma_diff_175_176_dict)
    lemma_trajectory_diff_dict_174_175_176_normalized = normalize_lemma_value_dict(lemma_trajectory_diff_dict_174_175_176)

## plotting

### plot_tsne_from_labels_embeddings

In [None]:
def plot_tsne_from_labels_embeddings(
    labels: list[str],
    embeddings: list[np.ndarray],
    title: str = None,
    height: int = None,
    width: int = None,
    rotation_degree: int = None,
    perplexity: int = None,
):
    reduced_vectors_tsne = create_tsne(embeddings, perplexity)

    if rotation_degree:
        angle_rad = np.deg2rad(-rotation_degree)
        rotation_matrix = np.array([[np.cos(angle_rad), -np.sin(angle_rad)], [np.sin(angle_rad), np.cos(angle_rad)]])
        reduced_vectors_tsne = reduced_vectors_tsne @ rotation_matrix.T

    if height is None:
        height = 800
    if width is None:
        width = 800
    fig = px.scatter(
        x=reduced_vectors_tsne[:, 0],
        y=reduced_vectors_tsne[:, 1],
        text=labels,
        height=height,
        width=width,
        title=title,
    )
    fig.update_layout(xaxis=dict(title=None, showticklabels=False), yaxis=dict(title=None, showticklabels=False))
    fig.update_traces(
        marker=dict(size=10),
        textposition="bottom center",
        textfont=dict(size=12),
    )
    fig.show()


if ENABLE_TEST:
    lemma_list = query_related(index_175, "gehen", keep_search=True, n=20, return_as_dict=False)
    embedding_list = []
    for lemma in lemma_list:
        embedding_list.append(query_embedding(index_175, lemma))
    plot_tsne_from_labels_embeddings(lemma_list, embedding_list)

### plot_tsne_from_lemma_and_related

In [None]:
def plot_tsne_from_lemma_and_related(
    index: Index,
    lemma: str = None,
    n: int = 100,
    title: str = None,
    height: int = None,
    width: int = None,
    rotation_degree: int = None,
):
    lemma_list = query_related(index, lemma, n=n, keep_search=True, return_as_dict=False)
    embedding_list = []
    for lemma in lemma_list:
        embedding_list.append(query_embedding(index_175, lemma))
    plot_tsne_from_labels_embeddings(lemma_list, embedding_list)


if ENABLE_TEST:
    plot_tsne_from_lemma_and_related(index_175, lemma="gehen", n=200)

### plot_lemma_with_value

In [None]:
def plot_lemma_with_value(lemma_value_dict: LemmaDiffDict, title: str = None):
    lemma_list = []
    diff_list = []
    for lemma, diff in lemma_value_dict.items():
        lemma_list.append(lemma)
        diff_list.append(diff)
    fig = px.scatter(x=lemma_list, y=diff_list, title=title)
    fig.update_layout(xaxis_title=None, yaxis_title=None)
    fig.show()


if ENABLE_TEST:
    plot_lemma_with_value(lemma_occurrence_count_dict_174, title="lemma occurrence count")
    plot_lemma_with_value(lemma_diff_174_175_dict, title="lemma relative diffs")
    plot_lemma_with_value(lemma_trajectory_diff_dict_174_175_176, title="lemma trajectory of diffs 174-175-176")

### plot_trajectory

In [None]:
def plot_trajectory(lemma_decade_embdding_dict: dict[Lemma, dict[Decade, Embedding]], perplexity=None):

    # prepare data
    global_labels_list = []
    global_embeddings_list = []
    group_end_position_list = []
    position_count = 0
    for lemma, decade_embedding_dict in lemma_decade_embdding_dict.items():
        for decade, embedding in decade_embedding_dict.items():
            global_labels_list.append(str(decade) + ":" + lemma)
            global_embeddings_list.append(embedding)
            position_count += 1
        group_end_position_list.append(position_count)
    if 1 < len(global_embeddings_list) < 6:
        perplexity = len(global_embeddings_list) - 1
    else:
        perplexity = None
    lemma_embeddings_reduced_array = create_tsne(global_embeddings_list, perplexity=perplexity)

    # create plot
    fig = go.Figure()
    group_start_position = 0
    for group_end_position in group_end_position_list:
        lemma_respective_embeddings = lemma_embeddings_reduced_array[group_start_position:group_end_position]
        fig.add_trace(
            go.Scatter(
                x=lemma_respective_embeddings[:, 0],
                y=lemma_respective_embeddings[:, 1],
                mode="lines",
            )
        )
        group_start_position = group_end_position
    fig.add_trace(
        go.Scatter(
            x=lemma_embeddings_reduced_array[:, 0],
            y=lemma_embeddings_reduced_array[:, 1],
            mode="markers+text",
            text=global_labels_list,
            textposition="top center",
        )
    )
    fig.update_layout(
        showlegend=False,
        width=800,
        height=800,
    )
    fig.show()


if ENABLE_TEST:
    lemma_list = ["der", "gefunden", "Nachmittag"]
    lemma_decade_embdding_dict = {}
    print("trajectories:")
    for lemma in lemma_list:
        print(lemma, lemma_trajectory_diff_dict_174_175_176[lemma])
        lemma_decade_embdding_dict[lemma] = {
            174: query_embedding(index_174, lemma),
            175: query_embedding(index_aligned_175, lemma),
            176: query_embedding(index_aligned_176, lemma),
        }
    plot_trajectory(lemma_decade_embdding_dict)

# aggregated analysis

# OLD

## plot_lemma_with_neighbours

In [None]:
def plot_lemma_with_neighbours(decade_dict_value, lemma, height=None, width=None, rotation_degree=None):
    neighbours_dict = query_related(decade_dict_value[1], lemma)
    # print(neighbours_dict)
    labels = [lemma]
    values = [decade_dict_value[0][lemma][1]]
    for neighbour in neighbours_dict.keys():
        labels.append(neighbour)
        values.append(decade_dict_value[0][neighbour][1])
    plot_tsne_from_labels_embeddings(labels, values, None, height, width, rotation_degree)


lemma = "gehen"
plot_lemma_with_neighbours(decade_dict[174], lemma, height=500, width=500, rotation_degree=None)
plot_lemma_with_neighbours(decade_dict[175], lemma, height=500, width=500, rotation_degree=180)
plot_lemma_with_neighbours(decade_dict[176], lemma, height=500, width=500, rotation_degree=220)

In [None]:
def create_global_word_set(model_list):
    word_set = set()
    for m in model_list:
        for w in m.wv.index_to_key:
            word_set.add(w)
    return word_set


word_set = create_global_word_set(list(model_dict.values()))
len(word_set)

In [None]:
def create_word_set_sample(word_set, model_list, limit, must_be_in_all_model):
    word_set_sample = set()
    num_found = 0
    word_list_shuffled = list(word_set)
    random.shuffle(word_list_shuffled)
    for w in word_list_shuffled:
        if num_found == limit:
            break
        else:
            skip = False
            if must_be_in_all_model:
                for m in model_list:
                    if w not in m.wv:
                        skip = True
                        break
            if not skip:
                word_set_sample.add(w)
                num_found += 1
    return word_set_sample


word_set_sample = create_word_set_sample(word_set, list(model_dict.values()), 100, False)
print(len(word_set_sample))

## plotting

## analysis functions

In [None]:
def calculate_cosine_similarity(vec1, vec2):
    return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))

In [None]:
def calculate_gini_coefficient(x):
    x = np.array(x, dtype=np.float64)
    n = len(x)
    x_sorted = np.sort(x)
    cum_x = np.cumsum(x_sorted)
    gini = (2 * np.sum((np.arange(1, n + 1) * x_sorted))) / (n * np.sum(x_sorted)) - (n + 1) / n
    return gini


calculate_gini_coefficient([1, 3, 2, 100, 2])

In [None]:
def get_diff_between_decades(w, model_prev, model_current, debug=False):
    try:
        v_prev = model_prev.wv[w]
        v_current = model_current.wv[w]
        word_set_top_10_prev = model_prev.wv.most_similar(w)
        word_set_top_10_current = model_current.wv.most_similar(w)
    except:
        return None

    word_set_both = set()
    for w_prev, _ in word_set_top_10_prev:
        word_set_both.add(w_prev)
    for w_current, _ in word_set_top_10_current:
        word_set_both.add(w_current)

    diff_per_decade = []
    for w_rel in word_set_both:
        try:
            v_rel_prev = model_prev.wv[w_rel]
            v_rel_current = model_current.wv[w_rel]
        except:
            pass
        else:
            cos_sim_prev = calculate_cosine_similarity(v_prev, v_rel_prev)
            cos_sim_current = calculate_cosine_similarity(v_current, v_rel_current)
            diff_prev_current = abs(cos_sim_prev - cos_sim_current)
            diff_per_decade.append(diff_prev_current)
            if debug:
                print(
                    "w_rel:",
                    w_rel,
                    "cos_sim_prev:",
                    cos_sim_prev,
                    "cos_sim_current:",
                    cos_sim_current,
                    "diff_prev_current:",
                    diff_prev_current,
                )
    if diff_per_decade:
        diff_per_decade = sum(diff_per_decade) / len(diff_per_decade)

        return diff_per_decade


get_diff_between_decades("gesetze", model_dict[178], model_dict[179], True)

In [None]:
def create_relative_history(word_set, model_dict, print_progress=False):
    len_total = len(word_set)
    if print_progress:
        len_segment = round(len_total / 100)
        print("len_total:", len_total)
        print("len_segment:", len_segment)
    word_diff_history = {}
    for i, w in enumerate(word_set):
        if print_progress and i % len_segment == 0:
            print("i:", i)
        model_prev = None
        diff_between_decade_dict = {}
        diff_between_decade_total = []
        for decade, model_current in model_dict.items():
            if model_prev:
                diff_between_decade = get_diff_between_decades(w, model_prev, model_current)
                if diff_between_decade:
                    diff_between_decade_dict[str(decade) + "0s"] = diff_between_decade
                    if 152 <= decade <= 191:
                        diff_between_decade_total.append(diff_between_decade)
            model_prev = model_current

        if diff_between_decade_total:
            diff_between_decade_avg = sum(diff_between_decade_total) / len(diff_between_decade_total)
            diff_between_decade_gini = calculate_gini_coefficient(diff_between_decade_total)
            word_diff_history[w] = {
                "diff_decade_avg": diff_between_decade_avg,
                "diff_decade_gini": diff_between_decade_gini,
                "diff_decade_dict": diff_between_decade_dict,
            }
    return word_diff_history


# word_diff_history = create_relative_history(word_set, model_dict, True)
# word_diff_history

In [None]:
# with open("/veld/output/word_diff_history.pkl", "wb") as f:
#    pickle.dump(word_diff_history, f)

with open("/veld/output/word_diff_history.pkl", "rb") as f:
    word_diff_history = pickle.load(f)

In [None]:
word_diff_history

In [None]:
def sort_word_diff_history(word_diff_history, sort_key):
    return dict(sorted(word_diff_history.items(), key=lambda item: -item[1][sort_key]))


word_diff_history_sorted_avg = sort_word_diff_history(word_diff_history, "diff_decade_avg")
word_diff_history_sorted_gini = sort_word_diff_history(word_diff_history, "diff_decade_gini")

In [None]:
for i, w_diff_history in enumerate(word_diff_history.items()):
    if i == 3:
        break
    print(w_diff_history)

print("------------")

for i, w_diff_history in enumerate(word_diff_history_sorted_avg.items()):
    if i == 3:
        break
    print(w_diff_history)

print("------------")

for i, w_diff_history in enumerate(word_diff_history_sorted_gini.items()):
    if i == 3:
        break
    print(w_diff_history)

In [None]:
def get_sample(
    word_diff_history,
    from_top,
    min_num_decades,
    max_sample,
    index_range=None,
    word_set=None,
):
    word_diff_history_sample = {}
    if from_top:
        direction = 1
    else:
        direction = -1
    if index_range:
        if not index_range[0]:
            index_range[0] = 0
        if not index_range[1]:
            index_range[1] = len(word_diff_history)
    num_found = 0
    for i, w_history in enumerate(list(word_diff_history.items())[::direction]):
        w = w_history[0]
        w_diff = w_history[1]
        if ((word_set and w in word_set) or not word_set) and ((index_range and index_range[0] <= i < index_range[1]) or not index_range):
            if num_found == max_sample:
                break
            if (min_num_decades and len(w_diff["diff_decade_dict"]) >= min_num_decades) or not min_num_decades:
                word_diff_history_sample[w] = w_diff
                num_found += 1
    return word_diff_history_sample

In [None]:
get_sample(
    word_diff_history_sorted_avg,
    from_top=True,
    min_num_decades=30,
    max_sample=3,
    index_range=[None, None],
    word_set=None,
)

In [None]:
def plot_history(word_diff_history):
    decades = []
    for d in range(1470, 1960, 10):
        decades.append(str(d) + "s")
    data = {}
    for w, diff_history in word_diff_history.items():
        data[w] = diff_history["diff_decade_dict"]
    df = pd.DataFrame(data).reindex(decades).reset_index()
    df = df.rename(columns={"index": "Decade"})
    fig = go.Figure()
    for category in data.keys():
        fig.add_trace(
            go.Scatter(
                x=df["Decade"],
                y=df[category],
                mode="lines+markers",
                name=category,
                connectgaps=True,  # Ensures lines are drawn across missing data
            )
        )
    fig.update_layout(
        title="Data points by decade",
        xaxis_title="Decades",
        yaxis_title="Value",
        xaxis=dict(categoryorder="array", categoryarray=decades),
    )
    fig.show()

In [None]:
def show_plot_tsne(vector_dict, title=None):

    labels = []
    values = []
    for w in WORD_LIST:
        labels.append(w)
        values.append(vector_dict[w])
    values = np.array(values)

    tsne = TSNE(n_components=2, perplexity=5, random_state=42)
    reduced_vectors_tsne = tsne.fit_transform(values)

    plt.figure(figsize=(8, 6))
    plt.scatter(reduced_vectors_tsne[:, 0], reduced_vectors_tsne[:, 1], c="blue", alpha=0.7)

    for i, label in enumerate(labels):
        plt.text(
            reduced_vectors_tsne[i, 0],
            reduced_vectors_tsne[i, 1],
            label,
            fontsize=9,
            ha="right",
            color="black",
        )
    if title:
        plt.title(title)
    plt.show()

In [None]:
plot_history(
    get_sample(
        word_diff_history_sorted_avg,
        from_top=True,
        min_num_decades=30,
        max_sample=3,
        index_range=[9000, None],
        word_set=None,
    )
)

In [None]:
plot_history(
    get_sample(
        word_diff_history_sorted_gini,
        from_top=True,
        min_num_decades=30,
        max_sample=3,
        index_range=[10000, None],
        word_set=None,
    )
)

In [None]:
plot(
    get_cos_sim_sample(
        word_relative_history_list,
        from_top=True,
        min_num_decades=30,
        max_sample=3,
        index_range=[1200, None],
        word_set=None,
    )
)

In [None]:
plot(
    get_cos_sim_sample(
        word_relative_history_list,
        from_top=True,
        min_num_decades=30,
        num_sample=3,
        index_range=[2000, None],
        word_set=None,
    )
)

In [None]:
plot(
    get_cos_sim_sample(
        word_relative_history_list,
        from_top=True,
        min_num_decades=30,
        num_sample=3,
        index_range=[10000, None],
        word_set=None,
    )
)

In [None]:
plot(
    get_cos_sim_sample(
        word_relative_history_list,
        from_top=False,
        min_num_decades=30,
        num_sample=3,
        index_range=[None, None],
        word_set=None,
    )
)

In [None]:
plot(
    get_cos_sim_sample(
        word_relative_history_list,
        from_top=False,
        min_num_decades=10,
        num_sample=3,
        index_range=[40000, None],
        word_set=None,
    )
)

In [None]:
for w in word_relative_history_list:
    if w[0] in ["dar", "sollst"]:
        print(w)

In [None]:
plot(
    get_cos_sim_sample(
        word_relative_history_list,
        from_top=True,
        min_num_decades=30,
        num_sample=3,
        index_range=[None, None],
        word_set=["gesetz", "himmel"],
    )
)

In [None]:
plot(
    get_cos_sim_sample(
        word_relative_history_list,
        from_top=True,
        min_num_decades=None,
        num_sample=3,
        index_range=[None, None],
        word_set=["demokratie"],
    )
)

In [None]:
plot(
    get_cos_sim_sample(
        word_relative_history_list,
        from_top=True,
        min_num_decades=None,
        num_sample=3,
        index_range=[None, None],
        word_set=["frau", "mann"],
    )
)

In [None]:
plot(
    get_cos_sim_sample(
        word_relative_history_list,
        from_top=True,
        min_num_decades=None,
        num_sample=3,
        index_range=[None, None],
        word_set=["mutter", "vater"],
    )
)

In [None]:
plot(
    get_cos_sim_sample(
        word_relative_history_list,
        from_top=True,
        min_num_decades=None,
        num_sample=3,
        index_range=[None, None],
        word_set=["könig", "kaiser"],
    )
)

In [None]:
plot(
    get_cos_sim_sample(
        word_relative_history_list,
        from_top=True,
        min_num_decades=None,
        num_sample=4,
        index_range=[None, None],
        word_set=["wasser", "erde", "brot", "haus"],
    )
)

In [None]:
plot(
    get_cos_sim_sample(
        word_relative_history_list,
        from_top=True,
        min_num_decades=None,
        num_sample=10,
        index_range=[None, None],
        word_set=["haus", "kaiser"],
    )
)

In [None]:
plot(
    get_cos_sim_sample(
        word_relative_history_list,
        from_top=True,
        min_num_decades=None,
        num_sample=10,
        index_range=[None, None],
        word_set=["mensch", "gott", "welt"],
    )
)

In [None]:
plot(
    get_cos_sim_sample(
        word_relative_history_list_gini,
        from_top=True,
        min_num_decades=20,
        num_sample=5,
        index_range=[None, None],
        word_set=None,
    )
)

## various snippets

In [None]:
import hnswlib
import numpy as np

# Your data
texts = ["apple", "banana", "orange"]
vectors = np.random.rand(3, 128).astype("float32")
vectors /= np.linalg.norm(vectors, axis=1, keepdims=True)

# Map text labels to integer IDs
text_to_id = {text: i for i, text in enumerate(texts)}
id_to_text = {i: text for text, i in text_to_id.items()}

# Initialize the index
dim = 128
index = hnswlib.Index(space="cosine", dim=dim)
index.init_index(max_elements=10, ef_construction=100, M=16)
index.add_items(vectors, ids=list(text_to_id.values()))

# Query
query_vector = vectors[0].reshape(1, -1)
labels, distances = index.knn_query(query_vector, k=2)

# Convert back to text labels
results = [(id_to_text[label], dist) for label, dist in zip(labels[0], distances[0])]
print(results)

In [None]:
# TODO: probably not useful
def create_word_cos_sim_history_list(word_set, model_dict):
    word_cos_sim_history_list = []
    for w in word_set:
        cos_sim_history_dict = {}
        model_prev = None
        w_vec_current = None
        w_vec_prev = None
        total_diff = 0
        for decade, model_current in model_dict.items():
            try:
                w_vec_current = model_current.wv[w]
            except:
                pass
            else:
                if w_vec_prev is not None:
                    cos_sim = calculate_cosine_similarity(w_vec_prev, w_vec_current)
                    total_diff += 2 - (cos_sim + 1)
                    cos_sim_history_dict[str(decade) + "0s"] = cos_sim
                w_vec_prev = w_vec_current
        if cos_sim_history_dict:
            word_cos_sim_history_list.append((w, total_diff, cos_sim_history_dict))
    word_cos_sim_history_list = sorted(word_cos_sim_history_list, key=lambda x: -x[1])
    return word_cos_sim_history_list


# word_cos_sim_history_list = create_word_cos_sim_history_list(word_set, model_dict)
# print(len(word_cos_sim_history_list))

In [None]:
import pandas as pd
import plotly.graph_objects as go

# Define the full range of decades
decades = [f"{1900+10*i}s" for i in range(10)]  # 1900s to 2000s

# Sample data
data = {
    "a": {"1920s": 0.6, "1930s": 0.5, "1980s": 0.1},
    "b": {"1930s": 0.2, "1980s": 0.4},
}

# Convert to DataFrame with explicit ordering
df = pd.DataFrame(data).reindex(decades).reset_index()
df = df.rename(columns={"index": "Decade"})

# Create the plot
fig = go.Figure()

for category in data.keys():
    fig.add_trace(
        go.Scatter(
            x=df["Decade"],
            y=df[category],
            mode="lines+markers",
            name=category,
            connectgaps=True,  # Ensures lines are drawn across missing data
        )
    )

# Format the layout
fig.update_layout(
    title="Data points by decade",
    xaxis_title="Decades",
    yaxis_title="Value",
    xaxis=dict(categoryorder="array", categoryarray=decades),
)

fig.show()

In [None]:
import matplotlib.pyplot as plt

# Sample data
data = {
    "a": {"1920s": 1, "1930s": 0.5, "1980s": 0.1},
    "b": {"1930s": 0.2, "1940s": 0, "1950s": 0.7, "1980s": 0.4},
}

# Define all decades (ensuring consistent x-axis)
decades = [f"{1900 + 10*i}s" for i in range(10)]

# Plot each ID
plt.figure(figsize=(10, 5))
for identifier, values in data.items():
    y_values = [values.get(decade, None) for decade in decades]  # Use None for missing data
    plt.plot(decades, y_values, marker="o", label=identifier)

# Customize plot
plt.xlabel("Decades")
plt.ylabel("Value")
plt.title("Data points by decade")
plt.legend()
plt.grid(True)

plt.show()

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Sample data
data = {
    "a": {"1920s": 1, "1930s": 0.5, "1980s": 0.1},
    "b": {"1930s": 0.2, "1950s": 0.8, "1980s": 0.4},
}

# Define all decades (ensuring consistent x-axis)
decades = [f"{1900 + 10*i}s" for i in range(10)]

# Plot each ID
plt.figure(figsize=(10, 5))
for identifier, values in data.items():
    y_values = [values.get(decade, np.nan) for decade in decades]  # Use np.nan to avoid connecting missing points
    plt.plot(decades, y_values, marker="o", label=identifier)

# Customize plot
plt.xlabel("Decades")
plt.ylabel("Value")
plt.title("Data points by decade")
plt.legend()
plt.grid(True)

plt.show()

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

# Sample data
data = {
    "a": {"1920s": 1, "1930s": 0.5, "1980s": 0.1},
    "b": {"1930s": 0.2, "1950s": 0.8, "1980s": 0.4},
}

# Define all decades (ensuring consistent x-axis)
decades = [f"{1900 + 10*i}s" for i in range(10)]

# Convert to a DataFrame for interpolation
df = pd.DataFrame({key: {d: data[key].get(d, np.nan) for d in decades} for key in data})

# Interpolate missing values (linear interpolation)
df.interpolate(method="linear", inplace=True)

# Plot each ID
plt.figure(figsize=(10, 5))
for identifier in df.columns:
    plt.plot(decades, df[identifier], marker="o", label=identifier)

# Customize plot
plt.xlabel("Decades")
plt.ylabel("Value")
plt.title("Data points by decade (with interpolation)")
plt.legend()
plt.grid(True)

plt.show()

In [None]:
!pip install plotly

In [None]:
import pandas as pd
import plotly.express as px

# Sample data
data = {
    "Decade": ["1920s", "1930s", "1980s", "1930s", "1950s", "1980s"],
    "Value": [1, 0.5, 0.1, 0.2, 0.8, None],
    "Category": ["a", "a", "a", "b", "b", "b"],
}

# Convert to DataFrame
df = pd.DataFrame(data)

# Plot using Plotly Express
fig = px.line(
    df,
    x="Decade",
    y="Value",
    color="Category",
    markers=True,
    title="Data points by decade",
)

fig.show()

In [None]:
calculate_cosine_similarity(model_dict[180].wv["mann"], model_dict[180].wv["mann"])

In [None]:
model_dict[180].wv.most_similar("mann", topn=10)

In [None]:
model_dict[181].wv.most_similar("mann", topn=10)

In [None]:
# Extract vocabulary and corresponding vectors
words = list(model.wv.index_to_key)[:100]  # List of words in vocabulary
vectors = np.array([model.wv[word] for word in words])  # Word vectors

# Compute cosine similarity matrix
similarity_matrix = cosine_similarity(vectors)

# Convert to a dictionary of word pairs (optional)
similarity_dict = {(words[i], words[j]): similarity_matrix[i, j] for i in range(len(words)) for j in range(len(words)) if i != j}

# Example: Print top 10 most similar word pairs
sorted_pairs = sorted(similarity_dict.items(), key=lambda x: x[1], reverse=True)
for pair, sim in sorted_pairs[:10]:
    print(f"{pair}: {sim}")

In [None]:
similarity_matrix = cosine_similarity(vectors)

In [None]:
similarity_matrix

In [None]:
# Convert to a dictionary of word pairs (optional)
similarity_dict = {(words[i], words[j]): similarity_matrix[i, j] for i in range(len(words)) for j in range(len(words)) if i != j}

# Example: Print top 10 most similar word pairs
sorted_pairs = sorted(similarity_dict.items(), key=lambda x: x[1], reverse=True)
for pair, sim in sorted_pairs:
    print(f"{pair}: {sim}")