# Homework: Decipherment

## Group: wisefish

#### Wenhao Zhang, wenhaoz
#### Graeme Milne, gmilne 
#### Mitchell McCormack, mmccorma 
#### Jonathan Lo, jcl60

# Setup

In [None]:
# IPython autoreload
%load_ext autoreload
%autoreload 2

# Imports and setup
from hw2_helpers import read_file, get_statistics
import pprint
"""
PrettyPrinter for output
"""
pp = pprint.PrettyPrinter(width=45, compact=True)

First we read in files that are necessary for decipherment.

`cipher` is the ciphertext that we would like to decode.  
`plaintext` is data about the English language. The data used here is a dataset containing all lowercase letters, with no whitespace, no numbers, and no symbols.

In [None]:
cipher = read_file("data/cipher.txt")
plaintxt = read_file("data/default.wiki.txt.bz2")
print(cipher)

The function `get_statistics` obtains some statistics that are needed in our decipherment process.

For the cipher, we need the length, frequency of characters, and vocab in the cipher text.
For the plaintext, we need the vocab.

In [None]:
cipher_desc = get_statistics(cipher, cipher=True)
pp.pprint(cipher_desc)

In [None]:
# plaintext description
plaintxt_desc = get_statistics(plaintxt, cipher=False)
pp.pprint(plaintxt_desc)

# Helper Functions

In [None]:
def extract_mappings(hypo, mappings):
    """
    Flattens and collects mappings from the partial hypothesis into a list
    """
    if hypo:
        if isinstance(hypo[len(hypo)-1], tuple):
            mappings.append(hypo[len(hypo)-1])
        if isinstance(hypo[0], tuple):
            return extract_mappings(hypo[0],mappings)
    else:
        return mappings
    
def histogramPrune(HT, beam_size):
    """
    Takes hypotheses from a beam search stage and keeps on the top `beam_size` hypotheses.
    """
    sorted_HT = sorted(HT, key=lambda ht: ht[1], reverse=True)  # Order by scores (index 1), descending.
    pruned_HT = sorted_HT[0:beam_size]  # Select the beam_size number of highest scored hypothesis.

    return pruned_HT

# Beam Search Function

Our implementation of the beam search is as described in the algorithm from the baseline paper.

It goes through each ciphertext character (`Vf`) and builds up the best scoring hypotheses (`HS`) for decipherment.

Our implementations of checking `EXT_LIMITS`, scoring, and determining the winning decipherment follow below.

In [None]:
def beam_search(lm, vf, ve, ext_order, ext_limits, beam_size, cipher):
    HS = list()
    HT = list()
    HS.append((tuple(),0))
    cardinality = 0
    while cardinality < len(vf):
        f = ext_order[cardinality]
        for phi in HS:
            for e in ve:
                phi_prime = tuple([phi, (e,f)])
                if checkExtLimits(phi_prime, ext_limits):
                    HT.append((phi_prime, score(lm, phi_prime, cipher)))
        HT = histogramPrune(HT, beam_size)
        cardinality += 1
        HS = HT
        HT = list()
    return winner(HS)

## Checking EXT_LIMITS

This is used in the beam search when determining whether a given temporary hypothesis should be added to the valid hypotheses in the current search stage.

The function `checkExtLimits` takes in a hypothesis (`phi_prime`) and `ext_limits`, which is the maximum number of ciphertext characters that can map to a given plaintext character.

This function iterates through the mappings in the hypothesis and checks how many times each plaintext character has a mapping.

If any character has more mappings than is allowed by EXT_LIMITS, the function will return `False`.
Otherwise it will return `True` to indicate that this hypothesis has no plaintext character with more than `ext_limits` mappings to it from cipher characters.

In [None]:
def checkExtLimits(phi_prime, ext_limits):
    counts = {}
    sequence = []
    extract_mappings(phi_prime, sequence)
    
    for mapping in sequence:
        if len(mapping) < 2 or mapping[0] is None:
            continue
        plaintext = mapping[0]
        if plaintext in counts:
            counts[plaintext] += 1
        else:
            counts[plaintext] = 1
        if counts[plaintext] > ext_limits:
            return False
        
    return True

# Ext Order Function

This function attempts to find the best ordering for ext_order. The function takes in the cipher vocabular the cipher text, n, the highest ngram number, and weights which is the weighting used in the calculation of the score. The function uses a beam search to search through all enumartions of the vocabulary ordering. The scoring is calculated as the sum 1 to n, the weight n multiplied by the number of continuous ngram for that particular symbol. We start by fixing the first symbol, and then we fix the second. We prune the tree to the 100 best and then continue until we have all symbols in ext order. 

