# Pre-processing of Data for Animacy Classifiers

## Files for Classifier A

First, 100 randomly chosen files out of the 25.000 files are stored.

The original files in the WebAnno TSV format are processed and written with a new header. Also, a new column is added to the TSV files and with the help of a noun chunker and the IOB format, the nouns in the texts are labeled (B = begin of a noun chunk, I = not the begin of a noun chunk). Both labels B and I are then written into the files for later annotaion with inCeption.

In [None]:
import os
import random
import numpy as np
import spacy
import shutil
import csv

In [None]:
# read 100 random tsv files and store them
def read_random_TSV_files(folder_path, save_folder_path):
    """
    Reads 100 random TSV files from a folder and saves their contents to a new folder.

    Args:
        folder_path (str): Path to the folder containing the TSV files.
        save_folder_path (str): Path to the folder where the new files should be saved.
    """
    
    counter = 0
    
    files = os.listdir(folder_path)
    rs = np.random.RandomState(seed=20)

    # randomly select 100 files with the RandomState object
    random_files = rs.choice(files, size=50, replace=False)

    if not os.path.exists(save_folder_path):
        os.makedirs(save_folder_path)

    for file_name in random_files:
        
        file_path = os.path.join(folder_path, file_name)

        with open(file_path, "r") as file:
            file_content = file.read()
            counter += 1

        save_file_path = os.path.join(save_folder_path, file_name)

        with open(save_file_path, "w") as file:
            file.write(file_content)

In [None]:
# Creation of folder containing 100 random speeches

folder_path = "/Users/lisa/Desktop/Code/Preperation_Of_Classifiers/originalFiles/"
save_folder_path = "/Users/lisa/Desktop/Code/Preperation_Of_Classifiers/newRandomTSVFiles/"

read_random_TSV_files(folder_path, save_folder_path)

In [None]:
# load German spacy model
nlp = spacy.load('de_core_news_lg')

# end = end of a chunk, blank = not at the end of a chunk
class LineLabel:
    def __init__(self, tag, end, blank):
        self.tag = tag
        self.end = end
        self.blank = blank

# return word number (containing a token and its label)
def findWordNumber(checkLine):
    desiredStart = checkLine.find('-')
    desiredEnd = checkLine.find('\t')

    return checkLine[desiredStart + 1:desiredEnd]

def determineLabel(line, nounChunkList):
    tag = '_'
    endOfNounChunk = False

    if (line.strip() == ''):
        return LineLabel('_', False, True)

    # checks if noun chunks are in the list (or not)
    if len(nounChunkList) == 0:
        return LineLabel('_', False, False)

    # identification of the next noun chunk
    currentNounChunk = nounChunkList[0]
    searchStart = int(currentNounChunk.start) + 1
    searchEnd = int(currentNounChunk.end)
    wordNumber = int(findWordNumber(line))

    if (wordNumber == searchStart):
        # Label B (Begin)
        tag = 'B'
    # checks if current line is in the middle of a noun chunk
    if ((searchStart + 1) <= wordNumber <= (searchEnd)):
        # Label I (Inside)
        tag = 'I'
    # checks if current line is the end of a noun chunk
    if (wordNumber == searchEnd):
        # sets endOfNounChunk to 'True' to hint that the end of a noun chunk is reached
        endOfNounChunk = True
    return LineLabel(tag, endOfNounChunk, False)

# reads all input files
def noun_chunk_file(file, output):
    with open(file, 'r', encoding='utf-8') as f:
        inputLines = f.readlines()

    with open(output, 'w', encoding='utf-8') as out:
        lineCount = 0
        nounChunkList = []

        # writes the header of the TSV files in the WebAnno Format
        out.writelines(
            ['#FORMAT=WebAnno TSV 3.2\n', '#T_SP=webanno.custom.Animacy|Animated\n',
             '#T_SP=webanno.custom.CustomChunk|Chunk\n', '\n'])

        for line in inputLines:
            lineCount += 1
            
            # skipps the first lines (header)
            if (lineCount < 4):
                continue
                
            # detects lines which contain the text (#Text=...) and updates the noun chunk list(s)
            if line[0:6] == '#Text=':
                nounChunkList = []
                actualText = nlp(line[6:])
                nounChunks = actualText.noun_chunks
            
                for nounChunk in nounChunks:
                    nounChunkList.append(nounChunk)
                    
                out.writelines('\n' + line)
                
            else:
                # determines the tag and whether a line is the end of a noun chunk or not
                label = determineLabel(line, nounChunkList)

                if (label.blank):
                    out.writelines('')
                    continue

                # writes the line with the new added tag
                lineToWrite = line.rstrip() + '\t' + label.tag + '\n'
                out.write(lineToWrite)

                # removes the noun chunk from the list if line was the last line of a noun chunk
                if (label.end):
                    nounChunkList.remove(nounChunkList[0])

                
