In [1]:
import re
from tqdm import tqdm
import pickle
from functools import reduce
import mafan
from mafan import text
import itertools
bos = " <bos> "
eos = " <eos> "

# Tokenizer Functions

## Sentence Tokenizer

In [None]:
def zng(paragraph):
    for sent in re.findall(u'[^!?。\.\!\?]+[!?。\.\!\?]?', paragraph, flags=re.U):
        yield sent

## Simplified Chinese Tokenizer

Below is the code for simplified to traditional mapping dictionary.

We have a large dictionary *conversions.txt* that includes words, characters, common phrases, locations and idioms. Each entry contains the traditional chinese word and simplified chinese word.

In [None]:
infile = open("conversions.txt", "r+", encoding="utf-8")

s2t_dict = dict()

for line in infile:
    line = line.rstrip()
    arr = line.split()
    trad = arr[0]
    sim = arr[1]
    if sim not in s2t_dict:
        s2t_dict[sim] = [trad]
    else:
        s2t_dict[sim].append(trad)
s2t_dict['-'] = ['-']

Tokeniser is used for identifying dictionary words and phrases in the input sentence. We always prefer longer phrases because it gives more meaning and less translation mappings. Hence we use Byte Pair Encoding (BPE) for identifying words, while BPE candidates are constrained by the defined list of vocabs in the dictionary. Since the longest phrase in the dictionary has 8 characters we start with 8-character phrases and do it backwards.

In [None]:
def tokenizer(sentence, n = 8):
    '''
    This function tokenizes input sentences according to the dicitionary.
    Input: a sentence or paragraph
    Output: a list of tokens from the input in order according to the original paragraph; a list of non-chinese characters from the original text.
    '''
    text, charList = prepare(sentence)
    token_list = []
    input_text = text
    for k in range(n, 0, -1):
        candidates = [input_text[i:i + k] for i in range(len(input_text) - k + 1)]
        for candidate in candidates:
            if candidate in s2t_dict:
                token_list.append(candidate)
                input_text = re.sub(candidate, '', input_text)
    final = sequencer(token_list, text)
    return final, charList

In [None]:
def output_list(sentence_list, char_list):
    count = 0
    original = [] # sentence we want to output
    
    for word in sentence_list:
        if "-" in word:
            original.append(list(char_list[count]))
            count += 1
        else:
            original.append(word)
    return original

In [None]:
def output(sentence, char_list):
    count = 0
    original = "" # sentence we want to output

    for char in list(sentence):
        if char == "-":
            original += char_list[count] # append character if non-chinese
            count += 1
        else:
            original += char # append chinese
    return original

In [None]:
def prepare(sentence):
    new = "" # input to your tokenizer
    char_list = [] # punct / english to be omitted

    for char in list(sentence):
        if text.identify(char) is mafan.NEITHER:
            new += "-" # sub - with non-chinese chars
            char_list.append(char)
        else:
            new += char

    return new, char_list

In [None]:
def sequencer(tokens, example):

    flags = [1] * len(example)
    sequence = []
    for token in tokens:
        for match in re.finditer(token, example):
            location = (token, match.span()[0], match.span()[1])
            valid = reduce(lambda x,y:x*y, flags[location[1]:location[2]])
            if valid:
                sequence.append(location)
                for i in range(location[1], location[2]):
                    flags[i] = 0
            else:
                continue
    sequence.sort(key=lambda x: x[1])
    result = [x[0] for x in sequence]
    return result

## Corpus Preparation

First, we need to prepare our corpus.
1. We will add paddings (sentinels) to our sentences.
2. Take one sentence at a time.
3. Change non-chinese words to FW to avoid data explosion.
4. Slice the n-grams and add them to dictionary

In [None]:
def add_stuff(order):
    '''
    This function divides the corpus into n-grams and stores them in dictionary.
    Input: order of n-gram (like 2 for bi-gram)
    Output: none
    '''
    infile = open("hk-zh.txt", "r+") # this contains our corpus
    start_padding = bos * order # add padding
    end_padding = eos * order

    for line in tqdm(infile, total=1314726):
        line = line.rstrip()
        sentences = list(zng(line)) # tokenize sentence by sentence
        for sentence in sentences:
            candidate = start_padding + sentence + end_padding # form sentence
            word_list = candidate.split()
            word_list_tokens = []
            for word in word_list:
                if not(bool(re.match('^[a-zA-Z0-9]+$', word))):
                    word_list_tokens.append(word) # add if not chinese
                else:
                    word_list_tokens.append("FW") # turn non-chinese (except punc) to FW
            word_list = word_list_tokens
            ordered = [word_list[i:i + order] for i in range(1, len(word_list) - order)] # extract n-grams through slicing
            # for each ngram, convert to tuple and add to dictionary
            for ngram in ordered:
                ngram = tuple(ngram)
                if ngram not in corpus:
                    corpus[ngram] = 1
                else:
                    corpus[ngram] += 1

Let's say you want to extract till trigrams.

We want to do 3 iterations, for trigram, bi-gram and then unigram. Each iteration takes 2 minutes. This is only time-consuming part of this code. Once you prep the dictionary, you don't need to do this again.

