In [1]:
import csv
import pdb
import json
import os
import re
import string
import unittest

# Preprocess Text

To simplify the text anaylsis of the U.S. Civil Telegrams, the telegrams need to be preprocessed to remove volunteer generated metadata tags (e.g., `<deletion>`, `<insertion>`, and `<unclear>`), punctuation, as well as converting all characters to lower case.

As these changes are applied, the changes are saved in a dictionary. The structure of this dictionary is:

```
telegram_changes = {
    removed_tags: [], # array of tuples (e.g., (starting index, text of the selection))
    removed_punctuation: [], # array of tuples (e.g., (starting index, ending index, text of the selection))
    lower_cased_characters: [], # array of tuples (e.g., (starting index, text of the selection))
    spaces_removed: []
}
```
This script runs on one ledger at a time.


## Inputs:
- path to unprocessed text files
- path to folder that will contain the outputs of this script

## Outputs:
This script generates the following file structure:

- folder of each telegram
    - txt file containing the unprocessed telegram
    - txt file containing the processed telegram
    - json file containing the dictionary of changes


In [2]:
unprocessed_file_path = os.getenv('UNPROCESSED_TELEGRAMS_FOLDER')
processed_file_path = os.getenv('PROCESSED_TELEGRAMS_FOLDER')

In [3]:
def remove_annotation_tags(telegrams_lines, removed_punctuation):
    tag_pattern = "(<deletion>|</deletion>|<insertion>|</insertion>|<unclear>|</unclear>)"
    removed_tags = []
    telegrams_lines_without_tags = telegrams_lines
    end_index = 0
    while (end_index < len(telegrams_lines_without_tags)):
        match = re.search(tag_pattern, telegrams_lines_without_tags)
        if match:
            start_index = int(match.start())
            end_index = int(match.end())
            telegrams_lines_without_tags = telegrams_lines_without_tags[0:start_index] + telegrams_lines_without_tags[end_index:]
            tag = match.group(0)
            removed_punctuation.append((match.start(), match.end(), tag))
        else:
            end_index = len(telegrams_lines_without_tags)
    return (telegrams_lines_without_tags, removed_punctuation)

In [4]:
# going through by charcter    
def remove_punctuation(telegrams_lines, removed_punctuation):
    telegram_lines_no_punctuation = ""
    index = 0
    for character in telegrams_lines:
        if character not in string.punctuation:
            telegram_lines_no_punctuation = '{0}{1}'.format(telegram_lines_no_punctuation, character)
        else:
            removed_punctuation.append((index, character))
        index+=1
    return (telegram_lines_no_punctuation, removed_punctuation)

In [5]:
def lowercase_characters(telegrams_lines, removed_punctuation):
    lowercase_telegram = ""
    index = 0
    for character in telegrams_lines:
        if character in string.ascii_uppercase:
            # lowercase the uppercase letter
            lowercase_telegram = "".join([lowercase_telegram, character.lower()])
            removed_punctuation.append((index, character))
        else:
            lowercase_telegram = "".join([lowercase_telegram, character])
        index+=1
    return (lowercase_telegram, removed_punctuation)

In [6]:
def remove_extra_spaces(telegrams_lines, removed_punctuation):
    extra_space_pattern = "(  ){1,}"
    remove_extra_space_telegram = telegrams_lines
    end_index = 0
    # this could be a separate method
    while (end_index < len(remove_extra_space_telegram)):
        match = re.search(extra_space_pattern, remove_extra_space_telegram)
        if match:
            start_index = int(match.start())
            end_index = int(match.end())
            remove_extra_space_telegram = remove_extra_space_telegram[0:start_index] + " " + remove_extra_space_telegram[end_index:]
            tag = match.group(0)
            removed_punctuation.append((match.start(), match.end(), tag))
        else:
            end_index = len(remove_extra_space_telegram)

    return (remove_extra_space_telegram, removed_punctuation)