# folderpaths of the in & output files
folder_path = "/Users/lisa/Desktop/Code/Preperation_Of_Classifiers/newRandomTSVFiles/"
output_folder_path = "//Users/lisa/Desktop/Code/Preperation_Of_Classifiers/labeledDataClassifierA/"

files = os.listdir(folder_path)

for file in files:
    noun_chunk_file(folder_path + file, output_folder_path + "IOB_format_" + file +".tsv")
    

## Files for Classifier B

In [None]:
# folder containing the 100 files
folder_path = "/Users/lisa/Desktop/Code/Preperation_Of_Classifiers/newRandomTSVFiles/"
test_path = "/Users/lisa/Desktop/Code/Preperation_Of_Classifiers/newRandomTSVFiles/"
train_path = "/Users/lisa/Desktop/Code/Preperation_Of_Classifiers/newRandomTSVFiles/"

# percentage of files to use for testing
test_percent = 20

files = os.listdir(folder_path)

# shuffle the list of files randomly
random.shuffle(files)

# calculate the number of files to use for testing based on the test_percent
num_test_files = int(len(files) * (test_percent / 100))

if not os.path.exists(test_path):
    os.makedirs(test_path)

if not os.path.exists(train_path):
    os.makedirs(train_path)

for file in files[:num_test_files]:
    file_path = os.path.join(folder_path, file)
    shutil.move(file_path, test_path)

for file in files[num_test_files:]:
    file_path = os.path.join(folder_path, file)
    shutil.move(file_path, train_path)

In [None]:
def read_random_TSV_files(folder_path, save_folder_path):
    """
    Reads 80 random TSV files from a folder and saves their contents to a new folder.

    Args:
        folder_path (str): Path to the folder containing the TSV files.
        save_folder_path (str): Path to the folder where the new files should be saved.
    """
    
    counter = 0
    
    # list of all files in the folder
    files = os.listdir(folder_path)

    rs = np.random.RandomState(seed=9)

    # randomly select 100 files
    random_files = rs.choice(files, size=100, replace=False)

    if not os.path.exists(save_folder_path):
        os.makedirs(save_folder_path)

    # loop over the selected files
    for file_name in random_files:
        
        file_path = os.path.join(folder_path, file_name)
        with open(file_path, "r") as file:
            file_content = file.read()
            counter += 1

        save_file_path = os.path.join(save_folder_path, file_name)

        with open(save_file_path, "w") as file:
            file.write(file_content)
            
            
# Creation of folder containing 80 random speeches
folder_path = "/Users/lisa/Desktop/Code/Preperation_Of_Classifiers/newRandomTSVFiles/"
save_folder_path = "/Users/lisa/Desktop/Code/Preperation_Of_Classifiers/newRandomTSVFilesForClassifierB"

read_random_TSV_files(folder_path, save_folder_path)

In [None]:
# transformation from TSV WebAnno Files into CoNLL 2000 format for all files for Classifier B


# end = end of a chunk, blank = not at the end of a chunk
class LineLabel:
    def __init__(self, tag, end, blank):
        self.tag = tag
        self.end = end
        self.blank = blank

