In [1]:
import string
import os
import numpy as np
import tensorflow as tf

from tensorflow import keras
from datetime import datetime, timedelta
from typing import Callable

In [2]:
POSSIBLE_INPUT_CHARS = f'{"".join(list(map(str, range(10))))}{string.ascii_lowercase}-'
POSSIBLE_OUTPUT_CHARS = f'{"".join(list(map(str, range(10))))}-'
CURRENT_DAY = datetime.utcnow()
ROW_COUNT = 14000


def string_to_ids(s: str, chars: str) -> list[int]:
    ids = []

    for char in s.lower():
        idx = chars.index(char)

        ids.append(idx)

    return ids


def shuffle(vals: tf.RaggedTensor, targets: tf.RaggedTensor) -> (tf.RaggedTensor, tf.RaggedTensor):
    a = tf.random.shuffle(tf.range(vals.shape[0]))
    b = tf.reshape(a, (vals.shape[0], 1))
    shuffled_vals = tf.gather_nd(vals, b)
    shuffled_targets = tf.gather_nd(targets, b)

    return shuffled_vals, shuffled_targets


def pad_year(year: int) -> str:
    return f'{"".join(map(str, [0] * (4 - len(str(year)))))}{year}'


def get_date_pairs() -> (np.ndarray, np.ndarray):
    xs = []
    ys = []
    all_years = np.arange(ROW_COUNT).tolist()
    years_padded = np.array([pad_year(year) for year in all_years])

    np.random.shuffle(years_padded)

    for counter in range(ROW_COUNT):
        date = CURRENT_DAY - timedelta(days=counter)
        year_month_day = date.strftime('%Y-%m-%d')
        year_month_name_day = date.strftime('%Y-%B-%d')
        _, month_name, day = year_month_name_day.split('-')
        year = years_padded[counter]
        year_month_day = f'{year}-{year_month_day[5:]}'
        xs.append(tf.constant(
            string_to_ids(f'{year}-', POSSIBLE_INPUT_CHARS) +
            string_to_ids(f'{month_name}-', POSSIBLE_INPUT_CHARS) +
            string_to_ids(day, POSSIBLE_INPUT_CHARS)))
        ys.append(tf.constant(string_to_ids(year_month_day, POSSIBLE_OUTPUT_CHARS)))

    ragged_xs = tf.ragged.stack(xs, axis=0)
    ragged_ys = tf.ragged.stack(ys, axis=0)

    return shuffle(ragged_xs, ragged_ys)


X, y = get_date_pairs()
X = (X + 1).to_tensor()
y = y.to_tensor()
seventy_percent_count = int(X.shape[0] * .7)
ninety_percent_count = int(X.shape[0] * .9)
X_train, y_train = X[:seventy_percent_count, :], y[:seventy_percent_count, :]
X_valid, y_valid = X[seventy_percent_count:ninety_percent_count, :], y[seventy_percent_count:ninety_percent_count, :]
X_test, y_test = X[ninety_percent_count:, :], y[ninety_percent_count:, :]
max_output_length = y.shape[1]

In [3]:
sos_id = len(POSSIBLE_OUTPUT_CHARS) + 1


def shifted_output_sequences(y: tf.Tensor) -> tf.Tensor:
    sos_tokens = tf.fill(dims=(len(y), 1), value=sos_id)

    return tf.concat([sos_tokens, y[:, :-1]], axis=1)


X_train_decoder = shifted_output_sequences(y_train)
X_valid_decoder = shifted_output_sequences(y_valid)
X_test_decoder = shifted_output_sequences(y_test)

## Training part - LSTM variant

In [4]:
def scheduler(drop_after: int) -> Callable[[int, int], float]:
    def drop(epoch: int, learning_rate: int) -> float:
        if epoch < drop_after:
            return learning_rate
        else:
            return learning_rate * tf.math.exp(-0.2)

    return drop


