## RNN for decoding Enigma codes
The [Enigma machine](https://brilliant.org/wiki/enigma-machine/) was designed by Germans during WWII so they could communicate via encrypted messages. Alan Turing and others were able to design a decoder to crack the encryption and intercept messages. Here we'll try to create a seq2seq RNN with attention to decode enigma messages. The motivation for this architecture was borrowed from machine translation and adapted from [this](https://www.tensorflow.org/tutorials/text/nmt_with_attention) Tensorflow example

In [1]:
import os
import time
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import pickle
import tensorflow as tf
from etl import EnigmaDataProcessor
from evaluate import score, predict

In [2]:
_NUM_EXAMPLES = 1000000
_NUM_EXAMPLES = 100
_BATCH_SIZE = 128
_EPOCHS = 4
_MAX_LEN = 42

Here's an example of encoded and decoded texts, respectively

In [3]:
data_proc = EnigmaDataProcessor(max_len=_MAX_LEN)

In [4]:
data_proc.generate_text(5)

(['sPIMAEZLHSYEMDUPOYOJMFAJEMGGLe',
  'sNTBZCAUVWNTFNORLJKSRBISNWUe',
  'sVNQHVVYECNEHBQXZJKFISCPQVXXNVQGRVLJHe',
  'sLMCQTASHEOBBAOWAZGDHe',
  'sLMCOXZNPVGQBBMKDUHJIQOLMQBDWYe'],
 ['sOFFICEROOMTHATBANKHUSBANDPERe',
  'sABOVETHEMSELVESFROMSUCCESSe',
  'sCHILDINVESTMENTPROVEFIRSTENVIRONMENTe',
  'sYOUNOTDOCTORDEMOCRATe',
  'sYOUTHEYAIRPREPARETHEREDOOCCURe'])

## RNN with Attention

In [5]:
class Encoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, enc_units, batch_sz):
        super(Encoder, self).__init__()
        self.batch_sz = batch_sz
        self.enc_units = enc_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(self.enc_units,
                                       return_sequences=True,
                                       return_state=True,
                                       recurrent_initializer='glorot_uniform')

    def call(self, x, hidden):
        x = self.embedding(x)
        output, state = self.gru(x, initial_state = hidden)
        return output, state

    def initialize_hidden_state(self):
        return tf.zeros((self.batch_sz, self.enc_units))

In [6]:
class BahdanauAttention(tf.keras.layers.Layer):
    def __init__(self, units):
        super(BahdanauAttention, self).__init__()
        self.W1 = tf.keras.layers.Dense(units)
        self.W2 = tf.keras.layers.Dense(units)
        self.V = tf.keras.layers.Dense(1)

    def call(self, query, values):
        # hidden shape == (batch_size, hidden size)
        # hidden_with_time_axis shape == (batch_size, 1, hidden size)
        # we are doing this to perform addition to calculate the score
        hidden_with_time_axis = tf.expand_dims(query, 1)

        # score shape == (batch_size, max_length, 1)
        # we get 1 at the last axis because we are applying score to self.V
        # the shape of the tensor before applying self.V is (batch_size, max_length, units)
        score = self.V(tf.nn.tanh(
            self.W1(values) + self.W2(hidden_with_time_axis)))

        # attention_weights shape == (batch_size, max_length, 1)
        attention_weights = tf.nn.softmax(score, axis=1)

        # context_vector shape after sum == (batch_size, hidden_size)
        context_vector = attention_weights * values
        context_vector = tf.reduce_sum(context_vector, axis=1)

        return context_vector, attention_weights

In [7]:
class Decoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, dec_units, batch_sz):
        super(Decoder, self).__init__()
        self.batch_sz = batch_sz
        self.dec_units = dec_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(self.dec_units,
                                       return_sequences=True,
                                       return_state=True,
                                       recurrent_initializer='glorot_uniform')
        self.fc = tf.keras.layers.Dense(vocab_size)

        # used for attention
        self.attention = BahdanauAttention(self.dec_units)

    def call(self, x, hidden, enc_output):
        # enc_output shape == (batch_size, max_length, hidden_size)
        context_vector, attention_weights = self.attention(hidden, enc_output)

        # x shape after passing through embedding == (batch_size, 1, embedding_dim)
        x = self.embedding(x)

        # x shape after concatenation == (batch_size, 1, embedding_dim + hidden_size)
        x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)

        # passing the concatenated vector to the GRU
        output, state = self.gru(x)

        # output shape == (batch_size * 1, hidden_size)
        output = tf.reshape(output, (-1, output.shape[2]))

        # output shape == (batch_size, vocab)
        x = self.fc(output)

        return x, state, attention_weights

## Training

In [8]:
def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)

    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask

    return tf.reduce_mean(loss_)

In [9]:
@tf.function
def train_step(inp, targ, enc_hidden, char_tokenizer):
    loss = 0

    with tf.GradientTape() as tape:
        enc_output, enc_hidden = encoder(inp, enc_hidden)

        dec_hidden = enc_hidden

        dec_input = tf.expand_dims([char_tokenizer.word_index['s']] * _BATCH_SIZE, 1)

        # Teacher forcing - feeding the target as the next input
        for t in range(1, targ.shape[1]):
            # passing enc_output to the decoder
            predictions, dec_hidden, _ = decoder(dec_input, dec_hidden, enc_output)

            loss += loss_function(targ[:, t], predictions)

            # using teacher forcing
            dec_input = tf.expand_dims(targ[:, t], 1)

    batch_loss = (loss / int(targ.shape[1]))

    variables = encoder.trainable_variables + decoder.trainable_variables

    gradients = tape.gradient(loss, variables)

    optimizer.apply_gradients(zip(gradients, variables))

    return batch_loss

