In [4]:
import sys
import os
import json
import logging
from typing import List, Tuple, Dict
from collections import defaultdict
import math
from dataclasses import asdict
from src.common.prompts.value_adding_analysis.gpt_components import PromptComponents

ROOT_DIR = r"C:\Projects\Research\SWEEP\SWEEP"

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

root_path = os.path.abspath(os.path.join(os.getcwd(), '..'))
sys.path.append(root_path)

from src.common.models.process_model import ProcessModel
from src.value_adding_analysis import ValueClassificationComponentsGPT
from src.value_adding_analysis.value_adding_analysis_metrics import compare_value_classifications, print_comparison_results

def get_sectors(train=True) -> List[str]:
    return [sector for sector in os.listdir("data") if os.path.isdir(os.path.join("data",f"{'train' if train else 'test'}", sector))]

def get_activities(sector: str, train=True) -> List[str]:
    sector_path = os.path.join("data",f"{'train' if train else 'test'}", sector)
    return [activity for activity in os.listdir(sector_path) if os.path.isdir(os.path.join(sector_path, activity))]

def get_file_paths(sector: str, activity: str, model_name: str, train=True) -> Tuple[str, str, str, str]:
    dir_path = os.path.join(ROOT_DIR, "data", f"{'train' if train else 'test'}", sector, activity)
    test_path = os.path.join(ROOT_DIR, "test", "results", sector, activity)
    activity_breakdown_path = os.path.join(ROOT_DIR, dir_path, f"{activity}_activity_breakdown.json")
    ground_truth_path = os.path.join(ROOT_DIR, dir_path, f"{activity}_step_value_analysis.json")
    response_path = os.path.join(ROOT_DIR, test_path, f"{model_name}_response.json")
    return test_path, activity_breakdown_path, ground_truth_path, response_path

def create_prompt_components_variations() -> List[Dict[str, PromptComponents]]:
    variations = [
        {
            "name": "Neutral_Analyst_Basic",
            "components": PromptComponents.from_dict({
                "role_description": "neutral_analyst",
                "task_description": "standard",
                "classification_types": "basic",
                "function_definition": "basic",
                "parsing_instructions": "sequential",
                "output_format": "basic"
            })
        },
        {
            "name": "Lean_Expert_Detailed",
            "components": PromptComponents.from_dict({
                "role_description": "lean_expert",
                "task_description": "efficiency_focused",
                "classification_types": "detailed",
                "function_definition": "detailed",
                "parsing_instructions": "holistic",
                "output_format": "structured",
                "additional_guidelines": "lean_principles"
            })
        },
        {
            "name": "Process_Engineer_Technical",
            "components": PromptComponents.from_dict({
                "role_description": "process_engineer",
                "task_description": "standard",
                "classification_types": "detailed",
                "function_definition": "basic",
                "parsing_instructions": "sequential",
                "output_format": "basic",
                "example_output": "complex_process"
            })
        },
        {
            "name": "Customer_Advocate_ValueFocused",
            "components": PromptComponents.from_dict({
                "role_description": "customer_advocate",
                "task_description": "efficiency_focused",
                "classification_types": "basic",
                "function_definition": "detailed",
                "parsing_instructions": "holistic",
                "output_format": "structured",
                "additional_guidelines": "context_aware"
            })
        },
        {
            "name": "Business_Consultant_Strategic",
            "components": PromptComponents.from_dict({
                "role_description": "business_consultant",
                "task_description": "standard",
                "classification_types": "detailed",
                "function_definition": "detailed",
                "parsing_instructions": "holistic",
                "output_format": "structured",
                "example_output": "complex_process",
                "additional_guidelines": "context_aware"
            })
        },
        {
            "name": "Neutral_Analyst_Comprehensive",
            "components": PromptComponents.from_dict({
                "role_description": "neutral_analyst",
                "task_description": "efficiency_focused",
                "classification_types": "detailed",
                "function_definition": "detailed",
                "parsing_instructions": "holistic",
                "output_format": "structured",
                "example_output": "complex_process",
                "additional_guidelines": "lean_principles"
            })
        },
        {
            "name": "Lean_Expert_Minimal",
            "components": PromptComponents.from_dict({
                "role_description": "lean_expert",
                "task_description": "standard",
                "classification_types": "basic",
                "function_definition": "basic",
                "parsing_instructions": "sequential",
                "output_format": "basic"
            })
        },
        {
            "name": "Process_Engineer_Detailed",
            "components": PromptComponents.from_dict({
                "role_description": "process_engineer",
                "task_description": "efficiency_focused",
                "classification_types": "detailed",
                "function_definition": "detailed",
                "parsing_instructions": "holistic",
                "output_format": "structured",
                "example_output": "complex_process",
                "additional_guidelines": "context_aware"
            })
        }
    ]
    return variations

