In [1]:
from enigma.enigma import Enigma
from enigma.rotors.rotor_with_mapping_and_notches import RotorWithMappingAndNotches
from enigma.rotors.rotor_I import RotorI
from enigma.rotors.rotor_II import RotorII
from enigma.rotors.rotor_III import RotorIII
from enigma.rotors.rotor_IV import RotorIV
from enigma.rotors.rotor_V import RotorV
from enigma.reflectors.reflector_b import ReflectorB
from enigma.plugboard import Plugboard
from language_models.character_frequency_kld_language_model import CharacterFrequencyKLDLanguageModel
from language_models.character_frequency_ic_language_model import CharacterFrequencyICLanguageModel
from language_models.markov_chain_model import MarkovChainModel

import random
from collections import OrderedDict
import heapq
from itertools import permutations, combinations
from tqdm import tqdm
from tqdm.auto import trange
from multiprocessing import Pool
from functools import partial
import matplotlib.pyplot as plt
import re
from Levenshtein import distance
import time
import pickle

POSSIBLE_ROTORS = {RotorI, RotorII, RotorIII, RotorIV, RotorV}
POSSIBLE_LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"

def config_to_string(rotors_classes, offsets, ringstellungs, plugboard_tuples):
    string = ""
    for rot, off, rs in zip(rotors_classes, offsets, ringstellungs):
        string += rot.__name__+"(o:%d,rs:%d)"%(off,rs)+"|"
    for l1, l2 in plugboard_tuples:
        string += l1+l2
    return string

