In [2]:
import re
import json
import text_processing
import secrets 

from typing import List, Dict
import openai  # Assuming OpenAI API, but adaptable to other models

# Load text file and taxonomy JSON

taxonomy_file_path = 'privacy_ontology_simple.json'  # Replace with actual path
example_file_path = 'annotations/Actual_Budget/Accounts_&_Transactions.txt'
target_file_path = 'input/lh-ehr/Direct_Messaging_README.txt'

# Load previously processed example files
example_data = text_processing.process_input(example_file_path)

### Building prompts


In [3]:

import json
from typing import Dict, List, Union
import tiktoken  # Library for token counting with OpenAI models
import prompt_templates

# Load the privacy ontology
ontology_path = "privacy_ontology_simple.json"
privacy_ontology = prompt_templates.load_privacy_ontology(ontology_path)

# Load one processed example file for demonstration
example_file = text_processing.process_input(example_file_path)[0]


# Define target text to be annotated (for demonstration) & Load annotated version 
target_file_annotations = text_processing.process_input(target_file_path)
new_text_to_annotate = open(target_file_path).read()

# Generate the prompt
prompt_example = prompt_templates.create_annotation_prompt(example_file, new_text_to_annotate, privacy_ontology)

# Print the prompt to review it
print(prompt_example)
print(f"\nToken Count: {prompt_templates.count_tokens(prompt_example)} tokens")


You are a privacy expert annotator tasked with annotating text files with metadata about privacy behaviors and stories. For the given text, annotate the following:

1. Actions: Actions performed or expected in the text.
2. Data Types: Types of data referenced in the text. Data types may include specific subcategories.
3. Purposes: Intentions or purposes related to the actions and data types.
4. Stories: Concise stories that describe how actions, data types, and purposes interact in context.

After providing your annotations, explain your rationale for these annotations. Place <R> tag between your annotations and your rationale.

Use only the categories listed below when annotating:

Actions:
Collect, Use, Share

Data Types:
Contact Data:
  Phone Number:
  Email address:
  User ID:
  Job Title:
  Company:
  Address:
  Name:
  Date of Birth:
  Image:
  Government ID:
  Biographical Data:
    CV:
    Education:
    Employment:
Health Data:
  Physical activity:
Social Media:
Location:
  Ap

### Annotating with LLMs 

In [1]:
import os
import json
import prompt_templates
import text_processing
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

input_dir = 'input'

def find_matching_file(input_file_path, annotations_dir):
    """
    Find the matching annotation file for a given input file.
    
    Args:
        input_file_path (str): Path to the input file
        annotations_dir (str): Root directory of annotations
    
    Returns:
        str: Path to the matching annotation file, or None if not found
    """
    # Get the relative path from the input directory
    relative_path = os.path.relpath(input_file_path, input_dir)
    annotation_file_path = os.path.join(annotations_dir, relative_path)
    
    return annotation_file_path if os.path.exists(annotation_file_path) else None


def find_most_similar_file(input_file_path, annotations_dir, exclude_file):
    """
    Find the most similar annotation file to the input file, excluding a specified file.
    
    Args:
        input_file_path (str): Path to the input file
        annotations_dir (str): Directory containing the annotation files
        exclude_file (str): File to exclude from similarity search
    
    Returns:
        str: Path to the most similar annotation file
    """
    # Read the input file's content
    with open(input_file_path, 'r', encoding='utf-8') as f:
        input_text = f.read()
    
    # List to store (similarity_score, annotation_file_path)
    similarity_scores = []
    
    # Loop through the annotations directory to calculate similarity
    for root, _, files in os.walk(annotations_dir):
        for filename in files:
            if filename == exclude_file or filename.startswith('.'):
                continue
            
            annotation_file_path = os.path.join(root, filename)
            
            # Read the annotation file's content
            with open(annotation_file_path, 'r', encoding='utf-8') as f:
                annotation_text = f.read()
            
            # Compute similarity using TF-IDF and cosine similarity
            tfidf = TfidfVectorizer().fit_transform([input_text, annotation_text])
            similarity = cosine_similarity(tfidf[0:1], tfidf[1:2])[0][0]
            similarity_scores.append((similarity, annotation_file_path))
    
    # Sort by similarity score in descending order and return the most similar file
    similarity_scores.sort(reverse=True, key=lambda x: x[0])
    return similarity_scores[0][1] if similarity_scores else None


