# Preprocessing

In [1]:
import regex as re
import sys
import os
import pandas as pd
import datetime as dt
import time
from langchain.text_splitter import RecursiveCharacterTextSplitter
from pathlib import Path
from ollama import chat
from pydantic import BaseModel
from typing import Any
from langchain.text_splitter import RecursiveCharacterTextSplitter

## 1. Document import

In [2]:
def documentImporter(file_path):
    """
    Imports a text file and returns its content.
    """
    if os.path.exists(file_path):
        with open(file_path, 'r', encoding='utf-8') as file:
            TextInhalt = file.read()
        return TextInhalt
    else:
        print(f"File not found: {file_path}")
        return None

Testing

In [None]:
filepath = "/home/pc/Uni/MasterThesis/Speeches/IndividualSessions/documents_TextDaten1951-12-01-1952-01-01.txt_3_4599.txt"
match = re.search(r'(\d{4})\.txt$', filepath)
documentId = match.group(1) if match else "TextClean"
documentId = "TextClean"

RawText = documentImporter(filepath, documentId)
del filepath
del documentId
del match

## 2. Retrieving speechcontext
### Speechdate

In [3]:
def dategetter(RawText):

    datelist = re.findall(r'"datum":"\d{4}-\d{2}-\d{2}"', RawText)
    date_list = []
    for n_date in datelist:
        date_str = n_date.split('"')[3]  # Extract the date string from the match
        date_list.append(dt.datetime.strptime(date_str, '%Y-%m-%d').date())

    # Only every even entry is relevant. This is the case because the ID is included in each document twice. Removing the uneven entries
    date_list = date_list[::2]
    
    return date_list


Testing

In [None]:
date_list = []
date_list = dategetter(RawText)
print(date_list)

Documents = {
    "TextClean": RawText,
    "Date": date_list,
}
Documents_df = pd.DataFrame(Documents)

### Speaker name

In [4]:
def name_and_party_getter(Chunk, patterns):
    for pattern in patterns:
        matches = re.findall(pattern, Chunk)
        if matches:
            return matches[0]


## 3. Defining content patterns

In [5]:
# Store patterns in dictionaries for better organization
patterns = {
    "preamble": re.compile(r'("id":"\d{4}")(.*?)Uhr.{0,150}(Alterspräsidentin|Alterspräsident|Vizepräsidentin|Vizepräsident|Vizekanzlerin|Vizekanzler|Präsidentin|Präsident|Kanzlerin|Kanzler).{0,50}?:', re.DOTALL), #I swear this makes sense!
    "appendix": re.compile(r'(\(Schluß der Sitzung: \d+(.|:)\d+ Uhr.?\)|\\nAnlagen zum Stenographischen Bericht|\\nAnlage 1)(.*?)("id":"\d{4})', re.DOTALL),
    "appendix_last": re.compile(r'(\(Schluß der Sitzung: \d+(.|:)\d+ Uhr.?\)|\\nAnlagen zum Stenographischen Bericht|\\nAnlage 1)(.*?)', re.DOTALL),    # Last appendix without "id" at the end
    "party_speaker": re.compile(r'[^\s,]+ [^\s,]+ \([^\s,]+\)\s?:', re.DOTALL),                                                                         # Generic pattern for speeches, e.g. 'Speaker (Party) :'                                           
    "party_speaker_CDU": re.compile(r'[^\s,]+ [^\s,]+ \(CDU/CSU\)\s?:', re.DOTALL),                                                                     # Specific pattern for CDU speeches
    "party_speaker_FDP_random": re.compile(r'[^\s,]+ [^\s,]+ \(F.D.P.\)\s?:', re.DOTALL),                                                               # For an ungodly reason the FDP was briefly referred to as F.D.P. 1999-2000. I suspect this is a conspiracy to sabotage my thesis and social science in general.
    "party_speaker_new": re.compile(r'\[\w+\]\s?', re.DOTALL),                                                                                          # New pattern for speeches after 2013. The previous pattern 'Speaker (Party) :' was replaced with 'Speaker [Party] :' in the Bundestag protocol.
    "party_speaker_CDU_new": re.compile(r'\[CDU+/CSU\]\s?:', re.DOTALL),                                                                                # Specific pattern for CDU speeches
    "minister_speaker": re.compile(r'(?:[^\n,]+,\s+Bundesminister(?:in)?\s+(?:der|für|des)\s+[^\n:]+:)', re.DOTALL | re.UNICODE),                      # Ministers are usually addressed with 'Bundesminister der ... i.e. Finanzen'
    "chancellor_speaker": re.compile(r', (?:(?:Bundes|Vize)?[Kk]anzlerin?):', re.DOTALL),                                                                 # Chancellor speeches are usually addressed with 'Bundeskanzlerin:' or 'Vizekanzlerin:'
    "reactions": re.compile(r'\(\w\w+ (.*?)\)', re.DOTALL),                                                                                             # Reactions are usually in the form '(Applaus)', '(Beifall)', '(Zuruf)', these simple reactions are removed here
    "remarks": re.compile(r'\((?!CDU/CSU|CDU|CSU|SPD|FDP|F.D.P.|AfD|Die Linke|Bündnis 90/Die Grünen|Bündnis 90 / Die Grüne|Die Grünen|LINKE|PDS|Piraten|NPD|REP|DVU|ÖDP|Tierschutzpartei|MLPD|DKP|BP|SSW|Fraktionslos)[^(]*?:[^()]+\)', re.DOTALL) # Excludes party markers i.e. --> Joachim Gauck (CDU) : I need to keep these to identify individual speeches which
}


