In [None]:
import os 
import numpy as np
import cv2
from glob import glob
from PIL import Image
from keras import backend as K
import imutils
import gc
from sklearn.utils import shuffle
from os.path import join
from keras.utils.np_utils import *
import keras
import random
import math
import pickle
minibatch_size = 8

In [None]:
# 斷詞
vocab_200 = open('./split_data/words.txt', encoding='UTF-8-sig')
split_word = []
for line in vocab_200:
    line = line.strip('\n')
    split_word.append(line)

In [None]:
# 做字典
from keras.preprocessing.text import Tokenizer

tok_path = join('split_data', 'vocabulary_tok.pickle')

# saving
if not os.path.exists(tok_path):
    tok = Tokenizer(char_level=False)
    tok.fit_on_texts(split_word)
    with open(tok_path, 'wb') as handle:
        pickle.dump(tok, handle, protocol = pickle.HIGHEST_PROTOCOL)
        print('create tok')
# loading
else:
    with open(tok_path, 'rb') as handle:
        tok = pickle.load(handle)
        print('load tok')

In [None]:
print(len(tok.word_index))          # 詞彙的個數    1~200

In [None]:
# for ii,iterm in enumerate(tok.word_index.items()):
#     print(iterm)

In [None]:
def labels_to_text(label):                       # label is list
    words = tok.sequences_to_texts([[label[0]+1]])
    text = words[0]
    return(text)

def text_to_labels(text):                        # text is string
    seq = tok.texts_to_sequences([text])
    seq = seq[0][0] - 1
    return(seq)

In [None]:
image_train = np.load('./split_data/image_train.npy')
image_val = np.load('./split_data/image_val.npy')

f_val = open('./split_data/txt_val.txt', encoding='UTF-8-sig')
txt_val = []
for line in f_val:
    line = line.strip('\n')
    txt_val.append(line)
    
f_train = open('./split_data/txt_train.txt', encoding='UTF-8-sig')
txt_train = []
for line in f_train:
    line = line.strip('\n')
    txt_train.append(line)
    
txt_train = np.array(txt_train)
txt_val = np.array(txt_val)

print(image_train.shape, len(txt_train))
print(image_val.shape, len(txt_val))

In [None]:
# f_train = open('./split_data/words.txt', encoding='UTF-8-sig')
# txt_train = []
# for line in f_train:
#     line = line.strip('\n')
#     txt_train.append(line)

In [None]:
# train_label = []
# for i in txt_train:
#     a = text_to_labels(i)
#     train_label.append(a)
# len(train_label)

In [None]:
# train_labels = to_categorical(train_label, 200)

# # 由one-hot转换为普通np数组
# data = [np.argmax(one_hot)for one_hot in train_labels]
# data

In [None]:
# result = []
# for i in train_label:
#     a = labels_to_text([i])
#     result.append(a)
# len(result)

In [None]:
class TrainDataGenerator(keras.utils.Sequence):
    def __init__(self, shuffle = True):
        self.indexes = np.arange(len(txt_train))
        self.shuffle = True
        self.batch_size = minibatch_size
        
    def __len__(self):
        return int(np.ceil(len(image_train) / minibatch_size))
    
    def __getitem__(self, index):
        # 生成batch_size个索引
        batch_indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]
        
        if (len(batch_indexes) !=  minibatch_size):
            self.batch_size = len(batch_indexes)
        else:
            self.batch_size = minibatch_size
        
        batch_data = []
        batch_data = image_train[batch_indexes]

        batch_label = []
        for index1 in txt_train[batch_indexes]:
            temp = text_to_labels(index1)
            batch_label.append(temp)
        batch_label = np.array(batch_label)
        batch_label = to_categorical(batch_label, 200)
        
        # 畫素資料浮點化以便歸一化
        batch_data = batch_data.astype('float32')
        batch_data /= 255
        
