In [3]:
import pickle
import numpy as np
import gensim
import wandb
from pathlib import Path
from gensim.models.word2vec import Word2Vec
from tokenizers import Tokenizer, decoders, pre_tokenizers

In [4]:
train_ch = Path("../dataset/ASPEC-JC/train/filtered_ch.txt")
train_jp = Path("../dataset/ASPEC-JC/train/filtered_jp.txt")

In [5]:
tokenizer_ch = Path("../tokenizer/tokenizer_sentencepiece_ch.json")
tokenizer_jp = Path("../tokenizer/tokenizer_sentencepiece_jp.json")

tokenizer_ch = Tokenizer.from_file(str(tokenizer_ch))
tokenizer_jp = Tokenizer.from_file(str(tokenizer_jp))

In [6]:
print(tokenizer_ch.get_vocab_size())
print(tokenizer_jp.get_vocab_size())

32000
32000


# Tokenize raw sentences

In [7]:
tokenized_path = Path("../tokenized_sentences/")
tokenized_ch_path = tokenized_path / "ch.txt"
tokenized_jp_path = tokenized_path / "jp.txt" 

In [8]:
if tokenized_ch_path.exists():
    with open(tokenized_ch_path, "rb") as f:
        ch_texts = pickle.load(f)
else:
    with open(train_ch) as f:
        ch_texts = [tokenizer_ch.encode(line).tokens[1:-1] for line in f.readlines()]
    
    with open(tokenized_ch_path, "wb") as f:
         pickle.dump(ch_texts, f)

In [9]:
if tokenized_jp_path.exists():
    with open(tokenized_jp_path, "rb") as f:
        jp_texts = pickle.load(f)
else:
    with open(train_jp) as f:
        jp_texts = [tokenizer_jp.encode(line).tokens[1:-1] for line in f.readlines()]
    
    with open(tokenized_jp_path, "wb") as f:
         pickle.dump(jp_texts, f)

# Helper Functions

In [10]:
def check_coverage(vocab, embedding, map_func=None):
    """
    vocab = vocabulary from tokenizer, input=[(voc, index)]
    embedding = word2vec embedding
    """
    count = 0
    for word in vocab.keys():
        if map_func:
            word = map_func(word)
        try:
            if embedding[word] is not None:
                count += 1
        except:
            pass

    print(f"{count / len(vocab):.0%} ({count}/{len(vocab)}) is covered.")

In [11]:
def build_embedding_matrix(vocab, embedding, map_func=None):
    """
    vocab = vocabulary from tokenizer, input=[(voc, index)]
    embedding = word2vec embedding
    """
    embed_matrix = np.zeros((len(vocab), 300))
    
    for word, i in vocab.items():
        if map_func:
            word = map_func(word)
            
        if word in embedding:
            embed_matrix[i] = embedding[word]
            
    return embed_matrix

In [12]:
def build_word2vec(tokenized_corpus, vector_size=300, max_vocab_size=32000, rule=None):
    return Word2Vec(
        tokenized_corpus,
        vector_size=vector_size,
        max_vocab_size=max_vocab_size,
        sg=1,
        hs=0,
        negative=5,
        workers=32,
        min_count=5,
        trim_rule=rule,
        epochs=5,
    )

# Train Semantic Embedding

In [13]:
import re

# if any chinese + japanese
reg = re.compile(r'[\u3040-\u30ff\u3400-\u4dbf\u4e00-\u9fff\uf900-\ufaff\uff66-\uff9f]')

def semantic_rule(word, count, min_count):
    if reg.search(word):
        return gensim.utils.RULE_DEFAULT
    else:
        return gensim.utils.RULE_DISCARD

In [14]:
word2vec_path = Path("../word2vec/")
semantic_path = word2vec_path / "semantic/"
semantic_path.mkdir(parents=True, exist_ok=True)

ch_semantic_path = semantic_path / "ch_word2vec"
jp_semantic_path = semantic_path / "jp_word2vec"

ch_semantic_embedding_path = semantic_path / "ch_embedding.npy"
jp_semantic_embedding_path = semantic_path / "jp_embedding.npy"

## Chinese