def create_prompt_templates_dict(input_dir='input', annotations_dir='annotations', ontology_path='privacy_ontology_simple.json'):
    """
    Create a dictionary of prompt templates matching input files with their corresponding annotation examples.
    
    Args:
        input_dir (str): Root directory containing input files to be annotated
        annotations_dir (str): Root directory containing annotated example files
        ontology_path (str): Path to the privacy ontology JSON file
    
    Returns:
        dict: Dictionary of prompt templates for each input file
    """
    # Load privacy ontology
    privacy_ontology = prompt_templates.load_privacy_ontology(ontology_path)
    
    # Dictionary to store prompt templates
    prompt_templates_dict = {}
    
    # Walk through all directories and files in the input directory
    for root, _, files in os.walk(input_dir):
        for filename in files:
            # Skip hidden files
            if filename.startswith('.'):
                continue
            
            # Full path to the input file
            input_file_path = os.path.join(root, filename)
            
            # Find corresponding annotation file (for exclusion from similarity search)
            annotation_file_path = find_matching_file(input_file_path, annotations_dir)
            
            if not annotation_file_path:
                print(f"No matching annotation found for {input_file_path}")
                continue
            
            try:
                # Read the input text to annotate
                with open(input_file_path, 'r', encoding='utf-8') as f:
                    new_text_to_annotate = f.read()
                
                # Find the most similar annotation file
                most_similar_annotation = find_most_similar_file(input_file_path, annotations_dir, os.path.basename(annotation_file_path))
                
                if not most_similar_annotation:
                    print(f"No similar annotation found for {input_file_path}")
                    continue
                
                # Process the example file from annotations
                example_file = text_processing.process_input(most_similar_annotation)[0]
                
                # Create prompt template
                prompt_template = prompt_templates.create_0_shot_annotation_prompt(
                    example_file, 
                    new_text_to_annotate, 
                    privacy_ontology
                )
                
                # Create a unique key based on relative path
                relative_key = os.path.relpath(input_file_path, input_dir)
                
                # Store in dictionary
                prompt_templates_dict[relative_key] = {
                    'input_file_path': input_file_path,
                    'annotation_file_path': most_similar_annotation,
                    'prompt_template': prompt_template,
                    'target_annotations': text_processing.process_file(annotation_file_path),
                    'token_count': prompt_templates.count_tokens(prompt_template)
                }
            
            except Exception as e:
                print(f"Error processing {input_file_path}: {e}")
    
    return prompt_templates_dict


# Create the prompt templates dictionary
prompt_templates_dict = create_prompt_templates_dict()

# Save to JSON for inspection
with open('prompt_templates_output.json', 'w', encoding='utf-8') as f:
    json.dump({k: {**v, 'prompt_template': v['prompt_template']} 
              for k, v in prompt_templates_dict.items()}, 
              f, indent=2)

print(f"Created prompt templates for {len(prompt_templates_dict)} files")

# Optional: You can save the full dictionary using pickle if needed
import pickle
with open('prompt_templates.pkl', 'wb') as f:
    pickle.dump(prompt_templates_dict, f)


Error processing input\Actual_Budget\Backup_&_Restore.txt: 'charmap' codec can't decode byte 0x9d in position 232: character maps to <undefined>
Created prompt templates for 24 files


In [3]:
import os
import csv
import json
from getpass import getpass
import model_routing
from typing import Dict, List

# Ensure OPENAI_API_KEY is set
os.environ['OPENAI_API_KEY'] = getpass('Enter your OPENAI API key: ')

os.environ['GROQ_API_KEY'] ="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"

