In [None]:
! pip install -r ../requirements.txt

import pathlib
import tensorflow as tf
import tensorflow_text as tftext
import numpy as np
from tensorflow.compat.v1 import ConfigProto
from tensorflow.compat.v1 import InteractiveSession

config = ConfigProto()
config.gpu_options.allow_growth = True
session = InteractiveSession(config=config)

In [None]:
def load_data(path):
  text = path.read_text(encoding='utf-8')

  lines = text.splitlines()
  text = lines

  return text

train_targ = load_data(pathlib.Path("data/train.seg.zh.txt"))
train_inp = load_data(pathlib.Path("data/train.seg.en.txt"))
val_targ = load_data(pathlib.Path("data/dev.seg.zh.txt"))
val_inp = load_data(pathlib.Path("data/dev.seg.en.txt"))
test_targ = load_data(pathlib.Path("data/test.seg.zh.txt"))
test_inp = load_data(pathlib.Path("data/test.seg.en.txt"))

In [None]:
batch_size = 64

train_dataset = tf.data.Dataset.from_tensor_slices((
    train_inp, train_targ)
).shuffle(len(train_targ)).batch(batch_size)

# val_dataset = tf.data.Dataset.from_tensor_slices((
#     val_inp, val_targ)
# ).shuffle(len(val_targ)).batch(batch_size)

test_dataset = tf.data.Dataset.from_tensor_slices((
    test_inp, test_targ)
).shuffle(len(test_targ)).batch(batch_size)

In [48]:
def inp_preprocess(text):
  text = tftext.normalize_utf8(text, "NFKD")
  text = tf.strings.lower(text)
  text = tf.strings.strip(text)

  text = tf.strings.regex_replace(text, '[/(){}:;<>,|\'"]', '')
  text = tf.strings.join(['[START]', text, '[END]'], separator = ' ')

  return text

def targ_preprocess(text):
  text = tftext.normalize_utf8(text, "NFKD")
  text = tf.strings.lower(text)
  text = tf.strings.strip(text)

  text = tf.strings.regex_replace(text, '[/(){}:;<>,|\'"]', '')

  text = tf.strings.join(['[START]', text, '[END]'], separator = ' ')

  return text

In [49]:
max_vocab_size = 10000

input_processor = tf.keras.layers.experimental.preprocessing.TextVectorization(
    standardize = inp_preprocess, max_tokens = max_vocab_size)
output_processor = tf.keras.layers.experimental.preprocessing.TextVectorization(
    standardize = targ_preprocess, max_tokens = max_vocab_size)
input_processor.adapt(train_inp)
output_processor.adapt(train_targ)


In [50]:
print(train_inp[0])
print(targ_preprocess(train_inp[0]))
print(train_targ[:10])
print(targ_preprocess(train_targ[:10]))
print(output_processor.get_vocabulary()[:20])
print(input_processor.vocabulary_size())
print(output_processor.vocabulary_size())