def get_callbacks() -> (keras.callbacks.EarlyStopping, keras.callbacks.ModelCheckpoint, keras.callbacks.TensorBoard):
    the_name = 'encoder_decoder_w_teacher_forcing'
    patience = 5
    model_dir = os.path.join(os.curdir, 'saved_models')
    run_logdir_root = os.path.join(os.curdir, 'tensor_logs')
    dirs = [
        name
        for name in os.listdir(run_logdir_root)
        if os.path.isdir(os.path.join(run_logdir_root, name)) and name.startswith(name)
    ]
    dirs_count = len(dirs) + 1
    run_logdir = os.path.join(run_logdir_root, f'{the_name}_{dirs_count}')
    early_stopping = keras.callbacks.EarlyStopping(monitor='val_loss', patience=patience, min_delta=1e-4)
    model_checkpoint = keras.callbacks.ModelCheckpoint(os.path.join(model_dir, f'{the_name}_{dirs_count}.h5'), save_best_only=True)
    tensorboard = keras.callbacks.TensorBoard(run_logdir, histogram_freq=1, profile_batch=10)
    lr_scheduler = keras.callbacks.LearningRateScheduler(scheduler(10))

    return early_stopping, model_checkpoint, tensorboard, lr_scheduler

In [5]:
embedding_size = 32
latent_dim = 128
encoder_input_lstm = keras.Input(shape=(None,))
encoder_embedding_lstm = keras.layers.Embedding(input_dim=len(POSSIBLE_INPUT_CHARS) + 1,
                                                output_dim=embedding_size)(encoder_input_lstm)
encoder_lstm = keras.layers.LSTM(latent_dim, return_state=True)
_, encoder_state_h_lstm, encoder_state_c_lstm = encoder_lstm(encoder_embedding_lstm)
encoder_states_lstm = [encoder_state_h_lstm, encoder_state_c_lstm]

decoder_input_lstm = keras.Input(shape=(None,))
decoder_embedding_lstm = keras.layers.Embedding(input_dim=len(POSSIBLE_OUTPUT_CHARS) + 2,
                                                output_dim=embedding_size)(decoder_input_lstm)
decoder_lstm = keras.layers.LSTM(latent_dim, return_sequences=True)
decoder_lstm_output = decoder_lstm(decoder_embedding_lstm, initial_state=encoder_states_lstm)
decoder_dense_lstm = keras.layers.Dense(len(POSSIBLE_OUTPUT_CHARS) + 1, activation='softmax')
decoder_outputs_lstm = decoder_dense_lstm(decoder_lstm_output)
model_lstm = keras.Model(inputs=[encoder_input_lstm, decoder_input_lstm], outputs=[decoder_outputs_lstm])
adam_opt = keras.optimizers.Adam(learning_rate=.01)

model_lstm.compile(optimizer=adam_opt, loss='sparse_categorical_crossentropy', metrics=['accuracy'])

early_stopping, model_checkpoint, tensorboard, lr_scheduler = get_callbacks()
history_lstm = model_lstm.fit(
    [X_train, X_train_decoder],
    y_train,
    epochs=40,
    validation_data=([X_valid, X_valid_decoder], y_valid),
    callbacks=[early_stopping, model_checkpoint, tensorboard, lr_scheduler])

Epoch 1/40
Epoch 2/40
Epoch 3/40
Epoch 4/40
Epoch 5/40
Epoch 6/40
Epoch 7/40
Epoch 8/40
Epoch 9/40
Epoch 10/40
Epoch 11/40
Epoch 12/40
Epoch 13/40
Epoch 14/40
Epoch 15/40
Epoch 16/40
Epoch 17/40
Epoch 18/40
Epoch 19/40
Epoch 20/40


## Training part - GRU variant

In [30]:
encoder_input_gru = keras.Input(shape=(None,))
encoder_embedding_gru = keras.layers.Embedding(input_dim=len(POSSIBLE_INPUT_CHARS) + 1,
                                               output_dim=embedding_size)(encoder_input_gru)
