# Homework: Decipherment

In [1]:
from collections import defaultdict, Counter
import collections
import pprint
import math
import bz2
import copy
from random import shuffle
pp = pprint.PrettyPrinter(width=45, compact=True)

First let us read in the cipher text from the `data` directory:

In [2]:
def read_file(filename):
    if filename[-4:] == ".bz2":
        with bz2.open(filename, 'rt') as f:
            content = f.read()
            f.close()
    else:
        with open(filename, 'r', encoding="utf-8") as f:
            content = f.read()
            f.close()
    return content

cipher = read_file("data/cipher.txt")
#print(cipher)

## Default Solution

For the default solution we need to compute statistics like length, number of symbols/letters, 
unique occurences, frequencies and relative frequencies of a given file. This is done in the function `get_statistics` below.

While using `get_statistics`, make sure that `cipher=True` is set when the input is a ciphertext.

In [3]:
def get_statistics(content, cipher=True):
    stats = {}
    content = list(content)
    split_content = [x for x in content if x != '\n' and x!=' ']
    length = len(split_content)
    symbols = set(split_content)
    uniq_sym = len(list(symbols))
    freq = collections.Counter(split_content)
    rel_freq = {}
    for sym, frequency in freq.items():
        rel_freq[sym] = (frequency/length)*100
        
    if cipher:
        stats = {'content':split_content, 'length':length, 'vocab':list(symbols), 'vocab_length':uniq_sym, 'frequencies':freq, 'relative_freq':rel_freq}
    else:
        stats = {'length':length, 'vocab':list(symbols), 'vocab_length':uniq_sym, 'frequencies':freq, 'relative_freq':rel_freq}
    return stats

In [4]:
#cipher_desc = get_statistics(cipher, cipher=True)
#pp.pprint(cipher_desc)

The default solution matches the frequency of symbols in the cipher text with frequency of letters in the plaintext language (in this case, English). Note that this is just some text in English used to compute letter frequencies. We do not have access to the real plaintext in this homework. 

In order to do compute plaintext frequencies, we use an English dataset has no punctuation or spaces and all characters are lowercase.

In [5]:
# plaintext description
#plaintxt = read_file("data/default.wiki.txt.bz2")
#plaintxt_desc = get_statistics(plaintxt, cipher=False)
#pp.pprint(plaintxt_desc)

We have all the tools we need to describe the default solution to this homework.

We use a simple frequency matching heuristic to map cipher symbols to English letters.

We match the frequencies using the function $f(\cdot)$ of each cipher symbol $c$ with each English letter $e$:

$$h_{c,e} = | \log(\frac{f(c)}{f(e)})) | $$

For each cipher text symbol $c$ we then compute the most likely plain text symbol $e$ by sorting based on the above score.

In [6]:
# """
# default : frequency matching heuristic

# Notice how the candidate mappings, a.k.a hypotheses, are first scored with a measure of quality and, 
# then, the best scoring hypothesis is chosen as the winner. 

# The plaintext letters from the winner are then mapped to the respective ciphertext symbols.
# """

# def find_mappings(ciphertext, plaintext):
#     mappings = defaultdict(dict)
#     hypotheses = defaultdict(dict)
#     # calculate alignment scores
#     for symbol in ciphertext['vocab']:
#         for letter in plaintext['vocab']:
#             hypotheses[symbol][letter] = abs(math.log((ciphertext['relative_freq'][symbol]/plaintext['relative_freq'][letter])))
    
#     # find winner
#     for sym in hypotheses.keys():
#         #mappings[sym] = min(lemma_alignment[sym], key=lemma_alignment[sym].get)
#         winner = sorted(hypotheses[sym].items(), key=lambda kv: kv[1])
#         mappings[sym] = winner[1][0]
    
#     return mappings

Using this scoring function we map the cipher symbol `∆` to `v` in English

In [7]:
# mapping = find_mappings(cipher_desc, plaintxt_desc)
# print("∆ maps to {}\n".format(mapping['∆']))
# print(mapping)

∆ maps to v

defaultdict(<class 'dict'>, {'Z': 'g', 'P': 'm', 'H': 'g', 'À': 'g', '√': 'b', 'ƒ': 'b', '“': 'b', '∫': 'm', '–': 'b', 'V': 'g', '^': 'b', 'O': 'b', 'E': 'g', 'Ω': 'v', '∑': 'u', '+': 'g', 'I': 'm', 'µ': 'g', '•': 'b', 'º': 'd', 'Ã': 'y', '£': 'g', '\\': 'y', 'R': 'u', 'y': 'u', '∆': 'v', 'T': 'b', 'N': 'b', '§': 'k', 'S': 'b', 'F': 'b', 'Q': 'y', 'π': 'b', '¢': 'k', '∏': 'g', 'D': 'b', 'K': 'y', 'B': 'u', 'u': 'u', '∞': 'g', 'X': 'g', '—': 'l', '≈': 'u', '‘': 'b', 'L': 'g', 'æ': 'g', 'G': 'b', 'j': 'x', 'Ç': 'y', '/': 'b', 'M': 'g', 'J': 'b', 'W': 'g', 'A': 'g'})