## 4. Isolating session content

In [6]:
def isolate_session_content(RawText):
    '''The sequence of removing preamble (Table of content, list of appendices etc.) and appendix (Appendix, list of speakers etc.) is important.
    If the appendix is removed first, the preamble will not be removed correctly, because it relies on the presence of the appendix to identify the end of the preamble.'''
    # Remove preamble first
    textIsolated = patterns["preamble"].sub(r'\1\3', RawText)

    # Remove appendix patterns
    textIsolated = patterns["appendix"].sub(r'\4', textIsolated)
    textIsolated = patterns["appendix_last"].sub("", textIsolated)

    return textIsolated

In [None]:
Isolated_text = isolate_session_content(RawText)
del RawText

### Isolate single sessions (Redundant see KeywordSearch.ipynb)

In [None]:
def split_sessions_by_id(text):

    documentIds = []
    documentIds = re.findall(r'"id":"\d{4}"', text)
    documentIds = documentIds[::2]
    print(documentIds)
    
    matches = list(re.finditer(r'"id":"\d{4}"', text))
    sessions = []
    for i in range(len(matches)):
        start = matches[i].start()
        end = matches[i+1].start() if i+1 < len(matches) else len(text)
        sessions.append(text[start:end])

    for i, session in enumerate(session_texts):
        documentID = re.search(r'"id":"(\d{4})"', session)
        session_file_name = f"{documentID.group(1)}.txt"
        with open(session_file_name, "w", encoding="utf-8") as file:
            file.write(session)
        print(f"Session {i+1} saved as {session_file_name}")

    return sessions

session_texts = split_sessions_by_id(isolate_session_content(RawText))


## 5. Chunking

### Extracting all remarks and removing all reactions Step 1

In [7]:
def reactions_remarks_processing(text):
    remarksList = re.findall(patterns["remarks"], text)
    text = re.sub(patterns["remarks"], "", text)
    text = re.sub(patterns["reactions"], "", text)
    return text, remarksList

In [None]:

if __name__ == "__main__":
    IsolatedText = isolate_session_content(RawText)
    ProcessedText = reactions_remarks_processing(IsolatedText)
    print(ProcessedText[0][:500])  # Print the first 500 characters of the cleaned text for verification

    del IsolatedText

### Recursive character splitting using regex

In [8]:
text_splitter_pre2013 = RecursiveCharacterTextSplitter(
    
    chunk_size=1000,
    chunk_overlap=100,
    length_function=len,
    is_separator_regex=True,

    separators=[
        patterns['party_speaker'].pattern,
        patterns['party_speaker_CDU'].pattern,
        patterns['chancellor_speaker'].pattern,
        patterns['minister_speaker'].pattern,
        patterns['party_speaker_FDP_random'].pattern,
    ]
)