encoder_gru = keras.layers.GRU(latent_dim, return_state=True)
_, encoder_state_h_gru = encoder_gru(encoder_embedding_gru)

decoder_input_gru = keras.Input(shape=(None,))
decoder_embedding_gru = keras.layers.Embedding(input_dim=len(POSSIBLE_OUTPUT_CHARS) + 2,
                                               output_dim=embedding_size)(decoder_input_gru)
decoder_gru = keras.layers.GRU(latent_dim, return_sequences=True)
decoder_gru_output = decoder_gru(decoder_embedding_gru, initial_state=encoder_state_h_gru)
decoder_dense_gru = keras.layers.Dense(len(POSSIBLE_OUTPUT_CHARS) + 1, activation='softmax')
decoder_outputs_gru = decoder_dense_gru(decoder_gru_output)
model_gru = keras.Model(inputs=[encoder_input_gru, decoder_input_gru], outputs=[decoder_outputs_gru])
adam_opt = keras.optimizers.Adam(learning_rate=.01)

model_gru.compile(optimizer=adam_opt, loss='sparse_categorical_crossentropy', metrics=['accuracy'])

early_stopping, model_checkpoint, tensorboard, lr_scheduler = get_callbacks()
history = model_gru.fit(
    [X_train, X_train_decoder],
    y_train,
    epochs=40,
    validation_data=([X_valid, X_valid_decoder], y_valid),
    callbacks=[early_stopping, model_checkpoint, tensorboard, lr_scheduler])

Epoch 1/40
Epoch 2/40
Epoch 3/40
Epoch 4/40
Epoch 5/40
Epoch 6/40
Epoch 7/40
Epoch 8/40
Epoch 9/40
Epoch 10/40
Epoch 11/40
Epoch 12/40
Epoch 13/40
Epoch 14/40


## Inference part with LSTM

In [39]:
def prepare_date_strs(date_strs: list[str], chars=POSSIBLE_INPUT_CHARS) -> tf.Tensor:
    X_ids = [string_to_ids(dt, chars) for dt in date_strs]
    X = tf.ragged.constant(X_ids, ragged_rank=1)

    return (X + 1).to_tensor()


def prepare_date_strs_padded(date_strs: list[str]) -> tf.Tensor:
    ids = prepare_date_strs(date_strs)

    if ids.shape[1] < X_train.shape[1]:
        ids = tf.pad(ids, [[0, 0], [0, X_train.shape[1] - ids.shape[1]]])

    return ids


def ids_to_date_strs(ids: list[int], chars: str = POSSIBLE_OUTPUT_CHARS) -> list[str]:
    return ["".join([("?" + chars)[index] for index in sequence]) for sequence in ids]


def predict_date_strs(date_strs: list[str]) -> list[str]:
    X_new = prepare_date_strs_padded(date_strs)
    y_pred = tf.fill(dims=(len(X_new), 1), value=sos_id)

    for index in range(max_output_length):
        pad_size = max_output_length - y_pred.shape[1]
        X_decoder = tf.pad(y_pred, [[0, 0], [0, pad_size]])
        y_probas_next = model_lstm.predict([X_new, X_decoder])[:, index:index+1]
        y_pred_next = tf.argmax(y_probas_next, axis=-1, output_type=tf.int32)
        y_pred = tf.concat([y_pred, y_pred_next], axis=1)

    return ids_to_date_strs(y_pred[:, 1:-1] + 1)


predict_date_strs(['1999-May-01', '8123-June-30', '1213-July-16', '6990-Demember-12', '5432-Sepxxmber-09'])



['1999-05-01', '8123-06-30', '1213-07-16', '6990-12-12', '5432-09-09']

Maybe it's not able to predict dates with [1, 3] or [5, ...) numbers on the year position, but it can translate to the correct date even with a misspelling (sometimes :) )!