def save_results_to_csv(models: List[str], 
                         prompt_templates_dict: Dict, 
                         model_responses: Dict, 
                         output_file: str):
    """
    Save the model outputs, prompts, and file annotations to a CSV file.
    """
    # Open the output CSV file
    with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
        # Prepare the CSV header
        fieldnames = ['File', 'Prompt', 'Model', 'Target File Path', 'Target Annotations', 'Model Response 1', 'Model Response 2']
        
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()

        # Create a list of file keys to maintain order
        file_keys = list(prompt_templates_dict.keys())

        # Iterate through the prompt templates and models to pair responses
        for file_key in file_keys:
            template_info = prompt_templates_dict[file_key]
            
            for model in models:
                # Get responses for this model
                model_specific_responses = model_responses.get(model, {})
                
                # Get responses for this specific file/prompt
                prompt_responses = model_specific_responses.get(file_key, [])
                
                # Ensure we have at least two responses (or 'No Response')
                response1 = prompt_responses[0] if prompt_responses and len(prompt_responses) > 0 else 'No Response'
                response2 = prompt_responses[1] if prompt_responses and len(prompt_responses) > 1 else 'No Response'

                # Create a row for each model and file
                row = {
                    'File': file_key,
                    'Prompt': template_info['prompt_template'],
                    'Model': model,
                    'Target File Path': template_info['input_file_path'],
                    'Target Annotations': json.dumps(template_info['target_annotations']),
                    'Model Response 1': json.dumps(response1, ensure_ascii=False),
                    'Model Response 2': json.dumps(response2, ensure_ascii=False)
                }

                # Write the row to the CSV file
                writer.writerow(row)


def run_multi_file_annotations(prompt_templates_dict, output_csv, models=None):
    """
    Run annotation process for multiple files and save results to a CSV file.
    """

    try:
        # Create a list of file keys to maintain order
        file_keys = list(prompt_templates_dict.keys())

        # Prepare prompts that maintains the file key order
        test_prompts = [
            prompt_templates_dict[file_key]['prompt_template'] 
            for file_key in file_keys
        ]

        # Get model responses 
        raw_model_responses = model_routing.run_multi_model_prompts(
            models=models, 
            prompts=test_prompts, 
            num_runs=2  # Number of runs per model
        )

        # Restructure responses to match file keys
        model_responses = {}
        for model in models:
            model_responses[model] = {
                file_key: model_runs 
                for file_key, model_runs in zip(file_keys, raw_model_responses.get(model, []))
            }

        # Save results to CSV 
        save_results_to_csv(
            models=models,
            prompt_templates_dict=prompt_templates_dict,
            model_responses=model_responses,
            output_file=output_csv
        )

        print(f"Annotation results saved to {output_csv}")
        print(f"Number of files processed: {len(prompt_templates_dict)}")

    except Exception as e:
        print(f"An error occurred: {e}")
        import traceback
        traceback.print_exc()

# Example usage remains the same
output_csv = 'LLMAnnotation_gpt4o_0_shot.csv'
models = [
    'openai:gpt-4o-2024-11-20'
    # 'groq:llama-3.3-70b-versatile'
    ]
run_multi_file_annotations(prompt_templates_dict, output_csv, models=models)

Model: openai:gpt-4o-2024-11-20 - Completed a run
Model: openai:gpt-4o-2024-11-20 - Completed a run
Model: openai:gpt-4o-2024-11-20 - Completed a run
Model: openai:gpt-4o-2024-11-20 - Completed a run
Model: openai:gpt-4o-2024-11-20 - Completed a run
Model: openai:gpt-4o-2024-11-20 - Completed a run
Model: openai:gpt-4o-2024-11-20 - Completed a run
Model: openai:gpt-4o-2024-11-20 - Completed a run
Model: openai:gpt-4o-2024-11-20 - Completed a run
Model: openai:gpt-4o-2024-11-20 - Completed a run
Model: openai:gpt-4o-2024-11-20 - Completed a run
Model: openai:gpt-4o-2024-11-20 - Completed a run
Model: openai:gpt-4o-2024-11-20 - Completed a run
Model: openai:gpt-4o-2024-11-20 - Completed a run
Model: openai:gpt-4o-2024-11-20 - Completed a run
Model: openai:gpt-4o-2024-11-20 - Completed a run
Model: openai:gpt-4o-2024-11-20 - Completed a run
Model: openai:gpt-4o-2024-11-20 - Completed a run
Model: openai:gpt-4o-2024-11-20 - Completed a run
Model: openai:gpt-4o-2024-11-20 - Completed a run


### Get model F1 scores 


