### Loading modules

In [1]:
import re
import os
import numpy as np
from numpy.random import rand, randint, choice, shuffle
from tqdm import tqdm_notebook

### Loading parameters

In [2]:
# parameters
FILEPATH = "./data/news.2011.en.shuffled"
CLEAN_FILEPATH = "./data/news.2011.en.cleaned"
N_LINES = 2466169
CHARS = list("abcdefghijklmnopqrstuvwxyz .-()")
MAX_INPUT_LEN = 40
MIN_INPUT_LEN = 3
AMOUNT_OF_NOISE = 0.3 / MAX_INPUT_LEN

# regex cleanup
RE_DASH_FILTER = re.compile(r'[\-\˗\֊\‐\‑\‒\–\—\⁻\₋\−\﹣\－]', re.UNICODE)
NORMALIZE_WHITESPACE_REGEX = re.compile(r'[^\S\n]+', re.UNICODE)
RE_APOSTROPHE_FILTER = re.compile(r'&#39;|[ʼ՚＇‘’‛❛❜ߴߵ`‵´ˊˋ{}{}{}{}{}{}{}{}{}]'.
                                  format ( 
                                      chr(768), chr(769), chr(832), 
                                      chr(833), chr(2387), chr(5151), 
                                      chr(5152), chr(65344), chr(8242)
                                  ), re.UNICODE)
RE_LEFT_PARENTH_FILTER = re.compile(r'[\(\[\{\⁽\₍\❨\❪\﹙\（]', re.UNICODE)
RE_RIGHT_PARENTH_FILTER = re.compile(r'[\)\]\}\⁾\₎\❩\❫\﹚\）]', re.UNICODE)
RE_BASIC_CLEANER = re.compile(r'[^\w\s\-\)\(]', re.UNICODE)

### Loading and cleaning data

In [3]:
def clean_text(txt):
    txt = NORMALIZE_WHITESPACE_REGEX.sub(' ', txt)
    txt = RE_DASH_FILTER.sub('-', txt)
    txt = RE_APOSTROPHE_FILTER.sub(' ', txt)
    txt = RE_LEFT_PARENTH_FILTER.sub('(', txt)
    txt = RE_RIGHT_PARENTH_FILTER.sub(')', txt)
    txt = RE_BASIC_CLEANER.sub('', txt)
    txt = "".join(char for char in txt if char in CHARS)
    return txt

In [4]:
RE_BASIC_CLEANER.sub('', 'lfjj"ljsfdfd')

'lfjjljsfdfd'

In [5]:
%%time

i = 0
length = 0
lines = list()

if os.path.isfile(CLEAN_FILEPATH):
    print("loading pre-cleaned file")
    with open(CLEAN_FILEPATH, 'r', encoding='utf-8') as f:
        for s in f:
            lines.append(s.rstrip('\n'))
            i += 1
            new_len = len(s)
            if new_len > length:
                length = new_len
else:
    print("loading and cleaning text")
    with open(FILEPATH, 'r', encoding='utf-8') as f:
        for s in tqdm_notebook(f, total=N_LINES):
            line = clean_text(s.lower()).strip('\n')
            if line != '':
                lines.append(line)
                i += 1
                new_len = len(s)
                if new_len > length:
                    length = new_len

    with open(CLEAN_FILEPATH, 'w', encoding='utf-8') as f:
        for line in lines:
            f.write(line + '\n')

n_lines = i
print("total: %d lines" % n_lines)
print("max length: %d char" % length)

loading pre-cleaned file
total: 2454669 lines
max length: 9818 char
Wall time: 2.9 s


### Generating mispelled sentence

In [6]:
def add_noise_to_string(a_string, amount_of_noise):
    
    length = len(a_string)
    threshold = amount_of_noise * length
    
    if rand() < threshold:
        # replace a character with a random character
        rdm_char_pos = randint(length)
        a_string = a_string[:rdm_char_pos] + choice(CHARS[:-1]) + a_string[rdm_char_pos + 1:]
        
    if rand() < threshold:
        # delete a character
        rdm_char_pos = randint(length)
        a_string = a_string[:rdm_char_pos] + a_string[rdm_char_pos + 1:]
        
    if length < MAX_INPUT_LEN and rand() < threshold:
        # add a random character
        rdm_char_pos = randint(length)
        a_string = a_string[:rdm_char_pos] + choice(CHARS[:-1]) + a_string[rdm_char_pos:]
        
    if rand() < threshold:
        # transpose 2 characters
        rdm_char_pos = randint(length - 2)
        a_string = (a_string[:rdm_char_pos] +
                    a_string[rdm_char_pos + 1] +
                    a_string[rdm_char_pos] +
                    a_string[rdm_char_pos + 2:])
        
    if rand() < threshold:
        # delete space
        spaces_pos = [pos for pos, char in enumerate(a_string) if char == " "]
        if len(spaces_pos) != 0:
            rdm_space_pos = choice(spaces_pos)
            a_string = a_string[:rdm_space_pos] + a_string[rdm_space_pos + 1:]
        
    return a_string


def generate_examples(corpus, inverted=True):
    
    sources, targets = list(), list()
    
    while corpus:
        line = corpus.pop()
        
        while len(line) > MIN_INPUT_LEN:
            if len(line) <= MAX_INPUT_LEN:
                target = line
                line = ""
            else:
                space_pos = line.rfind(" ", MIN_INPUT_LEN, MAX_INPUT_LEN - 1)
                if space_pos > -1:
                    target = line[:space_pos]
                    line = line[space_pos + 1:]
                else:
                    space_pos = line.rfind(" ")
                    if space_pos == -1:
                        break
                    else:
                        line = line[space_pos + 1:]
                        continue
            targets.append(target)
    
    for target_idx, target in enumerate(targets):
        source = add_noise_to_string(target, AMOUNT_OF_NOISE)
        source += "." * (MAX_INPUT_LEN - len(source))
        target += "." * (MAX_INPUT_LEN - len(target))
        targets[target_idx] = target
        source = source[::-1] if inverted else source
        sources.append(source)
        
    return sources, targets

In [7]:
%%time
corpus = lines.copy()
sources, targets = generate_examples(corpus, inverted=False)

Wall time: 1min 41s


In [10]:
rd_idx = randint(0, len(sources))
print(sources[rd_idx])
print(targets[rd_idx])

bputi s reallyjust a modern-day way to..
but is really just a modern-day way to..


### Encoding character into vectors

In [9]:
class CharacterTable(object):
    """
    Given a set of characters:
    + Encode them to a one hot integer representation
    + Decode the one hot integer representation to their character output
    + Decode a vector of probabilities to their character output
    """

    def __init__(self, chars):
        self.chars = sorted(set(chars))
        self.char_indices = dict((c, i) for i, c in enumerate(self.chars))
        self.indices_char = dict((i, c) for i, c in enumerate(self.chars))
        self.size = len(self.chars)

    def encode(self, C, maxlen):
        """Encode as one-hot"""
        X = np_zeros((maxlen, len(self.chars)), dtype=np.bool)  # pylint:disable=no-member
        for i, c in enumerate(C):
            X[i, self.char_indices[c]] = 1
        return X

    def decode(self, X, calc_argmax=True):
        """Decode from one-hot"""
        if calc_argmax:
            X = X.argmax(axis=-1)
        return ''.join(self.indices_char[x] for x in X)