In [37]:
# Import our dependencies
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np
import tensorflow as tf
import keras_tuner as kt
import pathlib
import random

from credentials import SAMPLE_DB, FULL_DB
from librarian import SAMPLE_DATA_DIR

import encoders
import db_connect
import helpers

In [38]:
DATA_DIR = SAMPLE_DATA_DIR
CHUNK_SIZE = 128
PROCESSING_UNITS = 32
LAYER_UNITS = max(1, CHUNK_SIZE // 10)

INFER_TEXT = False
INFER_KEY = not INFER_TEXT

USE_CUSTOM_LOSS = True
USE_CUSTOM_OUTPUT_ACTIVATION = True

OUTPUT_MIN = 0
OUTPUT_MAX = len(encoders.CHARSET)-1
CUSTOM_LOSS_MODULO = OUTPUT_MAX+1

if INFER_TEXT:
    MAIN_ACCURACY_METRIC = "mae"
    LOSS_METRIC = "mean_squared_error"
    OUTPUT_SIZE = CHUNK_SIZE
    OPTIMIZER = "sgd"
else:
    MAIN_ACCURACY_METRIC = "mae"
    LOSS_METRIC = "mae"
    OUTPUT_SIZE = 1 # actually depends on cipher
    OPTIMIZER = "adamax"    

ENCRYPTED_FILE_LIMIT = 10 # -1 to disable limit

BASE_TRAIN_PCT = 0.75   # Start here. If it exceed the max count, reduce it. Note 0.75 is the default.
#MAX_TRAIN_COUNT = 100000 # -1 to disable
MAX_TRAIN_COUNT = 10000 # -1 to disable
SPLIT_SEED = 42

LOAD_BEST_MODEL = False # If False, a new model will be created from scratch
SAVE_BEST_MODEL = True
TRAIN_MODEL = True
EPOCHS = 5
BATCH_SIZE = 32 # Default is 32

db = db_connect.DB(SAMPLE_DB)

In [39]:
# Get database IDs for encoders and key types

encoder_ids= {}
key_type_ids = {}

with db.get_session() as session:
    for encoder in encoders.ALL_ENCODER_NAMES:
        id = db.get_encoder_id(session, encoder)
        encoder_ids[encoder] = id

    print(f"Encoder IDs: {encoder_ids}")

    for key_type in encoders.KEY_NAMES:
        id = db.get_key_type_id(session, key_type)
        key_type_ids[key_type] = id

    print(f"Key Type IDs: {key_type_ids}")

Encoder IDs: {'None': 1, 'Simplifier': 2, 'Caesar Cipher': 3, 'Substitution Cipher': 4, 'Enigma Machine': 5}
Key Type IDs: {'Character Offset': 1, 'Character Map': 2, 'Rotor Settings': 3}


In [40]:
# Map source ID to plaintext file (1) details, and source ID to corresponding ciphertext files (1+) details
sid_to_p = {}
sid_to_c = {}

cipher_id = encoder_ids[encoders.ENCODER_CAESAR]
with db.get_session() as session:
    # Get all files encrypted with the cipher we care about
    encrypted_files = db.get_files_by_source_and_encoder(session, -1, cipher_id)

    if len(encrypted_files) > ENCRYPTED_FILE_LIMIT and ENCRYPTED_FILE_LIMIT > 0:
        encrypted_files = random.sample(encrypted_files, ENCRYPTED_FILE_LIMIT)

    for c in encrypted_files:
        sid = c.source_id
    
        if sid not in sid_to_p:
            plaintext_ids = db.get_files_by_source_and_encoder(session, sid, encoder_ids[encoders.ENCODER_SIMPLIFIER])
            if len(plaintext_ids) != 1:
                raise Exception(f"Found {len(plaintext_ids)} plaintexts for source ID {sid}; should be exactly 1")
            sid_to_p[sid] = plaintext_ids[0]

        if sid not in sid_to_c:
            sid_to_c[sid] = []
        sid_to_c[sid].append(c)

len(sid_to_p), len(sid_to_c)

(6, 6)

In [41]:
# Build up the features (X, the cipher texts as offsets) and targets (y, either the plain texts as offsets OR the key).
# Note targets are not necessarily unique.
X = []
y = []

with db.get_session() as session:
    for sid in sid_to_p:
        if INFER_TEXT:
            plaintext = encoders.string_to_offsets(helpers.read_text_file(sid_to_p[sid].path))
            target_chunks = helpers.chunkify(plaintext, CHUNK_SIZE)    
    
        for c in sid_to_c[sid]:
            ciphertext = encoders.string_to_offsets(helpers.read_text_file(c.path))
            feature_chunks = helpers.chunkify(ciphertext, CHUNK_SIZE)

            if INFER_KEY:
                key_value = float(db.get_key_by_id(session, c.key_id).value)
    
            for i in range (len(feature_chunks)):
                X.append(feature_chunks[i])

                if INFER_TEXT:
                    y.append(target_chunks[i])                

                if INFER_KEY:
                    y.append(key_value)

X = np.array(X)
y = np.array(y)

X.shape, y.shape, X[0].shape, y[0].shape, X[0], y[0]

((24381, 128),
 (24381,),
 (128,),
 (),
 array([11, 11, 27, 29, 26, 15, 32, 14, 16, 15, 10, 13, 36, 10, 15, 16, 12,
        32, 29, 20, 15, 16, 29,  4, 10, 13, 29, 20, 12, 25, 10, 34, 20, 23,
        14, 26, 35, 10, 12, 25, 15, 10, 31, 19, 16, 10, 26, 25, 23, 20, 25,
        16, 11, 15, 20, 30, 31, 29, 20, 13, 32, 31, 16, 15, 10, 27, 29, 26,
        26, 17, 29, 16, 12, 15, 20, 25, 18, 10, 31, 16, 12, 24, 10, 12, 31,
        10, 19, 31, 31, 27,  2,  6,  6, 34, 34, 34,  5, 27, 18, 15, 27,  5,
        25, 16, 31, 10, 57, 31, 19, 20, 30, 11, 17, 20, 23, 16, 10, 34, 12,
        30, 10, 27, 29, 26, 15, 32, 14, 16]),
 12.0)

In [42]:
# Split the preprocessed data into a training and testing dataset
train_count = int(round(len(y) * BASE_TRAIN_PCT))
print(f"Train count would be {train_count}")
if train_count > MAX_TRAIN_COUNT and MAX_TRAIN_COUNT > -1:
    train_count = int(MAX_TRAIN_COUNT)
print(f"Train count is {train_count}")

X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=train_count, random_state=SPLIT_SEED)
print( len(X), len(y), len(X_train), len(X_test), len(y_train), len(y_test) )

