In [17]:
import re
import MeCab
import gensim
import sqlite3
import Levenshtein
from scipy.stats.mstats import gmean
import functools
from typing import Callable, Iterable, List, Set, Dict, Tuple, Optional

In [2]:
model = gensim.models.FastText.load_fasttext_format("./data/ja_model.bin")

  """Entry point for launching an IPython kernel.


In [3]:
conn = sqlite3.connect("./data/wnjpn.db")

In [22]:
yomi = MeCab.Tagger("-Oyomi")
re_katakana = re.compile(r'[\u30A1-\u30F4]+')
def get_yomi(text: str) -> str:
    reading = yomi.parse(text)
    kana_readings = re_katakana.findall(reading)
    return "".join(kana_readings)

In [18]:
## returns a list of
## (wordid: int, lemma: str,score: float, occurrence: int)
def get_words_in_all_synsets(
    search: str or Iterable[str], threshold: int = 1
) -> List[Tuple[int, str, float, int]]:
    placeholder = "?" if type(search) is str else ",".join([
        "?" for i in range(0, len(search))
    ])
    params = (search,threshold) if type (search) is str else [*search, threshold]
    # print(placeholder, params)
    cur = conn.execute(f"""\
select
 sub.wordid,
 sub.lemma,
 sum(1.0) as score,
 count(sub.lemma) as occurrence
from (
    select
        related_word.wordid,
        related_word.lemma
    from word base
    inner join sense attributed_sense
        on attributed_sense.wordid = base.wordid
    inner join sense all_sense
        on all_sense.synset = attributed_sense.synset
    inner join word related_word
        on related_word.wordid = all_sense.wordid
        and related_word.lang = "jpn"
    where base.lemma in ({placeholder})
) sub
group by sub.lemma, sub.wordid
having count(sub.lemma) >= (?)
order by count(sub.lemma) desc
""", params)
    return list(map(lambda x:(x[0:4]), cur.fetchall()))

# get_words_in_all_synsets("優しい", 1)
# get_words_in_all_synsets(["優しい", "好青年", "朗らか"], 2)

In [0]:
def create_values_query(
    neigbhor_list: Iterable[Tuple[str, float]]
) -> Tuple[str, Dict]:
    params = functools.reduce(
        lambda acc,e: { **acc, f"w{e[0]}": e[1][0], f"s{e[0]}": e[1][1] },
        enumerate(neigbhor_list),
        {}
    )
    query_elems = [f":w{i}, :s{i}" for i in range(0, len(neigbhor_list))]
    return (f"""select {" union select ".join(query_elems)} """, params)

## returns a list of
## (wordid: int, lemma: str,score: float, occurrence: int)
def get_synonym_score_for_neigbhor_list(
    neigbhor_list: Iterable[Tuple[str, float]],
    threshold: int = 1,
    includes_self: bool = False,
) -> List[Tuple[str, int, float, int]]:
    (values_query, params) = create_values_query(neigbhor_list)
    query = f"""\
with neigbhors(word, score) as (
{values_query}
),
sub(word, score, wordid, lemma) as (
select
  n.word,
  n.score,
  rel_w.wordid,
  rel_w.lemma
from neigbhors n
inner join word w
  on w.lemma = n.word
inner join sense s
  on s.wordid = w.wordid
inner join sense rel_s
  on rel_s.synset = s.synset
  {"" if includes_self else "and rel_s.wordid != s.wordid" }
inner join word rel_w
  on rel_w.wordid = rel_s.wordid
  and rel_w.lang = "jpn"
)
select
 sub.wordid,
 sub.lemma,
 sum(sub.score) as score,
 count(sub.lemma) as occurrence