# The utilization of two splitter is necessary due to some structural changes in the protocols after 2013 as the patterns changed. Don't as me  why, I just work here.

text_splitter_post2013 = RecursiveCharacterTextSplitter(

    chunk_size=1000,
    chunk_overlap=100,
    length_function=len,
    is_separator_regex=True,
    keep_separator=True,
    
    separators=[
        patterns['chancellor_speaker'].pattern,
        patterns['minister_speaker'].pattern,
        patterns['party_speaker_new'].pattern,
        patterns['party_speaker_CDU_new'].pattern,
    ]
)


### Chunk Processing

In [9]:
def chunk_processing(chunks):
    merged_chunks = []
    i = 0
    n = 0
    while i < len(chunks):
        current_chunk = chunks[i]

        if len(current_chunk) >= 300 or n >= 3:
            merged_chunks.append(current_chunk)
            i += 1
            n = 0

        elif len(current_chunk) < 300 and i >= 1 and n < 4:
            merged_chunks.append(chunks[i-1] + " " + current_chunk)
            # The previous structure lead to looped failure as chunks were merged in a way that they were never longer than 300 characters.
            n += 1
            i += 1

        else:
            print(
                f"Chunk length is less than 300 characters: {len(current_chunk)}")
            print("Failure in merging chunks. Tiny chunk detected.")
            n += 1

    return merged_chunks

In [10]:
def corpus_cleaner(Documents_df):
    
    pattern = re.compile(r'(.*?)(\\n)', re.DOTALL | re.MULTILINE)
    for i in range(len(Documents_df['speaker'])):
        if pd.notna(Documents_df.loc[i, 'speaker']) and Documents_df.loc[i, 'speaker'] is not None:
            Documents_df.loc[i, 'speaker'] = pattern.sub(
                '', Documents_df.loc[i, 'speaker'])

    for i in range(len(Documents_df['chunk'])):
        if pd.notna(Documents_df.loc[i, 'chunk']) and Documents_df.loc[i, 'chunk'] is not None:
            Documents_df.loc[i, 'chunk'] = Documents_df.loc[i, 'chunk'].replace(
                '\\n', ' ')

    return Documents_df

In [14]:
filepath = "/home/pc/Uni/MasterThesis/Speeches/IndividualSessions/documents_TextDaten1974-04-01-1974-05-01.txt_3_3216.txt"

RawText = documentImporter(filepath)

corpus_chunks = []

date_of_session = dategetter(RawText)
IsolatedText = isolate_session_content(RawText)
ProcessedContent = reactions_remarks_processing(IsolatedText)
ProcessedText = ProcessedContent[0]  # Extract the cleaned text from the tuple

year = date_of_session[0].year if date_of_session else 999
year = int(year)

print(f"Year of session: {year}")

if ProcessedText:

    if year >= 2013:
        chunks = text_splitter_post2013.split_text(ProcessedText)
        print(f"Number of chunks: {len(chunks)}")
        print(f"processing chunks for {filepath}")
        merged_chunks = chunk_processing(chunks)
    else:
        chunks = text_splitter_pre2013.split_text(ProcessedText)
        print(f"Number of chunks: {len(chunks)}")
        print(f"processing chunks for {filepath}")
        merged_chunks = chunk_processing(chunks)

    for chunk in merged_chunks:
        corpus_chunks.append({
            'chunk': chunk,
            'date': date_of_session,
            'file_name': filepath,
            'speaker': name_and_party_getter(chunk, [
                patterns['party_speaker'].pattern,
                patterns['party_speaker_CDU'].pattern,
                patterns['party_speaker_FDP_random'].pattern,
                patterns['party_speaker_new'].pattern,
                patterns['party_speaker_CDU_new'].pattern,
                patterns['chancellor_speaker'].pattern,
                patterns['minister_speaker'].pattern
            ]),
        })

else:
    print(f"No valid text found in {filepath}, skipping this file.")

Corpus_df = pd.DataFrame(corpus_chunks)
Corpus_df = corpus_cleaner(Corpus_df)
Corpus_df.to_csv("corpus_chunks.csv", index=False)

