# Apply POS Tagging and NER to Politics Test Sets

In [1]:
import spacy
import csv
import pandas as pd
import os
import re

from transformers import AutoTokenizer
from spacy.symbols import ORTH
from spacy.tokenizer import Tokenizer
from spacy.util import compile_prefix_regex, compile_infix_regex, compile_suffix_regex

In [2]:
# load spacy model for POS tagging and NER
spacy_model = spacy.load("en_core_web_sm")

# custom punctuation infix handling --> square and round parentheses split words
def custom_tokenizer(nlp):
    infix_re = re.compile(r'''[..\,\-\[\]\?\:\;\...\‘\’\`\“\”\"\'~]''')
    prefix_re = compile_prefix_regex(nlp.Defaults.prefixes)
    suffix_re = compile_suffix_regex(nlp.Defaults.suffixes)

    return Tokenizer(nlp.vocab, prefix_search=prefix_re.search,
                                suffix_search=suffix_re.search,
                                infix_finditer=infix_re.finditer,
                                token_match=None)

spacy_model.tokenizer = custom_tokenizer(spacy_model)

for special_tokens in ["[URL]", "[EMOJI]"]:
    special_case = [{ORTH: special_tokens}]
    spacy_model.tokenizer.add_special_case(special_tokens, special_case)

In [3]:
%%time

# spacy POS tags
def text_to_pos_list(text):
    doc = spacy_model(text)
    return [[token.text, token.pos_] for token in doc]


# spacy NER tags
def text_to_ner_list(text):
    doc = spacy_model(text)
    text_ner_list = []
    for token in doc:
        if str(token.ent_type_)!="":
            text_ner_list.append([token.text, token.ent_type_, token.ent_iob_])
        else: 
            text_ner_list.append([token.text, "NO_ENT", token.ent_iob_])
    return text_ner_list


def load_df_from_txt(path):
    
    with open(path, "r") as file:
        df = pd.DataFrame(pd.Series(line.rstrip("\n") for line in file if len(line) > 0 and not line.isspace()), columns=["text"])
        
    # write spacy POS tags and NER to new columns
    print("  writing POS tags")
    df["text_pos"] = df.text.apply(lambda x: text_to_pos_list(x))
    print("  writing NER tags")
    df["text_ner"] = df.text.apply(lambda x: text_to_ner_list(x))
    
    # write BERT tokenized version of input text to new column
    tokenizer = AutoTokenizer.from_pretrained('../../0_models/bert-rand-1m-3ep-rand', use_fast=True)
    print("  writing tokenized texts")
    df["text_tokens"] = df.text.apply(lambda x: tokenizer.convert_ids_to_tokens(tokenizer(x, truncation=True, max_length=128, return_special_tokens_mask=True)["input_ids"]))
        
    return df

test_set_dict = {}

directory = '../../0_data/clean/unlabelled_reddit/politics_test'
for filename in sorted(os.listdir(directory)):
    if filename.endswith("_5k.txt"):
        print(f"reading {filename}")
        test_set_dict[filename.rstrip(".txt")] = load_df_from_txt(os.path.join(directory, filename))

reading test_2017_03_5k.txt
  writing POS tags
  writing NER tags
  writing tokenized texts
reading test_2017_04_5k.txt
  writing POS tags
  writing NER tags
  writing tokenized texts
reading test_2017_05_5k.txt
  writing POS tags
  writing NER tags
  writing tokenized texts
reading test_2017_06_5k.txt
  writing POS tags
  writing NER tags
  writing tokenized texts
reading test_2017_07_5k.txt
  writing POS tags
  writing NER tags
  writing tokenized texts
reading test_2017_08_5k.txt
  writing POS tags
  writing NER tags
  writing tokenized texts
reading test_2017_09_5k.txt
  writing POS tags
  writing NER tags
  writing tokenized texts
reading test_2017_10_5k.txt
  writing POS tags
  writing NER tags
  writing tokenized texts
reading test_2017_11_5k.txt
  writing POS tags
  writing NER tags
  writing tokenized texts
reading test_2017_12_5k.txt
  writing POS tags
  writing NER tags
  writing tokenized texts
reading test_2018_01_5k.txt
  writing POS tags
  writing NER tags
  writing toke