In [10]:
dataset = data_proc.generate_examples(_NUM_EXAMPLES)

In [11]:
steps_per_epoch = _NUM_EXAMPLES//_BATCH_SIZE
embedding_dim = 256
units = 1024
vocab_size = len(data_proc.char_tokenizer.word_index) + 1

In [12]:
dataset = dataset.shuffle(_NUM_EXAMPLES).batch(_BATCH_SIZE, drop_remainder=True)

In [13]:
example_input_batch, example_target_batch = next(iter(dataset))
example_input_batch.shape, example_target_batch.shape

(TensorShape([10, 44]), TensorShape([10, 44]))

In [14]:
encoder = Encoder(vocab_size, embedding_dim, units, _BATCH_SIZE)

# sample input
sample_hidden = encoder.initialize_hidden_state()
sample_output, sample_hidden = encoder(example_input_batch, sample_hidden)
print ('Encoder output shape: (batch size, sequence length, units) {}'.format(sample_output.shape))
print ('Encoder Hidden state shape: (batch size, units) {}'.format(sample_hidden.shape))

Encoder output shape: (batch size, sequence length, units) (10, 44, 1024)
Encoder Hidden state shape: (batch size, units) (10, 1024)


In [15]:
attention_layer = BahdanauAttention(10)
attention_result, attention_weights = attention_layer(sample_hidden, sample_output)

print("Attention result shape: (batch size, units) {}".format(attention_result.shape))
print("Attention weights shape: (batch_size, sequence_length, 1) {}".format(attention_weights.shape))

Attention result shape: (batch size, units) (10, 1024)
Attention weights shape: (batch_size, sequence_length, 1) (10, 44, 1)


In [16]:
decoder = Decoder(vocab_size, embedding_dim, units, _BATCH_SIZE)

sample_decoder_output, _, _ = decoder(tf.random.uniform((_BATCH_SIZE, 1)),
                                      sample_hidden, sample_output)

print ('Decoder output shape: (batch_size, vocab size) {}'.format(sample_decoder_output.shape))

Decoder output shape: (batch_size, vocab size) (10, 29)


In [17]:
optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')

In [18]:
checkpoint_dir = "model_rnn"
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(optimizer=optimizer,
                                 encoder=encoder,
                                 decoder=decoder)

In [30]:
for epoch in range(_EPOCHS):
    start = time.time()

    enc_hidden = encoder.initialize_hidden_state()
    total_loss = 0

    for (batch, (inp, targ)) in enumerate(dataset.take(steps_per_epoch)):
        batch_loss = train_step(inp, targ, enc_hidden, data_proc.char_tokenizer)
        total_loss += batch_loss

        if batch % 100 == 0:
            print('Epoch {} Batch {} Loss {:.4f}'.format(epoch + 1,
                                                       batch,
                                                       batch_loss.numpy()))
    # saving (checkpoint) the model every 2 epochs
    if (epoch + 1) % 2 == 0:
        checkpoint.save(file_prefix = checkpoint_prefix)

    print('Epoch {} Loss {:.4f}'.format(epoch + 1,
                                      total_loss / steps_per_epoch))
    print('Time taken for 1 epoch {} sec\n'.format(time.time() - start))

Epoch 1 Batch 0 Loss 2.2267
Epoch 1 Loss 1.9271
Time taken for 1 epoch 102.9673719406128 sec

Epoch 2 Batch 0 Loss 1.8603
Epoch 2 Loss 1.7751
Time taken for 1 epoch 33.44476938247681 sec

Epoch 3 Batch 0 Loss 1.7875
Epoch 3 Loss 1.7096
Time taken for 1 epoch 32.94222593307495 sec

Epoch 4 Batch 0 Loss 1.6145
Epoch 4 Loss 1.6617
Time taken for 1 epoch 33.193976402282715 sec



## Evaluation

In [39]:
def evaluate(sentence):
    inputs = tf.convert_to_tensor(data_proc._vectorize_and_pad([sentence]))

    result = ''

    hidden = [tf.zeros((1, units))]
    enc_out, enc_hidden = encoder(inputs, hidden)

    dec_hidden = enc_hidden
    dec_input = tf.expand_dims([data_proc.char_tokenizer.word_index['s']], 0)

    for t in range(_MAX_LEN):
        predictions, _, _ = decoder(dec_input, dec_hidden, enc_out)
        predicted_id = tf.argmax(predictions[0]).numpy()

        if data_proc.char_tokenizer.index_word[predicted_id] == "e":
            return result, sentence

        result += data_proc.char_tokenizer.index_word[predicted_id]
        # the predicted ID is fed back into the model
        dec_input = tf.expand_dims([predicted_id], 0)

    return result, sentence

In [42]:
cipher_list, plain_list = data_proc.generate_text(10)

In [43]:
score(predict(cipher_list), cipher_list)

0.0