In [46]:
import json
import csv
import re
from typing import Dict, List, Set
from sklearn.metrics import f1_score

class AnnotationEvaluator:
    def __init__(self, ontology_path: str):
        """
        Initialize the evaluator with the privacy ontology.
        """
        with open(ontology_path, 'r') as f:
            self.ontology = json.load(f)
    
    def _normalize_label(self, label: str) -> List[str]:
        """
        Normalize label by removing extra whitespace, converting to lowercase,
        and splitting comma-separated values.
        """
        # Split by comma, strip whitespace, convert to lowercase
        return [
            l.strip().lower() 
            for l in label.split(',') 
            if l.strip()
        ]
    
    def _preprocess_text(self, text: str) -> str:
        """
        Preprocess text by removing content within or after <R> tags.
        
        Args:
            text (str): Input text to preprocess
        
        Returns:
            str: Preprocessed text
        """
        # Find all <R> tags
        r_tags = list(re.finditer(r'<R>(.*?)</R>', text, re.DOTALL))
        
        if not r_tags:
            # If no </R> tags, remove everything after a single <R> tag
            single_r_match = re.search(r'<R>', text)
            if single_r_match:
                return text[:single_r_match.start()]
            return text
        
        # Take the content before the first <R> tag
        preprocessed = text[:r_tags[0].start()]
        
        # Return preprocessed text
        return preprocessed.strip()
    
    def _match_labels(self, text: str, category_labels: List[str]) -> Set[str]:
        """
        Find matching labels in the text.
        
        Args:
            text (str): Text to search
            category_labels (List[str]): Labels to match
        
        Returns:
            Set[str]: Matched labels
        """
        # Preprocess text to remove R-tag content
        preprocessed_text = self._preprocess_text(text)
        
        # Normalize text
        normalized_text = preprocessed_text.lower()
        
        # Match labels
        matched_labels = set()
        
        for label in category_labels:
            # Various matching strategies
            # 1. Whole word match
            word_pattern = r'\b' + re.escape(label) + r'\b'
            # 2. Partial match
            partial_pattern = re.escape(label)
            
            if (re.search(word_pattern, normalized_text) or 
                re.search(partial_pattern, normalized_text)):
                matched_labels.add(label)
        
        return matched_labels
    
    def calculate_comprehensive_f1_scores(self, ground_truth: Dict, text: str) -> Dict[str, float]:
        """
        Calculate F1 scores for all categories.
        
        Args:
            ground_truth (Dict): Ground truth annotations
            text (str): Text to evaluate
        
        Returns:
            Dict[str, float]: F1 scores
        """
        f1_scores = {}
        categories = ['actions', 'data_types', 'purposes']
        
        for category in categories:
            # Extract labels from ground truth
            gt_labels = self._normalize_label(', '.join(ground_truth.get(category, [])))
            
            # Find matching labels in text
            pred_labels = self._match_labels(text, gt_labels)
            
            # Calculate F1 score
            y_true = [1 if label in gt_labels else 0 for label in gt_labels]
            y_pred = [1 if label in pred_labels else 0 for label in gt_labels]
            
            try:
                # Macro average F1 score
                f1 = f1_score(y_true, y_pred, average='binary')
            except Exception as e:
                print(f"F1 Score calculation error for {category}: {e}")
                f1 = 0
            
            f1_scores[category] = f1
        
        # Calculate overall F1 score
        f1_scores['Overall'] = sum(f1_scores.values()) / len(f1_scores)
        
        # Diagnostic print
        print(f"\nCategory Diagnostics:")
        for category in categories:
            print(f"{category.capitalize()}:")
            print(f"  Ground Truth Labels: {gt_labels}")
            print(f"  Matched Labels: {pred_labels}")
            print(f"  F1 Score: {f1_scores[category]:.4f}")
        
        # Print preprocessed text for verification
        print("\nPreprocessed Text:")
        print(self._preprocess_text(text))
        
        return f1_scores

