<a href="https://colab.research.google.com/github/mstekel/akk_word2vec/blob/main/akk_word2vec.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import glob
for file_path in glob.glob('drive/MyDrive/datasets/oracc/*.xml'):
  print(file_path)

In [None]:
!export PYTHONHASHSEED=777

from gensim.test.utils import datapath
from gensim import utils, models
import xml.etree.ElementTree as ET
import glob
from datetime import datetime

window = 2
model_path = f'drive/MyDrive/models/word2vec/oracc_lemmas_{window}.model'
load_model_from_drive = True

class OraccCorpus:
    """An iterator that yields sentences (lists of str)."""
    
    def __iter__(self):
      for file_path in glob.glob('drive/MyDrive/datasets/oracc/*.xml'):
        t = ET.parse(file_path)
        namespaces = {'tei': 'http://www.tei-c.org/ns/1.0'}
        for s in t.findall('.//tei:s', namespaces):
          lemmatized_sentence = []
          for w in s.findall('.//tei:w', namespaces):
            #sentence.append(' '.join(w.itertext()))
            lemma = w.get('lemma')
            if lemma != None:
              lemma,_,_ = lemma.rpartition('[')
              lemmatized_sentence.append(lemma)
            else:
              lemmatized_sentence.append('')    
          yield s, lemmatized_sentence

sentences = OraccCorpus()

if(load_model_from_drive):
  model = models.Word2Vec.load(model_path)
else:
  print(f"Model build started: {datetime.now().time()}")
  for i, s in enumerate(sentences):
    if i == 10:
      break
    print(i, s)

  model = models.Word2Vec(sentences=sentences, min_count=1, workers=1, size=300, window=window)
  model.save(model_path)
  print(f"Model build finished: {datetime.now().time()}")


In [None]:
from html import escape

term = 'galû'
term_lemmas = [
    # 'šuglû[deported//deported]AJ', 
    # 'galû[be(come) deported//be(come) deported]V', 
    # 'galû[be(come) deported//emigrate]V',
    # 'galû[be(come) deported//smuggle]V',
    # 'galû[be(come) deported//exile]V', 
    # 'galû[be(come) deported//send into exile]V',
    # 'galû[be(come) deported//deport]V', 
    # 'galû[be deported//take/send into exile]V', 
    # 'šagalûtu[deportation//deportation]N'
    term,
    #'šuglû',
    #'šagalûtu'
]

term_span = []
for lemma in term_lemmas:
  term_span = term_span + model.wv.most_similar(positive=[lemma], topn=10)
term_span

In [None]:
term_sentences = [sentence for sentence in sentences if any(lemma in sentence[1] for lemma in term_lemmas)]
print(f'{len(term_sentences)} sentences:')
term_sentences

In [None]:
import numpy as np


values = []
for s in term_sentences:
  sv = np.sum(np.array([model.wv[w] for w in s[1]]), axis=0) 
  values.append(sv)
values

In [None]:
import numpy as np

tv = model.wv[term]
values = []
for s in term_sentences:
  sv = np.mean(np.array([np.dot(model.wv[w], tv) for w in s[1]])) 
  values.append([sv])
values

In [None]:
from sklearn.cluster import KMeans
from sklearn.metrics import pairwise_distances_argmin_min
from sklearn.metrics import silhouette_score
from scipy.signal import find_peaks
from termcolor import colored
import json

def find_optimal_k(values, max_k=10):
    sil = [(1, 0)]
    for k in range(2, min(len(values) - 1, max_k)):
        kmeans = KMeans(n_clusters=k).fit(values)
        sil.append((k, silhouette_score(values, kmeans.labels_, metric='euclidean')))
    k_values, sil_values = zip(*sil)
    peaks = find_peaks(sil_values)
    if len(peaks[0]) > 0:
        return k_values[peaks[0][0]]
    else:
        max_sil_value = max(sil_values)
        if max_sil_value > 0:
            k_value = k_values[sil_values.index(max_sil_value)]
        else:
            k_value = 1
        return k_value

opt_k = find_optimal_k(values)

decode_sentence = lambda s: "".join(s).replace("\n", " ")
clustering = KMeans(n_clusters=opt_k).fit(values)
cluster_bests, _ = pairwise_distances_argmin_min(clustering.cluster_centers_, values)
result = {
        'clusters': [
            {
                'cluster': str(l),
                'best': decode_sentence(term_sentences[cluster_bests[l]][0].itertext()),
                'all': [
                    f'{decode_sentence(s[0].itertext())}'
                    for i, s in enumerate(term_sentences)
                    if clustering.labels_[i] == l
                ]
            }
            for l in set(clustering.labels_)
        ]
    }
print(colored(json.dumps(result, indent=2, ensure_ascii=False), 'red'))