### Imports

In [19]:
import re
from ollama import chat
import json
from typing import List, Dict
from PyPDF2 import PdfReader
import re
import sys
from typing import Dict, List, Pattern, Tuple, Optional
import spacy
from spacy.language import Language
from collections import defaultdict
import pandas as pd
import openpyxl
from openpyxl.styles import Font, Border, Side, Alignment, PatternFill


### Load The PDF

In [None]:
# Load PDF
# pdf_path = "DocComplaints.pdf"
pdf_path = "DocComplaints_Original.pdf"
name_file = pdf_path.split(".")[0]
doc = PdfReader(pdf_path)

# Extract text
full_text = ""
for page in doc.pages:
    text = page.extract_text()
    if text:
        full_text += "\n" + text

# Improved regex to split at exact section numbers (e.g., '1.', '2.', ..., '15.') only at line start
documents = re.split(r"(?m)(?=^\d{1,2}\.\s)", full_text)

# Clean up
documents = [doc.strip() for doc in documents if doc.strip()]

# Create JSON-style output
json_output = []
for idx, content in enumerate(documents):
    json_output.append({
        "source": pdf_path,
        "content": content
    })

document_json = f'{name_file}_split_documents.json'

# Save as JSON
with open(document_json, "w", encoding="utf-8") as f:
    json.dump(json_output, f, indent=4, ensure_ascii=False)

print(f"Saved {len(json_output)} documents into split_documents.json 🚀")

Saved 5 documents into split_documents.json 🚀


#### Functions

In [15]:
## ANONYMIZE

class GermanTextAnonymizer:
    """Anonymizer for German text that uses spaCy NER and regex patterns."""

    def __init__(self, nlp: Language):
        """
        Initialize the anonymizer with a spaCy language model.

        Args:
            nlp: Loaded spaCy language model
        """
        self.nlp = nlp
        self.entity_types: Dict[str, str] = {}
        self.regex_patterns: List[Tuple[str, Pattern, str]] = []

    def add_entity_type(self, entity_type: str, replacement: str) -> None:
        """
        Add an entity type to be anonymized.

        Args:
            entity_type: The spaCy entity label to anonymize
            replacement: Text to replace the entity with
        """
        self.entity_types[entity_type] = replacement

    def add_regex_pattern(self, name: str, pattern: str, replacement: str) -> None:
        """
        Add a regex pattern to be anonymized.

        Args:
            name: Name of the pattern for reference
            pattern: Regular expression pattern to match
            replacement: Text to replace matches with
        """
        self.regex_patterns.append(
            (name, re.compile(pattern, re.IGNORECASE), replacement)
        )

    def anonymize(self, text: str) -> str:
        """
        Anonymize the text using NER and regex patterns.

        Args:
            text: The text to anonymize

        Returns:
            Anonymized text with PII replaced
        """
        # Process text with spaCy
        doc = self.nlp(text)

        # Collect all replacements to be made
        replacements = []

        # Add entity replacements from spaCy NER
        for ent in doc.ents:
            if ent.label_ in self.entity_types:
                replacements.append(
                    (ent.start_char, ent.end_char, self.entity_types[ent.label_])
                )

        # Add regex pattern replacements
        for name, pattern, replacement in self.regex_patterns:
            for match in pattern.finditer(text):
                replacements.append((match.start(), match.end(), replacement))

        # Sort replacements in reverse order by start position
        # This way, replacing from end to beginning doesn't affect other positions
        replacements.sort(key=lambda x: x[0], reverse=True)

        # Apply replacements from end to beginning
        result = text
        for start, end, replacement in replacements:
            result = result[:start] + replacement + result[end:]

        return result


def load_spacy_model(model_name: str = "de_core_news_md") -> Optional[Language]:
    """
    Load the specified spaCy model.

    Args:
        model_name: Name of the spaCy model to load

    Returns:
        Loaded spaCy model or None if loading failed
    """
    print(f"Loading {model_name} model...")
    try:
        return spacy.load(model_name)
    except OSError:
        print(f"Error: {model_name} not found.")
        print(f"Please install it with: python -m spacy download {model_name}")
        return None

