In [1]:
import pathlib
import numpy as np

import re

import typing
from typing import Any, Tuple

import matplotlib.pyplot as plt
import matplotlib.ticker as ticker

import tensorflow as tf
import tensorflow_text as tf_text

from tensorflow.keras.utils import get_file

2024-04-08 00:05:51.701522: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-04-08 00:05:51.701654: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-04-08 00:05:51.902192: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


# **Dataset**

# 1. Download data

In [2]:
def load_data(path):
    text = path.read_text(encoding='utf-8')

    lines = text.splitlines()
    pairs = [line.split('\t')[:2] for line in lines]

    src = np.array([src for tgt, src in pairs])
    tgt = np.array([tgt for tgt, src in pairs])

    return tgt, src

In [3]:
path_txt = '/kaggle/input/translator-data/kor.txt'
path = pathlib.Path(path_txt)
tgt_raw, src_raw = load_data(path)

# 2. Create a tf.data Dataset

In [4]:
BUFFER_SIZE = len(src_raw)
BATCH_SIZE = 64

is_train = np.random.uniform(size=(len(tgt_raw),)) < 0.8

train_raw = (
    tf.data.Dataset
    .from_tensor_slices((src_raw[is_train], tgt_raw[is_train])))
    #.shuffle(BUFFER_SIZE)
    #.batch(BATCH_SIZE))
val_raw = (
    tf.data.Dataset
    .from_tensor_slices((src_raw[~is_train], tgt_raw[~is_train])))
    #.shuffle(BUFFER_SIZE)
    #.batch(BATCH_SIZE))

In [5]:
for src, tgt in train_raw.take(1):
    print("Source: ", src.numpy().decode('utf-8'))
    print("Target: ", tgt.numpy().decode('utf-8'))

Source:  가.
Target:  Go.


In [6]:
train_tgt = train_raw.map(lambda src, tgt: tgt)
train_src = train_raw.map(lambda src, tgt: src)

# 3. Generate the Vocabulary

In [7]:
from tensorflow_text.tools.wordpiece_vocab import bert_vocab_from_dataset as bert_vocab
import multiprocessing

In [8]:
bert_tokenizer_params=dict(lower_case=True)
reserved_tokens=["[PAD]", "[UNK]", "[START]", "[END]"]

bert_vocab_args = dict(
    # The target vocabulary size
    vocab_size = 8000,
    # Reserved tokens that must be included in the vocabulary
    reserved_tokens=reserved_tokens,
    # Arguments for `text.BertTokenizer`
    bert_tokenizer_params=bert_tokenizer_params,
    # Arguments for `wordpiece_vocab.wordpiece_tokenizer_learner_lib.learn`
    learn_params={},
)

In [9]:
%%time
src_vocab = bert_vocab.bert_vocab_from_dataset(
    train_src.batch(1000).prefetch(2),
    **bert_vocab_args
)

CPU times: user 36 s, sys: 84.6 ms, total: 36.1 s
Wall time: 36 s


In [10]:
%%time
tgt_vocab = bert_vocab.bert_vocab_from_dataset(
    train_tgt.batch(1000).prefetch(2),
    **bert_vocab_args
)

CPU times: user 8.76 s, sys: 30.1 ms, total: 8.79 s
Wall time: 8.71 s


In [11]:
def write_vocab_file(filepath, vocab):
    with open(filepath, 'w') as f:
        for token in vocab:
            print(token, file=f)

In [12]:
write_vocab_file('kor_vocab.txt', src_vocab)
write_vocab_file('en_vocab.txt', tgt_vocab)

In [13]:
START = tf.argmax(tf.constant(reserved_tokens) == "[START]")
END = tf.argmax(tf.constant(reserved_tokens) == "[END]")

def add_start_end(ragged):
    count = ragged.bounding_shape()[0]
    starts = tf.fill([count,1], START)
    ends = tf.fill([count,1], END)
    return tf.concat([starts, ragged, ends], axis=1)

In [14]:
def cleanup_text(reserved_tokens, token_txt):
    # Drop the reserved tokens, except for "[UNK]".
    bad_tokens = [re.escape(tok) for tok in reserved_tokens if tok != "[UNK]"]
    bad_token_re = "|".join(bad_tokens)

    bad_cells = tf.strings.regex_full_match(token_txt, bad_token_re)
    result = tf.ragged.boolean_mask(token_txt, ~bad_cells)

    # Join them into strings.
    result = tf.strings.reduce_join(result, separator=' ', axis=-1)

    return result

In [15]:
class CustomTokenizer(tf.Module):
    def __init__(self, reserved_tokens, vocab_path):
        self.tokenizer = tf_text.BertTokenizer(vocab_path, lower_case=True)
        self._reserved_tokens = reserved_tokens
        self._vocab_path = tf.saved_model.Asset(vocab_path)

        vocab = pathlib.Path(vocab_path).read_text().splitlines()
        self.vocab = tf.Variable(vocab)

        ## Create the signatures for export:   

        # Include a tokenize signature for a batch of strings. 
        self.tokenize.get_concrete_function(
            tf.TensorSpec(shape=[None], dtype=tf.string))

        # Include `detokenize` and `lookup` signatures for:
        #   * `Tensors` with shapes [tokens] and [batch, tokens]
        #   * `RaggedTensors` with shape [batch, tokens]
        self.detokenize.get_concrete_function(
            tf.TensorSpec(shape=[None, None], dtype=tf.int64))
        self.detokenize.get_concrete_function(
              tf.RaggedTensorSpec(shape=[None, None], dtype=tf.int64))

        self.lookup.get_concrete_function(
            tf.TensorSpec(shape=[None, None], dtype=tf.int64))
        self.lookup.get_concrete_function(
              tf.RaggedTensorSpec(shape=[None, None], dtype=tf.int64))

        # These `get_*` methods take no arguments
        self.get_vocab_size.get_concrete_function()
        self.get_vocab_path.get_concrete_function()
        self.get_reserved_tokens.get_concrete_function()

    @tf.function
    def tokenize(self, strings):
        enc = self.tokenizer.tokenize(strings)
        # Merge the `word` and `word-piece` axes.
        enc = enc.merge_dims(-2,-1)
        enc = add_start_end(enc)
        return enc

    @tf.function
    def detokenize(self, tokenized):
        words = self.tokenizer.detokenize(tokenized)
        return cleanup_text(self._reserved_tokens, words)

    @tf.function
    def lookup(self, token_ids):
        return tf.gather(self.vocab, token_ids)

    @tf.function
    def get_vocab_size(self):
        return tf.shape(self.vocab)[0]

    @tf.function
    def get_vocab_path(self):
        return self._vocab_path

    @tf.function
    def get_reserved_tokens(self):
        return tf.constant(self._reserved_tokens)

In [16]:
tokenizers = tf.Module()
tokenizers.kor = CustomTokenizer(reserved_tokens, '/kaggle/working/kor_vocab.txt')
tokenizers.en = CustomTokenizer(reserved_tokens, '/kaggle/working/en_vocab.txt')

In [17]:
model_name = 'kor_en_converter'
tf.saved_model.save(tokenizers, model_name)