## This project implements a text decoder based on the MCMC algorithm (Markov Chain Monte Carlo)

The goal is to decrypt data that has previously been encrypted using a substitution cipher. It is assumed that the encryption key is unknown and it is necessary to decrypt the data and read the code using the recovered decryption key.

At the beginning, all the necessary libraries and modules are imported:

In [1]:
import numpy as np
import scipy
import scipy.stats
import math

import matplotlib.pyplot as plt

%matplotlib inline

from tqdm.auto import tqdm
import string

import warnings
warnings.filterwarnings("ignore")

An excerpt from Alice's Adventures in Wonderland is used as a test. This data is encrypted with a random encryption key.

In [2]:
plain_text = """
Alice was beginning to get very tired of sitting by her sister on the bank, and of having nothing to do: once or twice she had peeped into the book her sister was reading, but it had no pictures or conversations in it, 'and what is the use of a book,' thought Alice 'without pictures or conversation?'
So she was considering in her own mind (as well as she could, for the hot day made her feel very sleepy and stupid), whether the pleasure of making a daisy-chain would be worth the trouble of getting up and picking the daisies, when suddenly a White Rabbit with pink eyes ran close by her.
There was nothing so very remarkable in that; nor did Alice think it so very much out of the way to hear the Rabbit say to itself, 'Oh dear! Oh dear! I shall be late!' (when she thought it over afterwards, it occurred to her that she ought to have wondered at this, but at the time it all seemed quite natural); but when the Rabbit actually took a watch out of its waistcoat-pocket, and looked at it, and then hurried on, Alice started to her feet, for it flashed across her mind that she had never before see a rabbit with either a waistcoat-pocket, or a watch to take out of it, and burning with curiosity, she ran across the field after it, and fortunately was just in time to see it pop down a large rabbit-hole under the hedge.
In another moment down went Alice after it, never once considering how in the world she was to get out again.
The rabbit-hole went straight on like a tunnel for some way, and then dipped suddenly down, so suddenly that Alice had not a moment to think about stopping herself before she found herself falling down a very deep well.
"""

26 letters of the English alphabet are used.

In [3]:
characters = string.ascii_lowercase
characters_dict = {char : c for c, char in enumerate(characters, start=1)}
m = len(characters) + 1

### Generate random encryption key

Here are functions that will be used to encrypt/decrypt text:

In [4]:
def encode_text(text_to_encode, characters_dict):
    """This function turns a text string into an integer sequence using given dictionary."""
    characters_set = set(characters_dict.keys())
    return np.r_[[characters_dict[char] if char in characters_set else 0 for char in text_to_encode.strip().lower()]]

def decode_text(text_to_decode, characters):
    """This function turns an integer sequence into a text string using given list of characters."""
    characters_array = np.array([" "] + list(characters))
    return "".join(characters_array[text_to_decode])

def apply_cipher(text_as_int_array, cipher):
    "This function applies substitution cipher to an integer sequence."
    return cipher[text_as_int_array]

In this step, the encryption and decryption keys are generated:

In [5]:
np.random.seed(1234)

encryption_indices = np.random.permutation(np.arange(m-1))
decryption_indices = np.argsort(encryption_indices)

characters_array = np.array(list(characters))
encryption_key = "".join(characters_array[encryption_indices])
decryption_key = "".join(characters_array[decryption_indices])

Checking the encryption/decryption function and encryption/decryption keys:

In [6]:
encryption_key_encoded = np.r_[0, encode_text(encryption_key, characters_dict)]
decryption_key_encoded = np.r_[0, encode_text(decryption_key, characters_dict)]

text = "The quick brown fox jumps over the lazy dog"

encoded_text = encode_text(text, characters_dict)
cipher_text = apply_cipher(encoded_text, encryption_key_encoded)
encoded_text = apply_cipher(cipher_text, decryption_key_encoded)
decoded_text = decode_text(encoded_text, characters_dict)
decoded_text

'the quick brown fox jumps over the lazy dog'

Encrypt cipher text

In [7]:
plain_text_encoded = encode_text(plain_text, characters_dict)
cipher_text = apply_cipher(plain_text_encoded, encryption_key_encoded)

### Collect frequences 

Collecting frequences of two character combinations (bigrams) over large text corpus and from encrypted text. They will be stored in a matrix and interpreted as a transition matrix: from the first character to the second.

In [8]:
def collect_transition_frequences(data, transition_matrix):
    """For a given integer sequence, which corresponds to some char sequence, 
       return transitions for adjacent values."""
    transitions = data.repeat(2)[1:-1].reshape(-1, 2)
    for i, j in transitions:
        transition_matrix[i, j] += 1
    
    return transition_matrix

