In [4]:
# ディープラーニングを使ったかな漢字変換モデル
# @yoh_okunoさんのモデルをtf.keras＆jupyterで動くようにしたものです。
# tf.keras化の過程で若干オリジナルと異なる部分が出ているかもしれません。
# オリジナルのモデル:
# https://github.com/yohokuno/neural_ime

In [5]:
# prepare dataset

from utility import load_train_data
from collections import Counter
import os
import numpy as np


In [6]:
# import tensorflow
import tensorflow as tf
tf.enable_eager_execution()

In [7]:
# import ramdon for reproducability
import random as rn

np.random.seed(42)
rn.seed(12345)
tf.set_random_seed(1234)

In [8]:
# prepare model

def gru(units, backword_flg):
    # CuDNNGRU does not compabible with GRU for tensorflow 1.12
    return tf.keras.layers.GRU(units, 
                               return_sequences=True, 
                               return_state=True, 
                               recurrent_activation='sigmoid', 
                               recurrent_initializer='glorot_uniform',
                               go_backwards=backword_flg)
    
    
class KanaKanjiModel(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, rnn_units, batch_size):
        super(KanaKanjiModel, self).__init__()
        self.batch_size = batch_size
        self.rnn_units = rnn_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru_f = gru(self.rnn_units, False)
        self.gru_b = gru(self.rnn_units, True)
        self.vocab_size = vocab_size
        self.fc = tf.keras.layers.Dense(vocab_size)
        
    def call(self, x, hidden, training):
        x = self.embedding(x)
        output, state = self.gru_f(x, initial_state = hidden)
        output = tf.keras.layers.Dropout(0.5)(output, training=training)
        output = self.fc(output)
        
        return output, state
    
    def initialize_hidden_state(self):
        return tf.zeros((self.batch_size, self.rnn_units))


In [9]:
# rebuild moel
hidden_size = 400
embedding_dim = hidden_size
vocabulary_size = 50000

model = KanaKanjiModel(vocabulary_size, embedding_dim, hidden_size, 1)

In [10]:
optimizer3 = tf.train.AdamOptimizer()

In [11]:
# load model

# load weight
checkpoint_dir = './ck_20190209'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(optimizer=optimizer3,
                                 model=model)

status = checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))
# status = checkpoint.restore("./training_checkpoints_ver4/ckpt-3")
print("status: ", status)

status:  <tensorflow.python.training.checkpointable.util.CheckpointLoadStatus object at 0x134121f28>


In [12]:
tf.train.latest_checkpoint(checkpoint_dir)

'./ck_20190209/ckpt-5'

In [13]:
# this is based on decode.py of the original scripts

import collections
import heapq
import operator

hiragana_added = []

def load_dictionary(model_directory):
    vocabulary_path = os.path.join(model_directory, 'vocabulary.txt')
    vocabulary = []
    for line in open(vocabulary_path):
        line = line.rstrip('\n')
        target, source = line.split('/', 1)
        vocabulary.append((target, source))

    dictionary = collections.defaultdict(list)
    for i, (target, source) in enumerate(vocabulary):
        dictionary[source].append((target, i))

    return dictionary

def create_lattice(input_, dictionary):
    lattice = [[[] for _ in range(len(input_) + 1)] for _ in range(len(input_) + 2)]
    _, unk_id = dictionary['_UNK'][0]

    for i in range(1, len(input_) + 1):
        for j in range(i):
            key = input_[j:i]
            if key in dictionary:
                for target, word_id in dictionary[key]:
                    lattice[i][j].append((target, word_id))
            elif len(key) == 1:
                # Create _UNK node with verbatim target when single character key is not found in the dictionary.
                lattice[i][j].append((key, unk_id))

    _, eos_id = dictionary['_EOS'][0]
    lattice[-1][-1].append(('', eos_id))
    return lattice


def initialize_queues(lattice, rnn_predictor, dictionary):
    # Initialize priority queues for keeping hypotheses
    # A hypothesis is a tuple of (cost, string, state, prediction)
    # cost is total negative log probability
    # state.shape == [hidden_size * layer_size]
    # prediction.shape == [vocabulary_size]
    
    hiragana_added = []
    
    hidden_f = rnn_predictor.initialize_hidden_state()
    hidden_b = hidden_f
    
    _, bos_id = dictionary['_BOS'][0]
    
    input_x0 = tf.expand_dims([bos_id], 0)
    
    bos_predictions, hidden_f = rnn_predictor(input_x0, hidden_f, False)
    
    
    # logits to probability
    bos_predictions = tf.squeeze(bos_predictions, 0)
    bos_predictions = tf.squeeze(bos_predictions, 0)
    bos_predictions = -1 * tf.nn.log_softmax(bos_predictions, axis=0)
    
    hidden_f = tf.expand_dims(hidden_f, 0)
    
    bos_hypothesis = (0.0, '', hidden_f[0], bos_predictions)
    queues = [[] for _ in range(len(lattice))]
    queues[0].append(bos_hypothesis)
    return queues