def configure_anonymizer(anonymizer: GermanTextAnonymizer) -> None:
    """
    Configure the anonymizer with entity types and regex patterns.

    Args:
        anonymizer: The anonymizer to configure
    """
    # Configure entity types to anonymize
    entity_mappings = {
        "PER": "[PERSON]",
        "ORG": "[ORGANISATION]",
        "LOC": "[ORT]",
        "GPE": "[ORT]",
        "MONEY": "[FINANZIELL]",
        "DATE": "[DATUM]",
        "CARDINAL": "[NUMMER]"
    }

    for entity_type, replacement in entity_mappings.items():
        anonymizer.add_entity_type(entity_type, replacement)

    # Add regex patterns for additional PII
    regex_patterns = [
        ("PLZ", r'\b\d{5}\b', "[POSTLEITZAHL]"),
        (
            "ADDRESS",
            r'(\b\w+(?:straße|str\.|weg|platz|allee|gasse)\s+\d+,?\s+\d{5}\s+\w+\b)',
            "[ADRESSE]"
        ),
        (
            "AGE",
            r'\b\d{1,3}[-\s]?(?:jährige[rn]?|Jahre[n]? alt|Jahre)\b',
            "[ALTER]"
        ),
    ]

    for name, pattern, replacement in regex_patterns:
        anonymizer.add_regex_pattern(name, pattern, replacement)


### LLAMA 

# Base function
def chat_with_ollama(prompt: str, model: str = "llama3") -> str:
    response = chat(
        model=model,
        messages=[{
            'role': 'user',
            'content': prompt
        }]
    )
    return response.message.content

# Summarize function
def summarize_text(text: str, model: str = "llama3") -> str:
    prompt = f"""
        Here is a statement from a public participation process:
        
        {text}
        
        Please provide:
        1. A concise summary (max 150 words) that captures all unique concerns
        2. A list of the main themes/topics mentioned
        Note: The output text should be in german.
        
        Format your response as:
        SUMMARY: [Your summary here]
        TOPICS: [comma-separated list of themes]
        """
    return chat_with_ollama(prompt, model=model)


# Key points function
def classify_text(text: str, model: str = "llama3") -> str:
    prompt = f"""You are a classification assistant.

Read the following text and assign it to one of the following categories based on its content:

CATEGORIES:
- Siedlung
- Natur und Artenschutz
- Wasser
- Wald
- Bodenschutz
- Infrastruktur
- Flächenqualität
- Landschaft
- Andere

TEXT:
{text}

Format your response as:
CATEGORY: [category]"""
    
    return chat_with_ollama(prompt, model=model)

# Key points summurize function
def summarize_keypoints(text: str, model: str = "llama3") -> str:
    prompt = f"""
You are a summurization assistant.

The following text is a dictionry with the main clusters and the found keypoints for each cluster. 
Your task is to summarize the keypoints for each cluster, the output should be the most 5 repeated keypoints for each cluster.
Note: 
1- the keypoints could be repeated in different words, but the meaning should be the same.
2- return only the output without any additional text.
TEXT:
{text}

Format your response as a json file:
"CLUSTER": [list of keypoints],...
"""
    
    return chat_with_ollama(prompt, model=model)


def read_json_list(filepath: str) -> List[Dict]:
    with open(filepath, 'r', encoding='utf-8') as f:
        data = json.load(f)
        if not isinstance(data, list):
            raise ValueError("JSON root element must be a list")
        return data
    
def write_json_list(data: List[Dict], filepath: str) -> None:
    with open(filepath, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)

def extract_json_from_text(text):
    match = re.search(r'\{\s*".*?\}\s*$', text, re.DOTALL)
    if match:
        json_str = match.group(0)
        return json.loads(json_str)
    else:
        raise ValueError("No valid JSON object found in the text.")




### Main Pipeline

In [16]:
# Read the JSON file
document_list = read_json_list(document_json)

