In [1]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import random
import os
import io
import string
import re
from sklearn.model_selection import train_test_split

In [2]:
def preprocess(sentence):
    sent = sentence.lower()
    sent = sent.strip()
    sent = re.sub("'", " ", sent)
    sent = re.sub("\s+", " ", sent)
    sent = ''.join([char for char in sent if char not in exclude])
    sent = "<start> " + sent + " <end>"
    return sent
    
# load data
en_filename = "/content/sample_data/train.en.txt"
vi_filename = "/content/sample_data/train.vi.txt"
raw_en_lines = open(en_filename, encoding='utf-8').read().strip().split("\n")
raw_vi_lines = open(vi_filename, encoding='utf-8').read().strip().split("\n")
exclude = list(string.punctuation) + list(string.digits)

In [3]:
en_lines = []
vi_lines = []
min_len, max_len = 10, 14

for eline, vline in zip(raw_en_lines, raw_vi_lines):
    eline = preprocess(eline)
    vline = preprocess(vline)
    if min_len < len(eline.split(" ")) < max_len and min_len < len(vline.split(" ")) < max_len:
        en_lines.append(eline)
        vi_lines.append(vline)

In [4]:
class Language():
    def __init__(self, lines):
        self.lines = lines
        self.word2id = {}
        self.id2word = {}
        self.vocab = set()
        self.max_len = 0
        self.min_len = 0
        self.vocab_size = 0
        self.init_language_params()

    def init_language_params(self):
        self.word2id['<pad>'] = 0
        for line in self.lines:
            self.vocab.update(line.split(" "))
        for id, word in enumerate(self.vocab):
            self.word2id[word] = id + 1
        for word, id in self.word2id.items():
            self.id2word[id] = word
        
        self.max_len = max([len(line.split(" ")) for line in self.lines])
        self.min_len = min([len(line.split(" ")) for line in self.lines])
        self.vocab_size = len(self.vocab) + 1
            
    def sentence_to_vector(self, sent):
        return np.array([self.word2id[word] for word in sent.split(" ")])
            
    def vector_to_sentence(self, vector):
        return " ".join([self.id2word[id] for id in vector])

In [5]:
inp_lang = Language(en_lines)
tar_lang = Language(vi_lines)
inp_vector = [inp_lang.sentence_to_vector(line) for line in inp_lang.lines]
tar_vector = [tar_lang.sentence_to_vector(line) for line in tar_lang.lines]

# add padding vào câu để tất cả các câu có độ dài bằng nhau
inp_tensor = tf.keras.preprocessing.sequence.pad_sequences(inp_vector, inp_lang.max_len, padding='post')
tar_tensor = tf.keras.preprocessing.sequence.pad_sequences(tar_vector, tar_lang.max_len, padding='post')
x_train, x_val, y_train, y_val = train_test_split(inp_tensor, tar_tensor, test_size=0.1)

BATCH_SIZE = 32
BUFFER_SIZE = x_train.shape[0]
N_BATCH = BUFFER_SIZE//BATCH_SIZE
hidden_unit = 1024
embedding_size = 256

dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
dataset = dataset.batch(BATCH_SIZE)

In [6]:
class Encode(tf.keras.Model):
    def __init__(self, embedding_size, vocab_size, hidden_units):
        super(Encode, self).__init__()
        self.Embedding = tf.keras.layers.Embedding(vocab_size,embedding_size)
        self.GRU = tf.keras.layers.GRU(
            hidden_units,
            return_sequences=True,
            return_state=True,
            recurrent_initializer='glorot_uniform')
        self.hidden_units = hidden_units
        
    def call(self, x, hidden_state):
        x = self.Embedding(x)
        outputs, last_state = self.GRU(x, hidden_state)
        return outputs, last_state
    
    def init_hidden_state(self, batch_size):
        return tf.zeros([batch_size, self.hidden_units])

