In [19]:
import tensorflow as tf
from tensorflow import keras
import numpy as np

In [20]:
default_reber_grammar = [
    [("B", 1)],
    [("T", 2), ("P", 3)],
    [("S", 2), ("X", 4)],
    [("T", 3), ("V", 5)],
    [("X", 3), ("S", 6)],
    [("P", 4), ("V", 6)],
    [("E", None)]
]

embedded_reber_grammar = [
    [("B", 1)],
    [("T", 2), ("P", 3)],
    [(default_reber_grammar, 4)],
    [(default_reber_grammar, 5)],
    [("T", 6)],
    [("P", 6)],
    [("E", None)]
]

def generate_string(grammar):
    state = 0
    output = []
    while state is not None:
        index = np.random.randint(len(grammar[state]))
        production, state = grammar[state][index]
        if isinstance(production, list):
            production = generate_string(grammar=production)
        output.append(production)
    return "".join(output)

In [21]:
for _ in range(5):
    print(generate_string(default_reber_grammar), end=" ")

BTXXTVVE BTXSE BPVVE BPVPSE BPTTVVE 

In [22]:
for _ in range(5):
    print(generate_string(embedded_reber_grammar), end=" ")

BPBTSXXVVEPE BPBPTTTVPSEPE BPBPVPXTTTVPSEPE BPBTSXSEPE BTBPVVETE 

In [23]:
POSSIBLE_CHARS = "BEPSTVX"

def generate_corrupted_string(grammar, chars=POSSIBLE_CHARS):
    good_string = generate_string(grammar)
    index = np.random.randint(len(good_string))
    good_char = good_string[index]
    bad_char = np.random.choice(sorted(set(chars) - set(good_char)))
    return good_string[:index] + bad_char + good_string[index + 1:]

In [24]:
for _ in range(5):
    print(generate_corrupted_string(embedded_reber_grammar), end=" ")

BTBPXXVVETE BTBTTXTVVETE BTBXSXSETE BPBPTTTPTVVEPE VPBPTVVEPE 

In [25]:
def string_to_ids(s, chars=POSSIBLE_CHARS):
    return [chars.index(c) for c in s]

In [26]:
string_to_ids("BTTTXXVVETE")

[0, 4, 4, 4, 6, 6, 5, 5, 1, 4, 1]