The default solution to this decipherment problem is to take each cipher symbol and map it to the most likely English letter as provided by the `find_mappings` function above.

In [8]:
# english_text = []
# for symbol in cipher_desc['content']:
#     english_text.append(mapping[symbol])
# decipherment = ('').join(english_text)
# print(decipherment)

dmmbgbuumgbubbgbugggububgdgmbyyluugbubumgvlbbbyubggbkbduumbugumvuylggbgggbbbybbgggugubglbbdymgglgggkbkubbmugybglbubybuugbbmuubglgggubuugbgylbmgglyggggbduumbugxbgybkuguggbgbbbggmggbggybuggmdbugbubybubbgbygmggguubmggbggygbbbmybggdgggybgggkmkubggduuggyggbbbmbbbbyvuugvbkbmmmgbggbbgbdmmgvgmuugbuglglgbugbgbdgdumbbguubggbulgbblgggubuyggbugdmugbggybugdkggbbyvgyblgubuugugmbugybmbbgbbbblggbmgbumygggggbdgmglggggbumg


Notice that the default solution provides a very bad decipherment. Your job is to make it better!

## The Baseline - Beam Search

In [6]:
from ngram import LM
from nlm import *

lm = LM("data/6-gram-wiki-char.lm.bz2", n=6, verbose=False)
model = load_model("data/mlstm_ns.pt", cuda=True)



## Score Function
Ref to <i>Decipherment of Substitution Ciphers with Neural Language Models {Nishant Kambhatla et al}<i>   
3.3 Frequency Matching Heuristic   
$$ SCORE(\phi^\prime) = SCORE(\phi) + NEW(\phi^\prime) − FMH(\phi^\prime) $$
$$ where\ \ \ FMH(\phi^\prime) =  |\ log (\frac{ν(f)}{ν(e)})\ |\ \ \  f ∈ \forall_f , e ∈ V_e$$

In [9]:
""" Frequency Matching Heuristic
    new_map should contain only 1 mapping (by paper)
"""
def fmh(new_map):
    sum = 0
    for f, e in new_map.items():
        sum += abs(math.log(cipher_desc['relative_freq'][f] / plaintxt_desc['relative_freq'][e]))
    return sum

In [10]:
def replace_dict(string, *list_of_dict):
    for d in list_of_dict:
        for k, v in d.items():
            string = string.replace(k, v)
    return string

In [39]:
def score(old_score, phi_p, new_map, use_nlm=False):
    content =''.join(cipher_desc['content'])
    nlm_map = {}
    if not use_nlm:
        mask = {}
        for i in set(content):
            if i in phi_p:
                mask.update({i : 'o'})
            else:
                mask.update({i : '_'})
        mask = replace_dict(content, mask)
    else:
        seq = ''
        mask = ''
        for char in content:
            if char in phi_p:
                seq += phi_p[char]
                mask += 'o'
            elif len(seq) > 10 and seq != '':
                # Global Rest Cost Estimation
                sample_chars = [i for i in next_chars(seq, True, model) if i[0] != ' '] 
                shuffle(sample_chars)
                sample_char = sample_chars[0][0]
                nlm_map.update({char: sample_char})
                seq += sample_char
                mask += 'o'
            else:
                seq = ''
                mask += '_'
    new_score = lm.score_bitstring(replace_dict(content, phi_p, nlm_map), mask)
    return old_score + new_score - fmh(new_map)

In [12]:
cipher

'º∫P/Z/uB∫ÀOR•–X•B\nWV+≈GyF∞ºHPπKÇ—y≈\nMJy^uIÀΩ—T‘NQyDµ£\nS¢/º∑BPORAu∫∆RÃ—E\nÀ^LMZJƒ“\\–FHVW≈æy\nπ+—GDºKI£∞—Xæµ§S¢\nRN‘IyEÃOæ—GBTQS∑B\nLƒ/P∑BπX—EHMu^RRÀ\n√ZK—–I£W—ÇæµLM“º∑\nBPDR+j•∞\\N¢≈EuHÀF\nZ√–OVWIµ+‘L£Ã^R∞H\nIºDR∏Ty“\\ƒ≈/πXJQA\nPµMæRu‘∫L£NVEKH•G\n“IÇJÀµºæLMÃNA£Z¢P\n§u–ÀAº∑BVW\\+VT‘OP\n^•S“Ã∆u≈∞ΩD§G∫∫IM\nNÀ£S√E/º∫∫Z∆AP∑BV\n–≈X—W—∏F∑æ√+πºAºB\n∫OTµRu√+∏ƒy—∏^S—W\nVZ≈GyKE∏TyAº∫∑L‘∏\nHÇFBXº§XADƒ\\ΩLÇ•—\n∏≈ƒ∑∑∞≈µPORXQF∫G√\nZπJT‘—∏æJI+“BPQW∞\nVEX“ºWI∞—EHM£•uIÀ'