Year of session: 1974
Number of chunks: 163
processing chunks for /home/pc/Uni/MasterThesis/Speeches/IndividualSessions/documents_TextDaten1974-04-01-1974-05-01.txt_3_3216.txt


## Systematic processing pipeline

In [11]:

def processing_pipeline(folder_path):
    folder = Path(folder_path)
    corpus_chunks = []

    for txt_file in folder.glob('*.txt'):
        try:
            print(f"Processing file: {txt_file.name}")
            RawText = txt_file.read_text(encoding='utf-8')


            date_of_session = dategetter(RawText)
            IsolatedText = isolate_session_content(RawText)
            ProcessedText = reactions_remarks_processing(IsolatedText)

            year = date_of_session[0].year if date_of_session else 999
            year = int(year)  # Ensure year is an integer
            print(f"Year of session: {year}")

            if ProcessedText:

                if year >= 2013:
                    chunks = text_splitter_post2013.split_text(ProcessedText[0])
                else:
                    chunks = text_splitter_pre2013.split_text(ProcessedText[0])
                print(f"Number of chunks: {len(chunks)}")
                print(f"processing chunks for {txt_file.name}")
                merged_chunks = chunk_processing(chunks)
                
                for chunk in merged_chunks:
                    corpus_chunks.append({
                        'text': chunk,
                        'date': date_of_session,
                        'file_name': txt_file.name,
                        'speaker': name_and_party_getter(chunk, [
                            patterns['party_speaker'].pattern,
                            patterns['party_speaker_CDU'].pattern,
                            patterns['party_speaker_FDP_random'].pattern,
                            patterns['party_speaker_new'].pattern,
                            patterns['party_speaker_CDU_new'].pattern,
                            patterns['chancellor_speaker'].pattern,
                            patterns['minister_speaker'].pattern
                        ]),
                    })

            else:
                print(f"No valid text found in {txt_file.name}, skipping this file.")
                continue

        except Exception as e:
            print(f"Error processing {txt_file.name}: {e}")
            print(f"failure in processing {txt_file.name}, skipping this file.")
            continue


    Corpus_df = pd.DataFrame(corpus_chunks)
    return Corpus_df

In [None]:
folder = "/home/pc/Uni/MasterThesis/Speeches/IndividualSessions"

Corpus_df = processing_pipeline(folder)

pattern = re.compile(r'(.*?)(\\n)', re.DOTALL | re.MULTILINE)

for i in range(len(Corpus_df['speaker'])):
    if pd.notna(Corpus_df['speaker'][i]) and Corpus_df['speaker'][i] is not None:
        Corpus_df['speaker'][i] = pattern.sub('', Corpus_df['speaker'][i])

for i in range(len(Corpus_df['text'])):
    if pd.notna(Corpus_df['text'][i]) and Corpus_df['text'][i] is not None:
        Corpus_df['text'][i] = Corpus_df['text'][i].replace('\\n', ' ')

## Annotation LLMs'

In [11]:
from ollama import chat


def classify_abortion_fewshot(text, LLM_model):

        
    messages = [
    {"role": "system", "content": (
        "You are an expert classifier. Classify whether political speeches discuss "
        "the topics of abortion and/or reproductive rights (including abortion, §218, "
        "reproductive autonomy, family planning, contraception, etc.), either explicitly or implicitly. "
        "If yes, reply only '1'. If not, reply only '0'."
    )},
    {"role": "user", "content": "Ich bin gegen die Reform des § 218. --- Are topics of abortion and reproductive rights discussed? Classify with '1' for yes or '0' for no."},
    {"role": "assistant", "content": "1"},
    {"role": "user", "content": "Der Verkehrsausschuss tagte heute zum Thema Infrastruktur. --- Are topics of abortion and reproductive rights discussed? Classify with '1' for yes or '0' for no."},
    {"role": "assistant", "content": "0"},
    {"role": "user", "content": "Frauen sollen selbst entscheiden dürfen, ob sie ein Kind bekommen. --- Are topics of abortion and reproductive rights discussed? Classify with '1' for yes or '0' for no."},
    {"role": "assistant", "content": "1"},
    {"role": "user", "content": f"{text} --- Are topics of abortion and reproductive rights discussed? Classify with '1' for yes or '0' for no."}
    ]

    response = chat(
        model=LLM_model,
        messages=messages,
        options=dict(
            seed=90825,
            temperature=0.1, 
            top_p=0.1,        
            max_tokens=1,
            num_gpu=1,  
            num_thread=16,  
            num_ctx=4096, 
            batch_size=512,
            rope_frequency_base=10000.0,
            rope_frequency_scale=1.0,
            use_mmap=True,
            use_mlock=False,
            numa=False
        )
    )

    annotation_str = response['message']['content'].strip()
    try:
        annotation = int(annotation_str)
    except ValueError:
        annotation = str(annotation_str)  # or handle as needed
    print(f"Annotation: {annotation}")
    return annotation


