In [1]:
## Imports, utility, and cleaning functions

# required imports:
import codecs
from nltk.tokenize import sent_tokenize
import os
import pandas as pd
import re
import unicodedata

# project functions:
# from utility_code import *
import sys

import spacy
nlp = spacy.load("en_core_web_lg")
# faster but less accurate model:
# nlp = spacy.load("en_core_web_sm")
# slower but more accurate model:
# download it first
# !python -m spacy download en_core_web_lg

In [2]:
## From utility_code.py
## Progress bar to view the progress of lengthy processes
# As suggested by Rom Ruben (see: http://stackoverflow.com/questions/3173320/text-progress-bar-in-the-console/27871113#comment50529068_27871113)
def progress(count, total, status=''):
    bar_len = 60
    filled_len = int(round(bar_len * count / float(total)))
    percents = round(100.1 * count / float(total), 1)
    bar = '#' * filled_len + '-' * (bar_len - filled_len)
    sys.stdout.write('[%s] %s%s ...%s\r' % (bar, percents, '%', status))
    sys.stdout.flush() 

# for full output:
def output_full(df, path_to_spreadsheets):
    spreadsheet_name = input("[=] Please input desired spreadsheet name: ")
    df.to_csv(path_to_spreadsheets + spreadsheet_name + '.csv')
    print(spreadsheet_name + ' was saved in '+str(path_to_spreadsheets) + f" as {spreadsheet_name}.csv")


In [3]:
## All cleaning functions and regex

### regex:
regex_expressions = {"initials": r"\b([A-Z][.](\s)?)+", "prefixes": r"(Mr|St|Mrs|Ms|Dr|Esq|Sec|Secretar)[.]",\
                     "addresses": "", "dates": "", "line_break": r"¬\n", "space": r"/s",\
                     "dashes": r"[-]+", "quote_marks": r"(“|”)", \
                     "months_abrv": r"(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[.](\s*(\d{1,2})(,|\.)?)?(\s*\d+)?",\
                     "pennies": r"(\d+[.]?\s*)[d][.]", "months_and_years": r"\d{1,2}[.]\s*(\d{4})"}


def input_corpus_of_txts(path):
    list_of_filenames_and_dirty_texts = []
    for filename in os.listdir(path):
        with codecs.open(path + filename, 'r', encoding='utf-8', errors="ignore") as raw_text:
            dirty_text = raw_text.read()
        list_of_filenames_and_dirty_texts.append((filename, dirty_text))
    return list_of_filenames_and_dirty_texts


# strip all accented characters:
def strip_accents(text):
    text = unicodedata.normalize('NFD', text).encode('ascii', 'ignore').decode("utf-8")
    return str(text)

def process_periods(text):
    # no matchobj needed since this is only called in other processing functions
    text = re.sub(r"[.]","<prd>", text)
    return text

def process_periods_to_commas(matchobj):
    text = matchobj.group(0)
    text = re.sub(r"[.]", ",", text)
    return text

# processing functions for regex calls in preprocess_text() function
# process initials for regex, and return a format that we can identify
def process_initials(matchobj):
    text = matchobj.group(0)
    text = process_periods(text)
    text = re.sub(r"\s*", "", text)
    text = text + " "
    return text

def process_months_abrv(matchobj):
    text = matchobj.group(0)
    text = process_periods(text)
    # text = " <date>"+text+"<date> "
    return text

def process_pennies(matchobj):
    text = matchobj.group(0)
    text = re.sub(r"d[.]?","pennies", text)
    text = process_periods(text)
    return text

# combine it together
def preprocess_text(text):
    # remove all the line breaks created by newspaper processor
    text = re.sub(regex_expressions["line_break"],"", text)
    # marking initials:
    text = re.sub(regex_expressions["initials"], process_initials, text)
    # process titles:
    text = re.sub(regex_expressions["prefixes"],"\\1<prd>", text, flags=re.IGNORECASE)
    # process month abbreviations:
    text = re.sub(regex_expressions["months_abrv"], process_months_abrv, text, flags=re.IGNORECASE)
    # process instances of months [period] year:
    text = re.sub(regex_expressions["months_and_years"], process_periods_to_commas, text)
    # process instances of "No."
    text = re.sub(r"(No|Nos)[.]","number", text, flags=re.IGNORECASE)
    # strip all dashes:
    text = re.sub(regex_expressions["dashes"], " ", text)
    # transform all quotes to ' " ':
    text = re.sub(regex_expressions["quote_marks"], '"', text)
    # strip all pennies "XX d." in the text:
    text = re.sub(regex_expressions["pennies"], process_pennies, text)
    # strip all accents from the text:
    text = strip_accents(text)
    # print("[-] Finished processing linebreaks, initials, prefixes, months, years, numbers, dashes, quotations marks, and pennies symbols...")
    return text

def clean_tokenized_sent(sent):
    # removing newline notations
    clean_sent = re.sub('\n', ' ', sent)
    clean_sent = re.sub('\r', ' ', clean_sent)
    # transforming multiple spaces to one space
    clean_sent = re.sub('\s+',' ', clean_sent)
    split_sentence = clean_sent.split()
    
    # transform all the words that are completely uppercase to lowercase
    for index, word in enumerate(split_sentence):
        if (word.isupper()):
            new_word = word.lower()
            split_sentence[index] = new_word
    clean_sent = " ".join(split_sentence)
    
    # put back the periods:
    clean_sent = re.sub("<prd>", ".", clean_sent)
    # clean_sent = clean_sent.lower()
    return clean_sent

### Not needed if done in df
def clean_tokenized_list(sent_list):
    cleaned_tokenized_sentences = []
    for sent in sent_list:
        clean_set = clean_tokenized_sent(sent)
        cleaned_tokenized_sentences.append(clean_set)
    return cleaned_tokenized_sentences