def collect_empirical_frequences(filename, characters_dict, m):
    """Collect frequences over large text corpus, return transition matrix."""
    transition_matrix = np.zeros((m, m))
    with open(filename) as f:
        for line in f:
            line_encoded = encode_text(line, characters_dict)
            if line_encoded.size > 1:
                transition_matrix = collect_transition_frequences(line_encoded, transition_matrix)
                
    return transition_matrix

def collect_observed_frequences(cipher_text, characters_dict, m):
    """Collect frequences over encrypted text, return nonzero indices of 
       transition matrix for both dimentions and values for those indices.
       `values = transition_matrix[indices_1, indices_2]`"""
    transition_matrix = np.zeros((m, m))
    transition_matrix = collect_transition_frequences(cipher_text, transition_matrix)
    
    return transition_matrix

In [9]:
empirical_frequences = collect_empirical_frequences('war_and_peace.txt', characters_dict, m)
observed_frequences = collect_observed_frequences(cipher_text, characters_dict, m)

### General algorithm

Chain will include states that are permutations of the substitution cipher. Algorithm has following steps:

1. Picking up a random current state. 
2. Creating a proposal for a new state by swapping two or more random letters in the current state.
3. Using a Scoring Function which calculates the score of the current state $Score_{old}$ and the proposed state $Score_{new}$.
4. If the score of the proposed state is more than current state, Moving to Proposed State.
5. Else flipping a coin which has a probability of Heads $\frac{Score_{new}}{Score_{old}}$  . If it comes heads move to proposed State.
6. Repeating from Step 2.

The goal is to reach a steady state where the chain has the stationary distribution of the needed states. This state of chain could be used as a solution.

Below is the implementation of steps 2 and 3.

### Preparing sampling function.

To generate a new proposed cipher, several positions are chosen at random and the values in these positions are swapped. It corresponds to change in seveal mappings of encrypted characters in decrypted ones. Example with 2 swaps:

was|now
-|-
A -> B | A -> B
B -> C | B -> C
C -> D | C -> A
D -> A | D -> D

In [10]:
def generate_cipher(cipher, m, size=2):
    """Swap two or more random positions in cipher.
        
        cipher, np.array - current mapping from value(int) in encrypted text (index of array cell) into value(int) in decrypted text(value of array cell).
        m, int - capacity of used alphabet,
        size, int - number of positions to change.
    """
    
    # Your code here   
    new_cipher = np.copy(cipher)
    
    for i in range(size):
        pos1 = np.random.randint(m)
        pos2 = np.random.randint(m)
        new_cipher[pos1], new_cipher[pos2] = new_cipher[pos2], new_cipher[pos1]
    
    return new_cipher


### Preparing scoring function.

For each state, a score function (decryption key) is used that assigns a positive score to each decryption key. This score intuitively should be larger if the encrypted text looks more like actual english, when decrypted using this decryption key. Below, large text will be checked and frequencies calculated: how many times one character occurs after another in a large text such as "War and Peace".

For each pair of characters $\beta_1$ and $\beta_2$ (e.g. $\beta_1$ = A and $\beta_2$ = B), let's say 𝑅(𝛽1,𝛽2), it will be written how many times that particular pair (e.g. "AB") appears consecutively in the reference text.

Similarly, for a considered decryption key $x$, let's say $F_x(\beta_1,\beta_2)$ it will be written the number of times that
pair appears when the cipher text is decrypted using the decryption key $x$.

A specific decryption key $x$ will be evaluated using:

$$Score(x) = \prod R(\beta_1,\beta_2)^{F_x(\beta_1,\beta_2)}$$
 
To simplify the calculations, I will use $log(Score(x))$

The next step is to implement the scoring function. As input it takes 
- `cipher`: mapping between encrypted characters and decrypted characters,
- `observed_frequences`: transition matrix for cipher text, matrix representation of $F_x(\beta_1,\beta_2)$,
- `empirical_frequences`: transition matrix for large text, matrix representation of $R(\beta_1,\beta_2)$.

Scoring function returns $log(Score(x))$.

