# Context-sensitive Spelling Correction

The goal of the assignment is to implement context-sensitive spelling correction. The input of the code will be a set of text lines and the output will be the same lines with spelling mistakes fixed.

Submit the solution of the assignment to Moodle as a link to your GitHub repository containing this notebook.

Useful links:
- [Norvig's solution](https://norvig.com/spell-correct.html)
- [Norvig's dataset](https://norvig.com/big.txt)
- [Ngrams data](https://www.ngrams.info/download_coca.asp)

Grading:
- 60 points - Implement spelling correction
- 20 points - Justify your decisions
- 20 points - Evaluate on a test set


## Implement context-sensitive spelling correction

Your task is to implement context-sensitive spelling corrector using N-gram language model. The idea is to compute conditional probabilities of possible correction options. For example, the phrase "dking sport" should be fixed as "doing sport" not "dying sport", while "dking species" -- as "dying species".

The best way to start is to analyze [Norvig's solution](https://norvig.com/spell-correct.html) and [N-gram Language Models](https://web.stanford.edu/~jurafsky/slp3/3.pdf).

You may also want to implement:
- spell-checking for a concrete language - Russian, Tatar, etc. - any one you know, such that the solution accounts for language specifics,
- some recent (or not very recent) paper on this topic,
- solution which takes into account keyboard layout and associated misspellings,
- efficiency improvement to make the solution faster,
- any other idea of yours to improve the Norvig’s solution.

IMPORTANT:  
Your project should not be a mere code copy-paste from somewhere. You must provide:
- Your implementation
- Analysis of why the implemented approach is suggested
- Improvements of the original approach that you have chosen to implement

In [None]:
# Your code here

### Prepare environment

In [None]:
!pip install gdown -q
!pip install annoy -q
# Generate trie for bigrams
!pip install pygtrie
!pip install jsonlines - q

In [None]:
import pandas as pd
import numpy as np
from annoy import AnnoyIndex
import pygtrie
import random
import string
import datetime

import gdown
import urllib.request
import gzip
import shutil
import jsonlines
from tqdm import tqdm

import re
from collections import Counter
import time


from nltk.tokenize import sent_tokenize, word_tokenize
import nltk
nltk.download('punkt')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


True

In [None]:
# Load bigrams and fivegrams data
ids = ["1LauUKpATAXc6rvY5xKhS7rFWoJRxYt93", "1N7uOUeP8rPnLfXSoJV4L4PN-R1uh-bd9"]
filenames = ["bigrams.txt", "fivegrams.txt"]
url = 'https://drive.google.com/uc?id='

for i in range(len(ids)):
    gdown.download(url+ids[i], filenames[i], quiet=True)

### Implementation

In [None]:
def preprocess(sent):
    # Replace punctuation with space
    sent = sent.translate(str.maketrans(string.punctuation, ' ' * len(string.punctuation), ''))
    # Get word tokens
    sent = word_tokenize(sent)
    words = []
    for w in sent:
        # Only-digits words will not be considered as a word
        # While te1xt may be a misspell
        if not w.isdigit():
            words.append(w)
    return words


In [None]:
class Dictionary:
    """
    I created a class called Dictionary to handle data, such as bigrams and fivegrams data, list of unique words
    (used as dictionary words) and characters of a language. Note that I considered only English words, so characters
    of any other language will be considered as unknown character. Nevertheless, this implementation may be improved
    by adding other languages data (bigrams, unique words and characters).
    """

    def __init__(self, bigram_path="bigrams.txt", fivegram_path="fivegrams.txt"):
        """
        Prepare data and tries
        :param bigram_path: path to bigram dataset
        :param fivegram_path: path to fivegram dataset
        """
        bigram_data = self.load_bigrams(bigram_path)
        # Get unique words in bigrams dataset
        self.unique_words = np.unique([bigram_data["0"].astype(str).apply(str.lower).values,
                                       bigram_data["1"].astype(str).apply(str.lower).values])
        self.bigram_trie = self.build_bigram_trie(bigram_data)

        self.chars = "A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X, Y, Z".lower().split(
            ", ") + ["<unk>"]
        self.word_trie = self.build_word_trie(trie_depth=21, random_seed=33, filename='word_char_count.ann')

    def load_bigrams(self, path="bigrams.txt"):
        """
        Load bigrams.
        As a bigram dataset [2], I used the data provided with this assignment.
        This data contains most frequent n-grams from the one billion word
        Corpus of Contemporary American English (COCA).

        :param path: path to bigram dataset
        :return: pd dataframe [freq, 0, 1, prob]
        """
        bigram_data = pd.read_csv(path, sep="\t", encoding="ISO-8859-1", header=None, names=["freq", "0", "1"])
        bigram_data[["0", "1"]] = bigram_data[["0", "1"]].astype(str).apply(lambda x: [y.lower() for y in x])
        # Move from frequency to probability by dividing on total amount of occurences
        bigram_total = bigram_data.groupby("0")["freq"].sum()
        len_unique_words = len(self.dictionary.unique_words)
        bigram_data['prob'] = bigram_data.apply(lambda row: row['freq'] / bigram_total[row['0']], axis=1)
        return bigram_data

    def load_fivegrams(self, path="fivegrams.txt"):
        """
        Load fivegrams.
        As a fivegram dataset [2], I used the data provided with this assignment.
        This data contains most frequent n-grams from the one billion word
        Corpus of Contemporary American English (COCA).
        :param path: path to fivegram dataset
        :return: pd dataframe [freq, 0, 1,2,3,4, prob]
        """
        fivegram_data = pd.read_csv(path, sep="\t", encoding="ISO-8859-1", header=None,
                                    names=["freq", "0", "1", "2", "3", "4"])
        fivegram_data[["0", "1", "2", "3", "4"]] = fivegram_data[["0", "1", "2", "3", "4"]].astype(str).apply(
            lambda x: [y.lower() for y in x])

        # Move from frequency to probability by dividing on total amount of occurences
        # From five to fourgrams, because too much probabilities = 1s
        fivegram_total = fivegram_data.groupby(["0", "1", "2"])["freq"].sum()
        fivegram_data['prob'] = fivegram_data.apply(
            lambda row: row['freq'] / fivegram_total[row['0'], row['1'], row['2']], axis=1)

        fivegram_data.groupby(['prob'])['freq'].count().sort_values()
        return fivegram_data

    def build_bigram_trie(self, bigram_data):
        """
        Build a trie for faster bigram search.
        The problem I faced during running first version of my code is a slow processing of words.
        I remembered a trie structures that may improve search on text strings.
        One of that, CharTrie from pygtrie [4], I used to speed up bigram search.
        The key is a string of format (first_element + " " + second_element) and value is a probability
        of such a bigram. Note that this probability was obtained by dividing number of bigram occurence on the
        number of first word count in the dataset. According to lectures [1], the smoothing is a good improvement,
        so I used one with 1/num_unique_words as a base.

        :param bigram_data: bigrams dataset
        :return: pygtrie.CharTrie to find bigrams
        """
        bigram_trie = pygtrie.CharTrie()

        for ind, row in bigram_data.iterrows():
            sent = row['0'] + " " + row['1']
            # Update the trie key values
            if bigram_trie.has_key(sent):
                print(sent)
            else:
                bigram_trie[sent] = row['prob']
        return bigram_trie

    def word_vectorize(self, word):
        """
        Made a vector as a number of occurences of each english letter in the word and, additionally,
        unknown character (all symbols that are not English characters).
        :param word: word to be vectorized
        :return: vector shape (27,)
        """
        vec = [word.lower().count(c) for c in self.chars[:-1]]
        vec.append(sum([not c.isalpha() for c in word.lower()]))
        return vec

    def build_word_trie(self, trie_depth=21, random_seed=33, filename='word_char_count.ann'):
        """
        One more slow process was finding a similar word to the misspelled. The Edit distance algorithm is not so fast,
        and the number of unique words is big, so more optimized search was needed. As a solution, I made a vector
        from each word and put it inside the annoy algorithm [5].
        The manhattan distance was used with annoy algorithm, as an approximation to edit distance.
        :param trie_depth: depth of the trie
        :param random_seed: random seed
        :param filename: filename where to store the result
        :return: Annoy index
        """
        trie = AnnoyIndex(len(self.chars), 'manhattan')
        trie.set_seed(random_seed)

        # Put data in the index
        for ind, word in enumerate(self.unique_words):
            trie.add_item(ind, self.word_vectorize(word))
        # Build trie
        trie.build(trie_depth)
        # Store result
        if filename is not None:
            trie.save(filename)
        return trie

    def load_word_trie(self, filename='word_char_count.ann'):
        """
        Load word trie explained above
        :param filename: from where to load the index
        :return: annoy trie (index)
        """
        trie = AnnoyIndex(len(self.chars), 'manhattan')
        trie.load(filename)
        return trie

    def get_nearest_words(self, word, k=150):
        """
        Find the most similar words from the dictionary using annoy trie
        :param word: intial word
        :param k: max number of  similar words to output
        :return: list of similar words
        """
        return [self.unique_words[i] for i in self.word_trie.get_nns_by_vector(self.word_vectorize(word), k)]


In [None]:
class EditDistance:
    """
    Damreu-Levenstein variant with QWERTY-based weights.
    Based on existing research [3, 8], it is reasonable to consider edit distances smaller than or equal to 2.
    So I considered one more improvement: if difference in lenghts of misspelled word and a candidate is greater than 2,
    this candidate do not analyzed further. The ranked candidates goes through ngrams' probability sort and top one is
    returned as the correction.
    """
    def __init__(self):
        self.keyboard = ['QWERTYUIOP', 'ASDFGHJKL', 'ZXCVBNM']
        for i in range(len(self.keyboard)):
            self.keyboard[i] = list(self.keyboard[i].lower())

    # Calculate edit distance

    def _levenshtein_distance_matrix(self, string1, string2, is_damerau=False):
        """
        Calculate Damreu-Levenstein distance with QWERTY-based weights
        :param string1: first word
        :param string2: second word
        :param is_damerau: replace = 1 or 2
        :return: distance matrix between words
        """
        n1 = len(string1)
        n2 = len(string2)
        d = np.zeros((n1 + 1, n2 + 1), dtype=float)
        for i in range(n1 + 1):
            d[i, 0] = i
        for j in range(n2 + 1):
            d[0, j] = j
        for i in range(n1):
            for j in range(n2):
                if string1[i] == string2[j]:
                    cost = 0
                else:
                    cost = self.replace_weight(string1[i], string2[j])
                    # cost = 0.5
                d[i + 1, j + 1] = min(d[i, j + 1] + 1,  # insert
                                      d[i + 1, j] + 1,  # delete
                                      d[i, j] + cost)  # replace
                if is_damerau:
                    if i > 0 and j > 0 and string1[i] == string2[j - 1] and string1[i - 1] == string2[j]:
                        d[i + 1, j + 1] = min(d[i + 1, j + 1], d[i - 1, j - 1] + cost)  # transpose
        return d

    def replace_weight(self, let1, let2) -> float:
        """
        The weight for swap 2 letters is 1, while all letter pairs which share vertical border get 0.5 multiplier for
        replace and all letter pairs which share at least some horizontal border get 0.7 multiplier for replace.

        :param let1: first letter
        :param let2: second
        :return: distance
        """
        let1 = let1.lower()
        let2 = let2.lower()

        col1, row1 = 0, 0
        col2, row2 = 0, 0

        for ind in range(len(self.keyboard)):
            line = self.keyboard[ind]
            if let1 in line and let2 in line:
                if -1 <= line.index(let1) - line.index(let2) <= 1:
                    return 0.5
                else:
                    return 1
            elif let1 in line:
                col1 = ind
                row1 = line.index(let1) + ind / 2
            elif let2 in line:
                col2 = ind
                row2 = line.index(let2) + ind / 2
        if not -1 <= col1 - col2 <= 1:
            return 1
        if -1 <= row1 - row2 <= 1:
            return 0.7

        return 1

    def qwerty_edit_dist(self, s1, s2) -> float:
        # compute the Damerau-Levenshtein distance
        # between two given strings (s1 and s2)
        n1 = len(s1)
        n2 = len(s2)
        return self._levenshtein_distance_matrix(s1, s2, True)[n1, n2]


In [None]:
class ContextSpellChecker:
    """
    The main class (and algorithm workflow) is ContextSpellChecker. It takes a sequence of words and finds those who
    contains misspell (non-word errors based on list of unique words). For each such word it finds some candidates with
    word_trie, ranks them with edit distance and takes most probable one based on the context near it.
    """
    def __init__(self, dictionary, edit_dist, sensitivity=0.5):
        self.dictionary = dictionary
        self.edit_dist = edit_dist
        self.sensitivity = sensitivity

    def get_nearest_candidates(self, word, top_k=10):
        """
        Find the most similar words from the dictionary and sort by their edit distance
        :param word: initial word
        :param top_k: max num output candidates
        :return: list of candidates
        """
        global chars
        cand_dist = {}
        # Apply annoy as approximation
        for candidate in self.dictionary.get_nearest_words(word, k=10 * top_k):
            if -2 <= len(word) - len(candidate) <= 2:
                dist = self.edit_dist.qwerty_edit_dist(word, candidate)
                if dist <= 2:
                    cand_dist[candidate] = dist
        if len(cand_dist) == 0:
            return []
        # Rank by edit distance
        mini = min(cand_dist.values())
        res = {}
        if mini < 2:
            for k in cand_dist.keys():
                if cand_dist[k] <= mini + self.sensitivity:
                    res[k] = cand_dist[k]
        res = sorted(res.items(), key=lambda x: x[1])

        if len(res) > top_k:
            res = res[:top_k]
        return res

    # Ngams
    def calculate_prob(self, ind_w, candidate, sent, ngram):
        """
        Calculate conditional probability
        :param ind_w: word index
        :param candidate: candidate to replace the word
        :param sent: initial sentence
        :param ngram: n from ngram used
        :return: conditional probability
        """
        # Make initial probability some number (equal to all candidate)
        # to prevent underflow
        prob = 10 ** 5
        for i in range(max(0, ind_w - ngram + 1), min(len(sent) - ngram + 1, ind_w + ngram)):
            # New word sequence
            seq = sent[i:i + ngram]
            seq[ind_w - i] = candidate[0]
            seq = " ".join(seq)
            p = 0
            # Get probabiluty
            if self.dictionary.bigram_trie.has_key(seq):
                p = self.dictionary.bigram_trie[seq]
            p += 1 / len(self.dictionary.unique_words)
            prob *= p
        return prob

    def spell_check(self, old_sent):
        """
        Correct sequence of words
        Rank by edit, Look at context
        :param old_sent: list of words
        :return: new list of corrected words
        """
        sent = old_sent.copy()
        was_changed = False
        for ind_w in range(len(sent)):
            word = sent[ind_w]
            # Check non-word error
            if word not in self.dictionary.unique_words:
                candidates = self.get_nearest_candidates(word)

                # Get total frequency of words
                probs = []
                for candidate in candidates:
                    p = self.calculate_prob(ind_w, candidate, sent, 2)
                    # print(candidate)
                    # print(p)
                    # print(calculate_prob(ind_w, candidate, sent, 5, fivegram_data, fivegram_total))
                    probs.append(p)
                if len(candidates) > 0:
                    sent[ind_w] = candidates[np.argmax(probs)][0]
                    was_changed = True
        return sent



In [None]:
def correct_sent(sent, spell_funct):
    """
    General function to correct sentnces
    :param sent: sent with error
    :param spell_funct: funct to spell check
    :return: corrected sent
    """
    new_sent = preprocess(sent)
    renew_sent = [w.lower() for w in new_sent]
    renew_sent = spell_funct(renew_sent.copy())
    result = sent

    for ind_w in range(len(new_sent)):
        if renew_sent[ind_w].lower() != new_sent[ind_w].lower():
            if new_sent[ind_w][0].isupper():
                renew_sent[ind_w] = renew_sent[ind_w][0].upper() + renew_sent[ind_w][1:]
                # renew_sent[ind_w][0].upper()

            result = result.replace(new_sent[ind_w], renew_sent[ind_w])
    return result


In [None]:
# Prepare needed class objects
dictionary = Dictionary(bigram_path="bigrams.txt", fivegram_path="fivegrams.txt")
edit_dist = EditDistance()
my_spell_checker = ContextSpellChecker(dictionary, edit_dist)

Evaluate Norvig's implementation
Number of correct answers: 991 out of: 2000
Accuracy: 0.4955
Seconds for running 1 sentence: 0.009827772999999977

Evaluate my implementation
Number of correct answers: 1063 out of: 2000
Accuracy: 0.5315
Seconds for running 1 sentence: 0.062461154999999983


In [None]:
sent = "You know that all Italia men loves pizza!"
print(correct_sent(sent, my_spell_checker.spell_check))

You know that all Italian men loves pizza!


In [None]:
sent1 = "Differences exikt between us."
sent2 = "An exikt strategy for bussiness is ..."

print(correct_sent(sent1, my_spell_checker.spell_check))
print(correct_sent(sent2, my_spell_checker.spell_check))

Differences exist between us.
An exit strategy for business is ...


## Justify your decisions

Write down justificaitons for your implementation choices. For example, these choices could be:
- Which ngram dataset to use
- Which weights to assign for edit1, edit2 or absent words probabilities
- Beam search parameters
- etc.


Let's go trough my implementation, look on basic steps done and improvements applied. Note that the main ideas were taken

### 1. ContextSpellChecker
The main class (and algorithm workflow) is ContextSpellChecker. It takes a sequence of words and finds those who contains misspell (non-word errors based on list of unique words). For each such word it finds some candidates with word_trie, ranks them with edit distance and takes most probable one based on the context near it.

### 2. Dictionary
I created a class called Dictionary to handle data, such as bigrams and fivegrams data, list of unique words (used as dictionary words) and characters of a language. Note that I considered only English words, so characters of any other language will be considered as unknown character. Nevertheless, this implementation may be improved by adding other languages data (bigrams, unique words and characters).

#### 2.1. Ngrams
As a bigram and fivegram dataset [2], I used the data provided with this assignment. This data contains the most frequent n-grams from the one billion word Corpus of Contemporary American English (COCA).
Note that the algorithm uses only bigrams, while the test set was generated with the use of fivegrams. COCA datasets are big enough to be used within this implementation. While threegrams or combination of 2-, 3- and 4-grams may improve the perfomance (for more details refer to [3]), the appropriate large enough and high quality data were not found.

I analyzed the number of unique words in bigram and fivegram dataset (single words were combined in one list) and obtained that bigrams data contains much more unique words than fivegrams. That's why I parsed bigrams dataset to obtain a list of unique words.  

#### 2.2. Speed improvements

The problem I faced during running first version of my code is a slow processing of words. I remembered a trie structures that may improve search on text strings. One of that, CharTrie from pygtrie [4], I used to speed up bigram search. The key is a string of format (first_element + " " + second_element) and value is a probability of such a bigram. Note that this probability was obtained by dividing number of bigram occurence on the number of first word count in the dataset. According to lectures [1], the smoothing is a good improvement, so I used one with 1/num\_unique\_words as a base. Moreover, to consider context from both sides of the misspelled word, the candidate probability was calculated as a multiplication of conditional probabilities for a "sliding window" subsequence (for more details please refer to [3]).

One more slow process was finding a similar word to the misspelled. The Edit distance algorithm is not so fast, and the number of unique words is big, so more optimized search was needed. As a solution, I made a vector from each word and put it inside the annoy algorithm [5]. A vector was made as a number of occurences of each english letter in the word and, additionally, unknown character (all symbols that are not English characters). The manhattan distance was used with annoy algorithm, as an approximation to edit distance. Then the edit distance was used with found words for precise ranking.

### 3. EditDistance
I used Damreu-Levenstein variant with QWERTY-based weights. That means that the weight for swap 2 letters is 1, while all letter pairs which share vertical border get 0.5 multiplier for replace and all letter pairs which share at least some horizontal border get 0.7 multiplier for replace. One could go further and elaborate to a more complex algorithm with the distance between letters on the keyboard [6]. I took QWERTY keyboard, because it is the most popular layout used worldwide, especially in English-speaking countries [7].

Based on existing research [3, 8], it is reasonable to consider edit distances smaller than or equal to 2. So I considered one more improvement: if difference in lenghts of misspelled word and a candidate is greater than 2, this candidate do not analyzed further. The ranked candidates goes through ngrams' probability sort and top one is returned as the correction.

### DatasetGenerator

To test my and Norvig's implementations, I used 2 dataset. First one is a preprocessed misspells in GitHub commits [9], and the second one is synthesized by me. In github preprocessing, I skipped mistakes in insert/delete word, punctuation, lowercase and numbers, while others became a part of the github_data. To generate my dataset, I used fivegrams provided with the assignment. The first words are taken from a random row in the fivegrams dataset, and the next several words (till there is a combination of words) are taken with the probability of being after previous sequence. Then the error in a random word from a generated sequence is made. Note that for insert and delete opertion, the QWERTY edit distance is considered as a probability distribution of choise.

According to results, the context sensitive spell checker made more accurate predictions, but takes a bit more time for real-life tasks. This happens because there may be a lot of unknown words with big amount of candidates and small edit distance, so a lot of computations for calculatins edit distance and searching for probabilities are made. Based on this, one may conclude that the results obtained are good for a context sensitive spell checker.




## Used resources:


[1] - Lectures and labs materials

[2] - https://www.ngrams.info/download_coca.asp

[3] - https://arxiv.org/pdf/1910.11242.pdf

[4] - https://pygtrie.readthedocs.io/en/latest/#pygtrie.CharTrie

[5] - https://github.com/spotify/annoy

[6] - https://www.diva-portal.org/smash/get/diva2:1116701/FULLTEXT01.pdf

[7] - https://strawpoll.com/most-popular-keyboard-layout

[8] - https://www.irjet.net/archives/V8/i9/IRJET-V8I9316.pdf

[9] - https://github.com/mhagiwara/github-typo-corpus/tree/master

[10] - https://stackoverflow.com/questions/38511444/python-download-files-from-google-drive-using-url

[11] - https://norvig.com/spell-correct.html

[12] - https://www.nltk.org/api/nltk.tokenize.html

[13] - https://www.geeksforgeeks.org/python-remove-punctuation-from-string/

[14] - https://www.dcs.bbk.ac.uk/~roger/corpora.html





## Evaluate on a test set

Your task is to generate a test set and evaluate your work. You may vary the noise probability to generate different datasets with varying compexity. Compare your solution to the Norvig's corrector, and report the accuracies.

In [None]:
# Load data for Norvigs implementation
urls = ["https://norvig.com/big.txt", "https://norvig.com/spell-testset1.txt", "https://norvig.com/spell-testset2.txt"]
filenames2 = ["big.txt", "spell-testset1.txt", "spell-testset2.txt"]
for i in range(len(filenames2)):
    urllib.request.urlretrieve(urls[i], filenames2[i])

In [None]:
# Load Github dataset
url = "https://github-typo-corpus.s3.amazonaws.com/data/github-typo-corpus.v1.0.0.jsonl.gz"

urllib.request.urlretrieve(url, "github-typo-corpus.v1.0.0.jsonl.gz")

with gzip.open('github-typo-corpus.v1.0.0.jsonl.gz', 'rb') as f_in:
    with open('github_corpus.jsonl', 'wb') as f_out:
        shutil.copyfileobj(f_in, f_out)

In [None]:
class NorvigSpellChecker:
    # Norvigs implementation
    """Spelling Corrector in Python 3; see http://norvig.com/spell-correct.html

    Copyright (c) 2007-2016 Peter Norvig
    MIT license: www.opensource.org/licenses/mit-license.php
    """

    ################ Spelling Corrector

    def __init__(self, words_filename='big.txt'):
        self.WORDS = Counter(self.words(open(words_filename).read()))

    def words(self, text):
        return re.findall(r'\w+', text.lower())

    def P(self, word):
        N = sum(self.WORDS.values())
        "Probability of `word`."
        return self.WORDS[word] / N

    def correction(self, word):
        "Most probable spelling correction for word."
        return max(self.candidates(word), key=self.P)

    def candidates(self, word):
        "Generate possible spelling corrections for word."
        return (self.known([word]) or self.known(self.edits1(word)) or self.known(self.edits2(word)) or [word])

    def known(self, words):
        "The subset of `words` that appear in the dictionary of WORDS."
        return set(w for w in words if w in self.WORDS)

    def edits1(self, word):
        "All edits that are one edit away from `word`."
        letters = 'abcdefghijklmnopqrstuvwxyz'
        splits = [(word[:i], word[i:]) for i in range(len(word) + 1)]
        deletes = [L + R[1:] for L, R in splits if R]
        transposes = [L + R[1] + R[0] + R[2:] for L, R in splits if len(R) > 1]
        replaces = [L + c + R[1:] for L, R in splits if R for c in letters]
        inserts = [L + c + R for L, R in splits for c in letters]
        return set(deletes + transposes + replaces + inserts)

    def edits2(self, word):
        "All edits that are two edits away from `word`."
        return (e2 for e1 in self.edits1(word) for e2 in self.edits1(e1))

    def spell_norvig(self, old_sent):
        sent = old_sent.copy()

        for ind_w in range(len(sent)):
            sent[ind_w] = self.correction(sent[ind_w])
        return sent


In [None]:
class DatasetGenerator:
    """
    To test my and Norvig's implementations, I used 2 dataset.
    First one is a preprocessed misspells in GitHub commits[9], and the second one is synthesized by me.
    In github preprocessing, I skipped mistakes in insert/delete word, punctuation, lowercase and numbers, while
    others became a part of the github_data. To generate my dataset, I used fivegrams provided with the assignment.
    The first words are taken from a random row in the fivegrams dataset, and the next several words (till there is a
    combination of words) are taken with the probability of being after previous sequence. Then the error in a random
    word from a generated sequence is made. Note that for insert and delete opertion, the QWERTY edit distance is
    considered as a probability distribution of choise.
    """
    def __init__(self, fivegram_data, replace_weight_funct):
        self.fivegram_data = fivegram_data
        self.replace_weight = replace_weight_funct
        self.chars = "A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X, Y, Z".lower().split(', ')

    def generate_sent_fivegram(self):
        """
        Generate sentence based on fivegrams probabilities
        :return:
        """
        n = np.random.randint(len(self.fivegram_data))
        words = list(self.fivegram_data.iloc[n][['0', '1', '2']].values)
        prob = 1
        while prob != 0:
            # Take probabilities
            prob_words_data = self.fivegram_data[np.all(self.fivegram_data[['0', '1', '2']] == words[-3:], axis=1)]
            if len(prob_words_data) == 0:
                prob = 0
            else:
                # Select next word
                words.append(np.random.choice(prob_words_data['3'].values,
                                              p=prob_words_data['prob'].values / prob_words_data['prob'].values.sum()))

        return " ".join(words)

    def generate_error_word(self, word, n_possible_chars=3):
        """
        Make an error in the word
        :param word: initial word
        :param n_possible_chars: how many possible to eplace chars generate (only 1 will be chosen)
        :return: word with error
        """
        ind = np.random.randint(0, high=len(word))
        word = list(word)

        error_types = ['ins', 'del', 'swap', 'repl']
        error_type = np.random.choice(error_types)

        if error_type == 'swap':
            # Choose left or right swap
            if len(word) == 1:
                # Do not make
                lr = 0
            elif ind == 0:
                lr = 1
            elif ind == len(word) - 1:
                lr = -1
            else:
                # Random left or right
                lr = np.random.choice([-1, 1])

            word[ind], word[ind + lr] = word[ind + lr], word[ind]
        elif error_type == 'del':
            del word[ind]
        else:
            # Choose a char to generate error
            possible_chars = np.random.choice(self.chars, replace=False, size=n_possible_chars)
            # Calculate probability of each char
            # Based on qwerty distance
            prob_chars = 1.1 - np.array([self.replace_weight(word[ind], c) for c in possible_chars])
            prob_chars = prob_chars / prob_chars.sum()
            # Choose an error char
            error_char = np.random.choice(possible_chars, p=prob_chars)

            if error_type == 'ins':
                word.insert(ind + 1, error_char)
            elif error_type == 'repl':
                word[ind] = error_char

        return "".join(word)

    def generate_error_sent(self, sent, max_n_errors=2):
        """
        Make errors in the sentence
        :param sent: initial sent
        :param max_n_errors: num of errors
        :return: sent with misspells
        """
        words = preprocess(sent)
        n_errors = np.random.randint(1, high=min(max_n_errors, len(words) // 2) + 1)

        # More words, more errors may be made
        w_probs = np.array([len(w) for w in words])
        w_probs = w_probs / w_probs.sum()
        # replace=True meaning that multiple errors may be made in 1 word
        for ind_w in np.random.choice(range(0, len(words)), replace=True, size=n_errors, p=w_probs):
            words[ind_w] = self.generate_error_word(words[ind_w])

        return " ".join(words)

    def generate_dataset(self, data_len=1000):
        """
        Generate dataset src-tgt.
        To generate my dataset, I used fivegrams provided with the assignment. The first words are taken from a random
        row in the fivegrams dataset, and the next several words (till there is a combination of words) are taken with
        the probability of being after previous sequence. Then the error in a random word from a generated sequence is
        made. Note that for insert and delete opertion, the QWERTY edit distance is considered as a probability
        distribution of choise.
        :param data_len: len of dataset
        :return: pd df
        """
        dataset = pd.DataFrame(columns=['src', 'tgt'])
        for i in range(data_len):
            sent = self.generate_sent_fivegram()
            error_sent = self.generate_error_sent(sent)
            dataset.loc[i] = [error_sent, sent]
        return dataset

    def get_github_dataset(self, data_path='github_corpus.jsonl', data_len=1000):
        """
        I skipped mistakes in insert/delete word, punctuation, lowercase and numbers,
        while others became a part of the github_data.
        :param data_path:
        :param data_len:
        :return:
        """
        github_data = pd.DataFrame(columns=["src", "tgt"])
        n = 0
        with jsonlines.open(data_path) as reader:
            for obj in reader:
                for edit in obj['edits']:
                    # print(edit)
                    flag = False
                    for p in "`'()/=:":
                        if p in edit['src']['text']:
                            flag = True
                    if not flag:
                        src = [w.lower() for w in preprocess(edit['src']['text'])]
                        tgt = [w.lower() for w in preprocess(edit['tgt']['text'])]
                        if (len(src) == len(tgt)) and (src != tgt) and (len(src) >= 3):
                            github_data.loc[len(github_data.index)] = [edit['src']['text'], edit['tgt']['text']]
                            n += 1
                if n >= data_len:
                    break
        return github_data


In [None]:
def test_dataset(dataset, spell_funct):
    """
    Test dataset on the data
    :param dataset: pd df src-tgt
    :param spell_funct: funct to spell check
    :return: stats
    """
    sum_time = 0
    corrects = 0
    n = 0
    for ind, row in dataset.iterrows():
        # Start timer
        start_time = datetime.datetime.now()
        # Get corrected sequence of sentences
        result = spell_funct([w.lower() for w in preprocess(row['src'])])
        dur = datetime.datetime.now() - start_time

        # Calculate statistics
        corrects += sum([result[i] == row['tgt'][i] for i in range(len(result))]) / len(result)
        sum_time += dur.total_seconds()
        n += 1
    # Print result
    print("Percentage of correct token predictions:", corrects, "out of:", n)
    print("Accuracy:", round(corrects / n, 4))
    print("Seconds for running 1 sentence:", sum_time / n)

    return corrects, sum_time, n


In [None]:
# Read fivegrams from a file and process
fivegrams = dictionary.load_fivegrams(path="fivegrams.txt")

data_generator = DatasetGenerator(fivegrams, edit_dist.replace_weight)

# Generate a dataset from scratch (based on fivegrams)
# my_dataset = data_generator.generate_dataset(200)
# Or load already generated one
my_dataset = pd.read_csv("my_dataset.csv")
# Needs some preprocessing for faster algorithm run
my_dataset = my_dataset.copy()
my_dataset['tgt'] = my_dataset['tgt'].apply(str.lower).apply(preprocess)

# Load and preprocess real world dataset
github_data = data_generator.get_github_dataset('github_corpus.jsonl', data_len=3000)
github_data = github_data.copy()
github_data['tgt'] = github_data['tgt'].apply(str.lower).apply(preprocess)

In [None]:
# Compare with Norvig implementation
norvig_spell_checker = NorvigSpellChecker()

In [None]:
# Synthesized dataset
print("Evaluate Norvig's implementation")
norvig_res = test_dataset(my_dataset, norvig_spell_checker.spell_norvig)
print()
print("Evaluate my implementation")
my_res = test_dataset(my_dataset, my_spell_checker.spell_check)

Evaluate Norvig's implementation
Percentage of correct token predictions: 1849.4152358490276 out of: 2000
Accuracy: 0.9247
Seconds for running 1 sentence: 0.013874333999999969

Evaluate my implementation
Percentage of correct token predictions: 1863.513500998629 out of: 2000
Accuracy: 0.9318
Seconds for running 1 sentence: 0.04509435900000003


In [None]:
print("Github dataset")
print("Evaluate Norvig's implementation")
norvig_res = test_dataset(github_data, norvig_spell_checker.spell_norvig)
print()
print("Evaluate my implementation")
my_res = test_dataset(github_data, my_spell_checker.spell_check)

Github dataset
Evaluate Norvig's implementation
Percentage of correct token predictions: 2439.935522286997 out of: 3000
Accuracy: 0.8133
Seconds for running 1 sentence: 0.18702886599999988

Evaluate my implementation
Percentage of correct token predictions: 2606.556782879971 out of: 3000
Accuracy: 0.8689
Seconds for running 1 sentence: 0.15353176233333303
