<a href="https://colab.research.google.com/github/john-breton/SYSC5500_DNN_Encryption/blob/main/SYSC5500_F_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# pip statements
!pip install des
!pip install rsa
!pip install pycryptodome
!pip install -Iv matplotlib==3.4.2

In [None]:
# import statements
import numpy as np
import pandas as pd
import os
import rsa
import string
import base64
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick


from des import DesKey
from difflib import SequenceMatcher
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from Crypto.Util.Padding import unpad
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras.models import Sequential, load_model
from keras.layers import Dense, Bidirectional, GRU
from base64 import urlsafe_b64encode, urlsafe_b64decode
from keras.preprocessing.text import Tokenizer as tk
from base64 import urlsafe_b64decode as b64decode

In [None]:
""" 
Data Preperation
"""

PATH_TO_DATA_CSV = "ReplaceMe.txt"
PATH_TO_DATA_MODELS = "ReplaceMe.txt"

# Constants
NUMBER_OF_REVIEWS = 100000
VALIDATION_DATA_SIZE = 10000

KEY_XOR = b"7bJ1R"
TRAINING_DATA_XOR = []
VALIDATION_DATA_XOR = []

KEY_DES = DesKey(b"RgUkXp2s")
TRAINING_DATA_DES = []
VALIDATION_DATA_DES = []

KEY_3DES = DesKey(b"v8y/B?E(H+MbQeTh")
TRAINING_DATA_3DES = []
VALIDATION_DATA_3DES = []

AEScipher = AES.new(b'y$B?E(H+MbQeThWm', AES.MODE_ECB)
TRAINING_DATA_AES = []
VALIDATION_DATA_AES = []

(KEY_PUB_RSA, KEY_PRIV_RSA) = rsa.newkeys(2048)
TRAINING_DATA_RSA = []
VALIDATION_DATA_RSA = []

# Tokenizer shenanegins
tk = tk(char_level=True, lower=False)
tk.fit_on_texts(string.printable[:-2])

# Load in the reviews into a workable array
df = pd.read_csv(PATH_TO_DATA_CSV)

# Only use the right number of reviews. Access this object in later code blocks
df_temp = df.Summary[:NUMBER_OF_REVIEWS + VALIDATION_DATA_SIZE]
df_trim = df_temp.str.encode('ascii', 'ignore').str.decode('ascii')

In [None]:
""" 
XOR Encryption
Key used: b"7bJ1R"
"""

# Dylan's XOR, takes two Byte objects
def XOR(m, key):
  m_encrypted = bytearray()

  for i in range(len(m)):
    m_encrypted.append( m[i] ^ key[i % len(key)] )

  return bytes(m_encrypted)

# No need to rerun if data already exists
if len(TRAINING_DATA_XOR) != NUMBER_OF_REVIEWS or len(VALIDATION_DATA_XOR) != VALIDATION_DATA_SIZE:
  # Setup up training data
  for rev_xor in range(0, NUMBER_OF_REVIEWS):
    # Some of the summaries are just numbers, so we need to convert them to
    # strings first to ensure encryption functions properly
    TRAINING_DATA_XOR.append((str(df_trim[rev_xor]), 
                             XOR(str(df_trim[rev_xor]).encode(), KEY_XOR)))
    
  # Setup validation data
  for rev_val_xor in range(NUMBER_OF_REVIEWS, 
                            NUMBER_OF_REVIEWS + VALIDATION_DATA_SIZE):
    VALIDATION_DATA_XOR.append((str(df_trim[rev_val_xor]), 
                              XOR(str(df_trim[rev_val_xor]).encode(), KEY_XOR)))

# Sanity Check
print(len(TRAINING_DATA_XOR))
print(len(VALIDATION_DATA_XOR))

print(TRAINING_DATA_XOR[0])
print(XOR(TRAINING_DATA_XOR[0][1], KEY_XOR).decode('utf-8'))

In [None]:
""" 
DES Encryption ECB
Key used: RgUkXp2s (64-bit)
"""

