In [2]:
import re
import spacy
import nltk
from nltk.corpus import stopwords

# Load German stopwords and the German spaCy model
nltk.download('stopwords')
german_stopwords = set(stopwords.words('german'))
nlp = spacy.load("de_core_news_sm")


[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/rodolfocacacho/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [3]:
def preprocess_text(text):
    """Remove line breaks and standalone special characters from text."""
    # Replace line breaks with a space
    text = text.replace('\n', ' ')
    
    # Remove standalone special characters surrounded by spaces
    text = re.sub(r'\s[-/]\s', ' ', text)  # Example for " - " and " / "
    
    # Remove extra spaces that may have been created
    text = re.sub(r'\s+', ' ', text).strip()
    
    return text

def extract_urls_and_entities(text):
    # Extract URLs
    url_pattern = r'https?://\S+|www\.\S+'
    urls = re.findall(url_pattern, text)
    for url in urls:
        text = text.replace(url, '')  # Remove URLs from text

    # Extract standard entities with spaCy
    doc = nlp(text)
    entities = [ent.text for ent in doc.ents]
    for entity in entities:
        text = text.replace(entity, '')  # Remove entities from text

    return text, urls, entities

def isolate_number_unit_terms(text):
    """Find and remove terms with a number followed by a unit (e.g., '525 kWh/m2')."""
    number_unit_pattern = r'\b\d+(?:[\.,]\d+)?\s*(?:kwh/m2|°c|m2|g|kg|l|ml|w|kw|v|a)\b'
    number_unit_terms = re.findall(number_unit_pattern, text, re.IGNORECASE)
    for term in number_unit_terms:
        text = text.replace(term, '')  # Remove terms with numbers and units

    return text, number_unit_terms

def clean_text(text):
    # Remove special characters and extra whitespace
    text = re.sub(r'[^a-zA-Z0-9äöüÄÖÜß\s\-]', ' ', text)
    text = re.sub(r'\s+', ' ', text)
    return text.strip()

def find_chapters_and_sections(text):
    """
    Identifies chapter and section numbers in a given text, such as '3.03', '6.1.4.3', etc.
    
    Parameters:
    - text: str, input text to search for sections.
    
    Returns:
    - List of chapter/section numbers found in the text.
    """
    # Regular expression pattern to capture chapter/section numbers
    section_pattern = r'\b\d+(?:\.\d+)+\b'
    
    # Find all matches of the pattern in the text
    sections = re.findall(section_pattern, text)
    for term in sections:
        text = text.replace(term,'')
    
    return text,sections

def find_norms(text):
    """
    Identifies norms like DIN, ISO, EN, etc., with their associated names, numbers, and variants.
    
    Parameters:
    - text: str, input text to search for norms.
    
    Returns:
    - List of norms found in the text.
    """
    # Regular expression pattern to capture norms
    norm_pattern = r'\b(?:DIN|ISO|EN)(?:\s+[A-Z]{1,3})?\s+\d{3,5}(?:[-‑]\d+)?\b'
    
    # Find all matches of the pattern in the text
    norms = re.findall(norm_pattern, text, re.IGNORECASE)
    for term in norms:
        text = text.replace(term,'')
    
    return text,norms

def find_short_terms(text):
    """
    Identifies short terms like 'WG' or 'NWG' that are 2-3 characters long and uppercase.
    
    Parameters:
    - text: str, input text to search for short terms.
    
    Returns:
    - Tuple: modified text (with short terms removed), list of short terms found.
    """
    short_term_pattern = r'\b[A-Z]{2,3}\b'
    short_terms = re.findall(short_term_pattern, text)
    
    for term in short_terms:
        text = text.replace(term, '')
        
    return text, short_terms

def clean_chunk(text):
    # Step 1: Preprocess text to remove line breaks and standalone special characters
    text = preprocess_text(text)

    # Step 2: Extract URLs, entities, and number-unit terms, removing them from text
    text,sections = find_chapters_and_sections(text)
    text,norms = find_norms(text)
    text,short_terms = find_short_terms(text)
    text, number_unit_terms = isolate_number_unit_terms(text)
    text, urls, entities = extract_urls_and_entities(text)
    # Step 3: Clean text to remove unwanted characters and extra spaces
    text = clean_text(text)

    # Step 4: Tokenize, remove stopwords, and perform lemmatization using spaCy
    doc = nlp(text)
    
    # Step 5: Filter out stopwords and short words (words with length <= 2)
    cleaned_tokens = [
        token.lemma_.lower() for token in doc 
        if token.text.lower() not in german_stopwords and len(token.text) > 2
    ]

    # Step 6: Extend the tokens with extracted URLs, entities, and number-unit terms
    cleaned_tokens.extend(short_terms)
    cleaned_tokens.extend(sections)
    cleaned_tokens.extend(norms)
    cleaned_tokens.extend(urls)
    cleaned_tokens.extend(entities)
    cleaned_tokens.extend(number_unit_terms)
    cleaned_tokens = [token.lower() for token in cleaned_tokens]

    # Step 7: Return the final list of tokens
    return cleaned_tokens

In [5]:
text = """Erfolgt eine Maßnahme im Rahmen der Umsetzung eines im Förderprogramm „Bundesförderung für Energieberatung für Wohngebäude“ geförderten iSFP, und wurde dies im Rahmen des Antrags nach Nummer 9.2 vom Antragsteller gekennzeichnet, prüft der Energieeffizienz-Experte im Rahmen der Prüfung des Antrags auch, ob die beantragte Maßnahme dem iSFP entspricht und sie daher als iSFP-Maßnahme gewertet werden kann; unwesentliche inhaltliche Abweichungen, eine Übererfüllung der iSFP-Vorgaben oder Änderungen der zeitlichen Reihenfolge sind dabei unschädlich.
Abweichungen von der im Zuwendungsbescheid bzw. in der Zusage bewilligten Maßnahme sind dem BAFA bzw. der KfW unverzüglich anzuzeigen. Liegt eine wesentliche inhaltliche Abweichung im Sinne einer Untererfüllung der iSFP-Vorgaben vor, kann die Maßnahme nicht als iSFP-Maßnahme gewertet werden.
9.4.1 Zuschussförderung
Eine Zuschussförderung wird nur befristet zugesagt. Die Dauer der Befristung beträgt 24 Monate ab Zugang der Zusage des Zuwendungsbescheids (Bewilligungszeitraum). Die Befristung kann auf begründeten Antrag um maximal 24 Monate verlängert werden, wenn die Umsetzung der Maßnahme innerhalb der ursprünglichen Frist vom Antragsteller aus Gründen nicht umgesetzt werden konnte, die der Antragsteller nicht zu vertreten hat.
Die maximale Bewilligungsfrist für Einzelmaßnahmen beträgt damit 48 Monate.
9.4.2 Kreditförderung"""


In [6]:
clean_chunk(text)

['erfolgen',
 'maßnahme',
 'rahmen',
 'umsetzung',
 'förderprogramm',
 'bundesförderung',
 'energieberatung',
 'wohngebäude',
 'geförderen',
 'werden',
 'rahmen',
 'nummer',
 'teller',
 'kennzeichnen',
 'prüfen',
 'energieeffizienz-experte',
 'rahmen',
 'prüfung',
 'beantragt',
 'maßnahme',
 'entsprechen',
 'daher',
 '-maßnahme',
 'werten',
 'unwesentlich',
 'inhaltlich',
 'abweichung',
 'übererfüllung',
 'vorgab',
 'änderung',
 'zeitlich',
 'reihenfolge',
 'dabei',
 'unschädlich',
 'abweichung',
 'zuwendungsbescheid',
 'bzw',
 'zusage',
 'bewilligt',
 'maßnahme',
 'bzw',
 'unverzüglich',
 'anzuzeig',
 'leegten',
 'wesentlich',
 'inhaltlich',
 'abweichung',
 'sinn',
 'untererfüllung',
 'vorgaben',
 'maßnahme',
 '-maßnahme',
 'werten',
 'zuschussförderung',
 'zuschussförderung',
 'befristet',
 'zusagen',
 'dauer',
 'befristung',
 'betragen',
 'monat',
 'zugang',
 'zusage',
 'zuwendungsbescheid',
 'bewilligungszeitraum',
 'befristung',
 'begründet',
 'antrag',
 'maximal',
 'monat',
 'ver

# Clean text pre embedding with LLM/ChatGPT

In [1]:
import pandas as pd
import os
import unicodedata
from chunking_embeding_docs import load_documents_pages,config as db_config,db_name,merge_pages
from database_manager import MySQLDB
from dotenv import load_dotenv
from pydantic import BaseModel
import json
import openai
import base64
from textwrap import dedent


original_dir = os.getcwd()
dir_main = '/Users/rodolfocacacho/Documents/Documents/MAI/Master Thesis/Code/rag_clean_v2'
os.chdir(dir_main)
load_dotenv()

API_KEY_CGPT = os.getenv('API_KEY_CGPT')
openai.api_key = API_KEY_CGPT


metadata_csv = 'data/documents/metadata/Files_date_version.csv'
df_codes = pd.read_csv(metadata_csv)
df_codes['type_key'] = df_codes['type_key'].astype('int16')
df_codes['file_key'] = df_codes['file_key'].astype('int16')
# Convert document_date to datetime if it's not already
df_codes['date_c'] = pd.to_datetime(df_codes['date'], format="%d/%m/%Y", errors='coerce', dayfirst=True)

# Sort by date to ensure the most recent appears last within each type_key
df_codes = df_codes.sort_values(['type_key', 'date_c'], ascending=[True, False])

# Identify the most recent document within each type_key
df_codes['most_recent'] = df_codes.groupby('type_key')['date_c'].transform('max') == df_codes['date_c']

path_store ='/Users/rodolfocacacho/Documents/Documents/MAI/Master Thesis/Code/rag_clean_v2/data/storage/embeddings'

sql_embed_table = 'embedding_table_pinecone_sparse_new'
new_docs = []

sql_con = MySQLDB(config=db_config,database_name=db_name)
docs = load_documents_pages(sql_con=sql_con)        

complete_docs = []
for j in docs:
    pages_list = []
    for i in j:
        i['content'] = unicodedata.normalize('NFC',i['content'])
        page = i['content']
        pages_list.append(page)

    merged_doc = merge_pages(pages_list)
    complete_docs.append(merged_doc)

  from tqdm.autonotebook import tqdm, trange
[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/rodolfocacacho/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [45]:

class Placeholder(BaseModel):
    placeholder_type: str
    text: str

class Clean_text(BaseModel):
    clean_text: str
    placeholders: list[Placeholder]

def encode_image(image_path):
  with open(image_path, "rb") as image_file:
    return base64.b64encode(image_file.read()).decode('utf-8')

def call_gpt_api_with_single_prompt(instructions, prompt, model="gpt-4o-2024-08-06", max_tokens=2500, response_format=None, img_path=None,detail='high'):
    """
    Sends a single message to GPT API with optional image input and retrieves the response.
    
    Parameters:
    - instructions: System instructions to set the context (e.g., "You are an AI assistant that analyzes tables").
    - prompt: User's message or query (e.g., "Please analyze the table in the image and provide a summary").
    - model: The GPT model to be used (default is "gpt-4o-2024-08-06").
    - max_tokens: Maximum number of tokens for the response (default is 2500).
    - response_format: Format of the response (e.g., "Rag_reponse"). Defaults to standard completion if not provided.
    - img_path: Optional path to an image file. If provided, the image will be included in the request.
    
    Returns:
    - The GPT answer object.
    """

    content = []
    dict_images = []
    # Create the messages list to send to GPT
    messages = [
        {"role": "system", "content": instructions}
    ]

    # If an image path is provided, encode and append it as a separate message
    if img_path:
        base64_image = encode_image(img_path)
        prompt_text = {'type':'text','text':dedent(prompt)}
        dic_images = {'type':'image_url','image_url':{'url': f"data:image/png;base64,{base64_image}",'detail':detail}}
        dict_images.append(dic_images)
        content.append(prompt_text)
        content.extend(dict_images)
        chat = {"role": "user", "content":content}

    else:
        chat = {"role": "user", "content":dedent(prompt)}
    
    messages.append(chat)
    
    try:
        if response_format == None:
            # Call GPT API using OpenAI's beta chat completions with parse
            response = openai.beta.chat.completions.parse(
                model=model,
                messages=messages,
                max_tokens=max_tokens)
        else:
            # Call GPT API using OpenAI's beta chat completions with parse
            response = openai.beta.chat.completions.parse(
            model=model,
            messages=messages,
            max_tokens=max_tokens,
            response_format=response_format)

        # Extract and return the response content
        answer = response.choices[0].message.content
        return answer

    except Exception as e:
        print(f"Error during GPT API call: {e}")
        return None

instructions_cleaning_chunk = """This is a system designed to clean and structure German-language texts related to regulatory guidelines, green financial aid for buildings, and similar topics. The objective is to improve semantic chunking and embedding accuracy by removing noise and irrelevant text while maintaining placeholders for essential information. This cleaning protocol ensures that only meaningful content is embedded, while retaining critical placeholders for terms, dates, and references commonly found in regulatory documents.

Instructions

	1.	General Cleaning and Noise Removal:
	•	Remove redundant punctuation (e.g., multiple periods ..., excessive spaces) and any symbols that don’t add meaning.
	•	Preserve hyphens within compound words relevant to the document’s context, such as Energie-Effizienz.
	•	Discard formal or repetitive phrases that don’t contribute to semantic meaning, like introductory notes (e.g., “Wichtiger Hinweis”, “Bitte beachten Sie”).
    •   Don't change the text, don't complete it and don't rephrase it, just clean what could add noise to the embeddings by removing it. You are given chunks that may not be complete.
    •   While cleaning, retain line breaks where they logically divide sections or distinct topics within the text. If line breaks appear to separate unrelated content, such as a new topic or regulation clause, keep them intact. Only remove line breaks that do not contribute to the clarity or logical flow, such as multiple consecutive breaks or breaks within sentences.
    •   For the Table of contents/Inhalt or similar, keep the line breaks. 
	2.	Placeholder Masking (in German):
	•	For specific content types, replace with German-labeled placeholders. Follow this format:
	•	[Datum] – For dates (e.g., 01.01.2021, 20.06.2023, not Datum/Daten).
	•	[Wert] – For numeric values and measurements (e.g., 15 kWh, 50%, €1000).
	•	[Version] – For version details (e.g., Version 2.0, v3.1, ignore when not accompanied by a number).
	•	[Gesetz] – For legal references, such as § 35a, Artikel 27.
	•	[Email] – For email addresses (e.g., kontakt@unternehmen.de).
	•	[URL] – For URLs (e.g., http://www.beispiel.de).
	•	[Abschnitt] – For section or chapter headings (e.g., Abschnitt 2.3, Kapitel 4.5).
	•	[Telefon] – For phone numbers (e.g., 0800 123456).
    •   [Jahr] - For years (e.g, 2020, 2 019, etc.)
	•	Additional Flexibility: The model should recognize other relevant parts and add placeholders for these as needed to enhance retrieval accuracy. Special cases could be equations, insitutions, etc.
	3.	Output Requirements:
	•	Placeholder List: Provide a List showing each placeholder and its corresponding original text, for instance, [Datum]: ["01.01.2021"].
	•	Return Clean Text: Supply the final text with placeholders in place, optimized for semantic embedding.
    •   Make sure to match the number of placeholders in the text as in the list!!!"""

from collections import Counter

def verify_chunk(text,placeholders):
    placeholder_counts = Counter(item['placeholder_type'] for item in placeholders)
    for placeholder_type, count in placeholder_counts.items():
        count_in_text = text.count(f"[{placeholder_type}]")
        
        # Check for mismatch between counts
        if count != count_in_text:
            return False
    
    # If no mismatches are found, return True
    return True       


In [49]:
doc_name = 'BEG Infoblatt förderfähigen Kosten_9 (01.01.2024).pdf'
done_doc = False
chunks_processed = []
for i,doc in enumerate(docs):
    metadata = doc[0]['metadata']
    metadata = json.loads(metadata)
    pdf_name = metadata["source"]
    matched_row = df_codes[df_codes['file'] == pdf_name]
    if not matched_row.empty:
        # Extract type_key and file_key from the matched row (assuming there's only one match)
        recent = matched_row["most_recent"].iloc[0]
    else:
        # If no match found, set type_key and file_key to 0
        recent = False  
    print(f'{i}-{pdf_name} recent:{recent}')

    if recent and pdf_name == doc_name:
        for num_page,page in enumerate(doc):
            retries = 0
            check = False
            done_doc = True
            page_content = page['content']
            prompt_chunk = f'Process the following chunk:\n{page_content}'

            while not check:
                if retries > 0:
                    prompt_chunk_s = f'Make sure to match the number of placeholders in the text and the list returned. This is a retry. {prompt_chunk}'
                    print(f'Retrying {retries}')
                else:
                    prompt_chunk_s = prompt_chunk
                
                answer = call_gpt_api_with_single_prompt(instructions_cleaning_chunk,
                                                        prompt= prompt_chunk_s,
                                                        max_tokens=4000,
                                                        response_format=Clean_text)
                answer = json.loads(answer)
                check = verify_chunk(answer['clean_text'],answer['placeholders'])

            if check:
                print(f'Page {num_page} done')
                answer['correct'] = True
                answer['id'] = doc['id']
                chunks_processed.append(answer)
            else:
                retries+=1
                if retries > 2:
                    wrong_answer = {'clean_text':page_content,
                                    'placeholders': [],
                                    'correct': False,
                                    'id':doc['id']}
                    chunks_processed.append(wrong_answer)
                    check = True
            if num_page > 4:
                break
    if done_doc:
        break





0-Richtlinie BEG EM (2020-12-17).pdf recent:False
1-Richtlinie BEG EM (2023-12-21).pdf recent:True
2-Richtlinie BEG EM (2022-07-21)_Änderung.pdf recent:False
3-Richtlinie BEG EM (2021-05-20).pdf recent:False
4-Richtlinie BEG EM (2021-09-16).pdf recent:False
5-Richtlinie BEG EM (2022-12-09).pdf recent:False
6-Richtlinie BEG EM (2022-09-15)_Änderung.pdf recent:False
7-Allgemeines Merkblatt zur Antragstellung - Zuschuss_1.9 (2024-01-01).pdf recent:False
8-Allgemeines Merkblatt zur Antragstellung - Zuschuss_1.10 (2024-04-01).pdf recent:True
9-Allgemeines Merkblatt zur Antragstellung - Zuschuss_1.6 (2023-01-01).pdf recent:False
10-Allgemeines Merkblatt zur Antragstellung - Zuschuss_1.8 (2023-08-31).pdf recent:False
11-Allgemeines Merkblatt zur Antragstellung - Zuschuss_1.7 (2023-01-01).pdf recent:False
12-BEG Infoblatt förderfähigen Kosten_8 (20.06.2023).pdf recent:False
13-BEG Infoblatt förderfähigen Kosten_3 (01.02.2022).pdf recent:False
14-BEG Infoblatt förderfähigen Kosten_2 (21

In [50]:
for answer in chunks_processed:
    clean_text = answer['clean_text']
    print(f'{clean_text}\n')

    for ph in answer['placeholders']:
        print(ph)
    print('\n')
        

Bundesförderung für effiziente Gebäude: Infoblatt zu den förderfähigen Maßnahmen und Leistungen - Sanieren Dieses Infoblatt zu den förderfähigen Maßnahmen und Leistungen – Sanieren ist zur Ermittlung der förderfähigen Kosten bei der Antragstellung sowie im Rahmen des Verwendungsnachweises anzuwenden. In den Kredit- oder Zuschussvarianten der BEG bei der [Institution] sind diese Kosten von der Energieeffizienz-Expertin bzw. dem -Experten oder vom Fachunternehmen in der „Bestätigung zum Antrag“ für die Antragsstellung sowie in der „Bestätigung nach Durchführung“ im Rahmen des Verwendungsnachweises anzugeben. Der Zeitpunkt des Inkrafttretens sowie die [Version] einer Fassung sind jeweils in folgender Tabelle vermerkt:

{'placeholder_type': 'Institution', 'text': 'KfW'}
{'placeholder_type': 'Version', 'text': 'Versionsnummer'}


Die Tabelle zeigt Änderungen und Notizen zu verschiedenen Versionen von Richtlinien, die zu bestimmten Daten in Kraft treten. Hier sind die Details:

1. **[Version

In [16]:
def load_documents_pages_clean(sql_con,table,table_clean_documents):

    records = sql_con.get_all_records_as_dict(table)
    clean_records = sql_con.get_all_records_as_dict(table_clean_documents)
    processed_list = {entry['id']: entry for entry in clean_records}

    docs = []
    sub_docs = []
    act_doc = None
    for n,i in enumerate(records):
        id = i['id']
        pdf_name = i['pdf_name']
        if id in processed_list:
            vals = processed_list.get(id)
            i.update(vals)
        
        if act_doc != pdf_name and n > 0:
            act_doc = pdf_name
            docs.append(sub_docs)
            sub_docs = []
            sub_docs.append(i)
        elif n == len(records)-1:
            sub_docs.append(i)
            docs.append(sub_docs)
        else:
            act_doc = pdf_name
            sub_docs.append(i)

    return docs

In [17]:
import pandas as pd
import os
from chunking_embeding_docs import load_documents_pages,config as db_config,db_name,merge_pages,process_metadata_csv
from database_manager import MySQLDB



original_dir = os.getcwd()
dir_main = '/Users/rodolfocacacho/Documents/Documents/MAI/Master Thesis/Code/rag_clean_v2'
os.chdir(dir_main)

metadata_csv = 'data/documents/metadata/Files_date_version.csv'
df_codes = process_metadata_csv(metadata_csv)

sql_embed_table = 'embedding_table_pinecone_sparse_new'
new_docs = []

table_pages_clean = 'table_documents_clean'
table_documents_name = 'table_documents'

sql_con = MySQLDB(config=db_config,database_name=db_name)

docs = load_documents_pages_clean(sql_con=sql_con,
                                  table=table_documents_name,
                                  table_clean_documents=table_pages_clean)



In [1]:
import re

# Original text string
text = """4.5 Brennstoffaustragung, -förderung und -zufuhr (Biomasseanlagen)..................................................20
... (rest of the text)"""

# Function to merge entries with hyphens based on the prefix
def merge_entries(text):
    # Regular expression to find patterns with a prefix and hyphenated words
    pattern = re.compile(r'(\b\w+)([a-zA-Z]*)\s*-\w+')
    
    # Find all matches with a prefix followed by hyphenated terms
    matches = pattern.findall(text)
    
    # Replace hyphenated terms with full words
    for match in matches:
        prefix = match[0] + match[1]
        text = re.sub(r'(?<!\w)-(\w+)', prefix + r'\1', text)
    
    return text

# Apply the function
merged_text = merge_entries(text)
print(merged_text)

4.5 Brennstoffaustragung, undförderung und undzufuhr (Biomasseanlagen)..................................................20
... (rest of the text)