#         inputs = {'the_input': batch_data,
#                   'the_labels': batch_label,}
#         outputs = {'ctc': np.zeros([self.batch_size])}
        
        return (batch_data, batch_label)
    
    def on_epoch_end(self):
        #在每一次epoch结束是否需要进行一次随机，重新随机一下index
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

In [None]:
class ValDataGenerator(keras.utils.Sequence):
    def __init__(self, shuffle = True):
        self.indexes = np.arange(len(txt_val))
        self.shuffle = True
        self.batch_size = minibatch_size
        
    def __len__(self):
        return int(np.ceil(len(image_val) / minibatch_size))
    
    def __getitem__(self, index):
        # 生成batch_size个索引
        batch_indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]
        
        if (len(batch_indexes) !=  minibatch_size):
            self.batch_size = len(batch_indexes)
        else:
            self.batch_size = minibatch_size
        GRU
        batch_data = []
        batch_data = image_val[batch_indexes]

        batch_label = []
        for index1 in txt_val[batch_indexes]:
            temp = text_to_labels(index1)
            batch_label.append(temp)
        batch_label = np.array(batch_label)
        batch_label = to_categorical(batch_label, 200)
        
        # 畫素資料浮點化以便歸一化
        batch_data = batch_data.astype('float32')
        batch_data /= 255
        
#         inputs = {'the_input': batch_data,
#                   'the_labels': batch_label,}
#         outputs = {'ctc': np.zeros([self.batch_size])}
        
        return (batch_data, batch_label)
    
    def on_epoch_end(self):
        #在每一次epoch结束是否需要进行一次随机，重新随机一下index
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

# model

In [None]:
from keras import models, layers
from keras.models import Sequential, Model
from keras.layers import Dense, Dropout, Activation, Flatten, SpatialDropout3D
from keras.layers import Convolution3D, MaxPooling3D
from keras.layers.convolutional import Conv3D, ZeroPadding3D
from keras.layers.wrappers import Bidirectional, TimeDistributed
from keras.layers.normalization import BatchNormalization
from keras.layers.recurrent import GRU
from keras.layers.core import Lambda
from keras.layers import Input
from keras.optimizers import Adam
os.environ["CUDA_VISIBLE_DEVICES"] = "1"
import warnings
warnings.filterwarnings('ignore')
warnings.simplefilter('ignore')
from multiprocessing import set_start_method, Pool
set_start_method('forkserver')
from Word_Error_Rate import WordErrorRate
from wer import *
from decoders import Decoder
# from error_rates import ErrorRates
# out_size = len(OneWord)+1               # add ctc blank char

In [None]:
class MODEL(object):
    def __init__(self, img_c=3, img_w=100, img_h=50, frames_n=77, output_size=len(tok.word_index)):
        self.img_c = img_c
        self.img_w = img_w
        self.img_h = img_h
        self.frames_n = frames_n
