Written by Dylan Leddy

The Preprocessor handles all data augmentation prior to training

In [None]:
!pip install nltk
!pip install sacremoses
!pip install pyspellchecker
!pip install contractions

Collecting sacremoses
  Downloading sacremoses-0.1.1-py3-none-any.whl (897 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m897.5/897.5 kB[0m [31m9.1 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: sacremoses
Successfully installed sacremoses-0.1.1
Collecting pyspellchecker
  Downloading pyspellchecker-0.8.1-py3-none-any.whl (6.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.8/6.8 MB[0m [31m27.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pyspellchecker
Successfully installed pyspellchecker-0.8.1
Collecting contractions
  Downloading contractions-0.1.73-py2.py3-none-any.whl (8.7 kB)
Collecting textsearch>=0.0.21 (from contractions)
  Downloading textsearch-0.0.24-py2.py3-none-any.whl (7.6 kB)
Collecting anyascii (from textsearch>=0.0.21->contractions)
  Downloading anyascii-0.3.2-py3-none-any.whl (289 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m289.9/289.9 kB[0m [31m6.2 MB/s[0m

In [None]:
from spellchecker import SpellChecker
from sacremoses import MosesTruecaser, MosesTokenizer, MosesPunctNormalizer
import contractions
from enum import Enum
import re
import statistics as stats
import math
import nltk
import os
nltk.download('punkt')

ModuleNotFoundError: No module named 'spellchecker'

We implement a **strategy pattern** below, providing the client with control over what transformations are applied.

Note that the transformations are **sorted** - the order of operations matters for yielding quality text.

In [None]:
# Condenses punctuation to a set of common characters
class NormalizePunct:
  def __init__(self):
    self.sortKey = 1
    self.normalizer = MosesPunctNormalizer()

  def execute(self, text: str):
    return self.normalizer.normalize(text)


# Attempts to separate incorrectly joined words and removes excess white space.
class NormalizeSpaces:
  def __init__(self):
    self.sortKey = 10

  def execute(self, text: str):
    text = re.sub(r'([a-z])([A-Z])', r'\1 \2', text)
    text = re.sub(r'\s+', ' ', text)
    return text


# Removes all punctuation not enclosed by 2 alphanumerics
class RemovePunct:
  def __init__(self):
    self.sortKey = 9

  def execute(self, text):
    return re.sub(r'(?<!\w)[^\s\w]|(?!\w)[^\s\w]', '', text)


# E.g. expanding "I'm" to "I am"
class ExpandContractions:
  def __init__(self):
    self.sortKey = 4

  def execute(self, text: str):
    return contractions.fix(text)


# Careful, this may output bad corrections - only use for english
class Spellcheck:
  def __init__(self):
    self.spell = SpellChecker()
    self.sortKey = 11

  def execute(self, text: str):
    out = []
    for word in text.split(' '):
      if not word or any(chr.isdigit() for chr in word): continue
      corrected = word[0].isupper() and word or self.spell.correction(word)
      corrected = corrected is not None and corrected or word
      out.append(corrected)

    return " ".join(out)


# Attempts to retain capitalize of proper nouns while lowercasing others
class Truecase:
  def __init__(self):
    self.sortKey = 5

  def execute(self, truecaser: object, text: str):
    return " ".join(truecaser.truecase(text))


class PurgeNonEnglish:
  def __init__(self):
    self.sortKey = 2

  def execute(self, text: str):
    return re.sub("[^\u0000-\u05C0\u2100-\u214F]+", '', text)


class ReplaceSeq:
  def __init__(self, old: str, new: str):
    self.old = old
    self.new = new
    self.sortKey = 3

  def execute(self, text: str):
    return text.replace(self.old, self.new)


# Intra-field deduplication of sentences
class Dedupe:
  def __init__(self):
    self.sortKey = 6

  def execute(self, text: str):
    toks = nltk.sent_tokenize(text)
    deduped = list(dict.fromkeys(toks))
    return " ".join([s for s in deduped])


# Removes everything enclosed by parenthesis
class RemoveParenthetical:
  def __init__(self):
    self.sortKey = 7

  def execute(self, text: str):
    return re.sub(r'\([^)]*\)', '', text)


# Transforms one-to-many translations to one-to-one
# These mappings are identified by a ";" separator, which is unique to our dataset
# This may be expanded to include logical operators (src -> word1 or word2)
class HandleOneToMany:
  def __init__(self):
    self.sortKey = 8

  def execute(self, target: str, source: str):
    if ";" not in target:
      return target

    source_split = source.split(' ')
    target_split = target.split(';')

    if len(source_split) != 1:
      return target

    return min(target_split, key=len).lower()

In [None]:
# The context for one side of our parallel corpus
class Context:
  TRUECASER_SAVEPATH = '/content/drive/MyDrive/MTApplication/models/'

  def __init__(self, language: str, corpus: list[dict]):
    self.text = [item[language] for item in corpus]
    self.language = language

  def __train_truecasers(self, text: list[str], save_to: str):
    if not os.path.exists(save_to + '.truecasemodel'):
      tokenizer = MosesTokenizer()
      truecaser = MosesTruecaser()
      tokenized = [tokenizer.tokenize(line) for line in self.text]
      truecaser.train(tokenized, save_to=save_to + '.truecasemodel')

    self.truecaser = MosesTruecaser(save_to + '.truecasemodel')

  def get_text(self):
    return self.text

  def get_line(self, i: int):
    return self.text[i]

  # We provide a list of transforms
  def set_strategy(self, strategies: list[object]):
    self.strategies = sorted(strategies, key=lambda x: x.sortKey)
    for strategy in strategies:
      if isinstance(strategy, Truecase):
        self.__train_truecasers(
          self.text,
          self.TRUECASER_SAVEPATH + self.language
        )
        break

  # Carry out transformations on each line
  def process(self, i: int, other: str):
    txt = self.text[i]
    for strategy in self.strategies:
      if isinstance(strategy, HandleOneToMany):
        txt = strategy.execute(txt, other)
      elif isinstance(strategy, Truecase):
        txt = strategy.execute(self.truecaser, txt)
      else:
        txt = strategy.execute(txt)
    return txt

In [None]:
# Pass in src/tgt contexts and process
class Preprocessor:
  def __init__(self, src_context: Context, tgt_context: Context):
    self.src_context = src_context
    self.tgt_context = tgt_context
    self.lines = min(len(src_context.get_text()), len(tgt_context.get_text()))
    self.__calc_fertility_heuristic()

  # Very important! Discards incomplete translations or pairs w/bad size ratios
  def __filter(self, source: str, target: str):
    source_split = [w for w in source.split(' ') if w != ""]
    target_split = [w for w in target.split(' ') if w != ""]
    diff = math.pow((abs(len(source) - len(target))), 1/3)

    if not source_split or not target_split:
      return True

    if "no record" in source.lower():
      return True

    if "no chinese record" in target.lower():
      return True

    if len(source_split) > 150 or len(target_split) > 150: # Num word threshold 150
      return True

    if len(max(source_split, key=len)) > 20 or len(max(target_split, key=len)) > 20: # Max word length <= 20
      return True

    if diff < self.f_heuristic[0] and diff != 0 or diff > self.f_heuristic[1]: # Fertility (char ratios)
      return True

    return False

  # We calculate the average absolute difference in string lengths
  # Apply a pow transformation to make data normal
  # Discard pairs w/differences outside 2nd stdev
  def __calc_fertility_heuristic(self):
    diffs = []

    for line in range(self.lines):
      src = self.src_context.get_line(line)
      tgt = self.tgt_context.get_line(line)
      diff = math.pow((abs(len(src) - len(tgt))), 1/3)
      diffs.append(diff)

    self.f_heuristic = (stats.mean(diffs) - stats.stdev(diffs)*2,
                               stats.mean(diffs) + stats.stdev(diffs)*2)

  def get_total_lines(self):
    return self.lines

  # May process and save in batches if need be
  def partition(self):
    size = self.lines
    step = size // 4
    result = [(i, i + step) for i in range(0, size, step)]
    result[-1] = (result[-1][0], size)
    return result

  def process(self, partition: tuple[int, int]) -> list[tuple[str, str]]:
    processed = set()

    for i in range(partition[0], partition[1]):
      print(f"\r{i+1}/{self.lines}", end='')

      src_ctx, tgt_ctx = self.src_context, self.tgt_context
      src_txt, tgt_txt = src_ctx.get_line(i), tgt_ctx.get_line(i)

      src_txt_processed = src_ctx.process(i, tgt_txt)
      tgt_txt_processed = tgt_ctx.process(i, src_txt)

      if self.__filter(src_txt_processed, tgt_txt_processed): continue
      processed.add((src_txt_processed, tgt_txt_processed))

    return list(processed)