In [1]:
import copy
import spacy
import json

In [2]:
class Tokens(object):
    """A class to represent a list of tokenized text."""
    TEXT = 0
    TEXT_WS = 1
    SPAN = 2
    POS = 3
    LEMMA = 4
    NER = 5

    def __init__(self, data, annotators, opts=None):
        self.data = data
        self.annotators = annotators
        self.opts = opts or {}

    def __len__(self):
        """The number of tokens."""
        return len(self.data)

    def slice(self, i=None, j=None):
        """Return a view of the list of tokens from [i, j)."""
        new_tokens = copy.copy(self)
        new_tokens.data = self.data[i: j]
        return new_tokens

    def untokenize(self):
        """Returns the original text (with whitespace reinserted)."""
        return ''.join([t[self.TEXT_WS] for t in self.data]).strip()

    def words(self, uncased=False):
        """Returns a list of the text of each token

        Args:
            uncased: lower cases text
        """
        if uncased:
            return [t[self.TEXT].lower() for t in self.data]
        else:
            return [t[self.TEXT] for t in self.data]

    def offsets(self):
        """Returns a list of [start, end) character offsets of each token."""
        return [t[self.SPAN] for t in self.data]

    def pos(self):
        """Returns a list of part-of-speech tags of each token.
        Returns None if this annotation was not included.
        """
        if 'pos' not in self.annotators:
            return None
        return [t[self.POS] for t in self.data]

    def lemmas(self):
        """Returns a list of the lemmatized text of each token.
        Returns None if this annotation was not included.
        """
        if 'lemma' not in self.annotators:
            return None
        return [t[self.LEMMA] for t in self.data]

    def entities(self):
        """Returns a list of named-entity-recognition tags of each token.
        Returns None if this annotation was not included.
        """
        if 'ner' not in self.annotators:
            return None
        return [t[self.NER] for t in self.data]

    def ngrams(self, n=1, uncased=False, filter_fn=None, as_strings=True):
        """Returns a list of all ngrams from length 1 to n.

        Args:
            n: upper limit of ngram length
            uncased: lower cases text
            filter_fn: user function that takes in an ngram list and returns
              True or False to keep or not keep the ngram
            as_string: return the ngram as a string vs list
        """
        def _skip(gram):
            if not filter_fn:
                return False
            return filter_fn(gram)

        words = self.words(uncased)
        ngrams = [(s, e + 1)
                  for s in range(len(words))
                  for e in range(s, min(s + n, len(words)))
                  if not _skip(words[s:e + 1])]

        # Concatenate into strings
        if as_strings:
            ngrams = ['{}'.format(' '.join(words[s:e])) for (s, e) in ngrams]

        return ngrams

    def entity_groups(self):
        """Group consecutive entity tokens with the same NER tag."""
        entities = self.entities()
        if not entities:
            return None
        non_ent = self.opts.get('non_ent', 'O')
        groups = []
        idx = 0
        while idx < len(entities):
            ner_tag = entities[idx]
            # Check for entity tag
            if ner_tag != non_ent:
                # Chomp the sequence
                start = idx
                while (idx < len(entities) and entities[idx] == ner_tag):
                    idx += 1
                groups.append((self.slice(start, idx).untokenize(), ner_tag))
            else:
                idx += 1
        return groups

In [3]:
class Tokenizer(object):
    def __init__(self):
        self.nlp = spacy.load('en_core_web_sm')

    def tokenize(self, text):
        clean_text = text.replace('\n', ' ')

        tokens = self.nlp(clean_text)

        data = []
        for i in range(len(tokens)):
            # Get whitespace
            start_ws = tokens[i].idx
            if i + 1 < len(tokens):
                end_ws = tokens[i + 1].idx
            else:
                end_ws = tokens[i].idx + len(tokens[i].text)

            data.append((
                tokens[i].text,
                text[start_ws: end_ws],
                (tokens[i].idx, tokens[i].idx + len(tokens[i].text)),
                tokens[i].tag_,
                tokens[i].lemma_,
                tokens[i].ent_type_,
            ))

        return Tokens(data=data, annotators=('lemma', 'pos', 'ner'), opts={'non_ent': ''})