# read all input files
def noun_chunk_file(file, output):
    with open(file, 'r', encoding='utf-8') as f:
        inputLines = f.readlines()

    with open(output, 'w', encoding='utf-8') as out:
        lineCount = 0
        nounChunkList = []

        # writes the header of the TSV files in the WebAnno Format
        out.writelines(
            ['#FORMAT=WebAnno TSV 3.2\n', '#T_SP=webanno.custom.Animacy|Animated\n',
             '#T_SP=webanno.custom.CustomChunk|Chunk\n', '\n'])

        for line in inputLines:
            lineCount += 1
            
            # skipps the first lines (header)
            if (lineCount < 4):
                continue
                
            # detects lines which contain the text (#Text=...) and updates the noun chunk list(s)
            if line[0:6] == '#Text=':
                nounChunkList = []
                actualText = nlp(line[6:])
                nounChunks = actualText.noun_chunks
            
                for nounChunk in nounChunks:
                    nounChunkList.append(nounChunk)
                    
                # writes the found text (pieces) into the output files
                out.writelines('\n' + line)
                
            else:
                # determines the tag and whether a line is the end of a noun chunk or not
                label = determineLabel(line, nounChunkList)

                if (label.blank):
                    out.writelines('')
                    # writes a bank line if the line is empty and continues with the next line in the text
                    continue

                # writes the line with the new added tag
                lineToWrite = line.rstrip() + '\t' + label.tag + '\n'
                out.write(lineToWrite)

                # remove the noun chunk from the list if line was the last line of a noun chunk
                if (label.end):
                    nounChunkList.remove(nounChunkList[0])        
        
def findWordNumber(checkLine):
    desiredStart = checkLine.find('-')
    desiredEnd = checkLine.find('\t')

    return checkLine[desiredStart + 1:desiredEnd]

def determineLabel(line, nounChunkList):
    tag = '_'
    endOfNounChunk = False

    # checks if line is blank
    if (line.strip() == ''):
        return LineLabel('_', False, True)

    # checks if noun chunks are in the list (or not)
    if len(nounChunkList) == 0:
        return LineLabel('_', False, False)

    # parameters for the identification of the next noun chunk
    currentNounChunk = nounChunkList[0]
    searchStart = int(currentNounChunk.start) + 1
    searchEnd = int(currentNounChunk.end)
    wordNumber = int(findWordNumber(line))

    # checks if the current line is the start of a noun chunk
    if (wordNumber == searchStart):
        # Label B (Begin)
        tag = 'B'
    # checks if current line is in the middle of a noun chunk
    if ((searchStart + 1) <= wordNumber <= (searchEnd)):
        # Label I (Inside)
        tag = 'I'
    # checks if current line is the end of a noun chunk
    if (wordNumber == searchEnd):
        # sets endOfNounChunk to 'True' to hint that the end of a noun chunk is reached
        endOfNounChunk = True
    return LineLabel(tag, endOfNounChunk, False)

def pos_and_chunk_tags(text):
    doc = nlp(text)
    pos_tags = [(token.text, token.pos_) for token in doc]
    chunk_tags = ['O'] * len(doc)

    for chunk in doc.noun_chunks:
        for i in range(chunk.start, chunk.end):
            if i == chunk.start:
                chunk_tags[i] = 'B-NP'
            else:
                chunk_tags[i] = 'I-NP'

    return pos_tags, chunk_tags
    

# transform webanno to conll2000
def webanno_tsv_to_conll2000_with_spacy(input_file, output_file):
    with open(input_file, 'r', encoding='utf-8') as tsvfile, open(output_file, 'w', encoding='utf-8') as outfile:
        reader = csv.reader(tsvfile, delimiter='\t')
        sentence_started = False
        sentence = []

        for row in reader:
            if len(row) == 1 and row[0].startswith('#Text='):
                if sentence_started:
                    # process the sentence
                    pos_tags, chunk_tags = pos_and_chunk_tags(" ".join(sentence))

                    # write the sentence in CoNLL 2000 format
                    for (word, pos_tag), chunk_tag in zip(pos_tags, chunk_tags):
                        outfile.write(f"{word} {pos_tag} {chunk_tag}\n")

                    # add an empty line to separate sentences
                    outfile.write("\n")

                    # reset the sentence
                    sentence = []

                sentence_started = True

            elif len(row) >= 3 and sentence_started:
                word = row[2]
                sentence.append(word)

        # process and write the last sentence
        if sentence:
            pos_tags, chunk_tags = pos_and_chunk_tags(" ".join(sentence))

            for (word, pos_tag), chunk_tag in zip(pos_tags, chunk_tags):
                outfile.write(f"{word} {pos_tag} {chunk_tag}\n")

            outfile.write("\n")