# No need to rerun if data already exists
if len(TRAINING_DATA_DES) != NUMBER_OF_REVIEWS or len(VALIDATION_DATA_DES) != VALIDATION_DATA_SIZE:
  # Setup up training data
  for rev_des in range(0, NUMBER_OF_REVIEWS):
    # Some of the summaries are just numbers, so we need to convert them to
    # strings first to ensure encryption functions properly
    TRAINING_DATA_DES.append((str(df_trim[rev_des]), 
                              KEY_DES.encrypt(str(df_trim[rev_des]).encode(), 
                                                padding=True)))
    
  # Setup validation data
  for rev_val_des in range(NUMBER_OF_REVIEWS, 
                            NUMBER_OF_REVIEWS + VALIDATION_DATA_SIZE):
    VALIDATION_DATA_DES.append((str(df_trim[rev_val_des]), 
                              KEY_DES.encrypt(str(df_trim[rev_val_des]).encode(), 
                                                padding=True)))

# Sanity Check
print(len(TRAINING_DATA_DES))
print(len(VALIDATION_DATA_DES))

print(TRAINING_DATA_DES[0])
print(KEY_DES.decrypt(TRAINING_DATA_DES[0][1], padding=True).decode('utf-8'))

In [None]:
""" 
3DES Encryption ECB
Key used: v8y/B?E(H+MbQeTh (128-bit)
"""

# No need to rerun if data already exists
if len(TRAINING_DATA_3DES) != NUMBER_OF_REVIEWS or len(VALIDATION_DATA_3DES) != VALIDATION_DATA_SIZE:
  # Setup up training data
  for rev_3des in range(0, NUMBER_OF_REVIEWS):
    # Some of the summaries are just numbers, so we need to convert them to
    # strings first to ensure encryption functions properly
    TRAINING_DATA_3DES.append((str(df_trim[rev_3des]), 
                              KEY_3DES.encrypt(str(df_trim[rev_3des]).encode(), 
                                                padding=True)))
    
  # Setup validation data
  for rev_val_3des in range(NUMBER_OF_REVIEWS, 
                            NUMBER_OF_REVIEWS + VALIDATION_DATA_SIZE):
    VALIDATION_DATA_3DES.append((str(df_trim[rev_val_3des]), 
                              KEY_3DES.encrypt(str(df_trim[rev_val_3des]).encode(), 
                                                padding=True)))

# Sanity Check
print(len(TRAINING_DATA_3DES))
print(len(VALIDATION_DATA_3DES))

print(TRAINING_DATA_3DES[0])
print(KEY_3DES.decrypt(TRAINING_DATA_3DES[0][1], padding=True).decode('utf-8'))

In [None]:
""" 
AES Encryption ECB
Key used: y$B?E(H+MbQeThWm (128-bit)
"""

# No need to rerun if data already exists
if len(TRAINING_DATA_AES) != NUMBER_OF_REVIEWS or len(VALIDATION_DATA_AES) != VALIDATION_DATA_SIZE:
  # Setup up training data
  for rev_aes in range(0, NUMBER_OF_REVIEWS):
    # Some of the summaries are just numbers, so we need to convert them to
    # strings first to ensure encryption functions properly
    TRAINING_DATA_AES.append((str(df_trim[rev_aes]), 
                              AEScipher.encrypt(pad(str(df_trim[rev_aes]).encode(), 
                                                    AES.block_size))))
    
  # Setup validation data
  for rev_val_aes in range(NUMBER_OF_REVIEWS, 
                            NUMBER_OF_REVIEWS + VALIDATION_DATA_SIZE):
    VALIDATION_DATA_AES.append((str(df_trim[rev_val_aes]), 
                              AEScipher.encrypt(pad(str(df_trim[rev_val_aes]).encode(), 
                                                    AES.block_size))))

# Sanity Check
print(len(TRAINING_DATA_AES))
print(len(VALIDATION_DATA_AES))

print(TRAINING_DATA_AES[0])
print(unpad(AEScipher.decrypt(TRAINING_DATA_AES[0][1]), AES.block_size).decode('utf-8'))

In [None]:
""" 
RSA Encryption
"""

