Creating dataset for test

In [None]:
with open("./chinese.zh", 'r', encoding="utf-8") as f:
    src_text = f.read().splitlines()

with open("./english.en", 'r', encoding="utf-8") as f:
    trg_text = f.read().splitlines()

In [None]:
from collections import defaultdict
word_freq = defaultdict(int)
for sent in src_text:
    for word in sent:
        word_freq[word] += 1

In [None]:
len(src_text), len(trg_text)

In [None]:
from collections import defaultdict
# Create dict for text into strokes translation and vice versa
with open("./vocab/zh2letter.txt", 'r', encoding="utf-8") as f:
    conversions = f.read()

conversions = conversions.splitlines()
dic = defaultdict(str)
st2zh = defaultdict(str)
for line in conversions:
    chinese_char, strokes = line.split()
    dic[chinese_char] = strokes
    st2zh[strokes] = chinese_char

Strokify

In [None]:
from functools import partial

def is_chinese(uchar):
    """判断一个unicode是否是汉字"""
    if (uchar >= u'\u4e00') and (uchar <= u'\u9fa5'):
        return True
    else:
        return False

def zh2letter(dictionary, line):
    char_set = set(list(line))
    newline = line
    for char in char_set:
        if is_chinese(char):
            newline = newline.replace(char, ' '+dictionary.get(char, '')+' ')
    return ' '.join(newline.split())+'\n'

In [None]:
sorted_freq = [(w,f) for w,f in sorted(word_freq.items(), key=lambda x: x[1], reverse=True) if is_chinese(w)]

In [None]:
with open("./frequency_news.txt", 'w', encoding='utf-8') as f:
    for word,freq in sorted_freq:
        f.write(f'{word} {freq} \n')

In [None]:
with open("./frequency.txt", 'r', encoding="utf-8") as f:
    iwslt_words = f.read().splitlines()

In [None]:
iwslt_word = []
for word in iwslt_words:
    iwslt_word.append(word.split()[0])

In [None]:
news_words = []
for word,f in sorted_freq:
    news_words.append(word)

In [None]:
len(set(news_words).difference(set(iwslt_word)))

In [None]:
len(set(iwslt_word).difference(set(news_words)))

In [None]:
TYPES = ["zh", "tz"]
NAMES = ["simp", "trad"]
TYPE = 0 # 0 for simplified, 1 for traditional

In [None]:
import opencc
converter = opencc.OpenCC('s2t.json')

In [None]:
from tqdm import tqdm

src = TYPES[TYPE]
trg = "en"

func = partial(zh2letter, dic)
iter = map(func, src_text)

In [None]:
path = f"./data/NIST/{NAMES[TYPE]}/test/news"
split="test"
with open(f"{path}/{split}.{src}-{trg}.{src}", 'w', encoding="utf-8") as f:
    for k in tqdm(iter): f.write(k)

with open(f"{path}/{split}.{src}-{trg}.{trg}", 'w', encoding="utf-8") as f:
    for k in tqdm(trg_text): f.write(f"{k}\n")

Split by length

In [None]:
lens = [len(zh) for zh in src_text]
import numpy as np
for p in [33, 66]:
    print(np.percentile(lens, p))
# Split by length
from collections import defaultdict
sentence_by_length = defaultdict(list)
for idx in range(len(src_text)):
    pair = dict()
    pair["zh"] = src_text[idx]
    pair["en"] = trg_text[idx]
    if len(pair["zh"]) <= 18:
        sentence_by_length["short"].append(pair)
    elif len(pair["zh"]) <= 33:
        sentence_by_length["medium"].append(pair)
    else:
        sentence_by_length["long"].append(pair)
for type, sent in sentence_by_length.items():
    print(type, len(sent))

In [None]:
split="test"

src_text = defaultdict(list)
trg_text = defaultdict(list)
for type, sent in sentence_by_length.items():
    src_text[type] = [pair["zh"] for pair in sent]
    trg_text[type] = [pair["en"] for pair in sent]

In [None]:
import os
path = f"./data/NIST/simp/test/news_sent_sampled"
os.makedirs(path) if not os.path.exists(path) else None

Since theres too many sentences and it takes too long to evaluate all, we only sample out to get an exepected size of 5000

In [None]:
import random

sampled_src_text = defaultdict(list)
sampled_trg_text = defaultdict(list)
for word, pairs in sentence_by_length.items():
    k = 1000
    samples = random.sample(range(0, len(pairs)), k)
    for sample in samples:
        sampled_src_text[word].append(pairs[sample]["zh"])
        sampled_trg_text[word].append(pairs[sample]["en"])

In [None]:
for word in sentence_by_length.keys():
    iter = map(func, sampled_src_text[word])
    with open(f"{path}/{split}-{word}.{src}-{trg}.{src}", 'w', encoding="utf-8") as f:
        for k in tqdm(iter): f.write(k)

    with open(f"{path}/{split}-{word}.{src}-{trg}.{trg}", 'w', encoding="utf-8") as f:
        for k in tqdm(sampled_trg_text[word]): f.write(f"{k}\n")

Strokes

In [None]:
from tqdm import tqdm
from functools import partial


TYPES = ["zh", "tz"]
NAMES = ["simp", "trad"]
TYPE = 0 # 0 for simplified, 1 for traditional

src = TYPES[TYPE]
trg = "en"

func = partial(zh2letter, dic)
iter = map(func, src_text)

strokes = []
for k in tqdm(iter):
    strokes.append(k)

In [None]:
import numpy as np

avg_token_len = []
for sent in strokes:
    words = sent.split(" ")
    stroke_len = [len(word) for word in words]
    avg_token_len.append(np.average(stroke_len))
for p in [33, 66]:
    print(np.percentile(avg_token_len, p))

In [None]:
src_avg_strokes = defaultdict(list)
trg_avg_strokes = defaultdict(list)
for i, l in enumerate(avg_token_len):
    if l <= 6.785714285714286:
        src_avg_strokes["short"].append(src_text[i])
        trg_avg_strokes["short"].append(trg_text[i])
    elif l <= 7.2407407407407405:
        src_avg_strokes["medium"].append(src_text[i])
        trg_avg_strokes["medium"].append(trg_text[i])
    else:
        src_avg_strokes["long"].append(src_text[i])
        trg_avg_strokes["long"].append(trg_text[i])

In [None]:
import random

sampled_src_text = defaultdict(list)
sampled_trg_text = defaultdict(list)
for word, pairs in src_avg_strokes.items():
    k = 1000
    samples = random.sample(range(0, len(pairs)), k)
    for sample in samples:
        sampled_src_text[word].append(src_avg_strokes[word][sample])
        sampled_trg_text[word].append(trg_avg_strokes[word][sample])

In [None]:
for s in sampled_src_text.values():
    print(len(s))

In [None]:
path = f".data/NIST/simp/test/news_stroke"
split="test"
for word in sampled_src_text.keys():
    iter = map(func, sampled_src_text[word])
    with open(f"{path}/{split}-{word}.{src}-{trg}.{src}", 'w', encoding="utf-8") as f:
        for k in tqdm(iter): f.write(k)

    with open(f"{path}/{split}-{word}.{src}-{trg}.{trg}", 'w', encoding="utf-8") as f:
        for k in tqdm(sampled_trg_text[word]): f.write(f"{k}\n")