In [402]:
def ext_order_generate(cipher_vocab, cipher_text, n, weights):
 
    sequences = []
    sequences.append(("",0))

    candidates = []

    unigram = True

    cardinality = 0
    while cardinality < len(cipher_vocab):
        for seq in sequences:
            for c in cipher_vocab:
                if not in_part_ord(seq, c):
                    new_candidate = (seq, c)
                    bit_string = generate_bit_string(new_candidate, cipher_text)
                    score = 0
                    for i in range(0,n):
                        ngram_span = get_ngram_span(bit_string, i+1)
                        score += weights[i]*len(ngram_span)
                    candidates.append((new_candidate, score))
            
        candidates = histogramPrune(candidates, 100)
        sequences = candidates
        candidates = []
        unigram = False
        cardinality+=1
    return ord_winner(sequences)


#Helper Functions for Ext Order

This function checks to see if a symbol is in the current partial ordering.

In [None]:
def in_part_ord(part_ord, symbol):
    if isinstance(part_ord[1], str):
        return in_part_ord(part_ord[0], symbol) or part_ord[1] == symbol
    if part_ord[0]:
        return in_part_ord(part_ord[0][0], symbol) or part_ord[0][1] == symbol
    else:
        return False;

This function finds the number of continuous ngrams. IT takes in n and the bit string. Which is of the form ".o.....on..oooon."

In [None]:
def get_ngram_span(bit_string, n):
    return {i.span()[0]: i.span()[1] for i in re.finditer("o{" + str(n-1) + "," + str(n-1) + "}n|no{" + str(n-1) + "," + str(n-1) + "}", bit_string)}

This function takes in a ext order and flattens it.

In [None]:
def flatten_part_ord(part_ord, flattened):

    if part_ord:
        if isinstance(part_ord[len(part_ord)-1], str):
            flattened.append(part_ord[len(part_ord)-1])
        if isinstance(part_ord[0], tuple):
            return flatten_part_ord(part_ord[0],flattened)
    else:
        return flattened

Finds the winner ordering

In [401]:
def ord_winner(HS):
    sequence = []
    top = sorted(HS, key=lambda x: x[1], reverse=True)
    if len(top) == 0:
        return []
    flatten_part_ord(top[0], sequence)
    return sequence

This function generates the bit string. It generates a string of the form ".....oooo.ooon..." o if for a symbol that was already in the partial ordering. n is for the new symbol.

In [403]:
def generate_bit_string(part_ord, cipher_text):

    bit_string = ""
    flattened = []

    flatten_part_ord(part_ord[0], flattened)
    new_symbol = part_ord[1]

    cipher_characters = []
    [cipher_characters.append(c) for c in cipher_text]

    for character in cipher_characters:
        if character == part_ord[1]:
            bit_string = bit_string + "n"
        elif character in flattened:
            bit_string = bit_string + "o"
        else:
            bit_string = bit_string + "."
    return bit_string

In [404]:
vf = cipher_desc['vocab']
cipher_text = cipher_desc['content']
n=6
weights = [1.0,1.0,1.0,1.0,2.0,3.0]
ext = ext_order_generate(vf, cipher_text, n, weights)

In [405]:
print(ext)
ext.reverse()
print(ext)

['j', 'Ω', '∆', '\\', '§', '¢', 'D', 'Q', 'N', 'Ã', 'S', 'K', '“', 'Ç', 'T', '£', '‘', 'M', 'J', 'L', '•', '^', 'ƒ', 'π', 'y', 'æ', 'H', 'F', 'G', '∏', 'Z', '+', '√', 'E', '∞', '—', 'X', '≈', 'µ', 'I', '–', 'W', 'V', 'À', '/', 'u', 'R', 'O', 'P', 'B', '∑', '∫', 'º', 'A']
['A', 'º', '∫', '∑', 'B', 'P', 'O', 'R', 'u', '/', 'À', 'V', 'W', '–', 'I', 'µ', '≈', 'X', '—', '∞', 'E', '√', '+', 'Z', '∏', 'G', 'F', 'H', 'æ', 'y', 'π', 'ƒ', '^', '•', 'L', 'J', 'M', '‘', '£', 'T', 'Ç', '“', 'K', 'S', 'Ã', 'N', 'Q', 'D', '¢', '§', '\\', '∆', 'Ω', 'j']


## Scoring Function