# No need to rerun if data already exists
if len(TRAINING_DATA_RSA) != NUMBER_OF_REVIEWS or len(VALIDATION_DATA_RSA) != VALIDATION_DATA_SIZE:
  # Setup up training data
  for rev_rsa in range(0, NUMBER_OF_REVIEWS):
    # Some of the summaries are just numbers, so we need to convert them to
    # strings first to ensure encryption functions properly
    TRAINING_DATA_RSA.append((str(df_trim[rev_rsa]), 
                              rsa.encrypt(str(df_trim[rev_rsa]).encode(), 
                                          KEY_PUB_RSA)))
    
  # Setup validation data
  for rev_val_rsa in range(NUMBER_OF_REVIEWS, 
                            NUMBER_OF_REVIEWS + VALIDATION_DATA_SIZE):
    VALIDATION_DATA_RSA.append((str(df_trim[rev_val_rsa]), 
                                rsa.encrypt(str(df_trim[rev_val_rsa]).encode(), 
                                            KEY_PUB_RSA)))

# Sanity Check
print(len(TRAINING_DATA_RSA))
print(len(VALIDATION_DATA_RSA))

print(TRAINING_DATA_RSA[0])
print(rsa.decrypt(TRAINING_DATA_RSA[0][1], KEY_PRIV_RSA).decode('utf-8'))

In [None]:
# ML Training
for z in range(0, 5):
  if z == 0:
    curr_train_data = TRAINING_DATA_3DES
    name = "3DES"
  elif z == 1:
    curr_train_data = TRAINING_DATA_AES
    name = "AES"
  elif z == 2:
    curr_train_data = TRAINING_DATA_XOR
    name = "XOR"
  elif z == 3:
    curr_train_data = TRAINING_DATA_DES
    name = "DES"
  else:
    curr_train_data = TRAINING_DATA_RSA
    name = "RSA"
  for k in range(1, 11):
    if k < 10:
      MODEL_NAME = f"{name}_{k}0percent"
      INPUT_RANGE = int(NUMBER_OF_REVIEWS * float(f"0.{k}"))
    else:
      MODEL_NAME = f"{name}_100percent"
      INPUT_RANGE = NUMBER_OF_REVIEWS

    # Specify what data to use here
    temp_plain = []
    temp_cipher = []
    train_cipher = None
    train_plain = None

    # Convert the data to a ascii readable encoding
    for i in range(0, INPUT_RANGE):
      temp_plain.append(urlsafe_b64encode(curr_train_data[i][0].encode()).decode())
      temp_cipher.append(urlsafe_b64encode(curr_train_data[i][1]).decode())

    # Pad the strings until they are of length 350 (consistent throughout)
    for j in range(0, INPUT_RANGE):
      temp_plain[j] = temp_plain[j].ljust(350, "0")
      temp_cipher[j] = temp_cipher[j].ljust(350, "0")

    # Create np arrays for both datasets
    train_plain = np.expand_dims(np.array(tk.texts_to_sequences(temp_plain)), -1)
    train_cipher = np.expand_dims(np.array(tk.texts_to_sequences(temp_cipher)), -1)

    # Prepare the model
    model = Sequential()
    model.add(Bidirectional(GRU(128, activation='tanh', return_sequences=True), 
                            input_shape=(train_cipher.shape[1:])))
    model.add(Dense(1, activation='linear'))
    model.summary()

    # Train the model and save it so it can be loaded later for analysis
    model_path = f'{PATH_TO_DATA_MODELS}/{name}/{MODEL_NAME}.h5'
    calls = [
        EarlyStopping(monitor='val_loss', min_delta=1e-3, patience=3, verbose=1, 
                      mode='min'),
        ModelCheckpoint(model_path, monitor='val_accuracy', verbose=1, 
                        save_best_only=True, mode='max')
    ]

    model.compile(loss='mse', optimizer='adam', metrics=['accuracy'])
    model.fit(train_cipher, train_plain, validation_split=.1, epochs=10, callbacks=calls, batch_size=10)

In [None]:
# Data viz
xor_data = []
des_data = []
three_des_data = []
aes_data = []
rsa_data = []