def get_random_plugboad(plugboard_size: int):
    letter_sample = random.sample(list(POSSIBLE_LETTERS), plugboard_size*2)
    plugboard_tuples = []
    for i in range(len(letter_sample)//2):
        plugboard_tuples.append((letter_sample[i*2], letter_sample[(i*2)+1]))
    plugboard_tuples = sorted(plugboard_tuples, key=lambda x: x[0])
    return plugboard_tuples

def get_random_config(plugboard_size: int):
    rotors_to_use = random.sample(POSSIBLE_ROTORS, 3)
    offsets = [random.randint(0,25) for _ in range(3)]
    ringstellungs = [random.randint(0,25) for _ in range(3)]
    plugboard_tuples = get_random_plugboad(plugboard_size)
    return rotors_to_use, offsets, ringstellungs, plugboard_tuples

def transform_to_valid_chars(text: str):
    text = text.upper()
    return "".join([c for c in text if c in POSSIBLE_LETTERS])

def string_to_config(string: str):
    match = re.findall("(.+)\(o:(\d+),rs:(\d+)\)\|(.+)\(o:(\d+),rs:(\d+)\)\|(.+)\(o:(\d+),rs:(\d+)\)\|(.*)", 
                       string, re.IGNORECASE)[0]
    available_rotors = {rot.__name__:rot for rot in RotorWithMappingAndNotches.__subclasses__()}
    rotors_classes = [available_rotors[match[0]], available_rotors[match[3]], available_rotors[match[6]]]
    offsets = [int(match[1]), int(match[4]), int(match[7])]
    ringstellungs = [int(match[2]), int(match[5]), int(match[8])]
    plugboard_tuples = []
    for i in range(len(match[9])//2):
        plugboard_tuples.append((match[9][i*2],match[9][(i*2)+1]))
    return rotors_classes, offsets, ringstellungs, plugboard_tuples

In [2]:
with open('books/Alices_Adventures_in_Wonderland.txt', 'r') as book:
    book_train = transform_to_valid_chars(book.read())
with open('books/Pride_and_Prejudice.txt', 'r') as book:
    book_test = transform_to_valid_chars(book.read())

In [3]:
kld_char_freq = CharacterFrequencyKLDLanguageModel(book_train)
ic_char_freq = CharacterFrequencyICLanguageModel(book_train)
markov_chain1 = MarkovChainModel(book_train)

In [7]:
TEST_TEXT_LENGHT = 1000
PLUGBOARDS_TO_EXPLORE = [1,3,5,7,9]
SAMPLE_PER_PLUGBOARD = 10
TOP_TO_CHOICE_ROTORS = 6
TOP_TO_CHOICE_RS = 10

for _ in range(SAMPLE_PER_PLUGBOARD):
    for pb_size_chosen in PLUGBOARDS_TO_EXPLORE:
        rotors, offsets, ringstellungs, plugboard_tuples =get_random_config(pb_size_chosen)
        random_config_key = config_to_string(rotors,offsets,ringstellungs,plugboard_tuples)
        random_test_pos = random.randint(0,len(book_test)- TEST_TEXT_LENGHT)
        random_text_test = book_test[random_test_pos:random_test_pos+TEST_TEXT_LENGHT]
        rotors_to_use = [rot_c(offset=off, ring_setting=rs) 
                         for rot_c, off, rs in zip(rotors, offsets, ringstellungs)]
        random_engima = Enigma(reflector=ReflectorB(),
                              plugboard=Plugboard(plugboard_tuples), rotors=rotors_to_use)
        cyphertext = random_engima.encrypt(random_text_test)

        scores = {}

        def compute_scores(rot1, rot2, rot3, offsets):
            off1 = offsets % 26
            off2 = (offsets//26) % 26
            off3 = (offsets//(26**2)) % 26
            enigma = Enigma(reflector=ReflectorB(),
                              plugboard=Plugboard(), 
                                rotors=[rot1(offset=off1), rot2(offset=off2), rot3(offset=off3)])
            decrypted = enigma.decrypt(cyphertext)
            config = config_to_string([rot1, rot2, rot3], [off1,off2,off3], [0,0,0], [])
            return config, {'KLD': kld_char_freq.fitness(decrypted), 'IC': ic_char_freq.fitness(decrypted), 'Markov': markov_chain1.fitness(decrypted)}

        for rot1, rot2, rot3 in permutations(POSSIBLE_ROTORS, 3):
            with Pool(5) as p:
                partial_compute = partial(compute_scores, rot1, rot2, rot3)
                r = list(tqdm(p.imap(partial_compute, range(26**3)), total=26**3))
            for c, s in r:
                scores[c] = s
                
        top_ic = sorted([(c,s['IC']) for c,s in scores.items()], key=lambda x: x[1], reverse=True)
        top_kld = sorted([(c,s['KLD']) for c,s in scores.items()], key=lambda x: x[1], reverse=True)
        top_markov = sorted([(c,s['Markov']) for c,s in scores.items()], key=lambda x: x[1], reverse=True)

        best_choices = ([t[0] for t in top_ic[:TOP_TO_CHOICE_ROTORS]]+
                        [t[0] for t in top_kld[:TOP_TO_CHOICE_ROTORS]]+
                        [t[0] for t in top_markov[:TOP_TO_CHOICE_ROTORS]])

        # (rotors_classes, offsets, ringstellungs, plugboard_tuples)
        best_choices = [string_to_config(c) for c in best_choices]

        def compute_rs_score(rot1, rot2, rot3, offsets, ringstellungs):
            off1, off2, off3 = offsets
            rs1 = ringstellungs % 26
            rs2 = (ringstellungs//26) % 26
            rs3 = (ringstellungs//(26**2)) % 26
            enigma = Enigma(reflector=ReflectorB(),
                              plugboard=Plugboard(), 
                                rotors=[rot1(offset=off1,ring_setting=rs1), 
                                        rot2(offset=off2,ring_setting=rs2), 
                                        rot3(offset=off3,ring_setting=rs3)])
            decrypted = enigma.decrypt(cyphertext)
            config = config_to_string([rot1, rot2, rot3], [off1,off2,off3], [rs1,rs2,rs3], [])
            return config, {'KLD': kld_char_freq.fitness(decrypted), 'IC': ic_char_freq.fitness(decrypted), 'Markov': markov_chain1.fitness(decrypted)}

        ringstellung_scores = {}

        for choice in best_choices:
            rot1, rot2, rot3 = choice[0]
            with Pool(5) as p:
                partial_compute = partial(compute_rs_score, rot1, rot2, rot3, choice[1])
                r = list(tqdm(p.imap(partial_compute, range(26**3)), total=26**3))
            for c, s in r:
                ringstellung_scores[c] = s
        
        top_ic_rs = sorted([(c,s['IC']) for c,s in ringstellung_scores.items()], key=lambda x: x[1], reverse=True)
        top_kld_rs = sorted([(c,s['KLD']) for c,s in scores.items()], key=lambda x: x[1], reverse=True)
        top_markov_rs = sorted([(c,s['Markov']) for c,s in scores.items()], key=lambda x: x[1], reverse=True)

        best_choices = ([t[0] for t in top_ic_rs[:TOP_TO_CHOICE_RS]]+
                        [t[0] for t in top_kld_rs[:TOP_TO_CHOICE_RS]]+
                        [t[0] for t in top_markov_rs[:TOP_TO_CHOICE_RS]])

        # (rotors_classes, offsets, ringstellungs, plugboard_tuples)
        best_choices = [string_to_config(c) for c in best_choices]

        def find_best_plugboards(lang_model, choice):
            rotors_classes, offsets, ringstellungs, _ = choice
            rot1, rot2, rot3 = rotors_classes
            off1, off2, off3 = offsets
            rs1, rs2, rs3 = ringstellungs
            actual_pb = []
            for _ in range(pb_size_chosen):
                actual_letters = POSSIBLE_LETTERS
                for l1, l2 in actual_pb:
                    actual_letters = actual_letters.replace(l1,"").replace(l2,"")
                best_tuple = (('A','A'), -999999)

                for l1, l2 in combinations(list(actual_letters), 2):
                    enigma = Enigma(reflector=ReflectorB(),
                                      plugboard=Plugboard(actual_pb+[(l1,l2)]), 
                                        rotors=[rot1(offset=off1,ring_setting=rs1), 
                                                rot2(offset=off2,ring_setting=rs2), 
                                                rot3(offset=off3,ring_setting=rs3)])
                    decrypted = enigma.decrypt(cyphertext)
                    score = lang_model.fitness(decrypted)
                    if score > best_tuple[1]:
                        best_tuple = ((l1, l2), score)
                actual_pb.append(best_tuple[0])

            return actual_pb

        with Pool(5) as p:
            partial_finder = partial(find_best_plugboards, ic_char_freq)
            pbs_ic = list(tqdm(p.imap(partial_finder, best_choices), total=9))
        with Pool(5) as p:
            partial_finder = partial(find_best_plugboards, kld_char_freq)
            pbs_kld = list(tqdm(p.imap(partial_finder, best_choices), total=9))
        with Pool(5) as p:
            partial_finder = partial(find_best_plugboards, markov_chain1)
            pbs_markov = list(tqdm(p.imap(partial_finder, best_choices), total=9))

        final_scores = {}

        for i in range(len(best_choices)):
            rotors_classes, offsets, ringstellungs, _ = best_choices[i]
            rot1, rot2, rot3 = rotors_classes
            off1, off2, off3 = offsets
            rs1, rs2, rs3 = ringstellungs
            for pb in [pbs_ic[i]]+[pbs_kld[i]]+[pbs_markov[i]]:
                enigma = Enigma(reflector=ReflectorB(),
                              plugboard=Plugboard(pb), 
                                rotors=[rot1(offset=off1,ring_setting=rs1), 
                                        rot2(offset=off2,ring_setting=rs2), 
                                        rot3(offset=off3,ring_setting=rs3)])
                decrypted = enigma.decrypt(cyphertext)
                config = config_to_string([rot1, rot2, rot3], [off1,off2,off3], [rs1,rs2,rs3], pb)
                final_scores[config] = ({'KLD': kld_char_freq.fitness(decrypted), 'IC': ic_char_freq.fitness(decrypted), 'Markov': markov_chain1.fitness(decrypted)}, decrypted)

        final_best = sorted([(k,v[0],v[1],distance(random_text_test, v[1])) for k,v in final_scores.items()], key=lambda x: x[3])[0]
        with open('serialized_runs2/%d_%d.pickle'%(pb_size_chosen, int(time.time())), 'wb') as run_dump:
            pickle.dump((random_config_key, random_text_test, cyphertext, scores, ringstellung_scores, final_scores, final_best), run_dump)

100%|██████████| 17576/17576 [00:41<00:00, 427.72it/s]
100%|██████████| 17576/17576 [00:41<00:00, 427.82it/s]
100%|██████████| 17576/17576 [00:41<00:00, 428.67it/s]
100%|██████████| 17576/17576 [00:41<00:00, 428.28it/s]
100%|██████████| 17576/17576 [00:40<00:00, 428.80it/s]
100%|██████████| 17576/17576 [00:40<00:00, 430.07it/s]
100%|██████████| 17576/17576 [00:40<00:00, 429.31it/s]
100%|██████████| 17576/17576 [00:40<00:00, 437.64it/s]
100%|██████████| 17576/17576 [00:40<00:00, 435.96it/s]
100%|██████████| 17576/17576 [00:40<00:00, 438.17it/s]
100%|██████████| 17576/17576 [00:40<00:00, 438.68it/s]
100%|██████████| 17576/17576 [00:40<00:00, 429.79it/s]
100%|██████████| 17576/17576 [00:41<00:00, 422.09it/s]
100%|██████████| 17576/17576 [00:40<00:00, 436.33it/s]
100%|██████████| 17576/17576 [00:41<00:00, 428.10it/s]
100%|██████████| 17576/17576 [00:40<00:00, 430.68it/s]
100%|██████████| 17576/17576 [00:40<00:00, 433.82it/s]
100%|██████████| 17576/17576 [00:40<00:00, 432.96it/s]
100%|█████