In [4]:
def map_pos_to_token(row):
    token_pos_list = []
    
    token_counter = -1
    
    for token in row.text_tokens:
        if token in ["[CLS]", "[SEP]"]: # catch special tokens
            token_pos_list.append([token, "SPECIAL"])
        elif "##" in token: # if BERT token is continuation, don't increment spacy token counter and append POS tag equal to previous iteration
            token_pos_list.append([token, row.text_pos[token_counter][1]])
        else:
            if token_counter<(len(row.text_pos)-1): # ensure that spacy token_counter does not exceed number of spacy tokens (e.g. because of ellipses)
                token_counter+=1

            if str.lower(token) in str.lower(row.text_pos[token_counter][0]) or str.lower(row.text_pos[token_counter][0]) in str.lower(token): #let incrementation happen if BERT and spacy token partially match
                token_pos_list.append([token, row.text_pos[token_counter][1]])
            else:
                token_counter-=1
                token_pos_list.append([token, row.text_pos[token_counter][1]])

    return token_pos_list

In [5]:
def map_ent_to_token(row):
    token_ent_list = []
    
    token_counter = -1
    
    for token in row.text_tokens:
        if token in ["[CLS]", "[SEP]"]: # catch special tokens
            token_ent_list.append([token, "SPECIAL"])
        elif "##" in token: # if BERT token is continuation, don't increment spacy token counter and append POS tag equal to previous iteration
            token_ent_list.append([token, row.text_ner[token_counter][1]])
        else:
            if token_counter<(len(row.text_ner)-1): # ensure that spacy token_counter does not exceed number of spacy tokens (e.g. because of ellipses)
                token_counter+=1

            if str.lower(token) in str.lower(row.text_ner[token_counter][0]) or str.lower(row.text_ner[token_counter][0]) in str.lower(token): #let incrementation happen if BERT and spacy token partially match
                token_ent_list.append([token, row.text_ner[token_counter][1]])
            else:
                token_counter-=1
                token_ent_list.append([token, row.text_ner[token_counter][1]])

    return token_ent_list

In [6]:
%%time

# assign spacy POS tags to BERT subword tokens
for test_set in test_set_dict:
    print(test_set)
    test_set_dict[test_set]["tokens_pos"] =  test_set_dict[test_set].apply(lambda x: map_pos_to_token(x), axis=1)

test_2017_03_5k
test_2017_04_5k
test_2017_05_5k
test_2017_06_5k
test_2017_07_5k
test_2017_08_5k
test_2017_09_5k
test_2017_10_5k
test_2017_11_5k
test_2017_12_5k
test_2018_01_5k
test_2018_02_5k
test_2018_03_5k
test_2018_04_5k
test_2018_05_5k
test_2018_06_5k
test_2018_07_5k
test_2018_08_5k
test_2018_09_5k
test_2018_10_5k
test_2018_11_5k
test_2018_12_5k
test_2019_01_5k
test_2019_02_5k
test_2019_03_5k
test_2019_04_5k
test_2019_05_5k
test_2019_06_5k
test_2019_07_5k
test_2019_08_5k
test_2019_09_5k
test_2019_10_5k
test_2019_11_5k
test_2019_12_5k
test_2020_01_5k
test_2020_02_5k
CPU times: user 2min 18s, sys: 4.42 s, total: 2min 22s
Wall time: 2min 24s


In [7]:
%%time

# assign spacy ENT tags to BERT subword tokens
for test_set in test_set_dict:
    print(test_set)
    test_set_dict[test_set]["tokens_ner"] =  test_set_dict[test_set].apply(lambda x: map_ent_to_token(x), axis=1)

test_2017_03_5k
test_2017_04_5k
test_2017_05_5k
test_2017_06_5k
test_2017_07_5k
test_2017_08_5k
test_2017_09_5k
test_2017_10_5k
test_2017_11_5k
test_2017_12_5k
test_2018_01_5k
test_2018_02_5k
test_2018_03_5k
test_2018_04_5k
test_2018_05_5k
test_2018_06_5k
test_2018_07_5k
test_2018_08_5k
test_2018_09_5k
test_2018_10_5k
test_2018_11_5k
test_2018_12_5k
test_2019_01_5k
test_2019_02_5k
test_2019_03_5k
test_2019_04_5k
test_2019_05_5k
test_2019_06_5k
test_2019_07_5k
test_2019_08_5k
test_2019_09_5k
test_2019_10_5k
test_2019_11_5k
test_2019_12_5k
test_2020_01_5k
test_2020_02_5k
CPU times: user 2min 17s, sys: 5.39 s, total: 2min 22s
Wall time: 2min 24s


In [8]:
# save to to csv
for test_set in test_set_dict:
    test_set_dict[test_set].to_csv(f"../../0_data/clean/unlabelled_reddit/error_analysis/pol_{test_set}.csv", index=False)