In [7]:
def dehydrate(telegrams_lines):
    telegram_changes = {}
    telegram_without_tags = remove_annotation_tags(telegrams_lines, [])
    telegram_changes["removed_tags"] = telegram_without_tags[1]

    telegram_without_punctuation = remove_punctuation(telegram_without_tags[0], [])
    telegram_changes["removed_punctuation"] = telegram_without_punctuation[1]

    telegram_lowercase_charaters = lowercase_characters(telegram_without_punctuation[0], [])
    telegram_changes["lower_cased_characters"] = telegram_lowercase_charaters[1]
    
    telegram_remove_extra_spaces = remove_extra_spaces(telegram_lowercase_charaters[0], [])
    telegram_changes["spaces_removed"] = telegram_remove_extra_spaces[1]
    
    normalized_telegram = telegram_remove_extra_spaces[0]
    
    return (normalized_telegram, telegram_changes)

In [8]:
def rehydrate(telegram_lowercase_charaters, telegram_changes):
    rehydrated_telegrams = telegram_lowercase_charaters

    if len(telegram_changes["spaces_removed"]) > 0:
        telegram_changes["spaces_removed"].reverse()
    
    for change in telegram_changes["spaces_removed"]:
        index_of_change = change[0]
        before_change = rehydrated_telegrams[0:index_of_change]
        after_change = rehydrated_telegrams[index_of_change:]
        # change[2][1:] because we left a space in, we don't want to overadd space
        rehydrated_telegrams = "".join([before_change, change[2][1:], after_change])
    
    for change in telegram_changes["lower_cased_characters"]:
        index_of_change = change[0]
        changed_character = change[1]
        before_change = rehydrated_telegrams[0:index_of_change]
        after_change = rehydrated_telegrams[index_of_change + 1:]
        rehydrated_telegrams = "".join([before_change, changed_character.upper(), after_change])
    
    for change in telegram_changes["removed_punctuation"]:
        index_of_change = change[0]
        before_change = rehydrated_telegrams[0:index_of_change]
        after_change = rehydrated_telegrams[index_of_change:]
        rehydrated_telegrams = "".join([before_change, change[1], after_change])

    if len(telegram_changes["removed_tags"]) > 0:
        telegram_changes["removed_tags"].reverse()
    for change in telegram_changes["removed_tags"]:
        index_of_change = change[0]
        before_change = rehydrated_telegrams[0:index_of_change]
        after_change = rehydrated_telegrams[index_of_change:]
        rehydrated_telegrams = "".join([before_change, change[2], after_change])
    
    return rehydrated_telegrams

In [9]:
def create_telegram_folder(path):
    try:
        os.mkdir(path)
    except FileExistsError:
        print("ERROR: File already exists")

In [10]:
def create_text_file(dehydrated_telegram, path, filename):
    text_file_path = path + "/" + filename + ".txt"
    try:
        with open(text_file_path, "w") as dehydrated_file:
            dehydrated_file.write(dehydrated_telegram)
    except IOError:
        print("IOError: Error writing text file.")

In [11]:
def create_json_file(telegram_changes, path, filename):
    json_file_path = path + "/" + filename + ".json"
    try:
        with open(json_file_path, "w") as jsonfile:
            try:
                json.dump(telegram_changes, jsonfile, indent=4)
            except TypeError:
                print("TypeError writing JSON file.")
            except OverflowError:
                print("TypeError writing JSON file.")
            except ValueError:
                print("ValueError writing JSON file.")
    except IOError:
        print("IOError: Error writing JSON file.")

In [None]:
if (os.path.exists(unprocessed_file_path)):
    for file in os.listdir(unprocessed_file_path):
        if file.startswith("."):
            continue

        pathname = os.path.join(unprocessed_file_path, file)
        with open(pathname) as telegram:

            print("--------------------------------------")
            print("File Name: ", file)

            telegrams_lines = telegram.read()

            # tuple of (normalized telegram text, telegram_changes)
            dehydrated_telegram = dehydrate(telegrams_lines)
            rehydrated_telegram = rehydrate(dehydrated_telegram[0], dehydrated_telegram[1])

            print("")
            print("telegrams_lines")
            print(telegrams_lines)
            print("")
            print("dehydrated_telegram")
            print(dehydrated_telegram[0])

            filename = file[:-4]
            processed_filename = "preprocessed_" +filename

            # Create folder for telegram
            processed_folder_name = processed_file_path + "/" + filename
            create_telegram_folder(processed_folder_name)

            # Write a json file that contains change tracking object
            create_json_file(dehydrated_telegram[1], processed_folder_name, file[:-4])

            # Write a text file with the processed telegram text
            create_text_file(dehydrated_telegram[0], processed_folder_name, processed_filename)
            create_text_file(telegrams_lines, processed_folder_name, filename)

            print("--------------------------------------")
            print("")
            print("")