In [15]:
if ch_semantic_path.exists():
    ch_model = Word2Vec.load(str(ch_semantic_path))
else:
    ch_model = build_word2vec(ch_texts, rule=semantic_rule)
    ch_model.save(str(ch_semantic_path))

In [16]:
print(ch_model.wv.vectors.shape)

(27382, 300)


In [17]:
check_coverage(tokenizer_ch.get_vocab(), ch_model.wv)

86% (27382/32000) is covered.


In [18]:
if ch_semantic_embedding_path.exists():
    ch_semantic_embedding = np.load(ch_semantic_embedding_path)
else:
    ch_semantic_embedding = build_embedding_matrix(tokenizer_ch.get_vocab(), ch_model.wv)
    np.save(ch_semantic_embedding_path, ch_semantic_embedding)

ch_semantic_embedding.shape

(32000, 300)

## Japanese

In [19]:
if jp_semantic_path.exists():
    jp_model = Word2Vec.load(str(jp_semantic_path))
else:
    jp_model = build_word2vec(jp_texts, rule=semantic_rule)
    jp_model.save(str(jp_semantic_path))

In [20]:
print(jp_model.wv.vectors.shape)

(28313, 300)


In [21]:
check_coverage(tokenizer_jp.get_vocab(), jp_model.wv)

88% (28313/32000) is covered.


In [22]:
if jp_semantic_embedding_path.exists():
    jp_semantic_embedding = np.load(jp_semantic_embedding_path)
else:
    jp_semantic_embedding = build_embedding_matrix(tokenizer_jp.get_vocab(), jp_model.wv)
    np.save(jp_semantic_embedding_path, jp_semantic_embedding)

jp_semantic_embedding.shape

(32000, 300)

# Tokenize Phonetic Sentences

In [23]:
tokenized_ch_phonetic_path = tokenized_path / "chp.txt"
tokenized_jp_phonetic_path = tokenized_path / "jpp.txt"

## Chinese

In [24]:
from dragonmapper import hanzi

def to_zhuyin(word):
    try:
        word = hanzi.to_zhuyin(word)
    except:
        pass
    finally:
        return word

def generate_ch_phonetic_sentences(ch_texts):
    return [[to_zhuyin(s) for s in line] for line in ch_texts]

In [25]:
if tokenized_ch_phonetic_path.exists():
    with open(tokenized_ch_phonetic_path, 'rb') as f:
        chp_texts = pickle.load(f)
else:
    chp_texts = generate_ch_phonetic_sentences(ch_texts)
    with open(tokenized_ch_phonetic_path, 'wb') as f:
        pickle.dump(chp_texts, f)

## Japanese

In [26]:
import pykakasi

kks = pykakasi.kakasi()

def to_hira(kanji):
    return "".join([item["hira"] for item in kks.convert(kanji)])

def generate_jp_phonetic_sentences(jp_texts):
    return [[to_hira(s) for s in line] for line in jp_texts]

In [27]:
if tokenized_jp_phonetic_path.exists():
    with open(tokenized_jp_phonetic_path, 'rb') as f:
        jpp_texts = pickle.load(f)
else:
    jpp_texts = generate_jp_phonetic_sentences(jp_texts)
    with open(tokenized_jp_phonetic_path, 'wb') as f:
        pickle.dump(jpp_texts, f)

# Train Phonetic Embedding

In [28]:
phonetic_path = word2vec_path / "phonetic/"
phonetic_path.mkdir(parents=True, exist_ok=True)

ch_phonetic_path = phonetic_path / "chp_word2vec"
jp_phonetic_path = phonetic_path / "jpp_word2vec"

ch_phonetic_embedding_path = phonetic_path / "chp_embedding.npy"
jp_phonetic_embedding_path = phonetic_path / "jpp_embedding.npy"

In [29]:
# https://stackoverflow.com/questions/16027450/is-there-a-way-to-know-whether-a-unicode-string-contains-any-chinese-japanese-ch
import unicodedata

def has_zhuyin(s):
    for c in s:
        try:
            if "BOPOMOFO" in unicodedata.name(c):
                return True
        except:
            return False

