# Overview

**Data Preprocessing Pipeline**

- Split large files (memory constraints with spacy)
- Sentence segmentation with LatinCy
- Cleaning the sentences with regex
- Filter short sentences and combine the files
- Transform into .jsonl

In order to save checkpoints and ensure reproducibility, after each step the folder has been duplicated and renamed.

# Code

## Pipeline

In [None]:
split_large_files("../../data/preprocessing/1-smaller-files/") 

In [None]:
sentence_segmentation("../../data/preprocessing/2-segmented-sentences/")

In [None]:
clean_sentences("../../data/preprocessing/3-cleaned-sentences/")

In [None]:
filter_short_sentences("../../data/preprocessing/4-only-longer-sentences/")

In [None]:
transform_to_jsonl("../../data/preprocessing/4-only-longer-sentences/", "../../data/corpus/")

## Imports and Initialisation

In [None]:
import os
import spacy
import re 
import json

nlp = spacy.load("la_core_web_lg")
nlp.max_length = 4000000

TOKEN_LIMIT = 10 

## Parts

### Splitting Files

In [None]:
def split_large_files(root_folder, max_lines=250):
    for dirpath, dirnames, filenames in os.walk(root_folder):
        for filename in filenames:
            if filename.endswith('.txt'):
                file_path = os.path.join(dirpath, filename)
                try:
                    with open(file_path, 'r', encoding='utf-8') as file:
                        lines = file.readlines()
                    line_count = len(lines)
                    
                    if line_count > max_lines:
                        base_name, ext = os.path.splitext(filename)
                        
                        for i in range(0, line_count, max_lines):
                            part_lines = lines[i:i + max_lines]
                            part_filename = f"{base_name}_part{i // max_lines + 1}{ext}"
                            part_path = os.path.join(dirpath, part_filename)
                            
                            with open(part_path, 'w', encoding='utf-8') as part_file:
                                part_file.writelines(part_lines)
                                
                        os.remove(file_path)
                        
                except Exception as e:
                    print(f"Failed to process {file_path}: {e}")

### Sentence Segmentation

In [None]:
def sentence_segmentation(root_folder):
    for subdir, _, files in os.walk(root_folder):
        for file in files:
            if file.lower().endswith('.txt'):
                file_path = os.path.join(subdir, file)
                with open(file_path, 'r', encoding='utf-8') as f:
                    content = f.read()
                sentences = sentence_segmentation_in_text(content)
                with open(file_path, 'w', encoding='utf-8') as f:
                    for sentence in sentences:
                        f.write(sentence + "\n")
                    print(file_path)

In [None]:
def sentence_segmentation_in_text(text):
    text = text.replace('\n', ' ')
    doc = nlp(text)
    sentences = [sent.text.strip() for sent in doc.sents]
    return sentences

### Cleaning 

In [None]:
def clean_sentences(folder):
    for root, dirs, files in os.walk(folder):
        for file in files:
            file_path = os.path.join(root, file)
            try:
                with open(file_path, "r", encoding="utf-8") as f:
                    sentences = f.readlines()
                
                # Process each sentence: strip whitespace, then apply both cleaning functions
                cleaned_sentences = []
                for sentence in sentences:
                    cleaned_sentence = clean(sentence)
                    cleaned_sentence = normalize_capitalization(cleaned_sentence)
                    cleaned_sentences.append(cleaned_sentence)
                
                # Write the cleaned sentences back to the file
                with open(file_path, "w", encoding="utf-8") as f:
                    f.write("\n".join(cleaned_sentences))
            
            except Exception as e:
                print(f"Error processing file {file_path}: {e}")