Beam Search for Solving Substitution Ciphers   
6.2 Zodiac-408 Cipher - P.1574   
We use extension limits with nmax = 8 and histogram pruning with beam sizes of 10k up to 10M.

In [13]:
def beam_search(ext_order, ext_limits=8, beam_size=1000):
    Hs, Ht = [], []
    cardinality = 0
    Hs.append(({}, 0))
    while (cardinality < len(ext_order)):
        f = ext_order[cardinality]
        for phi, old_score in Hs:
            #with Pool(processes=2) as pool:
            for e in sorted(plaintxt_desc['vocab']):
                phi_p = copy.deepcopy(phi)
                new_map = {f: e}
                phi_p.update(new_map)
                counts = len([v for k, v in phi_p.items() if v == e])
                if counts <= ext_limits:
                    score_t = score(old_score, phi_p, new_map)
                    Ht.append((phi_p, score_t))
        Ht = sorted(Ht, key=lambda x: x[1], reverse=True)[:beam_size]
        cardinality += 1
        Hs = copy.deepcopy(Ht)
        Ht.clear()
        print(cardinality)
    return sorted(Hs, key=lambda x:x[1], reverse=True)[:1]

In [14]:
# cipher = read_file("data/cipher.txt")
# cipher_desc = get_statistics(cipher, cipher=True)
# plaintxt = read_file("data/default.wiki.txt.bz2")
# plaintxt_desc = get_statistics(plaintxt, cipher=False)
ext_order = [text for text, _ in sorted(cipher_desc['frequencies'].items(), key=lambda x: x[1], reverse=True)]

In [42]:
mapping = beam_search(ext_order, 4, 200)

1
2
3
4
5
6


KeyboardInterrupt: 

In [52]:
cipher

'º∫P/Z/uB∫ÀOR•–X•B\nWV+≈GyF∞ºHPπKÇ—y≈\nMJy^uIÀΩ—T‘NQyDµ£\nS¢/º∑BPORAu∫∆RÃ—E\nÀ^LMZJƒ“\\–FHVW≈æy\nπ+—GDºKI£∞—Xæµ§S¢\nRN‘IyEÃOæ—GBTQS∑B\nLƒ/P∑BπX—EHMu^RRÀ\n√ZK—–I£W—ÇæµLM“º∑\nBPDR+j•∞\\N¢≈EuHÀF\nZ√–OVWIµ+‘L£Ã^R∞H\nIºDR∏Ty“\\ƒ≈/πXJQA\nPµMæRu‘∫L£NVEKH•G\n“IÇJÀµºæLMÃNA£Z¢P\n§u–ÀAº∑BVW\\+VT‘OP\n^•S“Ã∆u≈∞ΩD§G∫∫IM\nNÀ£S√E/º∫∫Z∆AP∑BV\n–≈X—W—∏F∑æ√+πºAºB\n∫OTµRu√+∏ƒy—∏^S—W\nVZ≈GyKE∏TyAº∫∑L‘∏\nHÇFBXº§XADƒ\\ΩLÇ•—\n∏≈ƒ∑∑∞≈µPORXQF∫G√\nZπJT‘—∏æJI+“BPQW∞\nVEX“ºWI∞—EHM£•uIÀ'

## Grading

Ignore the following cells. They are for grading against the reference decipherment. Based on the clues provided in the decipherment homework description, you can easily find a reasonable reference text online for this cipher text.

In [11]:
"""
ATTENTION!
For grading purposes only. Don't bundle with the assignment. 
Make sure '_ref.txt' is removed from the 'data' directory before publishing.
"""

def read_gold(gold_file):
    with open(gold_file) as f:
        gold = f.read()
    f.close()
    gold = list(gold.strip())
    return gold

def symbol_error_rate(dec, _gold):
    gold = read_gold(_gold)
    correct = 0
    if len(gold) == len(dec):
        for (d,g) in zip(dec, gold):
            if d==g:
                correct += 1
    wrong = len(gold)-correct
    error = wrong/len(gold)
    
    return error
    
# gold decipherment
gold_file = "data/_ref_Zodiac_408.txt"
ser = symbol_error_rate(decipherment, gold_file)
print('Error: ', ser*100, 'Accuracy: ', (1-ser)*100)

Error:  100.0 Accuracy:  0.0