In [11]:
def score_cipher(cipher, observed_frequences, empirical_frequences):
    
    # Your code here
    """
    Calculate the log score of a given cipher mapping based on transition probabilities.
    
    Args:
        cipher (numpy array): mapping between encrypted characters and decrypted characters.
        observed_frequencies (numpy array): transition matrix for cipher text, matrix representation of 𝐹𝑥(𝛽1,𝛽2).
        empirical_frequencies (numpy array): transition matrix for large text, matrix representation of 𝑅(𝛽1,𝛽2).
    
    Returns:
        score (float): log score of the cipher mapping based on transition probabilities.
    """
    
    # Calculate the total number of characters in the reference text
    total_chars = empirical_frequences.sum()
    
    # Initialize the log score to 0
    log_score = 0
    
    # Iterate over all pairs of characters
    for i in range(len(cipher)):
        for j in range(len(cipher)):
            # Get the corresponding decrypted characters
            char1 = cipher[i]
            char2 = cipher[j]
            
            # Get the observed and empirical frequencies for this pair of characters
            observed_freq = observed_frequences[i,j]
            empirical_freq = empirical_frequences[char1,char2]
            
            # Handle zero values in the transition matrices
            if observed_freq == 0:
                observed_freq = 1 / total_chars
            if empirical_freq == 0:
                empirical_freq = 1 / total_chars
            
            # Calculate the log score for this pair of characters
            log_score += math.log(empirical_freq) * observed_freq
    
    return log_score


### Decryption

Now everything is ready to decrypt the ciphertext

In [12]:
def decrypting(observed_frequences, empirical_frequences, n_iters, m, step_size, seed, print_it=1000):
    """This function finds most suited decrypting cipher(1D np.array).
        observed_frequences, 2D np.array - transition matrix with frequences for cipher text,
        empirical_frequences, 2D np.array - transition matrix with frequences for large text,
        n_iters, int - number of MCMC iterations,
        step_size, int - number of changes in cipher per one iteration,
        seed, int - seed for random generator,
        print_it, int - print decrypted text every `print_it` iterations.
    """

    np.random.seed(seed)

    # 1. Start by picking up a random current state. 
    cipher_old = np.arange(m)
    score_cipher_old = score_cipher(cipher_old, observed_frequences, empirical_frequences)
    best_state, score = cipher_old, score_cipher_old

    for i in tqdm(range(1, n_iters+1)):

        # 2. Create a proposal for a new state by swapping two or more random letters in the current state.
        cipher_new = generate_cipher(cipher_old, m, size=step_size)

        # 3. Use a Scoring Function which calculates the score of the current state $Score_{old}$ and the proposed State $Score_{new}$.
        score_cipher_new = score_cipher(cipher_new, observed_frequences, empirical_frequences)
        acceptance_probability = np.min((1, np.exp(score_cipher_new - score_cipher_old)))

        # 4. If the score of the proposed state is more than current state, Move to Proposed State.
        # 5. Else flip a coin which has a probability of Heads $Score_{new}/Score_{old}$. If it comes heads move to proposed State.
        if score_cipher_old > score:
            best_state, score = cipher_old, score_cipher_old
        if acceptance_probability > np.random.uniform(0,1):
            cipher_old, score_cipher_old = cipher_new, score_cipher_new
        if i % print_it == 0:
            print(f"iter {i}: {decode_text(apply_cipher(cipher_text[0:99], cipher_old), characters)}")

    return best_state

In [13]:
decrypt_cipher = decrypting(observed_frequences, empirical_frequences, 10000, m, 1, 850, 1000)

print(
    f"\nDecoded Text: {decode_text(apply_cipher(cipher_text, decrypt_cipher), characters)}\n\n"
    f"MCMC KEY  : {''.join(characters_array[decrypt_cipher[1:]-1])}\n"
    f"ACTual KEY: {decryption_key}"
)

  0%|          | 0/10000 [00:00<?, ?it/s]

iter 1000: idace wis begannang to get verm tarel of sattang bm her saster on the bink  inl of hivang nothang t
iter 2000: alice was peginning to get very tired of sitting py her sister on the pank  and of having nothing t
iter 3000: alice was peginning to get very tired of sitting py her sister on the pank  and of having nothing t
iter 4000: alice was peginning to get very tired of sitting py her sister on the pank  and of having nothing t
iter 5000: alice was peginning to get very tired of sitting py her sister on the pank  and of having nothing t
iter 6000: alice was peginning to get very tired of sitting py her sister on the pank  and of having nothing t
iter 7000: alice was peginning to get very tired of sitting py her sister on the pank  and of having nothing t
iter 8000: alice was beginning to get very tired of sitting by her sister on the bank  and of having nothing t
iter 9000: alice was beginning to get very tired of sitting by her sister on the bank  and of having nothing t
i

It can be seen that the result of the algorithm is a fully decrypted text (MCMC KEY completely coincided with ACTual KEY)