def has_hira(s):
    for c in s:
        try:
            if "HIRAGANA" in unicodedata.name(c):
                return True
        except:
            return False

def phonetic_rule(word, count, min_count):
    if has_zhuyin(word) or has_hira(word):
        return gensim.utils.RULE_DEFAULT
    else:
        return gensim.utils.RULE_DISCARD

## Chinese

In [30]:
if ch_phonetic_path.exists():
    chp_model = Word2Vec.load(str(ch_phonetic_path))
else:
    chp_model = build_word2vec(chp_texts, rule=phonetic_rule)
    chp_model.save(str(ch_phonetic_path))

In [31]:
print(chp_model.wv.vectors.shape)

(25321, 300)


In [32]:
check_coverage(tokenizer_ch.get_vocab(), chp_model.wv, map_func=to_zhuyin)

94% (30009/32000) is covered.


In [33]:
if ch_phonetic_embedding_path.exists():
    ch_phonetic_embedding = np.load(ch_phonetic_embedding_path)
else:
    ch_phonetic_embedding = build_embedding_matrix(tokenizer_ch.get_vocab(), chp_model.wv, map_func=to_zhuyin)
    np.save(ch_phonetic_embedding_path, ch_phonetic_embedding)

ch_phonetic_embedding.shape

(32000, 300)

## Japanese

In [34]:
if jp_phonetic_path.exists():
    jpp_model = Word2Vec.load(str(jp_phonetic_path))
else:
    jpp_model = build_word2vec(jpp_texts, rule=phonetic_rule)
    jpp_model.save(str(jp_phonetic_path))

In [35]:
print(jpp_model.wv.vectors.shape)

(24371, 300)


In [36]:
check_coverage(tokenizer_jp.get_vocab(), jpp_model.wv, map_func=to_hira)

91% (29080/32000) is covered.


In [37]:
if jp_phonetic_embedding_path.exists():
    jp_phonetic_embedding = np.load(jp_phonetic_embedding_path)
else:
    jp_phonetic_embedding = build_embedding_matrix(tokenizer_jp.get_vocab(), jpp_model.wv, map_func=to_hira)
    np.save(jp_phonetic_embedding_path, jp_phonetic_embedding)

jp_phonetic_embedding.shape

(32000, 300)

# Concatenate Embedding

In [38]:
print(ch_semantic_embedding.shape)
print(jp_semantic_embedding.shape)

print(ch_phonetic_embedding.shape)
print(jp_phonetic_embedding.shape)

(32000, 300)
(32000, 300)
(32000, 300)
(32000, 300)


In [39]:
ch_embedding = np.concatenate([ch_semantic_embedding, ch_phonetic_embedding], axis=1)
ch_embedding.shape

(32000, 600)

In [40]:
jp_embedding = np.concatenate([jp_semantic_embedding, jp_phonetic_embedding], axis=1)
jp_embedding.shape

(32000, 600)

In [41]:
np.save(word2vec_path / "ch-embedding-concat.npy", ch_embedding)
np.save(word2vec_path / "jp-embedding-concat.npy", jp_embedding)

# Meta Embedding

In [54]:
beta = 0.3

In [55]:
print(ch_semantic_embedding.shape)
print(jp_semantic_embedding.shape)

print(ch_phonetic_embedding.shape)
print(jp_phonetic_embedding.shape)

(32000, 300)
(32000, 300)
(32000, 300)
(32000, 300)


In [56]:
ch_meta_embedding = np.mean(
    [(2 - (beta * 2)) * ch_semantic_embedding, (beta * 2) * ch_phonetic_embedding],
    axis=0,
)

In [57]:
ch_meta_embedding.shape

(32000, 300)

In [58]:
jp_meta_embedding = np.mean(
    [(2 - (beta * 2)) * jp_semantic_embedding, (beta * 2) * jp_phonetic_embedding],
    axis=0,
)

In [59]:
jp_meta_embedding.shape

(32000, 300)

In [60]:
np.save(word2vec_path / f"ch-embedding-meta-{beta=}.npy", ch_meta_embedding)
np.save(word2vec_path / f"jp-embedding-meta-{beta=}.npy", jp_meta_embedding)