for i in range(0, 5):
  curr_encryption = {
      0: "XOR",
      1: "DES",
      2: "3DES",
      3: "AES",
      4: "RSA"
  }[i]
  curr_dataset = {
      0: VALIDATION_DATA_XOR,
      1: VALIDATION_DATA_DES,
      2: VALIDATION_DATA_3DES,
      3: VALIDATION_DATA_AES,
      4: VALIDATION_DATA_RSA
  }[i]

  for k in range(1, 11):
    model_path = f"{PATH_TO_DATA_MODELS}/{curr_encryption}/{curr_encryption}_{k}0percent.h5"
    temp_curr = []
    temp_curr_plain = []
    sum_match = 0

    # Load the model for analysis
    model = load_model(model_path)

    # Prepare our data for input into our model
    for y in range(0, VALIDATION_DATA_SIZE):
      temp_curr.append(urlsafe_b64encode(curr_dataset[y][1]).decode())
      temp_curr_plain.append(urlsafe_b64encode(curr_dataset[y][0].encode()).decode())

    # Pad the strings until they are of length 200 (consistent throughout)
    if i != 4:
      for z in range(0, VALIDATION_DATA_SIZE):
        temp_curr[z] = temp_curr[z].ljust(200, "0")
    else:
      # RSA is a special little algorithm and needs 350 length padding
      for z in range(0, VALIDATION_DATA_SIZE):
        temp_curr[z] = temp_curr[z].ljust(350, "0")

    # Dimensionality for input into the model
    curr_validate = np.expand_dims(np.array(tk.texts_to_sequences(temp_curr)), -1)

    # Run the data on the model for validation purposes
    for b in range(0, 100):
      prediction = model.predict(curr_validate[b], verbose=0)
      text_prediction = tk.sequences_to_texts([map(round, prediction.reshape(-1).tolist())])[0].replace(' ', '')
      text_prediction = text_prediction[0: len(temp_curr_plain[b])]
      sum_match = sum_match + SequenceMatcher(None, text_prediction, temp_curr_plain[b]).ratio() * 100
    
    dataset_to_use = {
        0: xor_data,
        1: des_data,
        2: three_des_data,
        3: aes_data,
        4: rsa_data
    }[i]

    dataset_to_use.append(sum_match / VALIDATION_DATA_SIZE)



In [None]:
# Graph related code goes here, use the populated _data arrays, first element is 
# the 10% model for the algorithm, last element is the 100% model.

# Here is data from the runs:
xor_data = [6.171513889686618, 5.707942091486323, 8.02255540054602, 6.075225538900027, 11.239096036201909, 8.64997115438628, 11.717850748208175, 9.473102388346087, 7.377665961725071, 8.586916880407852]
des_data = [6.435465331846153, 4.440833378187538, 5.850120180808735, 4.487782266286228, 5.5085734224640515, 5.138867910689613, 4.368136099365463, 5.978767654140392, 3.2188557758436547, 6.366954077475426]
three_des_data = [2.083333333333582, 2.083333333333582, 2.083333333333582, 2.083333333333582, 2.083333333333582, 2.083333333333582, 2.083333333333582, 3.125, 3.125, 3.125]
aes_data = [7.313708082025275, 3.627231015309424, 3.621513005282047, 7.2590521641276595, 4.138649461747406, 3.7138189574565876, 4.91187722533661, 2.8291946094197304, 5.812162051652086, 4.736706195182162]
rsa_data = [6.960937500000003, 5.360520833333331, 9.953229166666729, 7.561666666666659, 4.779479166666687, 4.899270833333377, 3.2859374999999984, 4.553229166666663, 3.9488541666667576, 4.271354166666659]

data = {'XOR': xor_data, 'DES': des_data, '3DES': three_des_data, 'AES': aes_data, "RSA": rsa_data}
labels = ['10%', '20%', '30%', '40%', '50%', '60%', '70%', '80%', '90%', '100%']

for key in data:
  df = pd.DataFrame(data[key])

  p = df.plot(kind='bar', figsize=(20, 8), rot=0, xlabel ='Percentage Of Training Data', ylabel='Percentage Breakage', title="Percentage Breakage For " + key + " Algorithm")
  p.yaxis.set_major_formatter(mtick.PercentFormatter())
  p.get_legend().remove()
  p.set_xticklabels(labels)