else:
    print("path does not exist")
        

In [None]:
class TestStringMethods(unittest.TestCase):
    def test_remove_annotation_tags(self):
        example_string = "<unclear>Ft Monroe</unclear> 4th 120 PM  Recd Jul 4 ' 62 \n<insertion>Norfolk</insertion> <deletion>July</deletion> Fourth twelve thirty PM"
        string_to_match = "Ft Monroe 4th 120 PM  Recd Jul 4 ' 62 \nNorfolk July Fourth twelve thirty PM"
        removed_tags_example_string = remove_annotation_tags(example_string, [])
        
        self.assertEqual(removed_tags_example_string[0], string_to_match)
        
    def test_remove_punctuation(self):
        example_string = "Ft Monroe 4th 120 PM  Recd Jul 4 ' 62 \nNorfolk July Fourth twelve thirty PM"
        string_to_match = "Ft Monroe 4th 120 PM  Recd Jul 4  62 \nNorfolk July Fourth twelve thirty PM"
        removed_punctuation_example_string = remove_punctuation(example_string, [])
        
        self.assertEqual(removed_punctuation_example_string[0], string_to_match)
    
    def test_lowercase_characters(self):
        example_string = "Ft Monroe 4th 120 PM  Recd Jul 4 ' 62 \nNorfolk July Fourth twelve thirty PM"
        string_to_match = "ft monroe 4th 120 pm  recd jul 4 ' 62 \nnorfolk july fourth twelve thirty pm"
        lower_case_example_string = lowercase_characters(example_string, [])
        
        self.assertEqual(lower_case_example_string[0], string_to_match)  
    
    def test_dehydrated_telegram(self):
        example_string = "<unclear>Ft Monroe</unclear> 4th 120 PM  Recd Jul 4 ' 62 \n<insertion>Norfolk</insertion> <deletion>July</deletion> Fourth twelve thirty PM"
        string_to_match = "ft monroe 4th 120 pm  recd jul 4  62 \nnorfolk july fourth twelve thirty pm"
        dehydrated_telegram_example = dehydrate(example_string)
        self.assertEqual(dehydrated_telegram_example[0], string_to_match)
    
    def test_dehydrated_telegram(self):
        example_string = "<unclear>Ft Monroe</unclear> 4th 120 PM  Recd Jul 4 ' 62 \n<insertion>Norfolk</insertion> <deletion>July</deletion> Fourth twelve thirty PM"
        dehydrated_telegram_example = dehydrate(example_string)
        rehydtrated_telegram_example = rehydrate(dehydrated_telegram_example[0], dehydrated_telegram_example[1])
        
        self.assertEqual(rehydtrated_telegram_example, example_string)
    
    def test_remove_extra_spaces__two_spaces(self):
        example_string = "Ft Monroe 4th 120 PM  Recd Jul 4 ' 62 \nNorfolk July Fourth twelve thirty PM"
        string_to_match = "Ft Monroe 4th 120 PM Recd Jul 4 ' 62 \nNorfolk July Fourth twelve thirty PM"
        example_string_removed_extra_spaces = remove_extra_spaces(example_string,[])
        
        self.assertEqual(example_string_removed_extra_spaces[0], string_to_match)
    
    def test_remove_extra_spaces__three_spaces(self):
        example_string = "Ft Monroe 4th 120 PM   Recd Jul 4 ' 62 \nNorfolk July Fourth twelve thirty PM"
        string_to_match = "Ft Monroe 4th 120 PM Recd Jul 4 ' 62 \nNorfolk July Fourth twelve thirty PM"
        example_string_removed_extra_spaces = remove_extra_spaces(example_string,[])
        
        self.assertEqual(example_string_removed_extra_spaces[0], string_to_match)
    
    def test_remove_extra_spaces__multiple_spaces(self):
        example_string = "Ft Monroe 4th 120 PM   Recd Jul 4 ' 62 \nNorfolk July  Fourth twelve thirty PM"
        string_to_match = "Ft Monroe 4th 120 PM Recd Jul 4 ' 62 \nNorfolk July Fourth twelve thirty PM"
        example_string_removed_extra_spaces = remove_extra_spaces(example_string,[])
    
if __name__ == '__main__':
    unittest.main(argv=['first-arg-is-ignored'], exit=False)