The `score` function takes the partial hypothesis phi_prime and converts it into a partial decipherment and corresponding bit string using the helper functions `extract_mappings` and `match_symbols`. Then passes the partial decipherment and bitstring to the ngram language model function `score_bitstring` to return a log prob of phi_prime

The `exract_mappings` function recusivily traverses phi_prime to exctract the already established mappings and the appended hypothesis and returns it as a list of tuples

The `match_symbols` function takes phi_prime and uses `extract_mappings` to get the list of tuples representing the mappings. Then using the cipher text the function creates a partial decipherment with `_` representing unkown symbols and a corresponding bitstring. The function `returns` a tuple -> `(partial decipherment, bitstring) `

In [None]:
def match_symbols(phi_prime, cipher):
    """
    Matches cipher symbols to partial hypothesis mappings and generates a partially deciphered string as well as
    a bitstring indicating the locations of the deciphered symbols in the string. 

    eg.
        sequence = (('a',"A"), ('e', "E"))
        cipher = "GRAEME"

        return "__a_e_e" , "..o.o.o" 
    """
    bitString = ""
    mappings = {} 
    sequence = []
    extract_mappings(phi_prime, sequence)
    for item in sequence: 
        if item[1] not in mappings.keys():
            mappings[item[1]] = item[0]

    cipher_characters = []
    [cipher_characters.append(c) for c in cipher]
    plaintext = ""
    for character in cipher_characters:
        if character in mappings.keys():
            bitString = bitString + "o"
            plaintext = plaintext + mappings[character]
        else:
            bitString = bitString + "."
            plaintext = plaintext +"_"

    return (plaintext, bitString)

def score(lm, phi_prime, cipher_txt):
    """
    Uses the ngram model score_bitsting function to return a log prob score of the phi_prime hypothesis and cipher_txt
    """
    hypothesis_string, bitString = match_symbols(phi_prime, cipher_txt)
    return lm.score_bitstring(hypothesis_string, bitString)

## Decipherment Function

The `decipher` function converts the cipher text to a plain text string using the sequence of mappings. `sequence` is a list of tuples which are the mappings found during the beam search.

Because the mappings are a list of plaintext => ciphertext tuples, we need to reverse this into dict of ciphertext => plaintext for easy lookup in decipherment.

In [None]:
def decipher(sequence, cipher):
    plaintext = ""
    bitString =""
    mappings = {} 
    for item in sequence: 
        if item[1] not in mappings.keys():
            mappings[item[1]] = item[0]

    cipher_characters = []
    [cipher_characters.append(c) for c in cipher]
    plaintext = ""
    for character in cipher_characters:
        if character in mappings.keys():
            bitString = bitString + "o"
            plaintext = plaintext + mappings[character]
        else:
            bitString = bitString + "."
            plaintext = plaintext +"_"

    return (plaintext, bitString)

## Winner Function

The `winner` function takes all the hypotheses generated from the beam search (`HS`) and returns the best scored hypothesis as a list of mappings.

We pick the winner by sorting the hypotheses by their score determined from our `score` function, taking its mapping data, and converting it to a list.

If for whatever reason there are no hypotheses, this returns an empty list.

In [None]:
def winner(HS):
    sequence = []
    top = sorted(HS, key=lambda x: x[1], reverse=True)
    if len(top) == 0:
        return []
    extract_mappings(top[0], sequence)
    return sequence

# Implementation Efforts and Experiments

Implementing the basic solution took very little time. Most of our time was spent adjusting parameters such as
- beam size for kept hypotheses at each beam search stage
- `n` for the ngram model
- Ext Limit number for mappings

We also experimented with the second papers ext_order to improve mapping as an experiment. The results were not as good as those in the paper. We probably need to improve our scoring function to see an improvement in accuracy.

Some things we could attempt to implement to improve our decipherment methods:
- Implement the second research paper to improve runtime and scoring.
- Attempt to use NLM or other model to score hypotheses

# Solution: Beam search for decipherment of Zodiac 408 cipher

First we initialize the language model that we pass into the beam search for use in the scoring.

We are using the ngram model with `n = 6`.

In [None]:
from ngram import LM
lm = LM("data/6-gram-wiki-char.lm.bz2", n=6, verbose=False)

Now we run our beam search on the provided data:
- `Vf` is all of the ciphertext characters from the cipher file.
- `Ve` is all of the plaintext chracters from the English language dataset.
- `EXT_ORDER`is the sorted list of ciphertext characters by frequency.
- `EXT_LIMITS` is the maximum number of mappings that we want to allow for ciphertext chracters to a single plaintext character.
- `beam_size` is the number of hypotheses we want to keep at each stage of the beam search.