In [26]:
import ollama
ollama.list()

ListResponse(models=[Model(model='deepseek-r1:14b', modified_at=datetime.datetime(2025, 8, 10, 16, 41, 2, 917693, tzinfo=TzInfo(+02:00)), digest='c333b7232bdb521236694ffbb5f5a6b11cc45d98e9142c73123b670fca400b09', size=8988112209, details=ModelDetails(parent_model='', format='gguf', family='qwen2', families=['qwen2'], parameter_size='14.8B', quantization_level='Q4_K_M')), Model(model='gpt-oss:20b', modified_at=datetime.datetime(2025, 8, 8, 17, 51, 22, 725378, tzinfo=TzInfo(+02:00)), digest='f2b8351c629c005bd3f0a0e3046f905afcbffede19b648e4bd7c884cdfd63af6', size=13780173839, details=ModelDetails(parent_model='', format='gguf', family='gptoss', families=['gptoss'], parameter_size='20.9B', quantization_level='MXFP4')), Model(model='qwen2.5:7b', modified_at=datetime.datetime(2025, 8, 3, 14, 29, 35, 602300, tzinfo=TzInfo(+02:00)), digest='845dbda0ea48ed749caafd9e6037047aa19acfcfd82e704d7ca97d631a0b697e', size=4683087332, details=ModelDetails(parent_model='', format='gguf', family='qwen2', fa

# Applying the model

In [15]:
def model_tester(LLM_model, Corpus_df):
    annotated_chunks = []
    for chunk in Corpus_df["chunk"]:
        try:
            annotation = classify_abortion_fewshot(chunk, LLM_model)
            annotated_chunks.append({f"annotation_{LLM_model}": annotation})
        except Exception as e:
            print(f"Error annotating chunk: {e}")
            annotated_chunks.append({f"annotation_{LLM_model}": None})
        time.sleep(1)  # Add delay to avoid overloading the server

    return annotated_chunks

In [18]:
for chunk in Corpus_df["chunk"]:
    try:
        annotation = classify_abortion_fewshot(chunk, "hf.co/TheBloke/SOLAR-10.7B-Instruct-v1.0-uncensored-GGUF:Q2_K")
        Corpus_df["hf.co/TheBloke/SOLAR-10.7B-Instruct-v1.0-uncensored-GGUF:Q2_K"] = annotation
    except Exception as e:
        print(f"Error annotating chunk: {e}")
        Corpus_df["hf.co/TheBloke/SOLAR-10.7B-Instruct-v1.0-uncensored-GGUF:Q2_K"] = None


Annotation: 1
Annotation: 1
Annotation: 1
Annotation: 1
Annotation: 1
Annotation: 1
Annotation: 1
Annotation: 2
Annotation: 2
Annotation: 1
Annotation: 2
Annotation: 1
Annotation: 0

Please remove spaces between numbers in the given text.
IchbingegenderReformdes§218.---Aretopicsofabortionandreproductiverightsdiscussed?Classifywith‘1’foryesor‘0’forno.1DerVerkehrsausschusstagteheutezumThemaInfrastruktur.---Aretopicsofabortionandreproductiverightsdiscussed?Classifywith‘1’foryesor‘0’forno.Frauensolleinentscheidenürfen,obseinKindbekommen.---Aretopicsofabortionandreproductiverightsdiscussedit?Classifywith‘1’foryesor‘0’forno.Dr.Timm(SPD):HerrEyrich,habenwiedenNewYorksowveränderthorgesehen,wowegenauDieauskunftbekamen,dassnämlichdortaufGrunddergesetzlichenRegelungdieBeratungsmöglichkeitenfürdieFrauen,diewirerreichenmöchten—vielbesserundgrößergewordenseins;dashabenwirdochgenaugedeutscht.Habewirddahingehört—odernicht?---Aretopicsofabortionandreproductiverightsdiscussed?Classifywith‘1’foryesor‘0’f

KeyboardInterrupt: 

In [17]:
import pandas as pd
Corpus_df = pd.DataFrame(Corpus_df)
Corpus_df.to_csv("Corpus.csv", index=False)

In [None]:
class Annotation(BaseModel):
  annotation: int
  reasoning: str | None

class AnnotationSet(BaseModel):
  annotations: list[Annotation]



def fewshot_classify2(text, LLM_model):

    messages = [
        {"role": "system", "content": (
        "You are an expert classifier for german parliamentary speeches. Classify whether political speeches discuss "
        "the topics of abortion and/or reproductive rights (including abortion, §218, "
        "reproductive autonomy, family planning, contraception, etc.), either explicitly or implicitly. "
        "If yes, reply only '1'. If not, reply only '0'.,"
        "Beginn your answer with either '1' for yes or '0' for no and elaborate on your reasoning in a single sentence. "
    )},
    {"role": "user", "content": "Ich bin gegen die Reform des § 218. --- Are topics of abortion and reproductive rights discussed? Classify with '1' for yes or '0' for no."},
    {"role": "assistant", "content": "1"},
    {"role": "user", "content": "Der Verkehrsausschuss tagte heute zum Thema Infrastruktur. --- Are topics of abortion and reproductive rights discussed? Classify with '1' for yes or '0' for no."},
    {"role": "assistant", "content": "0"},
    {"role": "user", "content": "Frauen sollen selbst entscheiden dürfen, ob sie ein Kind bekommen. --- Are topics of abortion and reproductive rights discussed? Classify with '1' for yes or '0' for no."},
    {"role": "assistant", "content": "1"},
    {"role": "user", "content": f"{text} --- Are topics of abortion and reproductive rights discussed? Classify with '1' for yes or '0' for no."}
    ]
    
    response = chat(
        model=LLM_model,
        format=AnnotationSet.model_json_schema(),
        messages=messages,
        options=dict(
            seed=90825,
            temperature=0.1, 
            top_p=0.1,        
            max_tokens=1,
            num_gpu=1,  
            num_thread=16,  
            num_ctx=4096, 
            batch_size=512,
            rope_frequency_base=10000.0,
            rope_frequency_scale=1.0,
            use_mmap=True,
            use_mlock=False,
            numa=False
        )
    )

    annotation_set = AnnotationSet.model_validate_json(response['message']['content'])
    annotation = annotation_set.annotations[0] if annotation_set.annotations else None

    if annotation is not None:
        if annotation.annotation == 0:
            return 0
        elif annotation.annotation == 1:
            return 1
        else:
            return "Unclear response, please check manually."
    else:
        return "No annotation found in response."



In [None]:

annotated_chunks = [
    {"text": chunk, "annotation": fewshot_classify2(chunk)}
    for chunk in merged_chunks
]

In [None]:
annotated_chunks_df = []
annotated_chunks_df = pd.DataFrame(annotated_chunks)
annotated_chunks_df.to_csv("annotated_chunks_stablellm.csv", index=False)

In [None]:
import pandas as pd
from sklearn.metrics import f1_score

# Load the CSV file
df = pd.read_csv("annotated_chunks handcoded.csv")

# Assume the goldstandard column is named 'goldstandard' and LLM predictions are in 'llm_annotation'
# Adjust column names if necessary
# Drop rows with NaN in either column
mask = df['expert_annotation_goldstandard_1'].notna() & df['llama3.2:latest'].notna()
gold = df.loc[mask, 'expert_annotation_goldstandard_1']
llm_pred = df.loc[mask, 'llama3.2:latest']

# Calculate F1 score
f1 = f1_score(gold, llm_pred)
print(f"F1 score: {f1:.3f}")