def get_models() -> Dict[str, ValueClassificationComponentsGPT]:
    models = {}
    variations = create_prompt_components_variations()
    
    for variation in variations:
        model_name = f"GPT-3.5-Value-Classification-{variation['name']}"
        model = ValueClassificationComponentsGPT(prompt_components=variation['components'])
        models[model_name] = model
    
    return models

def process_activity(sector: str, activity: str) -> None:
    logging.info(f"Processing sector: {sector}, activity: {activity}")
    
    try:
        # Load activity breakdown and ground truth
        _, activity_breakdown_path, ground_truth_path, _ = get_file_paths(sector, activity, "dummy")
        activity_breakdown = ProcessModel.from_json(activity_breakdown_path)
        ground_truth = ProcessModel.from_json(ground_truth_path)
        
        activity_breakdown_dict = activity_breakdown.get_activity_breakdown_dict()
        
        models = get_models()
        
        for model_name, model in models.items():
            logging.info(f"Processing with model: {model_name}")
            
            test_path, _, _, response_path = get_file_paths(sector, activity, model_name)
            
            # Ensure test directory exists
            os.makedirs(test_path, exist_ok=True)
            
            # Get model response
            response = model.value_classification_step_level(activity_breakdown_dict)
            
            # Process model response
            model_activity_breakdown = ProcessModel.from_json(activity_breakdown_path)
            model_activity_breakdown.classify_substeps(response)
            
            # Compare results
            comparison_metrics = compare_value_classifications(model_activity_breakdown, ground_truth)
            
            response_dict = {
                "model": {
                    "name": model_name,
                    "components": model.prompt_components["_raw_input"]
                },
                "response": model_activity_breakdown.to_dict(),
                "metrics": comparison_metrics.to_dict()
            }
            
            # Save results
            with open(response_path, 'w') as f:
                json.dump(response_dict, f, indent=4)
            
            logging.info(f"Successfully processed and saved results for {sector}/{activity} with {model_name}")
    
    except Exception as e:
        logging.error(f"Error processing {sector}/{activity}: {str(e)}")
        
def save_model_configurations(models: Dict[str, ValueClassificationComponentsGPT], llm_version: str):
    """
    Save the configuration of each model as a JSON file.
    
    :param models: Dictionary of model names and their corresponding ValueClassificationComponentsGPT instances
    :param llm_version: Version of the LLM being used (e.g., "gpt-4")
    """
    output_dir = os.path.join("test", "models")
    os.makedirs(output_dir, exist_ok=True)
    
    for model_name, model in models.items():
        config = {
            "model_name": model_name,
            "prompt_components": model.prompt_components,
            "llm_version": llm_version
        }
        
        file_name = f"{model_name}.json"
        file_path = os.path.join(output_dir, file_name)
        
        with open(file_path, 'w') as f:
            json.dump(config, f, indent=2)
        
        print(f"Saved configuration for {model_name} to {file_path}")

In [5]:
model = ValueClassificationComponentsGPT(PromptComponents.from_dict({
                "role_description": "process_engineer",
                "task_description": "standard",
                "classification_types": "detailed",
                "function_definition": "basic",
                "parsing_instructions": "sequential",
                "output_format": "basic",
                "example_output": "complex_process"
            }))

In [6]:
model