def process_dirty_texts_to_df(list_of_filenames_and_dirty_texts):
    cleaned_texts = []
    cleaned_corpus_as_dictionary = {}
    for filename, dirty_text in list_of_filenames_and_dirty_texts:
        preprocessed_text = preprocess_text(dirty_text)
        tokenized_sentences = sent_tokenize(preprocessed_text)
        cleaned_tokenized_sentences = clean_tokenized_list(tokenized_sentences)
        relative_sentence_index = 0
        for clean_tokenized_sentence in cleaned_tokenized_sentences:
            tupled_files = (filename, clean_tokenized_sentence, relative_sentence_index)
            cleaned_texts.append(tupled_files)
            relative_sentence_index += 1
    cleaned_corpus_as_dictionary['file_names'] = [x[0] for x in cleaned_texts]
    cleaned_corpus_as_dictionary['sentences'] = [x[1] for x in cleaned_texts]
    cleaned_corpus_as_dictionary['relative_sentence_index'] = [x[2] for x in cleaned_texts]
    
    df = pd.DataFrame(cleaned_corpus_as_dictionary)
    print("\n[-] Text preprocessing completed.")
    return df

In [4]:


def get_pos_counts_from_tagged_sentence(analyzed_sent):
    pos_counts = []
    pos_counts_raw = analyzed_sent.count_by(spacy.attrs.IDS['POS'])
    for pos, count in pos_counts_raw.items():
        tag = analyzed_sent.vocab[pos].text
        pos_count = (tag, count)
        pos_counts.append(pos_count)
    # return a list of pos_counts
    return pos_counts

def pos_tag_sentence(sent):
    tagged_sentence = []
    analyzed_sent = nlp(sent, disable = ['ner'])
    # getting the complete tokenized sentence
    for token in analyzed_sent:
        tagged_word = (token, token.pos_)
        tagged_sentence.append(tagged_word)
    pos_counts = get_pos_counts_from_tagged_sentence(analyzed_sent)
    # return a tuple of both
    return (tagged_sentence, pos_counts)

def pos_tag_list_of_sentences(list_of_cleaned_sentences):
    pos_tagged_text = []
    for sent in list_of_cleaned_sentences:
        tagged_sent = pos_tag_sentence(sent)
        pos_tagged_text.append(tagged_sent)
    # returns a list of tuples
    return pos_tagged_text

df strip_double_proper_nouns():
    pass

def pos_tag_texts_from_df(df, sentences_column='sentences'):
    df['tagged_sentences'] = ''
    df['pos_counts'] = ''
    for index, row in df.iterrows():
        progress(index, len(list(df.index.values)))
        sentence = row[sentences_column]
        tagged_sentence, pos_counts = pos_tag_sentence(sentence)
        df.at[index, 'tagged_sentences'] = tagged_sentence
        df.at[index, 'pos_counts'] = pos_counts
        # print(tagged_sentence)
        # if index >= 10:
        #     break
    return df


In [5]:

sentences_column = "sentences"
# path_to_newspapers = "../../../GitHub/ciphersofthetimes/data/corpora/newspapers_test/"
path_to_newspapers = "../../data/corpora/newspapers_test/"
# path_to_spreadsheets = "../../../GitHub/ciphersofthetimes/data/spreadsheets/"
path_to_spreadsheets = "../../data/spreadsheets/"


# def main():
#     print("[+] Starting newspaper corpus processor...")
#     print(f"[+] Using default values:")
#     print(f"[+] Path to newspapers is: {path_to_newspapers}")
#     print(f"[+] Path to spreadsheets is: {path_to_spreadsheets}")
#     print(f"[+] Sentences column name is: {sentences_column}")

#     # path_to_newspapers = input("[+] Please input path to newspaper corpus: /Users/leehusigler/Documents/GitHub/ciphersofthetimes/data/corpora/newspapers_test")
#     print(f"[+] Importing corpus of dirty texts from {path_to_newspapers}")
#     dirty_texts = input_corpus_of_txts(path=path_to_newspapers)
#     print("[+] Processing dirty texts")
#     df = process_dirty_texts_to_df(dirty_texts)
#     print("[+] Dataframe created.")
#     print("[+] Beginning POS tagging ...")
#     df = pos_tag_texts_from_df(df, 'sentences')
#     print("[+] Completed POS tagging.")
#     print("[+] Dataframe head looks like this: ")
#     print(df.head())
#     print("[+] Saving dataframe...")
#     output_full(df=df, path_to_spreadsheets=path_to_spreadsheets)
#     print("[+] Program completed. Exiting...")
    # exit()

In [None]:
print("[+] Starting newspaper corpus processor...")
print(f"[+] Using default values:")
print(f"[+] Path to newspapers is: {path_to_newspapers}")
print(f"[+] Path to spreadsheets is: {path_to_spreadsheets}")
print(f"[+] Sentences column name is: {sentences_column}")

# path_to_newspapers = input("[+] Please input path to newspaper corpus: /Users/leehusigler/Documents/GitHub/ciphersofthetimes/data/corpora/newspapers_test")
print(f"[+] Importing corpus of dirty texts from {path_to_newspapers}")
dirty_texts = input_corpus_of_txts(path=path_to_newspapers)
print("[+] Processing dirty texts")
df = process_dirty_texts_to_df(dirty_texts)
print("[+] Dataframe created.")
print("[+] Beginning POS tagging ...")
df = pos_tag_texts_from_df(df, 'sentences')
print("[+] Completed POS tagging.")
print("[+] Dataframe head looks like this: ")
print(df.head())
print("[+] Saving dataframe...")
output_full(df=df, path_to_spreadsheets=path_to_spreadsheets)
print("[+] Program completed. Exiting...")