# folder paths of the input and output files
input_folder_path = "/Users/lisa/Desktop/Code/Preperation_Of_Classifiers/originalFiles/
output_conll2000_folder_path = "/Users/lisa/Desktop/Code/Preperation_Of_Classifiers/CoNLL Format/All Files/"
output_folder_path = "/Users/lisa/Desktop/Code/Preperation_Of_Classifiers/IOB Format/"

input_files = os.listdir(input_folder_path)

for input_file in input_files:
    # only process .tsv files
    if input_file.endswith(".csv"):
        # output file paths
        output_conll2000_file = output_conll2000_folder_path + "CoNNL2000_" + input_file[:-4] + ".txt"
        output_tsv_file = output_folder_path + "IOB_format_" + input_file
        print(output_tsv_file)
        # apply the noun_chunk_file function to the input TSV file
        noun_chunk_file(input_folder_path + input_file, output_tsv_file)
        print(noun_chunk_file)
        # convert the updated TSV file to the CoNLL 2000 format
        webanno_tsv_to_conll2000_with_spacy(output_tsv_file, output_conll2000_file)
        print(webanno_tsv_to_conll2000_with_spacy)


In [None]:
# creation of 20:80 validation and training split and store in folders

def split_data_into_train_and_validation(input_folder, train_folder, validation_folder, validation_size=20):
    all_files = os.listdir(input_folder)
    validation_files = random.sample(all_files, validation_size)
    train_files = [file for file in all_files if file not in validation_files]

    for file in validation_files:
        shutil.copy(os.path.join(input_folder, file), os.path.join(validation_folder, file))

    for file in train_files:
        shutil.copy(os.path.join(input_folder, file), os.path.join(train_folder, file))

input_folder_path = "/Users/lisa/Desktop/Code/Preperation_Of_Classifiers/CoNLL Format/All Files/"
train_folder_path = "/Users/lisa/Desktop/Code/Preperation_Of_Classifiers/CoNLL Format/Training/"
validation_folder_path = "/Users/lisa/Desktop/Code/Preperation_Of_Classifiers/CoNLL Format/Validation/"

os.makedirs(train_folder_path, exist_ok=True)
os.makedirs(validation_folder_path, exist_ok=True)

split_data_into_train_and_validation(input_folder_path, train_folder_path, validation_folder_path)

In [None]:
# transform conll2000 files to dataframe

def conll2000_to_dataframe(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        content = file.read()
        sentences = content.split('\n\n')

        data = []
        for sentence in sentences:
            lines = sentence.split('\n')
            for line in lines:
                tokens = line.split()
                if len(tokens) == 3:
                    data.append(tokens)

    df = pd.DataFrame(data, columns=['Word', 'POS', 'Chunk'])
    return df

def read_files_to_dataframe(folder_path, file_names):
    dfs = []

    for file_name in file_names:
        file_path = os.path.join(folder_path, file_name)
        df = conll2000_to_dataframe(file_path)
        dfs.append(df)

    combined_df = pd.concat(dfs, ignore_index=True)
    return combined_df

# folder paths of the input files
train_input_folder_path = "/Users/lisa/Desktop/Code/Preperation_Of_Classifiers/CoNLL Format/Training/"
test_input_folder_path = "/Users/lisa/Desktop/Code/Preperation_Of_Classifiers/CoNLL Format/Validation/"

# get all CoNLL 2000 files in the input folders
train_files = [f for f in os.listdir(train_input_folder_path) if f.endswith('.txt')]
test_files = [f for f in os.listdir(test_input_folder_path) if f.endswith('.txt')]

# read the training and testing data into DataFrames
classifier_B_train_df = read_files_to_dataframe(train_input_folder_path, train_files)
classifier_B_validation_df = read_files_to_dataframe(test_input_folder_path, test_files)

print("Train DataFrame:")
print(classifier_B_train_df.head())
print("\nTest DataFrame:")
print(classifier_B_validation_df.head())