Train count would be 18286
Train count is 10000
24381 24381 10000 14381 10000 14381


In [43]:
# Create a StandardScaler instances
scaler = StandardScaler()

# Fit the StandardScaler
X_scaler = scaler.fit(X_train)

# Scale the data
X_train_scaled = X_scaler.transform(X_train)
X_test_scaled = X_scaler.transform(X_test)
X_train_scaled.shape, X_test_scaled.shape

((10000, 128), (14381, 128))

In [47]:
# Custom output activation...
def modulo_output(x):
    #return tf.math.round(x)
    return tf.math.mod(x, OUTPUT_MAX)



    # Compute the raw difference
    #diff = tf.abs(y_true - y_pred)
    # Apply modulo operation to handle wrap-around cases
    #mod_diff = tf.math.mod(diff, CUSTOM_LOSS_MODULO)
    # Ensure the distance is within the range [0, CUSTOM_LOSS_MODULO/2]
    #loss = tf.minimum(mod_diff, CUSTOM_LOSS_MODULO - mod_diff) 
    #return tf.reduce_mean(loss)


if LOAD_BEST_MODEL:
    print(f"Loading model from {BEST_PATH}")
    nn = tf.keras.models.load_model(BEST_PATH)
else:
    print("Building new model")
    in_shape = (CHUNK_SIZE,)
    nn = tf.keras.models.Sequential()

    try_RNN = True
    if try_RNN:
        nn.add(tf.keras.layers.Embedding(input_dim=CHUNK_SIZE, output_dim=PROCESSING_UNITS, name="Embedding_Input"))
        nn.add(tf.keras.layers.LSTM(PROCESSING_UNITS, name="LSTM"))
        #nn.add(tf.keras.layers.Dense(units=PROCESSING_UNITS, activation="sigmoid", name="Sigmoid"))
        #nn.add(tf.keras.layers.Rescaling(scale=OUTPUT_MAX, offset=0, name="Rescaler")) # Input is 0-1
        
        if USE_CUSTOM_OUTPUT_ACTIVATION:
            nn.add(tf.keras.layers.Dense(OUTPUT_SIZE, activation=modulo_output, name="Output_Limiter"))
        else:
            nn.add(tf.keras.layers.Dense(OUTPUT_SIZE, name="Linear_Output"))

    if not try_RNN:
        # Input layer
        nn.add(tf.keras.Input(shape=in_shape))
        
        # Hidden layers
        activations = ["tanh", "relu", "elu", "exponential", "gelu", "mish", "relu6", "tanh", "selu"]
        unit_counts = [LAYER_UNITS]
        for u in unit_counts:
            for a in activations:
                nn.add(tf.keras.layers.Dense(units=u, activation=a))
        
        # Output layer
        nn.add(tf.keras.layers.Dense(units=OUTPUT_SIZE, activation=modulo_output))
    