We also pass in the ngram model instantiated above.

The result of running this is the highest scored decipherment hypothesis.

In [None]:
vf = cipher_desc['vocab']
ve = plaintxt_desc['vocab']
ext_order = [i[0] for i in cipher_desc['frequencies'].most_common(len(cipher_desc['frequencies'])) ]
ext_limits = 7
beam_size = 5000
cipher_text = cipher_desc['content']

win = beam_search(lm, vf, ve, ext_order, ext_limits, beam_size, cipher_text)
pp.pprint(win)

With our obtained decipherment, we now pass that into the `decipher` function and output our decipherment of the Zodiac 408 cipher.

In [None]:
decipherment, decrypted_bitstring = decipher(win, cipher_desc['content'])
pp.pprint(decipherment)

# Testing the baseline implementation 

In order to verify that our baseline implementation of the beam search was correct we used a simple caesar cipher as a test. With a 101 character long 1:1 caesar cipher text we are able to acheive 100% accuracy with a beam size of 30. See the details of our testing below.

In [None]:
caesar_cipher = "aopzpzhalzaavzllpmdlhjabhssfptwsltlualkaolihzlspuljvyyljasfhukohclhufjohujlhajvtwslapunaopzhzzpnutlua"
caesar_cipher_plaintext = "thisisatesttoseeifweactuallyimplementedthebaselinecorrectlyandhaveanychanceatcompletingthisassignment"
caesar_cipher_desc = get_statistics(caesar_cipher, cipher = True)

test_ve = caesar_cipher_desc['vocab']
test_vf = plaintxt_desc['vocab']
test_ex_order = [i[0] for i in caesar_cipher_desc['frequencies'].most_common(len(caesar_cipher_desc['frequencies'])) ]
test_ext_limits = 1 
test_beam_size = 30

theWinner = beam_search(lm, test_ve, test_vf, test_ex_order, test_ext_limits, test_beam_size, caesar_cipher) 
pp.pprint(theWinner)

In [None]:
test_decipherment, test_decrypted_bitstring = decipher(theWinner, caesar_cipher_desc['content'])
print("-- Decipherment Result -- ")
print(test_decipherment)

In [None]:
def test_symbol_error_rate(dec, plaintxt):
    correct = 0
    if len(plaintxt) == len(dec):
        for (d,g) in zip(dec, plaintxt):
            if d==g:
                correct += 1
    wrong = len(plaintxt)-correct
    error = wrong/len(plaintxt)
    
    return error

test_ser = test_symbol_error_rate(test_decipherment, caesar_cipher_plaintext)
print("decipherment : " + test_decipherment)
print("original text: " + caesar_cipher_plaintext)
print("---- Results ----")
print('Error: ', test_ser * 100, 'Accuracy: ', (1 - test_ser) * 100)

## Grading

Ignore the following cells. They are for grading against the reference decipherment. Based on the clues provided in the decipherment homework description, you can easily find a reasonable reference text online for this cipher text.

In [None]:
"""
ATTENTION!
For grading purposes only. Don't bundle with the assignment. 
Make sure '_ref.txt' is removed from the 'data' directory before publishing.
"""

def read_gold(gold_file):
    with open(gold_file) as f:
        gold = f.read()
    f.close()
    gold = list(gold.strip())
    return gold

def symbol_error_rate(dec, _gold):
    gold = read_gold(_gold)
    correct = 0
    if len(gold) == len(dec):
        for (d,g) in zip(dec, gold):
            if d==g:
                correct += 1
    wrong = len(gold)-correct
    error = wrong/len(gold)
    
    return error
    
# gold decipherment
gold_file = "data/_ref.txt"
ser = symbol_error_rate(decipherment, gold_file)
print('Error: ', ser*100, 'Accuracy: ', (1-ser)*100)

## Current Best Decipherment

In [2]:
print(decipherment)

esatttressretorterateartheeaahaereaircresmeaniirystontesearerrsuejessctatiirnotearesrateayehetherssponeinersjrseaeaiosetitasearesearceesotheoetreasstareseayeththninesresttoorarestnttjceheeeyesarrnietariirasasernsttiashetareaissestajirttnaprosresearntaanractorjurehmypasseaistoostessturaseaoererestssotaereesraserotsirescoeratearhssarresstnseatereprryinmtateseisshesareritsaotaianessietreairhasrrereheseattres