In [4]:
class Preprocessor():
    def __init__(self):
        self.tok = Tokenizer()

    def _load_dataset(self, path):
        """Load json file and store fields separately."""
        with open(path) as f:
            data = json.load(f)['data']
        output = {'qids': [], 'questions': [], 'answers': [],
                'contexts': [], 'qid2cid': []}
        for article in data:
            for paragraph in article['paragraphs']:
                output['contexts'].append(paragraph['context'])
                for qa in paragraph['qas']:
                    output['qids'].append(qa['id'])
                    output['questions'].append(qa['question'])
                    output['qid2cid'].append(len(output['contexts']) - 1)
                    if 'answers' in qa:
                        output['answers'].append(qa['answers'])
        return output
    
    def _tokenize(self, text):
        tokens = self.tok.tokenize(text)
        output = {
            'words': tokens.words(),
            'offsets': tokens.offsets(),
            'pos': tokens.pos(),
            'lemma': tokens.lemmas(),
            'ner': tokens.entities(),
        }
        return output
    
    def _find_answer(self, offsets, begin_offset, end_offset):
        """Match token offsets with the char begin/end offsets of the answer."""
        start = [i for i, tok in enumerate(offsets) if tok[0] == begin_offset]
        end = [i for i, tok in enumerate(offsets) if tok[1] == end_offset]
        assert(len(start) <= 1)
        assert(len(end) <= 1)
        if len(start) == 1 and len(end) == 1:
            return start[0], end[0]
    
    def _process_dataset(self, data):
        q_tokens = []
        for question in data['questions']:
            q_tokens.append(self._tokenize(question))

        c_tokens = []
        for context in data['contexts']:
            c_tokens.append(self._tokenize(context))

        for idx in range(len(data['qids'])):
            question = q_tokens[idx]['words']
            qlemma = q_tokens[idx]['lemma']
            document = c_tokens[data['qid2cid'][idx]]['words']
            offsets = c_tokens[data['qid2cid'][idx]]['offsets']
            lemma = c_tokens[data['qid2cid'][idx]]['lemma']
            pos = c_tokens[data['qid2cid'][idx]]['pos']
            ner = c_tokens[data['qid2cid'][idx]]['ner']
            ans_tokens = []
            if len(data['answers']) > 0:
                for ans in data['answers'][idx]:
                    found = self._find_answer(offsets,
                                        ans['answer_start'],
                                        ans['answer_start'] + len(ans['text']))
                    if found:
                        ans_tokens.append(found)
            yield {
                'id': data['qids'][idx],
                'question': question,
                'document': document,
                'offsets': offsets,
                'answers': ans_tokens,
                'qlemma': qlemma,
                'lemma': lemma,
                'pos': pos,
                'ner': ner,
            }

    def __call__(self, in_file, out_file, show_inputs=False, show_outputs=False, save=True):
        dataset = self._load_dataset(in_file)
        if show_inputs:
            print("Input (only first example):")
            for key in dataset.keys():
                print(f"{key} : {str(dataset[key][0])[:100]}")
        
        if save:
            with open(out_file, 'w') as f:
                for ex in self._process_dataset(dataset):
                    f.write(json.dumps(ex) + '\n')

        if show_outputs:
            print("\nOutput (only first example):")
            with open(out_file, 'r') as f:
                for key, value in json.loads(f.readline()).items():
                    print(f"{key} : {str(value)[:100]}")

### Preprocessing SQuAD train dataset for DrQA

In [5]:
in_file = "data/datasets/SQuAD-v1.1-train.json"
out_file = "data/datasets/SQuAD-v1.1-train-processed-spacy.txt"

preprocessor = Preprocessor()

preprocessor(in_file, out_file, show_inputs=True, show_outputs=True)

Input (only first example):
qids : 5733be284776f41900661182
questions : To whom did the Virgin Mary allegedly appear in 1858 in Lourdes France?
answers : [{'answer_start': 515, 'text': 'Saint Bernadette Soubirous'}]
contexts : Architecturally, the school has a Catholic character. Atop the Main Building's gold dome is a golden
qid2cid : 0

Output (only first example):
id : 5733be284776f41900661182
question : ['To', 'whom', 'did', 'the', 'Virgin', 'Mary', 'allegedly', 'appear', 'in', '1858', 'in', 'Lourdes',
document : ['Architecturally', ',', 'the', 'school', 'has', 'a', 'Catholic', 'character', '.', 'Atop', 'the', '
offsets : [[0, 15], [15, 16], [17, 20], [21, 27], [28, 31], [32, 33], [34, 42], [43, 52], [52, 53], [54, 58], 
answers : [[102, 104]]
qlemma : ['to', 'whom', 'do', 'the', 'Virgin', 'Mary', 'allegedly', 'appear', 'in', '1858', 'in', 'Lourdes', 
lemma : ['architecturally', ',', 'the', 'school', 'have', 'a', 'catholic', 'character', '.', 'atop', 'the', 
pos : ['RB', ',', 'DT',

### Preprocessing SQuAD dev dataset for DrQA

In [6]:
in_file = "data/datasets/SQuAD-v1.1-dev.json"
out_file = "data/datasets/SQuAD-v1.1-dev-processed-spacy.txt"

preprocessor = Preprocessor()

preprocessor(in_file, out_file)