#         self.absolute_max_string_len = absolute_max_string_len
        self.output_size = output_size
        self.build()
    
    def build(self):
        self.input_data = Input(name='the_input', shape=(77,50,100,3), dtype='float32')
        
        self.zero1 = ZeroPadding3D(padding=(1, 2, 2), name='zero1')(self.input_data)
        self.conv1 = Conv3D(32, (3, 5, 5), strides=(1, 2, 2), kernel_initializer='he_normal', name='conv1')(self.zero1)
        self.batc1 = BatchNormalization(name='batc1')(self.conv1)
        self.actv1 = Activation('relu', name='actv1')(self.batc1)
        self.drop1 = SpatialDropout3D(0.5)(self.actv1)
        self.maxp1 = MaxPooling3D(pool_size=(1, 2, 2), strides=(1, 2, 2), name='max1')(self.drop1)

        self.zero2 = ZeroPadding3D(padding=(1, 2, 2), name='zero2')(self.maxp1)
        self.conv2 = Conv3D(64, (3, 5, 5), strides=(1, 1, 1), kernel_initializer='he_normal', name='conv2')(self.zero2)
        self.batc2 = BatchNormalization(name='batc2')(self.conv2)
        self.actv2 = Activation('relu', name='actv2')(self.batc2)
        self.drop2 = SpatialDropout3D(0.5)(self.actv2)
        self.maxp2 = MaxPooling3D(pool_size=(1, 2, 2), strides=(1, 2, 2), name='max2')(self.drop2)

        self.zero3 = ZeroPadding3D(padding=(1, 1, 1), name='zero3')(self.maxp2)
        self.conv3 = Conv3D(96, (3, 3, 3), strides=(1, 1, 1), kernel_initializer='he_normal', name='conv3')(self.zero3)
        self.batc3 = BatchNormalization(name='batc3')(self.conv3)
        self.actv3 = Activation('relu', name='actv3')(self.batc3)
        self.drop3 = SpatialDropout3D(0.5)(self.actv3)
        self.maxp3 = MaxPooling3D(pool_size=(1, 2, 2), strides=(1, 2, 2), name='max3')(self.drop3)

        self.resh1 = TimeDistributed(Flatten())(self.maxp3)

        self.gru_1 = Bidirectional(GRU(256, return_sequences=True, kernel_initializer='Orthogonal', name='gru1'), merge_mode='concat')(self.resh1)
        self.gru_2 = Bidirectional(GRU(256, return_sequences=True, kernel_initializer='Orthogonal', name='gru2'), merge_mode='concat')(self.gru_1)

        self.resh2 = Flatten()(self.gru_2)
        
        # transforms RNN output to character activations:
        self.dense1 = Dense(self.output_size, kernel_initializer='he_normal', name='dense1')(self.resh2)

        self.y_pred = Activation('softmax', name='softmax')(self.dense1)

#         self.labels = Input(name='the_labels', shape=[self.absolute_max_string_len], dtype='float32')
#         self.input_length = Input(name='input_length', shape=[1], dtype='int64')
#         self.label_length = Input(name='label_length', shape=[1], dtype='int64')

#         self.loss_out = CTC('ctc', [self.y_pred, self.labels, self.input_length, self.label_length])

        self.model = Model(inputs = self.input_data, outputs = self.y_pred)
        
    def summary(self):
        Model(inputs=self.input_data, outputs=self.y_pred).summary()

#     def predict(self, input_batch):
#         return self.test_function([input_batch, 0])[0]  # the first 0 indicates test

#     @property
#     def test_function(self):
#         # captures output of softmax so we can decode the output during visualization
#         return K.function([self.input_data, K.learning_phase()], [self.y_pred])

In [None]:
# from keras.layers.wrappers import Bidirectional, TimeDistributed
# help(TimeDistributed)

In [None]:
# output_size = 200
model = MODEL(img_c=3,img_w=100,img_h=50,frames_n=77,output_size=len(tok.word_index))
model.summary()

In [None]:
train_generator = TrainDataGenerator()
val_generator = ValDataGenerator()

In [None]:
# adam = Adam(lr=0.0001, beta_1=0.9, beta_2=0.999, epsilon=1e-08)
model.model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
# MODEL.model.compile(loss={'ctc': lambda y_true, y_pred: y_pred}, optimizer = adam)

In [None]:
# 恢复模型结构及权重
model.model.load_weights('./weight/lipnet_50_vocab.h5')

In [None]:
# decoder = Decoder(greedy=True,beam_width=100,postprocessors=[labels_to_text])
# error_rates = ErrorRates(lipnet, val_generator, decoder, 256)
# WordError_Rate = WordErrorRate(lipnet, val_generator, decoder, minibatch_size)

In [None]:
--------------------------------------------------------------------------------------------------------------

In [None]:
model.model.fit_generator(generator = train_generator,
                           validation_data = val_generator,
                           epochs = 100,                 
#                            callbacks = [WordError_Rate],
                           verbose = 1,
                           max_q_size = 5,
                           shuffle = True
#                            workers = 2,
#                            pickle_safe=True,
#                            use_multiprocessing = True
                          )