<src.value_classification.value_classification_components_gpt.ValueClassificationComponentsGPT at 0x227692dc760>

In [2]:
def aggregate_results(model_name: str) -> Dict:
    results_path = os.path.join(ROOT_DIR, "test", "results")
    overall_results = {
        "total_substeps": 0,
        "correct_classifications": 0,
        "confusion_matrix": defaultdict(lambda: defaultdict(int)),
        "class_metrics": defaultdict(lambda: {"tp": 0, "fp": 0, "fn": 0}),
        "process_level_metrics": {
            "total_processes": 0,
            "accuracies": [],
            "substep_counts": [],
            "f1_scores": [],
        },
        "misclassification_rates": defaultdict(int),
    }

    for sector in os.listdir(results_path):
        sector_path = os.path.join(results_path, sector)
        if os.path.isdir(sector_path):
            for activity in os.listdir(sector_path):
                activity_path = os.path.join(sector_path, activity)
                if os.path.isdir(activity_path):
                    response_file = os.path.join(activity_path, f"{model_name}_response.json")
                    if os.path.exists(response_file):
                        with open(response_file, 'r') as f:
                            response_data = json.load(f)
                        
                        try:
                            metrics = response_data['metrics']
                            overall_results['total_substeps'] += metrics['total_substeps']
                            overall_results['correct_classifications'] += metrics['correct_classifications']

                            # Process-level metrics
                            overall_results['process_level_metrics']['total_processes'] += 1
                            overall_results['process_level_metrics']['accuracies'].append(metrics['accuracy'])
                            overall_results['process_level_metrics']['substep_counts'].append(metrics['total_substeps'])

                            process_f1_scores = []
                            for true_class, pred_dict in metrics['confusion_matrix'].items():
                                for pred_class, count in pred_dict.items():
                                    overall_results['confusion_matrix'][true_class][pred_class] += count
                                    if true_class == pred_class:
                                        overall_results['class_metrics'][true_class]['tp'] += count
                                    else:
                                        overall_results['class_metrics'][true_class]['fn'] += count
                                        overall_results['class_metrics'][pred_class]['fp'] += count
                                        overall_results['misclassification_rates'][f"{true_class}_as_{pred_class}"] += count

                            # Calculate process-level F1 score
                            for class_name in overall_results['class_metrics'].keys():
                                tp = metrics['confusion_matrix'].get(class_name, {}).get(class_name, 0)
                                fp = sum(metrics['confusion_matrix'].get(other_class, {}).get(class_name, 0) for other_class in overall_results['class_metrics'].keys() if other_class != class_name)
                                fn = sum(metrics['confusion_matrix'].get(class_name, {}).get(other_class, 0) for other_class in overall_results['class_metrics'].keys() if other_class != class_name)
                                
                                precision = tp / (tp + fp) if (tp + fp) > 0 else 0
                                recall = tp / (tp + fn) if (tp + fn) > 0 else 0
                                f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
                                process_f1_scores.append(f1_score)
                            
                            overall_results['process_level_metrics']['f1_scores'].append(sum(process_f1_scores) / len(process_f1_scores) if process_f1_scores else 0)

                        except Exception as e:
                            logging.warning(f"Failed response for {sector}/{activity} with {model_name}: {str(e)}")
                            continue

    # Calculate overall accuracy
    overall_results['accuracy'] = overall_results['correct_classifications'] / overall_results['total_substeps']

    # Calculate precision, recall, and F1 score for each class
    class_f1_scores = []
    class_precisions = []
    class_recalls = []
    for class_name, metrics in overall_results['class_metrics'].items():
        tp = metrics['tp']
        fp = metrics['fp']
        fn = metrics['fn']
        
        precision = tp / (tp + fp) if (tp + fp) > 0 else 0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0
        f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

        overall_results['class_metrics'][class_name].update({
            'precision': precision,
            'recall': recall,
            'f1_score': f1_score
        })
        
        class_f1_scores.append(f1_score)
        class_precisions.append(precision)
        class_recalls.append(recall)

    # Calculate macro average scores
    overall_results['macro_avg_f1_score'] = sum(class_f1_scores) / len(class_f1_scores)
    overall_results['macro_avg_precision'] = sum(class_precisions) / len(class_precisions)
    overall_results['macro_avg_recall'] = sum(class_recalls) / len(class_recalls)

    # Calculate weighted average scores
    total_samples = sum(metrics['tp'] + metrics['fn'] for metrics in overall_results['class_metrics'].values())
    overall_results['weighted_avg_f1_score'] = sum(metrics['f1_score'] * (metrics['tp'] + metrics['fn']) / total_samples for metrics in overall_results['class_metrics'].values())
    overall_results['weighted_avg_precision'] = sum(metrics['precision'] * (metrics['tp'] + metrics['fn']) / total_samples for metrics in overall_results['class_metrics'].values())
    overall_results['weighted_avg_recall'] = sum(metrics['recall'] * (metrics['tp'] + metrics['fn']) / total_samples for metrics in overall_results['class_metrics'].values())

    # Calculate process-level average metrics
    process_metrics = overall_results['process_level_metrics']
    process_metrics['avg_accuracy'] = sum(process_metrics['accuracies']) / process_metrics['total_processes']
    process_metrics['avg_substeps'] = sum(process_metrics['substep_counts']) / process_metrics['total_processes']
    process_metrics['avg_f1_score'] = sum(process_metrics['f1_scores']) / process_metrics['total_processes']
    
    # Calculate standard deviations
    process_metrics['std_dev_accuracy'] = math.sqrt(sum((x - process_metrics['avg_accuracy'])**2 for x in process_metrics['accuracies']) / process_metrics['total_processes'])
    process_metrics['std_dev_substeps'] = math.sqrt(sum((x - process_metrics['avg_substeps'])**2 for x in process_metrics['substep_counts']) / process_metrics['total_processes'])
    process_metrics['std_dev_f1_score'] = math.sqrt(sum((x - process_metrics['avg_f1_score'])**2 for x in process_metrics['f1_scores']) / process_metrics['total_processes'])

    # Calculate misclassification rates
    total_misclassifications = sum(overall_results['misclassification_rates'].values())
    for misclass_type, count in overall_results['misclassification_rates'].items():
        overall_results['misclassification_rates'][misclass_type] = count / total_misclassifications if total_misclassifications > 0 else 0

    # Calculate Cohen's Kappa
    observed_accuracy = overall_results['accuracy']
    expected_accuracy = sum((sum(overall_results['confusion_matrix'][c].values()) / overall_results['total_substeps']) * 
                            (sum(pred.get(c, 0) for pred in overall_results['confusion_matrix'].values()) / overall_results['total_substeps'])
                            for c in overall_results['class_metrics'].keys())
    overall_results['cohens_kappa'] = (observed_accuracy - expected_accuracy) / (1 - expected_accuracy) if expected_accuracy != 1 else 0

    return overall_results

def save_overall_results(model_name: str, overall_results: Dict) -> None:
    overall_test_path = os.path.join(ROOT_DIR, "test", "results", "overall")
    os.makedirs(overall_test_path, exist_ok=True)
    
    overall_results_path = os.path.join(overall_test_path, f"{model_name}_overall_results.json")
    with open(overall_results_path, 'w') as f:
        json.dump(overall_results, f, indent=4)
    
    logging.info(f"Overall results for {model_name} saved to {overall_results_path}")




In [1]:
def main():
    start = False
    sectors = get_sectors()
    for sector in sectors:
        activities = get_activities(sector)
        for activity in activities:
            if activity == "advanced_b2b_sales":
                start = True
                
            if not start:
                continue
            
            process_activity(sector, activity)
    
    # After processing all activities, aggregate and save overall results for each model
    models = get_models()
    llm_version = "gpt-4"  # or whatever version you're using
    save_model_configurations(models, llm_version)
    for model_name in models.keys():
        print(f"Aggregating results for {model_name}")
        overall_results = aggregate_results(model_name)
        save_overall_results(model_name, overall_results)

if __name__ == "__main__":
    main()

NameError: name 'get_sectors' is not defined