# Load German spaCy model
nlp = load_spacy_model()
if not nlp:
    sys.exit(1)

# Initialize and configure anonymizer
anonymizer = GermanTextAnonymizer(nlp)
configure_anonymizer(anonymizer)

file_statements = f"{name_file}_statements.json"

for n,i in enumerate(document_list):
    anonymized_text = anonymizer.anonymize(i['content'])
    document_list[n]['text_anonym'] = anonymized_text

for n,i in enumerate(document_list):
    # print(f'Text {n}\n --- {i['content']}\n')
    text_summary = summarize_text(i['text_anonym'])
    # key_points = list_key_points(i['content'])
    
    summary_part, themes_part = text_summary.split("TOPICS:")
    categories = classify_text(i['text_anonym'])
    _,categories_new = categories.split(sep=':')
    categories = categories_new.split(',')
    categories = [i.strip() for i in categories]

    summary = summary_part.replace("SUMMARY:", "").strip()
    topics = [t.strip() for t in themes_part.split(",")]

    document_list[n]['summary'] = summary
    document_list[n]['key_points'] = topics
    document_list[n]['cluster'] = categories
    
    print(f'Done {n+1}/{len(document_list)}\n')
    # break


write_json_list(document_list,file_statements)



Loading de_core_news_md model...
Done 1/5

Done 2/5

Done 3/5

Done 4/5

Done 5/5



### Main Keypoints


In [17]:
# Load the JSON data
with open(file_statements, "r", encoding="utf-8") as f:
    data = json.load(f)

# Initialize dictionary to collect keywords per cluster
cluster_keywords = defaultdict(list)

for doc in data:
    clusters = doc.get("cluster", [])
    keywords = doc.get("key_points", [])
    for cluster in clusters:
        cluster_keywords[cluster].extend(keywords)

# Print or use the result
keypoints = json.dumps(cluster_keywords, indent=2, ensure_ascii=False)

summarized_keypoints = summarize_keypoints(keypoints)
result = extract_json_from_text(summarized_keypoints)

keypoints_name_file = f"{name_file}_keypoints.json"

write_json_list(result, keypoints_name_file)


In [20]:
# Load the JSON file
with open(file_statements, 'r', encoding='utf-8') as file:
    data = json.load(file)

# Create a dictionary to store the unique clusters and their content
clusters = defaultdict(list)

# Process each document in the data
for doc in data:
    # Handle cluster as either a string or a list
    if 'cluster' in doc:
        if isinstance(doc['cluster'], list):
            cluster_names = doc['cluster']
        else:
            cluster_names = [doc['cluster']]
    else:
        cluster_names = ['Uncategorized']
    
    # Combine key points and summary for each document
    doc_content = ""
    
    # Add document ID if available
    if 'document_id' in doc:
        doc_content += f"**Doc ID:** {doc['document_id']}\n\n"
    elif 'Document ID' in doc:
        doc_content += f"**Doc ID:** {doc['Document ID']}\n\n"
    
    # Add summary if available
    if 'summary' in doc and doc['summary']:
        doc_content += f"**Summary:** {doc['summary']}\n\n"
    elif 'Summary' in doc and doc['Summary']:
        doc_content += f"**Summary:** {doc['Summary']}\n\n"
    
    # Add key points if available
    if 'key_points' in doc and doc['key_points']:
        if isinstance(doc['key_points'], list):
            doc_content += "**Key Points:**\n"
            for point in doc['key_points']:
                doc_content += f"- {point}\n"
        else:
            doc_content += f"**Key Points:** {doc['key_points']}\n\n"
    
    # Add the content to each appropriate cluster
    if doc_content:
        for cluster_name in cluster_names:
            clusters[cluster_name].append(doc_content)

# Find the maximum number of items in any cluster for table sizing
max_items = max(len(items) for items in clusters.values())

# Create a DataFrame for the markdown table
df = pd.DataFrame(index=range(max_items), columns=sorted(clusters.keys()))

# Fill the DataFrame with the content
for cluster in clusters:
    for i, content in enumerate(clusters[cluster]):
        df.loc[i, cluster] = content