def process_annotation_csv(csv_path: str, ontology_path: str) -> List[Dict]:
    """
    Process the annotation CSV and calculate F1 scores.
    """
    # Initialize the evaluator
    evaluator = AnnotationEvaluator(ontology_path)
    
    # Results storage
    results = []
    
    # Read the CSV file
    with open(csv_path, 'r', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        
        for row in reader:
            try:
                # Parse target annotations
                ground_truth = json.loads(row['Target Annotations'])
                
                # Process both model responses
                for response_col in ['Model Response 1', 'Model Response 2']:
                    # Extract full text
                    full_text = row[response_col]
                    
                    # Calculate F1 scores
                    f1_scores = evaluator.calculate_comprehensive_f1_scores(
                        ground_truth['metadata'], 
                        full_text
                    )
                    
                    # Prepare result dictionary
                    result = {
                        'File': ground_truth.get('file_name', 'Unknown'),
                        'Model': row['Model'],
                        'Response': response_col,
                        'Metrics': f1_scores
                    }
                    
                    results.append(result)
            
            except Exception as e:
                print(f"Error processing row: {e}")
                continue
    
    return results

def main():
    # Paths to your files
    csv_path = 'LLMAnnotation_groqGemma.csv'
    ontology_path = 'privacy_ontology_simple.json'
    
    # Process the CSV and calculate F1 scores
    results = process_annotation_csv(csv_path, ontology_path)
    
    # Print results
    print("\nAnnotation F1 Scores:")
    for result in results:
        print(f"File: {result['File']}")
        print(f"Model: {result['Model']}")
        print(f"Response: {result['Response']}")
        for category, score in result['Metrics'].items():
            print(f"{category} F1 Score: {score:.4f}")
        print("-" * 40)
    
    # Save results to a CSV
    if results:
        output_keys = ['File', 'Model', 'Response', 'Actions F1', 'Data Types F1', 'Purposes F1', 'Overall F1']
        with open('annotation_f1_scores.csv', 'w', newline='', encoding='utf-8') as outfile:
            writer = csv.DictWriter(outfile, fieldnames=output_keys)
            writer.writeheader()
            
            for result in results:
                # Prepare row for CSV
                csv_row = {
                    'File': result['File'],
                    'Model': result['Model'],
                    'Response': result['Response'],
                    'Actions F1': result['Metrics'].get('actions', 0),
                    'Data Types F1': result['Metrics'].get('data_types', 0),
                    'Purposes F1': result['Metrics'].get('purposes', 0),
                    'Overall F1': result['Metrics'].get('Overall', 0)
                }
                writer.writerow(csv_row)
        
        print("Results saved to annotation_f1_scores.csv")

if __name__ == "__main__":
    main()


Category Diagnostics:
Actions:
  Ground Truth Labels: ['personal analytics', 'functionality', 'account management']
  Matched Labels: {'functionality', 'account management'}
  F1 Score: 1.0000
Data_types:
  Ground Truth Labels: ['personal analytics', 'functionality', 'account management']
  Matched Labels: {'functionality', 'account management'}
  F1 Score: 0.4000
Purposes:
  Ground Truth Labels: ['personal analytics', 'functionality', 'account management']
  Matched Labels: {'functionality', 'account management'}
  F1 Score: 0.8000

Preprocessed Text:
"## Annotations:\n\n**Actions:** Collect, Use \n**Data Types:**  Account Information:\n    Account Balance,\n    User id\n**Purposes:** Account management, Functionality\n\n**Stories:** 1. We collect Account Balance, User id for account management. We use Account Balance, User id for functionality. \n\n

Category Diagnostics:
Actions:
  Ground Truth Labels: ['personal analytics', 'functionality', 'account management']
  Matched Labels: 

### Judge prompting llm outputs

In [None]:
import prompt_templates

# Generate prompts for a CSV file
def generate_prompts_from_csv(csv_file_path: str, output_file_path: str):
    """Reads a CSV file and generates judge prompts for each row, saving them to an output file."""
    import csv
    
    with open(csv_file_path, mode='r', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        prompts = []

        for row in reader:
            original_prompt = row.get('original_prompt', '')
            response_1 = row.get('response_1', '')
            response_2 = row.get('response_2', '')

            prompt = prompt_templates.create_judge_prompt(original_prompt, response_1, response_2)
            prompts.append(prompt)

    # Save prompts to output file
    with open(output_file_path, mode='w', encoding='utf-8') as outputfile:
        outputfile.write("\n\n---\n\n".join(prompts))