In [None]:
corpus = dict()
# start_order = 2
# for i in range(start_order, 0, -1):
#     add_stuff(i)

Once you made the dictionary, dump it into a pickle.

In [None]:
# import pickle
# with open('corpus.pkl', 'wb') as handle:
#     pickle.dump(corpus, handle)

Here's a way to load a pickle so you don't need to process data everytime.

In [None]:
with open('corpus.pkl', 'rb') as fp:
    corpus = pickle.load(fp)

# Making Candidate Lists

1. Tokenize the input.
2. Check the mappings of each input.
3. Add all possible mappings to candidate list.

In [None]:
def convert(sentence):
    '''
    Returns list of possible mappings.
    Input: Simplified chinese sentence
    Output: List of lists. Each list has a set of possible traditional chinese tokens
    '''
    tokens, char_list = tokenizer(sentence)
    candidate_list = []
    for token in tokens:
        candidate_list.append(s2t_dict[token])
    candidate_list = output_list(candidate_list, char_list)
    return(candidate_list)

# Maximum log-likelihood calculations

In [None]:
num_tokens = 4526000 # total number of tokens in corpus

def prob(word_list):
    '''
    Computes the log likelihood probability.
    Input: A sequence of words in form of list
    Output: Log probabilties
    '''
    word_list = tuple(word_list) # change word list to tuple
    if word_list in corpus:
        # word found in dictionary
        numerator = corpus[word_list] # get the frequency of that word list
        denominator = num_tokens # let denominator be num tokens
        # cutoff the last word and check whether it's in corpus
        if len(word_list[:-1]) > 1 and word_list[:-1] in corpus:
            denom_list = word_list[:-1]
            denominator = corpus[denom_list]
        return log(numerator / denominator) # log of prob
    else:
        word_list = list(word_list) # convert it back to list
        k = len(word_list) - 1 # backoff, reduce n gram length
        if k > 0:
            # recursive function, divide the sequence into smaller n and find probs
            probs = [prob(word_list[i:i + k]) for i in range(len(word_list) - k + 1)]
            return sum(probs)
        else:
            # we found an unseen word
            if not(bool(re.match('^[a-zA-Z0-9]+$', word_list[0]))):
                return log(1 / num_tokens) # return a small probability
            else:
                return prob(["FW"]) # we encountered a non-chinese word

# Backoff Language Model

In [None]:
from math import log
def backoff(sentence, order):
    '''
    Calcuates log likelihood using backoff language model
    Input: Sentence and order of the n-gram
    Output: Log prob of that sentence
    '''
    score = 0
    sentences = list(zng(sentence)) # sentence tokenizer
    for sentence in sentences:
        start_padding = bos * order # beginning padding
        end_padding = eos * order # ending padding
        candidate = start_padding + sentence + end_padding # add paddings
        word_list = candidate.split()
        word_list_tokens = []
        for word in word_list:
            # append only non-chinese words
            if not(bool(re.match('^[a-zA-Z0-9]+$', word))):
                word_list_tokens.append(word)
            else:
                word_list_tokens.append("FW")
        word_list = word_list_tokens
        ordered = [word_list[i:i + order] for i in range(1, len(word_list) - order)] # shingle into n-grams
        probs = [prob(x) for x in ordered] # calculate probabilities
        score += sum(probs) # final answer
    return score

# Translator

In [None]:
def translate(sentence):
    '''
    Translate a given sentence to traditional
    Input: Simplified Sentence
    Output: Traditional Sentence
    '''
    candidates = convert(sentence) # get the candidate lists
    final_sent = ""
    for words in candidates:
        if len(words) > 1:
            # many to one mappings
            score = -50000.0 # start with extreme negative value
            likely = ""
            for candidate in words:
                temp = final_sent
                temp = temp + " "  + candidate # add a candidate to temp sentence
                current_score = backoff(temp, 2) # check perplexity
                if current_score > score:
                    # if performing good, include that
                    score = current_score
                    likely = candidate
            final_sent = final_sent + " " + likely
        else:
            final_sent = final_sent + " " + words[0]
    final_sent = final_sent.replace(" ", "")
    final_sent = add_back_spaces(sentence, final_sent)
    return final_sent

In [None]:
def add_back_spaces(original, current):
    current_list = list(current)
    original_list = list(original)
    count = 1
    for index, char in enumerate(original_list):
        if char == " ":
            current_list[index - count] += " "
            count += 1
    current = "".join(current_list)
    return current

In [None]:
sentence = "早在23岁，伍兹就参与了世界上首个核反应堆Chicago Pile-1的建设，她是导师费米领导的项目团队中最年轻的一员。此外，伍兹在建立和使用实验所需的盖革计数器上起到关键作用。反应堆成功运转并达到自持状态时，她也是唯一在场的女性。曼哈顿计划中，她与费米合作；同时，她曾与第一任丈夫约翰·马歇尔（John Marshall）一同解决了汉福德区钚生产厂氙中毒的问题，并负责监督钚生产反应炉的建造和运行。"
a = translate(sentence)