In [27]:
def generate_dataset(size):
    good_strings = [string_to_ids(generate_string(embedded_reber_grammar)) for _ in range(size // 2)]
    bad_strings = [string_to_ids(generate_corrupted_string(embedded_reber_grammar)) for _ in range(size - size // 2)]
    all_strings = good_strings + bad_strings
    X = tf.ragged.constant(all_strings, ragged_rank=1)
    y = np.array([[1.] for _ in range(len(good_strings))] + [[0.] for _ in range(len(bad_strings))])
    return X, y

In [28]:
X_train, y_train = generate_dataset(10000)
X_valid, y_valid = generate_dataset(2000)

In [29]:
X_train[0]

<tf.Tensor: shape=(9,), dtype=int32, numpy=array([0, 4, 0, 4, 6, 3, 1, 4, 1])>

In [30]:
y_train[0]

array([1.])

In [31]:
embedding_size = 5

model = keras.models.Sequential([
    keras.layers.InputLayer(input_shape=[None], dtype=tf.int32, ragged=True),
    keras.layers.Embedding(input_dim=len(POSSIBLE_CHARS), output_dim=embedding_size),
    keras.layers.GRU(30),
    keras.layers.Dense(1, activation="sigmoid")
])
optimizer = keras.optimizers.SGD(learning_rate=0.02, momentum = 0.95, nesterov=True)
model.compile(loss="binary_crossentropy", optimizer=optimizer, metrics=["accuracy"])

model.fit(X_train, y_train, epochs=20, validation_data=(X_valid, y_valid))

Epoch 1/20


  "shape. This may consume a large amount of memory." % value)


Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


<keras.callbacks.History at 0x2af1d991288>

In [42]:
test_strings = ["BPBTSSSSSSSXXTTVPXVPXTTTTTVVETE", "BPBTSSSSSSSXXTTVPXVPXTTTTTVVEPE", "BB"]
X_test = tf.ragged.constant([string_to_ids(s) for s in test_strings], ragged_rank=1)

y_proba = model.predict(X_test)
print()
print("레버 문자열일 추정 확률:")
for index, string in enumerate(test_strings):
    print("{}: {:.2f}%".format(string, 100 * y_proba[index][0]))



레버 문자열일 추정 확률:
BPBTSSSSSSSXXTTVPXVPXTTTTTVVETE: 4.67%
BPBTSSSSSSSXXTTVPXVPXTTTTTVVEPE: 99.87%
BB: 0.44%


In [43]:
from datetime import date

# strftime()의 %B 포맷은 로케일에 의존하기 때문에 사용할 수 있습니다.
MONTHS = ["January", "February", "March", "April", "May", "June",
          "July", "August", "September", "October", "November", "December"]

def random_dates(n_dates):
    min_date = date(1000, 1, 1).toordinal()
    max_date = date(9999, 12, 31).toordinal()

    ordinals = np.random.randint(max_date - min_date, size=n_dates) + min_date
    dates = [date.fromordinal(ordinal) for ordinal in ordinals]

    x = [MONTHS[dt.month - 1] + " " + dt.strftime("%d, %Y") for dt in dates]
    y = [dt.isoformat() for dt in dates]
    return x, y

In [44]:
n_dates = 3
x_example, y_example = random_dates(n_dates)
print("{:25s}{:25s}".format("Input", "Target"))
print("-" * 50)
for idx in range(n_dates):
    print("{:25s}{:25s}".format(x_example[idx], y_example[idx]))

Input                    Target                   
--------------------------------------------------
July 21, 7829            7829-07-21               
November 09, 7880        7880-11-09               
May 29, 2675             2675-05-29               


In [46]:
INPUT_CHARS = "".join(sorted(set("".join(MONTHS) + "0123456789, ")))
OUTPUT_CHARS = "0123456789-"

In [47]:
def date_str_to_ids(date_str, chars=INPUT_CHARS):
    return [chars.index(c) for c in date_str]

In [48]:
def prepare_date_strs(date_strs, chars=INPUT_CHARS):
    X_ids = [date_str_to_ids(dt, chars) for dt in date_strs]
    X = tf.ragged.constant(X_ids, ragged_rank=1)
    return (X + 1).to_tensor()

def create_dataset(n_dates):
    x, y = random_dates(n_dates)
    return prepare_date_strs(x, INPUT_CHARS), prepare_date_strs(y, OUTPUT_CHARS)

In [51]:
X_train, y_train = create_dataset(10000)
X_valid, y_valid = create_dataset(2000)
X_test, y_test = create_dataset(2000)

In [52]:
X_train[0], y_train[0]

(<tf.Tensor: shape=(18,), dtype=int32, numpy=
 array([13, 36, 25, 36, 34, 35,  1,  5, 10,  2,  1,  7,  5,  8,  4,  0,  0,
         0])>,
 <tf.Tensor: shape=(10,), dtype=int32, numpy=array([ 5,  3,  6,  2, 11,  1,  9, 11,  3,  8])>)

In [53]:
embedding_size = 32
max_output_length = y_train.shape[1]

encoder = keras.models.Sequential([
    keras.layers.Embedding(input_dim=len(INPUT_CHARS) + 1, output_dim=embedding_size, input_shape=[None]),
    keras.layers.LSTM(128)
])

decoder = keras.models.Sequential([
    keras.layers.LSTM(128, return_sequences=True),
    keras.layers.Dense(len(OUTPUT_CHARS) + 1, activation="softmax")
])

model = keras.models.Sequential([
    encoder,
    keras.layers.RepeatVector(max_output_length),
    decoder
])

model.compile(loss="sparse_categorical_crossentropy", optimizer="Nadam", metrics=["accuracy"])
history = model.fit(X_train, y_train, epochs=10, validation_data=(X_valid, y_valid))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [55]:
def ids_to_date_strs(ids, chars=OUTPUT_CHARS):
    return ["".join([("?" + chars)[index] for index in sequence]) for sequence in ids]

In [60]:
X_new = prepare_date_strs(["September 27, 9999", "July 14, 1789", "May 01, 1111", "May 02, 2020", "July 14, 1789"])
ids = np.argmax(model.predict(X_new), axis=-1)
for date_str in ids_to_date_strs(ids):
    print(date_str)

9999-09-27
1789-07-14
1111-05-01
2000-05-02
1789-07-14


In [61]:
max_input_length = X_train.shape[1]

def prepare_date_strs_padded(date_strs):
    X = prepare_date_strs(date_strs)
    if X.shape[1] < max_input_length:
        X = tf.pad(X, [[0, 0], [0, max_input_length - X.shape[1]]])
    return X

def convert_date_strs(date_strs):
    X = prepare_date_strs_padded(date_strs)
    ids = np.argmax(model.predict(X), axis=-1)
    return ids_to_date_strs(ids)

In [62]:
convert_date_strs(["September 27, 9999", "July 14, 1789", "May 01, 1111", "May 02, 2020", "July 14, 1789"])

['9999-09-27', '1789-07-14', '1111-05-01', '2000-05-02', '1789-07-14']

In [63]:
sos_id = len(OUTPUT_CHARS) + 1

def shifted_output_sequences(Y):
    sos_tokens = tf.fill(dims=(len(Y), 1), value=sos_id)
    return tf.concat([sos_tokens, Y[:, :-1]], axis=1)

X_train_decoder = shifted_output_sequences(y_train)
X_valid_decoder = shifted_output_sequences(y_valid)
X_test_decoder = shifted_output_sequences(y_test)

In [69]:
encoder_embedding_size = 32
decoder_embedding_size = 32

encoder_input = keras.layers.Input(shape=[None], dtype=tf.int32)
encoder_embedding = keras.layers.Embedding(input_dim=len(INPUT_CHARS) + 1, output_dim=encoder_embedding_size)(encoder_input)
_, encoder_state_h, encoder_state_c = keras.layers.LSTM(128, return_state=True)(encoder_embedding)
encoder_state = [encoder_state_h, encoder_state_c]

decoder_input = keras.layers.Input(shape=[None], dtype=tf.int32)
decoder_embedding = keras.layers.Embedding(input_dim=len(OUTPUT_CHARS) + 2, output_dim=decoder_embedding_size)(decoder_input)
decoder_lstm_output = keras.layers.LSTM(128, return_sequences=True)(decoder_embedding, initial_state=encoder_state)
decoder_output = keras.layers.Dense(len(OUTPUT_CHARS) + 1, activation="softmax")(decoder_lstm_output)

model = keras.models.Model(inputs=[encoder_input, decoder_input], outputs=[decoder_output])

model.compile(loss="sparse_categorical_crossentropy", optimizer="Nadam", metrics=["accuracy"])

model.fit([X_train, X_train_decoder], y_train, epochs=10, validation_data=([X_valid, X_valid_decoder], y_valid))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x2b1224ee848>

In [70]:
sos_id = len(OUTPUT_CHARS) + 1

def predict_date_strs(date_strs):
    X = prepare_date_strs_padded(date_strs)
    Y_pred = tf.fill(dims=(len(X), 1), value=sos_id)
    for index in range(max_output_length):
        pad_size = max_output_length - Y_pred.shape[1]
        X_decoder = tf.pad(Y_pred, [[0, 0], [0, pad_size]])
        Y_probas_next = model.predict([X, X_decoder])[:, index:index+1]
        Y_pred_next = tf.argmax(Y_probas_next, axis=-1, output_type=tf.int32)
        Y_pred = tf.concat([Y_pred, Y_pred_next], axis=1)
    return ids_to_date_strs(Y_pred[:, 1:])

In [71]:
predict_date_strs(["July 14, 1789", "May 01, 2020"])

['1789-07-14', '2020-05-01']

In [72]:
import tensorflow_addons as tfa

encoder_embedding_size = 32
decoder_embedding_size = 32
units = 128

encoder_inputs = keras.layers.Input(shape=[None], dtype=np.int32)
decoder_inputs = keras.layers.Input(shape=[None], dtype=np.int32)
sequence_lengths = keras.layers.Input(shape=[], dtype=np.int32)

encoder_embeddings = keras.layers.Embedding(len(INPUT_CHARS) + 1, encoder_embedding_size)(encoder_inputs)

decoder_embedding_layer = keras.layers.Embedding(len(OUTPUT_CHARS) + 2, decoder_embedding_size)
decoder_embeddings = decoder_embedding_layer(decoder_inputs)

encoder = keras.layers.LSTM(units, return_state=True)
encoder_outputs, state_h, state_c = encoder(encoder_embeddings)
encoder_state = [state_h, state_c]

sampler = tfa.seq2seq.sampler.TrainingSampler()

decoder_cell = keras.layers.LSTMCell(units)
output_layer = keras.layers.Dense(len(OUTPUT_CHARS) + 1)

decoder = tfa.seq2seq.basic_decoder.BasicDecoder(decoder_cell, sampler, output_layer=output_layer)
final_outputs, final_state, final_sequence_lengths = decoder(decoder_embeddings, initial_state=encoder_state)
Y_proba = keras.layers.Activation("softmax")(final_outputs.rnn_output)

model = keras.models.Model(inputs=[encoder_inputs, decoder_inputs], outputs=[Y_proba])

model.compile(loss="sparse_categorical_crossentropy", optimizer="Nadam",metrics=["accuracy"])

model.fit([X_train, X_train_decoder], y_train, epochs=10, validation_data=([X_valid, X_valid_decoder], y_valid))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x2b11743ba48>

In [73]:
predict_date_strs(["July 14, 1789", "May 01, 2020"])

['1789-07-14', '2020-05-01']

In [74]:
inference_sampler = tfa.seq2seq.sampler.GreedyEmbeddingSampler(embedding_fn=decoder_embedding_layer)
inference_decoder = tfa.seq2seq.basic_decoder.BasicDecoder(decoder_cell, inference_sampler, output_layer=output_layer, maximum_iterations=max_output_length)
batch_size = tf.shape(encoder_inputs)[:1]
start_tokens = tf.fill(dims=batch_size, value=sos_id)
final_outputs, final_state, final_sequence_lengths = inference_decoder(start_tokens, initial_state=encoder_state, start_tokens=start_tokens, end_token=0)

inference_model = keras.models.Model(inputs=[encoder_inputs], outputs=[final_outputs.sample_id])