from sub
group by sub.lemma, sub.wordid
having count(sub.lemma) >= :threshold
order by count(sub.lemma) desc\
"""
    cur = conn.execute(query, {**params, "threshold": threshold })
    return list(map(lambda x:(x[0:4]), cur.fetchall()))

# get_synonym_score_for_neigbhor_list([("優しい",  0.1), ("朗らか", 0.01)])

In [19]:
## returns a list of
## (wordid: int, lemma: str, score: float, occurrence: int)
def list_synonyms_from_similar_words(
    search: str or Iterable[str],
    topn: int=20,
    threshold: int = 1,
    includes_self: bool = False,
) -> List[Tuple[str, int, float, int]]:
    neigbhor_list= model.wv.most_similar(
        positive=search,
        topn=topn,
    )
    # similar_words = [*map(lambda x:x[0],neigbhor_list)]
    synonyms = get_synonym_score_for_neigbhor_list(
        neigbhor_list=neigbhor_list,
        threshold=threshold,
        includes_self=includes_self,
    )
    return synonyms

# list_synonyms_from_similar_words(search=["優しい", "かわいい"])[0:30]

In [0]:
def sort_by_score(
    synonym_list: List[str],
    top_n: int = -1,
    order: str = "asc",
) -> List[Dict]:
    result_sorted = sorted(
        synonym_list,
        key=lambda x: x["s"],
        reverse=order=="desc",
    )
    if (top_n > 0 and type(top_n) == int):
        return result_sorted[0:top_n]
    return result_sorted

def get_mixed_synonyms(
    search: str or Iterable[str],
    search_threshold: int=1,
    use_similar: bool=True,
    similar_topn: int=20,
    similar_boost: float=0.2,
    similar_threshold: int=1,
    similar_includes_self: bool = False,
    add_yomi: bool = True,
) -> List[Dict]:
    synoyms_from_search = get_words_in_all_synsets(
        search=search,
        threshold=search_threshold,
    )
    synonyms_from_similar_words = list_synonyms_from_similar_words(
        search=search,
        topn=similar_topn,
        threshold=similar_threshold,
        includes_self=similar_includes_self,
    ) if use_similar else []
    aggregated_dict = functools.reduce(
        lambda acc, x: {
            **acc, x[0]: (
                {
                    "i": x[0], # wordid
                    "w": x[1], # word (lemma)
                    "s": x[2], # score
                    "occ": x[3], # occurrence in search iteself
                    "sim_occ": 0, # occurrence in similar words
                } if x[0] not in acc else {
                    **acc[x[0]],
                    "s": acc[x[0]]["s"] + x[2],
                    "occ": acc[x[0]]["occ"] + x[3],
                }
            )
        },
        synoyms_from_search,
        {}
    )
    if not use_similar:
        return aggregated_dict.values()
    return functools.reduce(
        lambda acc, x: {
            **acc, x[0]: (
                {
                    "i": x[0],
                    "w": x[1],
                    "s": similar_boost*x[2],
                    "occ": 0,
                    "sim_occ": x[3],
                } if x[0] not in acc else {
                    **acc[x[0]],
                    "s": acc[x[0]]["s"] + similar_boost*x[2],
                    "sim_occ": acc[x[0]]["sim_occ"] + x[3],
                }
            )
        },
        synonyms_from_similar_words,
        aggregated_dict,
    ).values()

# sort_by_score(get_mixed_synonyms("綺麗な", use_similar=True), 10)

In [20]:
def get_top_n_percentile(
    synonym_list: List[str],
    percentile: float = 0.95,
) -> List[Dict]:
    sorted_list = sort_by_score(synonym_list, order="desc")
    score_list = [*map(lambda x:x["s"], sorted_list)]
    total_score = sum(score_list)
    until = percentile * total_score
    curr = 0.0
    for i,s in enumerate(score_list):
        curr += s
        if (curr >= until):
            break
    return sorted_list[0:i]

# get_top_n_percentile(get_mixed_synonyms("綺麗な", use_similar=True), percentile=0.95)


In [0]:
def aggregate_yomi(
    yomi_list: List[str],
    prefix_weight: float = 0.10,
    threshold: float = 0.90
):
    n = len(yomi_list)
    parent = [-1] * n
    root_word = [None] * n
    sim_links = []
    for i, w_i in enumerate(yomi_list):
        sims = []
        for  j, w_j in enumerate(yomi_list[i+1:], i+1):
            if parent[j] >= 0:
                continue
            dist = Levenshtein.jaro_winkler(w_i, w_j, prefix_weight)
            if dist < threshold:
                continue
            parent[j] = i
            root_word[j] = yomi_list[i] if parent[i] < 0 else root_word[i]
            sims += [(j, w_j, dist)]
        if parent[i] < 0 or len(sims) > 0:
            sim_links += [(i, w_i, parent[i], sims)]
        else:
            sim_links += [None]
    # display(sim_links)
    aggregated = dict([(yomi_list[i], root_word[i]) for i in range (0, n)])
    return (aggregated, sim_links)
# aggregate_yomi(["キレイナ", "キレイダ", "キレイ", "キタナイ", "キレイダヨ", "キレイダヨネ", "レイダヨ", "ソケナイ", "キレタ", "ソッケナイ"])

In [29]:
def aggregated_by_yomi(
    synonym_list: List[str],
    jaro_winkler_prefix_weight: float = 0.05,
    jaro_winkler_threshold: float = 0.90
):
    with_yomi = map(lambda x: { **x, "y": get_yomi(x["w"]) }, synonym_list)
    yomi_lookup = functools.reduce(lambda acc, x: {
        **acc,
        x["y"]: (acc[x["y"]] if x["y"] in acc else []) + [x]
    }, with_yomi, {})
    # TODO: use_aggregated_yomi
    # unique_yomi_list = [*yomi_lookup.keys()]
    # yomi_aggregated_dict,_ = aggregate_yomi(
    #     unique_yomi_list,
    #     prefix_weight = jaro_winkler_prefix_weight,
    #     threshold = jaro_winkler_threshold,
    # )
    TODO = None
    yomi_aggregated = []
    for yomi, items in yomi_lookup.items():
        yomi_aggregated += [{
            "i": TODO, # TODO: wordid for representatrive word
            "w": TODO, # TODO: word (lemma) for representatrive word
            "s": TODO, # TODO: aggregated score
            "c": x[1], #children
        }]
    return yomi_lookup

tops = get_top_n_percentile(get_mixed_synonyms("素敵な", use_similar=True), percentile=0.95)
display(aggregated_by_yomi(tops))

{'ステキ': [{'i': 197548,
   'w': 'すてき',
   's': 0.31971263885498047,
   'occ': 0,
   'sim_occ': 4,
   'y': 'ステキ'},
  {'i': 201451,
   'w': '素的',
   's': 0.15985631942749023,
   'occ': 0,
   'sim_occ': 2,
   'y': 'ステキ'}],
 'モトテキ': [{'i': 169842,
   'w': '素適',
   's': 0.31971263885498047,
   'occ': 0,
   'sim_occ': 4,
   'y': 'モトテキ'}],
 'スバラシイ': [{'i': 171094,
   'w': '素晴らしい',
   's': 0.23978447914123535,
   'occ': 0,
   'sim_occ': 3,
   'y': 'スバラシイ'},
  {'i': 157495,
   'w': 'すばらしい',
   's': 0.07992815971374512,
   'occ': 0,
   'sim_occ': 1,
   'y': 'スバラシイ'}],
 'ミゴト': [{'i': 212929,
   'w': '見事',
   's': 0.23978447914123535,
   'occ': 0,
   'sim_occ': 3,
   'y': 'ミゴト'}],
 'キラビヤカ': [{'i': 220221,
   'w': 'きらびやか',
   's': 0.15985631942749023,
   'occ': 0,
   'sim_occ': 2,
   'y': 'キラビヤカ'}],
 'キレイ': [{'i': 203577,
   'w': 'きれい',
   's': 0.15985631942749023,
   'occ': 0,
   'sim_occ': 2,
   'y': 'キレイ'},
  {'i': 166868,
   'w': '奇麗',
   's': 0.15985631942749023,
   'occ': 0,
   'sim_occ': 2,
 

In [0]:

display(get_words_in_all_synsets("明るい"))
synonyms = list_synoyms_from_wordnet_dict("やる気")
display(synonyms)
list_neigbhors_from_synonyms(synonyms, topn=20)

In [0]:
def list_neigbhors_from_synonyms(
    synonyms: Iterable[Tuple[str, int]],
    topn: int = 100
):
    multiplied_list = [[word for i in range(0,occurrence)] for (word, occurrence) in synonyms ]
    search_list = [word for word_pack in multiplied_list for word in word_pack]
    # display(search_list)
    return list_neigbhors_for_search_list(search_list, topn=topn)


In [0]:
def append_neigbhorness_lift_for_each_search(
    search_list: Iterable[str],
    word: str,
    neigbhorness: float
) -> List[Tuple[str, float, Tuple[float, float]]]:
    neigbohr_and_lift = []
    for search in search_list:
        similarity = model.wv.similarity(word, search)
        lift =  neigbhorness/similarity if similarity > 0 else 0
        neigbohr_and_lift += [(similarity, lift)]
    return (word, neigbhorness, neigbohr_and_lift)

In [0]:
def filter_neigbours_by_score_func(
    neigbhorness_lift: Iterable[Tuple[str, float, Tuple[float, float]]],
    score_func: Callable[float, Iterable[Tuple[float, float]]]):


SyntaxError: unexpected EOF while parsing (<ipython-input-14-bced9ec0ec98>, line 1)

In [0]:
searches = ["明るい", "性格"]
combi = [
   (word, score, [model.wv.similarity(word, search) for search in searches])
   for (word, score) in model.wv.most_similar(
       positive=searches,
       topn=100
    )
]
coocs = [*filter(None, [
    None if len([*filter(lambda x: x > score, search_scores)]) > 0 else
    (
        word,
        score,
        min([*map(lambda x:score/x, search_scores)]),
        search_scores,
    )
    for (word, score, search_scores) in combi]
)]
display("related")
display([*sorted(coocs, key=lambda x:-x[2])][0:50])