# Check the structure of the model
nn.summary()

Building new model


In [None]:
%%time

# Custom loss function was adapted from code generated by Copilot
def modulo_distance_loss(y_true, y_pred):
    """ Custom loss function to compute the modulo distance. 
    Args: 
        y_true: True values (ground truth). 
        y_pred: Predicted values. 
        modulo: The modulo value to apply -- hard coded.
    Returns: 
        The computed loss. 
    """ 
    # Compute the raw difference
    diff = tf.abs(y_true - y_pred)
    # Apply modulo operation to handle wrap-around cases
    mod_diff = tf.math.mod(diff, CUSTOM_LOSS_MODULO)
    # Ensure the distance is within the range [0, CUSTOM_LOSS_MODULO/2]
    loss = tf.minimum(mod_diff, CUSTOM_LOSS_MODULO - mod_diff) 
    return tf.reduce_mean(loss)

# Set up training checkpoint to save after each epoch, if it is a new best model:
BEST_PATH = './best.keras'
model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=BEST_PATH,
    monitor="loss",
    mode="min",
    save_best_only=True,
    save_weights_only=False,
    verbose=1)

if TRAIN_MODEL:
    print(f"Training model")

    if USE_CUSTOM_LOSS:
        loss = modulo_distance_loss
    else:
        loss = LOSS_METRIC
    
    if SAVE_BEST_MODEL:
        callbacks = [model_checkpoint_callback]
    else:
        callbacks = None
    
    # Compile the Sequential model together and customize metrics
    nn.compile(loss=loss, optimizer=OPTIMIZER, metrics=[MAIN_ACCURACY_METRIC])
    
    # Fit the model to the training data
    fit_model = nn.fit(X_train_scaled, y_train, epochs=EPOCHS, callbacks=callbacks, batch_size=BATCH_SIZE)

# Evaluate the model using the test data
model_loss, model_accuracy = nn.evaluate(X_test_scaled,y_test,verbose=2)
print(f"Loss: {model_loss}, Accuracy: {model_accuracy}")

Training model
Epoch 1/5


In [46]:
def decode_chunks_with_model(chunks: list[list], model, scaler, input_already_scaled = True) -> list[list]:
    if input_already_scaled:
        return model.predict(chunks)
    else:
        return model.predict(scaler.transform(chunks))

def decode_text_with_model(ciphertext: str, model, scaler) -> str:
    offset_chunks = helpers.chunkify(encoders.string_to_offsets(ciphertext), CHUNK_SIZE)
    decoded_chunks = decode_chunks_with_model(offset_chunks, model, scaler, input_already_scaled = False)
    rounded = np.rint(decoded_chunks.flatten()).astype(int)
    return encoders.offsets_to_string(rounded)

def infer_key_with_model(ciphertext: str, model, scaler) -> int:
    chunks = helpers.string_to_bytes(ciphertext, CHUNK_SIZE)
    keys = model.predict(scaler.transform(chunks))
    key = int(round(np.median(keys)))
    return key

if INFER_TEXT:    
    CHUNKS_TO_CHECK = 2
else:
    CHUNKS_TO_CHECK = 20

cipher_file_db = sid_to_c[list(sid_to_c.keys())[0]][0]
ciphertext_path = cipher_file_db.path
ciphertext = helpers.read_text_file(ciphertext_path)
ciphertext = ciphertext[0:CHUNK_SIZE * CHUNKS_TO_CHECK]
    
if INFER_TEXT:    
    print("Decoded   : ", decode_text_with_model(ciphertext, nn, X_scaler))
if INFER_KEY:
    with db.get_session() as session:
        correct_key = int(db.get_key_by_id(session, cipher_file_db.key_id).value)
    print("Correct Key: ", correct_key)
    
    inferred_key = infer_key_with_model(ciphertext, nn, X_scaler)
    print("Inferred Key: ", inferred_key)

chunks = helpers.string_to_bytes(ciphertext, CHUNK_SIZE)
print(nn.predict(scaler.transform(chunks)))

nn.summary()

Correct Key:  12
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 196ms/step
Inferred Key:  36
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 32ms/step
[[38.35936 ]
 [37.190166]
 [36.671593]
 [37.151764]
 [35.965767]
 [36.73343 ]
 [35.906082]
 [37.65701 ]
 [36.46104 ]
 [33.913437]
 [34.696045]
 [37.351574]
 [39.77619 ]
 [35.884018]
 [34.882294]
 [30.680628]
 [34.591324]
 [36.59607 ]
 [30.97778 ]
 [34.52593 ]]