# Generate the markdown table
markdown_table = df.fillna("").to_markdown()

# Write to a file
with open('cluster_summary_table.md', 'w', encoding='utf-8') as f:
    f.write(markdown_table)

print("Markdown table created and saved as 'cluster_summary_table.md'")
# Also create a nicely formatted Excel file

# Create a workbook and worksheet
workbook = openpyxl.Workbook()
worksheet = workbook.active
worksheet.title = "Cluster Summary"

# Add column headers
for col_num, column_title in enumerate(sorted(clusters.keys()), 1):
    cell = worksheet.cell(row=1, column=col_num)
    cell.value = column_title
    cell.font = Font(bold=True)
    cell.fill = PatternFill(start_color='DDEBF7', end_color='DDEBF7', fill_type='solid')
    cell.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)

# Add content rows
for cluster_idx, cluster_name in enumerate(sorted(clusters.keys())):
    col_num = cluster_idx + 1
    for row_num, content in enumerate(clusters[cluster_name], 2):
        cell = worksheet.cell(row=row_num, column=col_num)
        cell.value = content
        cell.alignment = Alignment(wrap_text=True, vertical='top')

# Auto-adjust column widths
for column in worksheet.columns:
    max_length = 0
    column_letter = openpyxl.utils.get_column_letter(column[0].column)
    for cell in column:
        if cell.value:
            try:
                if len(str(cell.value)) > max_length:
                    max_length = min(len(str(cell.value)), 100)  # Cap width at 100 chars
            except:
                pass
    adjusted_width = (max_length + 2)
    worksheet.column_dimensions[column_letter].width = adjusted_width

# Set a border style for all cells with content
thin_border = Side(border_style="thin", color="000000")
border = Border(left=thin_border, right=thin_border, top=thin_border, bottom=thin_border)

for row in worksheet.iter_rows(min_row=1, max_row=worksheet.max_row, min_col=1, max_col=worksheet.max_column):
    for cell in row:
        if cell.value:
            cell.border = border

# Save the workbook
excel_summary_file = f"{name_file}_cluster_summary.xlsx"

workbook.save(excel_summary_file)
print("Excel file created and saved as 'cluster_summary.xlsx'")


import json
import openpyxl
from openpyxl.styles import Font, Alignment, PatternFill

# Save keypoints to JSON file
with open(keypoints_name_file, 'r', encoding='utf-8') as f:
        keypoints_data = json.load(f)

# Open the existing Excel file
workbook = openpyxl.load_workbook(excel_summary_file)
worksheet = workbook.active

# Get column indices for each cluster
column_indices = {}
for col in range(1, worksheet.max_column + 1):
        header = worksheet.cell(row=1, column=col).value
        if header in keypoints_data:
                column_indices[header] = col

# Insert a new row after the header
worksheet.insert_rows(2)

thin_border = Side(border_style="thin", color="000000")
border_bold = Border(left=thin_border, right=thin_border, top=thin_border, bottom=thin_border)

# Add keypoints to each column
for cluster, col_idx in column_indices.items():
        # Join the keypoints with commas
        keypoints_text = ", \n".join(keypoints_data[cluster])
        
        # Add to the worksheet
        cell = worksheet.cell(row=2, column=col_idx)
        cell.value = f"Key Topics: \n{keypoints_text}"
        cell.font = Font(italic=True)
        cell.alignment = Alignment(wrap_text=True)
        cell.fill = PatternFill(start_color="F2F2F2", end_color="F2F2F2", fill_type="solid")
        cell.border = border_bold

# Adjust row height
worksheet.row_dimensions[2].height = 120

# Save the updated workbook
cluster_summary_file = f"{name_file}_cluster_summary_updated.xlsx"
workbook.save(cluster_summary_file)
print("Excel file updated with key topics for each cluster")



Markdown table created and saved as 'cluster_summary_table.md'
Excel file created and saved as 'cluster_summary.xlsx'
Excel file updated with key topics for each cluster