In [None]:
--------------------------------------------------------------------------------------------------------------

In [None]:
# 保存模型结构及权重
model.model.save('./weight/lipnet_150_vocab.h5')

# predict

In [None]:
# 恢复模型结构及权重
# lipnet.model.load_weights('./weight/368-overlap-6.h5')
lipnet.model.load_weights('./weight/lipnet_1000_vocab.h5')

In [None]:
initial = 0             # 0, 120, 240
pred_batch = 150
def predicts(data, input_length):
    batch_data = []
    batch_data = data[initial:initial + pred_batch]
    
    # 畫素資料浮點化以便歸一化
    batch_data = batch_data.astype('float32')
    batch_data /= 255
    
    batch_input_length = []
    batch_input_length = input_length[initial:initial + pred_batch]
    
    return (batch_data, batch_input_length)

In [None]:
pred_data, pred_input_length = predicts(image_val, val_input_length)

In [None]:
y_pred = lipnet.predict(pred_data)
print(y_pred.shape)
# print(pred_input_length)

In [None]:
# ctc decode
r = K.ctc_decode(y_pred, pred_input_length, greedy = True, beam_width=100, top_paths=1)
r1 = K.get_value(r[0][0])
# print(r1)

In [None]:
def Decoder(decoded, **kwargs):
    postprocessors = kwargs.get('postprocessors', [])
    preprocessed = []
    for output in decoded:
        out = output
        for postprocessor in postprocessors:
            out = postprocessor(out)
        preprocessed.append(out)
    return(preprocessed)
result = Decoder(r1, postprocessors=[labels_to_text])

In [None]:
for i in result:
    print('Predict label:', i)

In [None]:
for i in txt_val[initial:initial + pred_batch]:
    print('True label:', i)

# evaluate

In [None]:
import difflib
def GetEditDistance(str1, str2):
    leven_cost = 0
    s = difflib.SequenceMatcher(None, str1, str2)
    for tag, i1, i2, j1, j2 in s.get_opcodes():
        #print('{:7} a[{}: {}] --> b[{}: {}] {} --> {}'.format(tag, i1, i2, j1, j2, str1[i1: i2], str2[j1: j2]))
        if tag == 'replace':
            leven_cost += max(i2-i1, j2-j1)
        elif tag == 'insert':
            leven_cost += (j2-j1)
        elif tag == 'delete':
            leven_cost += (i2-i1)
    return leven_cost

In [None]:
def calculate_cer(predict, label, label_length):
    # print(data)
    # mean_length = np.mean([len(d[1]) for d in data])
    cha_num = 0
    cha_error_num = 0

    for i in range(len(predict)):
        cha_edit_distance = GetEditDistance(str(predict[i]), str(label[i]))
        cha_num = cha_num + label_length[i]

        if(cha_edit_distance <= label_length[i]):
            cha_error_num += cha_edit_distance
        else:
            cha_error_num += label_length[i]

    return (cha_error_num / cha_num) * 100

In [None]:
def calculate_wer(predict, label, label_length):
    # print(data)
    # mean_length = np.mean([len(d[1].split()) for d in data])
    words_num = 0
    word_error_num = 0

    for i in range(len(predict)):
        word_edit_distance = chinese_wer_sentence(str(predict[i]), str(label[i]))
        words_num = words_num + label_length[i]

        if(word_edit_distance <= label_length[i]):
            word_error_num += word_edit_distance
        else:
            word_error_num += label_length[i]

    return (word_error_num / words_num) * 100

In [None]:
Letter_length = []
Word_length = []
for i in txt_val[initial:initial + pred_batch]:
    Letter_length.append(len(i))
    Word_length.append(len(jieba.lcut(i)))

In [None]:
wer = calculate_wer(result, txt_val[initial:initial + pred_batch], Word_length)
cer = calculate_cer(result, txt_val[initial:initial + pred_batch], Letter_length)
print("wer: " + str(wer))
print("cer: " + str(cer))