In [7]:
class Attention(tf.keras.Model):
    def __init__(self, hidden_units):
        super(Attention, self).__init__()
        self.W_out_encode = tf.keras.layers.Dense(hidden_unit)
        self.W_state = tf.keras.layers.Dense(hidden_unit)
        self.V = tf.keras.layers.Dense(1)
        
    def call(self, encode_outs, pre_state):
        pre_state = tf.expand_dims(pre_state, axis=1)
        pre_state = self.W_state(pre_state)
        encode_outs = self.W_out_encode(encode_outs)
        score = self.V(
            tf.nn.tanh(
                pre_state + encode_outs)
        )
        score = tf.nn.softmax(score, axis=1)
        context_vector = score*encode_outs
        context_vector = tf.reduce_sum(context_vector, axis=1)
        return context_vector, score

In [8]:
class Decode(tf.keras.Model):
    def __init__(self, vocab_size, embedding_size, hidden_units):
        super(Decode, self).__init__()
        self.hidden_units = hidden_units
        self.Embedding = tf.keras.layers.Embedding(vocab_size,embedding_size)
        self.Attention = Attention(hidden_units)
        self.GRU = tf.keras.layers.GRU(
            hidden_units,
            return_sequences=True,
            return_state=True,
            recurrent_initializer='glorot_uniform'
        )
        self.Fc = tf.keras.layers.Dense(vocab_size)
            
    def call(self, x, encode_outs, pre_state):
        x = tf.expand_dims(x, axis=1)
        x = self.Embedding(x)
        context_vector, attention_weight = self.Attention(encode_outs, pre_state)
        context_vector = tf.expand_dims(context_vector, axis=1)
        gru_inp = tf.concat([x, context_vector], axis=-1)
        out_gru, state = self.GRU(gru_inp)
        out_gru = tf.reshape(out_gru, (-1, out_gru.shape[2]))
        return self.Fc(out_gru), state

In [9]:
def loss_function(real, pred):
    mask = 1 - np.equal(real, 0)
    loss_ = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=real, logits=pred) * mask
    return tf.reduce_mean(loss_)

In [10]:
EPOCHS = 20
optimizer = tf.optimizers.Adam()
encoder = Encode(embedding_size, vocab_size=inp_lang.vocab_size, hidden_units=hidden_unit)
decoder = Decode(vocab_size=tar_lang.vocab_size, embedding_size=embedding_size, hidden_units=hidden_unit)
    
for epoch in range(EPOCHS):
    total_loss = 0
    for batch_id, (x, y) in enumerate(dataset.take(N_BATCH)):
        loss = 0
        with tf.GradientTape() as tape:
            first_state = encoder.init_hidden_state(batch_size=BATCH_SIZE)
            encode_outs, last_state = encoder(x, first_state)
            decode_state = last_state
            decode_input = [tar_lang.word2id["<start>"]]*BATCH_SIZE
            
            for i in range(1, y.shape[1]):
                decode_out, decode_state = decoder(
                        decode_input, encode_outs, decode_state
                )
                loss += loss_function(y[:, i], decode_out)
                decode_input = y[:, i]
                
            train_vars = encoder.trainable_variables + decoder.trainable_variables
            grads = tape.gradient(loss, train_vars)
            optimizer.apply_gradients(zip(grads, train_vars))
        total_loss += loss
    print(total_loss.numpy())

10596.915
9053.447
8089.248
7246.162
6426.9414
5715.084
5050.125
4423.8413
3888.817
3470.4146
3057.658
2658.97
2341.5095
2009.6713
1725.3738
1520.3059
1283.3983
1113.8059
1111.0929
991.66974


In [42]:
def translate(inputs):
    result = ''
    hidden = encoder.init_hidden_state(batch_size=1)
    enc_out, enc_hidden = encoder(inputs, hidden)
    dec_hidden = enc_hidden  
    dec_input = [tar_lang.word2id['<start>']]

    for t in range(tar_lang.max_len):
        predictions, dec_hidden = decoder(dec_input, enc_out, dec_hidden)
        predicted_id = tf.argmax(predictions[0]).numpy()
        result += tar_lang.id2word[predicted_id] + ' '
        dec_input = [predicted_id]
    return result
  
for inp, tar in dataset.take(N_BATCH):
    print(translate(inp[1:2,:]))
    break

bạn có thấy là bà ở đâu  <end> ở roma không  