def search(lattice, queues, rnn_predictor, beam_size, viterbi_size):
    # Breadth first search with beam pruning and viterbi-like pruning
    for i in range(len(lattice)):
        queue = []

        # create hypotheses without predicting next word
        for j in range(len(lattice[i])):
            for target, word_id in lattice[i][j]:
                
                # if word_id == 350:
                #    print("word_id: ", word_id)
                
                word_queue = []
                for previous_cost, previous_string, previous_state_f, previous_prediction in queues[j]:
                    # if logits is bigger, better.
                    
                    # seems to need give huge priority to first word
                    cost = previous_cost + previous_prediction[word_id]
                    
                    string = previous_string + target
                    hypothesis = (cost, string, word_id, previous_state_f)
                    word_queue.append(hypothesis)
                
                # prune word_queue to viterbi size
                if viterbi_size > 0:
                    word_queue = heapq.nsmallest(viterbi_size, word_queue, key=operator.itemgetter(0))
                    
                queue += word_queue
                
        # prune queue to beam size
        if beam_size > 0:
            queue = heapq.nsmallest(beam_size, queue, key=operator.itemgetter(0))
        
        # predict next word and state before continue
        for cost, string, word_id, previous_state_f in queue:
            
            input_x0 = tf.expand_dims([word_id], 0)
              
            predictions, state_f = rnn_predictor(input_x0, [previous_state_f], False)
            
            # logits to probability
            predictions = tf.squeeze(predictions, 0)
            predictions = tf.squeeze(predictions, 0)
            predictions = -1 * tf.nn.log_softmax(predictions, axis=0)
            # print("predictions.shape: ", predictions.shape)
        
            state_f = tf.expand_dims(state_f, 0)
            hypothesis = (cost, string, state_f[0], predictions)
            queues[i].append(hypothesis)

    return queues

def decode(source, dictionary, rnn_predictor, beam_size, viterbi_size):
    lattice = create_lattice(source, dictionary)
    queues = initialize_queues(lattice, rnn_predictor, dictionary)
    queues = search(lattice, queues, rnn_predictor, beam_size, viterbi_size)

    candidates = []
    for cost, string, _, _ in queues[-1]:
        candidates.append((string, cost))

    top_result = candidates[0][0]
    return top_result, candidates, lattice, queues


In [18]:
def convert_kana_to_kanji(line, rnn_predictor):
    
    # Load settings and vocabulary
    model_directory = "models"
    dictionary = load_dictionary(model_directory)
    
    # debug
    print_nbest = True 
    print_lattice = False
    print_queue = False
    
    # parameters
    beam_size = 5
    viterbi_size = 50000
    
    
    # Iterate input file line by line
    line = line.rstrip('\n')

    # Decode - this might take ~10 seconds per line
    result, candidates, lattice, queues = decode(line, dictionary, rnn_predictor, beam_size, viterbi_size)

    # Print decoded results
    if not print_nbest:
        print(result)
    else:
        for string, cost in candidates:
            print(string, cost)

    # Print lattice for debug
    if print_lattice:
        for i in range(len(lattice)):
            for j in range(len(lattice[i])):
                print('i = {}, j = {}'.format(i, j))
                for target, word_id in lattice[i][j]:
                    print(target, word_id)

    # Print queues for debug
    if print_queue:
        for i, queue in enumerate(queues):
            print('queue', i)
            for cost, string, state_f, prediction in queue:
                # print(string, cost)
                print(string, cost)

In [19]:
# test the model
convert_kana_to_kanji("ぱりのれきし", model)

パリの歴史 tf.Tensor(24.277733, shape=(), dtype=float32)
パリの歴誌 tf.Tensor(37.341175, shape=(), dtype=float32)
パリの歴し tf.Tensor(38.27497, shape=(), dtype=float32)
パリの歴史 tf.Tensor(38.383877, shape=(), dtype=float32)
パリノ歴史 tf.Tensor(38.963135, shape=(), dtype=float32)