In [None]:
def clean(text):
    patterns = [
        r'\bpage\b\s*\d*',                    # 'page' followed by numbers
        r'\b\w*\d+\w*\b',                     # alphanumeric with digits
        r'\[.*?\]',                           # brackets and content
        r'\bUERS\b[.,\s]*',                   # 'UERS' case-insensitive
        r'\bCAPUT\b\s*[IVXLCDM]+\.',          # 'CAPUT' + Roman numerals
        r'\bCAP\.\s*[IVXLCDM]+\.',            # 'CAP.' + Roman numerals
        r'\bGo back to text\b',               # specific phrase
        r'\bFront Matter\b',                  # specific phrase
        r'^(\b\w+\b[.,\s]*){1,3}$',           # short alphanumeric sequences
        r'\.{2,}',                            # multiple periods
        r'([.,\s])\1{1,}',                    # repeated punctuation/spaces
        r'[,.]{2,}',                          # mixed punctuations
        r'^\s*[.,]+',                         # leading punctuation/spaces
        r'^\b[IVXLCDM]+\b\.?',                # leading Roman numerals
        r'^\bibid\b\.?',                      # leading 'ibid.' case-insensitive
        r'\b[a-z]\.\b',                       # Removes standalone single letters (vowels + consonants) with a period
        r'\b[b-df-hj-np-tv-z]\b'              # Removes standalone consonants without a period
    ]

    for pattern in patterns:
        text = re.sub(pattern, '', text, flags=re.IGNORECASE)

    # Replace non-word characters (except spaces, dots, and commas) with a space
    text = re.sub(r'[^\w\s.,]', ' ', text)

    # Remove spaces before commas and periods
    text = re.sub(r'\s+([,.])', r'\1', text)

    # Ensure space after punctuation (if followed by a letter or number)
    text = re.sub(r'([,.])(\w)', r'\1 \2', text)

    # Normalize spaces (remove extra spaces)
    text = re.sub(r'\s+', ' ', text).strip()

    return text

In [None]:
def normalize_capitalization(text): 
    processed_tokens = []
    doc = nlp(text)
    for token in doc:
        if token.text.isupper():  # Check if the token is all uppercase
            if token.ent_type_:  # If it's a named entity, capitalize only the first letter
                processed_tokens.append(token.text.capitalize() + token.whitespace_)
            else:  # Otherwise, make it all lowercase
                processed_tokens.append(token.text.lower() + token.whitespace_)
        else:
            processed_tokens.append(token.text + token.whitespace_)  # Preserve original spacing
    
    return "".join(processed_tokens)  

### Filtering Short Sentences

In [None]:
def filter_short_sentences(folder):
    filter_short_sentences_per_author(folder, TOKEN_LIMIT, "Brenz")
    filter_short_sentences_per_author(folder, TOKEN_LIMIT, "Bucer")
    filter_short_sentences_per_author(folder, TOKEN_LIMIT, "Bugenhagen")
    filter_short_sentences_per_author(folder, TOKEN_LIMIT, "Bullinger")
    filter_short_sentences_per_author(folder, TOKEN_LIMIT, "Erasmus")
    filter_short_sentences_per_author(folder, TOKEN_LIMIT, "Melanchthon")
    filter_short_sentences_per_author(folder, TOKEN_LIMIT, "Oekolampad")
    filter_short_sentences_per_author(folder, TOKEN_LIMIT, "Theophylact")
    filter_short_sentences_per_author(folder, TOKEN_LIMIT, "Tuitiensis")
    filter_short_sentences_per_author(folder, TOKEN_LIMIT, "Wild")
    filter_short_sentences_per_author(folder, TOKEN_LIMIT, "Zwingli")

In [None]:
def filter_short_sentences_per_author(folder, token_limit, author):

    sentences = []

    for root, dirs, files in os.walk(folder):
        for file in files:
            if file.endswith('.txt') and file.startswith(author):
                file_path = os.path.join(root, file)
                with open(file_path, 'r', encoding='utf-8') as f:
                    # Each line is considered a sentence
                    for line in f:
                        sentence = line.strip()
                        if sentence:  # Skip empty lines
                            # Tokenize the sentence using spaCy
                            doc = nlp(sentence)
                            if len(doc) > token_limit:
                                sentences.append(sentence)
    
    with open(os.path.join(folder, f"{author}.txt"), 'w', encoding='utf-8') as f:
        for sentence in sentences:
            f.write(sentence.strip() + "\n")
        print("Finished " + author) 

### Transforming into jsonl

In [None]:
def transform_to_jsonl(input_folder, output_folder):
    for root, dirs, files in os.walk(input_folder):
        for file in files:
            file_path = os.path.join(root, file)
            txt_to_jsonl(file_path, output_folder)
            print("Finished " + file_path)

In [None]:
def txt_to_jsonl(file_path, output_folder):

    root, ext = os.path.splitext(file_path)
    jsonl_file = os.path.join(output_folder, root) + ".jsonl"
    
    with open(file_path, 'r', encoding='utf-8') as infile, open(jsonl_file, 'w', encoding='utf-8') as outfile:
        for line in infile:
            sentence = line.strip()
            if sentence:  # Skip empty lines
                json_obj = {"sentence": sentence}
                outfile.write(json.dumps(json_obj, ensure_ascii=False) + '\n')