In [None]:
# 预测下一个汉字。 
# 1. one word in, one word out 
# 2. mulitple word in, one word out.
# 通过model的性能可以看出来，two word in 优于 one word in
# 光使用10000条公交站数据，one word能到60%，two word能到66%。使用普通数据10000条，one word 26%, two word 34%，
# 说明ngram确实是work的，另外，提高epoch和增加训练数据可以提升。

from numpy import array

from tensorflow.python.keras import backend as k
from keras.preprocessing.text import Tokenizer
from keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Activation, Dropout, Dense, LSTM, Embedding

import itertools
import sys
sys.path.append("../common/")
import chinese_preprocess


def find_top_n_in_prob_array(prob_array, topn=5):
    idx = (-prob_array).argsort()[:topn]
    return idx
    
def map_top_n_int_to_word(index_array, tokenizer, y_distribution):
    int_set = set(index_array)
    result = []
    for word, index in tokenizer.word_index.items():
        if index in int_set:
            result.append((word, y_distribution[index]))
    return result

def find_topn_prob_word(y_distribution, model, tokenizer):
    prob_array = y_distribution[0]
    idx = find_top_n_in_prob_array(prob_array)
    # print(idx)
    topn_word = map_top_n_int_to_word(idx, tokenizer, prob_array)
    topn_word.sort(key=lambda x: x[1], reverse=True)
    return topn_word

# 还输出各个字的预测概率
def generate_one_word_with_prob(model, tokenizer, word, length=1):
    in_text = word
    encoded = tokenizer.texts_to_sequences([in_text])[0]
    encoded = encoded[-length:]
    encoded = [encoded]
    y_distribution = model.predict(encoded, verbose=0)
    print("input {}".format(word))
    topn_word = find_topn_prob_word(y_distribution, model, tokenizer)
    print(topn_word)
 
def map_yaht_to_word(yhat, tokenizer):
    for word, index in tokenizer.word_index.items():
        if index == yhat:
            return word
    return ''

# 实际上可以连续预测多个词
# 实际中预测的后面的词就不太靠谱了 
def generate_seq_with_one_word(model, tokenizer, seed_text, n_words):
    in_text, result = seed_text, seed_text
        # generate a fixed number of words
    for _ in range(n_words):
        # encode the text as integer
        encoded = tokenizer.texts_to_sequences([in_text])[0]
        encoded = array(encoded)
        # predict a word in the vocabulary
        yhat = model.predict_classes(encoded, verbose=0)
        y_distribution = model.predict(encoded, verbose=0)
        # map predicted word index to word
        out_word = ''
        for word, index in tokenizer.word_index.items():
            if index == yhat:
                out_word = word
                break
        # append to input
        in_text, result = out_word, result + ' ' + out_word
    return result

# 实际上可以连续预测多个词
def generate_seq_with_multiple_word(model, tokenizer, seed_text, n_words, length):
    in_text, result = seed_text, seed_text
    encoded = tokenizer.texts_to_sequences([in_text])[0]
    encoded = encoded[-length:]
    seed_text_length = len(encoded)
    encoded = [encoded]
    if (seed_text_length < length):
        print("context length is smaller than required length")
        return
    for x in range(n_words):
        yhat = model.predict_classes(encoded, verbose=0)
        # 输出的class映射回word
        out_word = ''
        for word, index in tokenizer.word_index.items():
            if index == yhat:
                out_word = word
                break
        # append to input
        in_text, result = out_word, result + ' ' + out_word
        # 改变输入变量
        encoded[0].pop(0)
        new_encode = tokenizer.texts_to_sequences([in_text])[0]
        encoded[0].extend(new_encode)
    return result

# 构造one-word-in, one-word-out的神经网络的输入
def generate_word_pair(encoded):
    sequences = list()
    for i in encoded:
        tuple_temp = tuple(i)
        for pair in list(zip(i, i[1:])):
            sequences.append(pair)
    print('Total Sequences: %d' % len(sequences))
    return sequences

# 构造多个word的神经网络输入
# encoded = [[1,2,3], [4,5,6]] length = 2
# [1,2] [2,3] [4,5] [5,6]
def generate_multiple_word_seq(encoded, length=1):
    sequences = list()
    for array in encoded:
        # [95, 4, 233, 37, 2, 3, 1]
        for idx, val in enumerate(array):
            seq = []
            if idx + length < len(array):
                seq = array[idx:idx + length + 1]
                sequences.append(seq) 
            else:
                break
    print('Total Sequences: %d' % len(sequences))
    return sequences    

# 生成X = [word_int1, word_int2, ...] -> word_int
def gen_X_y(encoded, length=1):
    word_seq = generate_multiple_word_seq(encoded, length)
    sequences = array(word_seq)
    X, y = sequences[:,0:-1],sequences[:,-1]
    # 为什么是one hot的方式
    y = to_categorical(y, num_classes=vocab_size)
    return X, y

def build_and_train_model(vocab_size, length, X, y):
    model = Sequential()
    model.add(Embedding(vocab_size, 10, input_length=length))
    model.add(LSTM(50))
    model.add(Dense(vocab_size, activation='softmax'))
    print(model.summary())
    # compile network
    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    # fit network
    model.fit(X, y, epochs=20, verbose=1)
    return model

def divide_chunks(l, n): 
    # looping till length l 
    for i in range(0, len(l), n):  
        yield l[i:i + n] 
    

In [None]:
# 预处理是一样的
chinese_poi_name = chinese_preprocess.load_chinese_and_seg("./input_data/test_10000")
print(chinese_poi_name[0:2])
tokenizer, encoded, vocab_size = chinese_preprocess.encode_chinese_to_int(chinese_poi_name)
print(encoded[0:2])
print("vocab_size : {}".format(vocab_size))
print(chinese_poi_name[0])
print(tokenizer.texts_to_sequences([chinese_poi_name[0]]))
# tokenizer.word_index

In [None]:
length_one = 1
X1, y1 = gen_X_y(encoded, length_one)
model1 = build_and_train_model(vocab_size, length_one, X1, y1)

generate_one_word_with_prob(model1, tokenizer, "公")
generate_one_word_with_prob(model1, tokenizer, "机")
generate_one_word_with_prob(model1, tokenizer, "阳")

# 测试效果 one word in
test_word1 = []
test_word1.append("公")
test_word1.append("机")
test_word1.append("阳")
for word in test_word1:
    print(word)
    print(generate_seq_with_one_word(model1, tokenizer, word, 1))

In [None]:
length_two = 2
X2, y2 = gen_X_y(encoded, length_two)
model2 = build_and_train_model(vocab_size, length_two, X2, y2)

In [None]:
print(tokenizer.texts_to_sequences(["公 安"]))

generate_one_word_with_prob(model2, tokenizer, "公 安", 2)
generate_one_word_with_prob(model2, tokenizer, "机 关", 2)
generate_one_word_with_prob(model2, tokenizer, "阳 光", 2)


# 测试效果 multiple word in 
test_word2 = []
test_word2.append("公 安")
test_word2.append("机 关")
test_word2.append("阳 光")

for word in test_word2:
    print(word)
    print(generate_seq_with_multiple_word(model2, tokenizer, word, 1, 2))

In [None]:
model1.summary()

In [None]:
def model_details(model):
    model.summary()
    model.get_config()
    
def model_layer_information(model, layer_index):
    print(model.layers[layer_index].input_shape)
    print(model.layers[layer_index].output_shape)