The following is a valid call:
tf.Tensor(b'[START] the following is a valid call [END]', shape=(), dtype=string)
['以下 均 为 合法 的 调用 :', '3.7   新版 功能 .', '后者 相应 增加 了 一个 别名 :   " Screen . onkeyrelease ( ) "', '" %% "   字面 的   " \' % \' "   字符', '子 进程 协议', '请 注意   cursor   的   arraysize   属性 会 影响 此 操作 的 执行 效率', 'state   是 一个 表示 编码器 状态 的 元组', '" tarfile . open ( ) "   函数 实际上 是 这个 类 方法 的 快捷方式', '任何 时候 将   " NULL "   指针 “ 泄露 ” 给   Python   用户 都 会 是 个 严重 的 错误', '可以 将 所有 数值 设置 为   " CHAR _ MAX "   ， 以 指示 此 语言 环境 中 未指定 任何 值']
tf.Tensor(
[b'[START] \xe4\xbb\xa5\xe4\xb8\x8b \xe5\x9d\x87 \xe4\xb8\xba \xe5\x90\x88\xe6\xb3\x95 \xe7\x9a\x84 \xe8\xb0\x83\xe7\x94\xa8  [END]'
 b'[START] 3.7   \xe6\x96\xb0\xe7\x89\x88 \xe5\x8a\x9f\xe8\x83\xbd . [END]'
 b'[START] \xe5\x90\x8e\xe8\x80\x85 \xe7\x9b\xb8\xe5\xba\x94 \xe5\xa2\x9e\xe5\x8a\xa0 \xe4\xba\x86 \xe4\xb8\x80\xe4\xb8\xaa \xe5\x88\xab\xe5\x90\x8d     screen . onkeyrelease    [END]'
 b'[START]  %%    \xe5\xad\x97\xe9\x9d\xa2 \xe7\x9a\x84     %     \xe5\xad

In [None]:
embedding_dim = 256
units = 1024

In [None]:
class Encoder(tf.keras.layers.Layer):
  def __init__(self, input_size, embedding_dim, units):
    super(Encoder, self).__init__()

    self.units = units
    self.input_size = input_size
    self.embedding = tf.keras.layers.Embedding(self.input_size, embedding_dim)
    self.gru = tf.keras.layers.GRU(self.units, 
                                   return_sequences = True, 
                                   return_state = True, 
                                   recurrent_initializer = "glorot_uniform")

  def call(self, token, state = None):
    vector = self.embedding(token)

    out, state = self.gru(vector, initial_state = state)

    return out, state

In [None]:
class Attention(tf.keras.layers.Layer):
  def __init__(self, units):
    super().__init__()

    self.w1 = tf.keras.layers.Dense(units, use_bias = False)
    self.w2 = tf.keras.layers.Dense(units, use_bias = False)

    self.attention = tf.keras.layers.AdditiveAttention()

  def call(self, q, v, mask):
    query = self.w1(q)
    key = self.w2(v)

    query_mask = tf.ones(tf.shape(query)[:-1], dtype = bool)
    value_mask = mask

    context_vector, weights = self.attention(inputs = [query, v, key], 
                                             mask = [query_mask, mask], 
                                             return_attention_scores = True)

    return context_vector, weights

In [None]:
import typing

class DecoderInput(typing.NamedTuple):
  new_tokens: typing.Any
  enc_output: typing.Any
  mask: typing.Any

class DecoderOutput(typing.NamedTuple):
  logits: typing.Any
  attention_weights: typing.Any

In [None]:
class Decoder(tf.keras.layers.Layer):
  def __init__(self, output_size, embedding_dim, units):
      super(Decoder, self).__init__()

      self.output_size = output_size
      self.embedding_dim = embedding_dim
      self.units = units

      self.embedding = tf.keras.layers.Embedding(
          self.output_size, 
          embedding_dim)
      self.gru = tf.keras.layers.GRU(self.units, 
                                     return_sequences = True, 
                                     return_state = True, 
                                     recurrent_initializer = 'glorot_uniform')
      self.attention= Attention(self.units)

      self.w = tf.keras.layers.Dense(self.units, 
                                     activation = "tanh", 
                                     use_bias = False)
      self.fc = tf.keras.layers.Dense(self.output_size)

  def call(self, 
           input: DecoderInput, 
           state = None) -> typing.Tuple[DecoderOutput, tf.Tensor]:
      vec = self.embedding(input.new_tokens)
      out, state = self.gru(vec, initial_state = state)

      context, weights = self.attention(out, 
                                        input.enc_output, 
                                        mask = input.mask)

      attention_vector = self.w(tf.concat([context, out], axis = -1))

      logits = self.fc(attention_vector)

      return DecoderOutput(logits, weights), state

In [None]:
class MaskedLoss(tf.keras.losses.Loss):
  def __init__(self):
    self.name = "masked_loss"
    self.loss = tf.keras.losses.SparseCategoricalCrossentropy(
        from_logits = True, 
        reduction = "none")

  def __call__(self, y_true, y_pred):
    loss = self.loss(y_true, y_pred)

    mask = tf.cast(y_true != 0, tf.float32)
    
    loss *= mask

    return tf.reduce_sum(loss)

In [None]:
class Translator(tf.keras.Model):
  def __init__(self, embedding_dim, units, input_processor, output_processor):
    super().__init__()

    self.encoder = Encoder(input_processor.vocabulary_size(), 
                           embedding_dim, 
                           units)
    self.decoder = Decoder(output_processor.vocabulary_size(), 
                           embedding_dim, 
                           units)

    self.input_processor = input_processor
    self.output_processor = output_processor

  def train_step(self, input):
    return self._tf_train_step(input)

  def _preprocess(self, input, targ):
    input_token = self.input_processor(input)
    targ_token = self.output_processor(targ)

    input_mask = input_token != 0
    targ_mask = targ_token != 0

    return input_token, input_mask, targ_token, targ_mask

  def _train_step(self, inp):
    input, targ = inp

    input_token, input_mask, targ_token, targ_mask = self._preprocess(input, 
                                                                      targ)

    max_length = tf.shape(targ_token)[1]

    with tf.GradientTape() as tape:
      enc_out, enc_state = self.encoder(input_token)

      dec_state = enc_state
      loss = tf.constant(0.0)

      for i in tf.range(max_length - 1):
        new_token = targ_token[:, i : i + 2]
        step_loss, dec_state = self._loop_state(new_token, 
                                                input_mask, 
                                                enc_out, 
                                                dec_state)

        loss = loss + step_loss

      avg_loss = loss / tf.reduce_sum(tf.cast(targ_mask, tf.float32))

    var = self.trainable_variables
    grad = tape.gradient(avg_loss, var)

    self.optimizer.apply_gradients(zip(grad, var))

    return {"batch_loss": avg_loss}

  @tf.function(input_signature=[[tf.TensorSpec(dtype=tf.string, shape=[None, ]),
                               tf.TensorSpec(dtype=tf.string, shape=[None, ])]])
  def _tf_train_step(self, input):
    return self._train_step(input)

  def _loop_state(self, new_token, input_mask, enc_out, dec_state):
    input_token, targ_token = new_token[:, 0:1], new_token[:, 1:2]

    dec_input = DecoderInput(input_token, enc_out, input_mask)

    dec_out, dec_state = self.decoder(dec_input, dec_state)

    step_loss = self.loss(targ_token, dec_out.logits)

    return step_loss, dec_state

In [None]:
translator = Translator(embedding_dim, units, input_processor, output_processor)

translator.compile(optimizer = "adam", loss = MaskedLoss())

early_stop = tf.keras.callbacks.EarlyStopping(monitor = "batch_loss", 
                                              patience = 5, 
                                              restore_best_weights = True)

class BatchLog(tf.keras.callbacks.Callback):
  def __init__(self, key):
    self.key = key
    self.logs = []

  def on_train_batch_end(self, n, logs):
    self.logs.append(logs[self.key])

bl = BatchLog("batch_loss")

translator.fit(train_dataset, epochs = 30, callbacks = [bl, early_stop])

In [29]:
class Translate(tf.Module):
  def __init__(self, encoder, decoder, input_processor, output_processor):
    self.encoder = encoder
    self.decoder = decoder
    self.input_processor = input_processor
    self.output_processor = output_processor

    self.output_string_from_index = tf.keras.layers.experimental.preprocessing.StringLookup(
        vocabulary = output_processor.get_vocabulary(), mask_token = '', invert = True)
    index_from_string = tf.keras.layers.StringLookup(vocabulary = output_processor.get_vocabulary(), mask_token = '')
    token_mask_id = index_from_string(["", "[UNK]", "[START]"]).numpy()
    self.token_mask = np.zeros([index_from_string.vocabulary_size()], dtype = np.bool)
    self.token_mask[np.array(token_mask_id)] = True

    self.start_token = index_from_string(tf.constant("[START]"))
    self.end_token = index_from_string(tf.constant("[END]"))

  def token_to_string(self, result_token):
    result_text_token = self.output_string_from_index(result_token)
    result_text = tf.strings.strip(tf.strings.reduce_join(result_text_token, axis = 1, separator = ' '))

    return result_text

  def sample(self, logits, temp):
    token_mask = self.token_mask[tf.newaxis, tf.newaxis, :]
    logits = tf.where(self.token_mask, -np.inf, logits)

    if temp == 0.0:
      new_token = tf.argmax(logits, axis = -1)
    else: 
      logits = tf.squeeze(logits, axis = 1)
      new_token = tf.random.categorical(logits / temp, num_samples = 1)

    return new_token

  def translate(self, input_text, *, max_l = 50, return_attention = True, temp = 1.0):
    batch_size = tf.shape(input_text)[0]

    input_token = self.input_processor(input_text)
    enc_out, enc_state = self.encoder(input_token)

    dec_state = enc_state
    new_token = tf.fill([batch_size, 1], self.start_token)

    result_token = []
    attention = []

    done = tf.zeros([batch_size, 1], dtype = tf.bool)

    for _ in range(max_l):
      dec_in = DecoderInput(new_token, enc_out, (input_token != 0))

      dec_result, dec_state = self.decoder(dec_in, state = dec_state)

      attention.append(dec_result.attention_weights)

      new_token = self.sample(dec_result.logits, temp)

      done |= (new_token == self.end_token)

      new_token = tf.where(done, tf.constant(0, dtype = tf.int64), new_token)

      result_token.append(new_token)

    result_token = tf.concat(result_token, axis = -1)
    result_text = self.token_to_string(result_token)

    if return_attention:
      attention_stack = tf.concat(attention, axis = -1)
      return {"text": result_text, "attention:": attention_stack}
    else:
      return {"text": result_text}

In [30]:
translate = Translate(translator.encoder, translator.decoder, input_processor, output_processor)

for i in range(10):
  sample_input = tf.constant([test_inp[np.random.randint(0, tf.shape(test_inp)[0])]])

  print(sample_input[0].numpy().decode())

  result = translate.translate(sample_input)

  print(tf.strings.regex_replace(result["text"][0], ' ', '').numpy().decode())

AttributeError: module 'tensorflow.keras.layers' has no attribute 'StringLookup'