In [None]:
import pandas as pd
import re
import os
import math

class Evaluation:
    def __init__(self, predictions_df, task_id, prompt):
        """
        Initializes the Evaluation object.
        """
        self.predictions = predictions_df
        self.task_id = task_id
        self.prompt = prompt
        
        # Check the column names and set the response column name
        self.response_column = self._get_response_column()
        
    def _get_response_column(self):
        """
        Determines the correct column name for model responses.
        """
        columns = self.predictions.columns
        possible_names = ['Model Response', 'Model_Response', 'model_response', 
                         'Response', 'response', 'Output', 'output', 'Prediction']
        
        for name in possible_names:
            if name in columns:
                return name
        
        # If none of the expected names are found, use the second column 
        # (assuming first is 'Answer' and second is the response)
        if len(columns) >= 2:
            print(f"Response column not found. Using column: {columns[1]}")
            return columns[1]
        
        raise ValueError("Could not determine model response column. Available columns: " + str(columns.tolist()))

    def extract_numeric_answer(self, text):
        """
        Extracts a numeric answer from the given text using a regular expression.
        """
        if self.prompt == "bnap":
            match = re.search(r'উত্তর:\s.*?([+-]?\d*\.?\d+)', str(text), re.DOTALL)
        else:
            match = re.search(r"(?:Answer:|answer is:)\s.*?([+-]?\d*\.?\d+)", str(text), re.DOTALL)
        if match:
            return match.group(1)
        else:
            return "00000"
        
    def extract_option_answer(self, text):
        """
        Extracts an option answer from the input text.
        For the 'bnap' prompt, it maps "উত্তর ১" to "Option 1" and "উত্তর ২" to "Option 2".
        """
        if self.prompt == "bnap":
            match = re.search(r"উত্তর:\s*(.*)", text, re.DOTALL)
            if match:
                answer = match.group(1).strip()
                if "উত্তর ১" in answer:
                    return "Option 1"
                elif "উত্তর ২" in answer:
                    return "Option 2"
                else:
                    return "00000"
            else:
                return "00000"
        else:
            match = re.search(r"(?:Answer:|answer is:)\s*(.*)", text, re.DOTALL)
            if match:
                return match.group(1).strip()
            else:
                return "00000"

    def extract_relation_answer(self, text):
        """
        Extracts a relation answer from the input text and maps it to one of:
        "neutral", "contradiction", or "Entailment".
        """
        if self.prompt == "bnap":
            match = re.search(r"উত্তর:\s*(.*)", text, re.DOTALL)
            if match:
                answer = match.group(1)
                if "নিরপেক্ষ" in answer:
                    return "neutral"
                elif "বিরোধ" in answer:
                    return "contradiction"
                elif "সমর্থন" in answer:
                    return "Entailment"
                else:
                    return "00000"
            else:
                return "00000"
        else:
            match = re.search(r"(?:Answer:|answer is:)\s*(.*)", str(text), re.DOTALL)
            if match:
                return match.group(1)
            else:
                return "00000"

    def is_direct_answer_match(self, correct_ans, model_output):
        """
        Checks if the correct answer and model output match.
        For 'bnap' prompt, the match is direct (case-sensitive), 
        otherwise it performs a case-insensitive substring check.
        """
        if self.prompt == "bnap":
            return correct_ans == model_output
        else:
            return correct_ans.lower() in model_output.lower()

    def convert_bengali_digits_to_english(self, text):
        """
        Converts Bengali numeric characters in the input text to their English equivalents.
        
        Returns:
            str: The converted numeric text in English, or "00000" if no valid numeric characters are found.
        """
        mapping = {
            '০': '0',
            '১': '1',
            '২': '2',
            '৩': '3',
            '৪': '4',
            '৫': '5',
            '৬': '6',
            '৭': '7',
            '৮': '8',
            '৯': '9',
            '.': '.',
            "/": "/"
        }
        # Retain only characters that are in our mapping or are valid digits/punctuation
        english_text = ''.join(mapping.get(char, char) for char in text if char in '০১২৩৪৫৬৭৮৯10123456789./-')
        return english_text if english_text else "00000"

    def is_numeric_answer_match(self, ground_truth, model_ans):
        """
        Compares a numeric ground truth answer with the model answer after converting Bengali digits.
        """
        ground_truth_converted = self.convert_bengali_digits_to_english(ground_truth)
        model_ans_converted = self.convert_bengali_digits_to_english(model_ans)
        ground_truth_val = int(eval(ground_truth_converted) * 100) / 100
        model_ans_val = int(eval(model_ans_converted) * 100) / 100
        return ground_truth_val == model_ans_val

    def find_wrong_output_format(self, answer):
        if answer == "00000":
            return True
        else:
            return False

    def evaluate_errors(self):
        """
        Evaluates the model's accuracy on the predictions DataFrame by comparing the 'Answer' and 
        model response columns.
        
        Returns:
            tuple: A tuple containing error percentages.
        """
        total_data = len(self.predictions)
        
        # Use the correct response column name
        response_column = self.response_column
        print(f"Using response column: {response_column}")
        
        if self.task_id == "task3":
            exact_matches = self.predictions.apply(
                lambda row: self.is_direct_answer_match(
                    row['Answer'], 
                    self.extract_option_answer(row[response_column])
                ),
                axis=1
            ).sum()

            wrong_format = self.predictions.apply(
                lambda row: self.find_wrong_output_format(
                    self.extract_option_answer(row[response_column])
                ),
                axis=1
            ).sum()
            
        elif self.task_id == "task5":
            exact_matches = self.predictions.apply(
                lambda row: self.is_direct_answer_match(
                    row['Answer'], 
                    self.extract_relation_answer(row[response_column])
                ),
                axis=1
            ).sum()
            
            wrong_format = self.predictions.apply(
                lambda row: self.find_wrong_output_format(
                    self.extract_relation_answer(row[response_column])
                ),
                axis=1
            ).sum()
            
        else:
            exact_matches = self.predictions.apply(
                lambda row: self.is_numeric_answer_match(
                    row['Answer'], 
                    self.extract_numeric_answer(row[response_column])
                ),
                axis=1
            ).sum()
            
            wrong_format = self.predictions.apply(
                lambda row: self.find_wrong_output_format(
                    self.extract_numeric_answer(row[response_column])
                ),
                axis=1
            ).sum()
            
        wrong_predictions = (total_data - exact_matches)
        wrong_prediction_percentage = (wrong_predictions / total_data) * 100
        wrong_format_percentage = (wrong_format / total_data) * 100
        correct_format = total_data - wrong_format
        # wrong_calculation = correct_format - exact_matches
        wrong_calculation = (wrong_predictions - wrong_format)
        wrong_calculation_percentage = (wrong_calculation / correct_format) * 100

        return wrong_prediction_percentage, wrong_format_percentage, wrong_calculation_percentage

# Main code for processing and evaluation
def main():
    # Define the sample sizes for each task
    task_sizes = {
        "task1": 150,
        "task2": 250,
        "task3": 150,
        "task4": 150,
        "task5": 150,
        "task6": 150,
    }

    # Define the prompts and models
    prompts = ["bnap", "xlp", "xcot"]

    models = [
        "Mathstral_7B", 
        "Llama_3.3_70B", 
        "DeepSeek_R1_Distill_Llama_70B",
        "Gpt_4o", 
        "Gemini_2.0_flash", 
    ]

    # Tasks to process
    tasks = [1, 2, 3, 4, 5, 6]

    # Initialize results dictionary with the modified structure to store task-specific percentages
    results_by_task = {
        prompt: {
            model: {
                f"task{task}": {"wrong_format_percentage": 0, "wrong_calculation_percentage": 0}
                for task in tasks
            }
            for model in models
        }
        for prompt in prompts
    }

    # Loop through prompts, tasks, and models to collect results
    for prompt in prompts:
        for model in models:
            for task in tasks:
                taskId = f"task{task}"
                sample_size = task_sizes.get(taskId)
                filename = f"{model}_{prompt}_{taskId}_random_{sample_size}_responses.csv"
                csv_path = os.path.join("..","Model Responses", model, filename)
                
                try:
                    # Read the predictions DataFrame
                    print(f"Processing {csv_path}")
                    predictions_df = pd.read_csv(csv_path)
                    
                    # Evaluate the predictions
                    evaluator = Evaluation(predictions_df, taskId, prompt)
                    wrong_prediction_percentage, wrong_format_percentage, wrong_calculation_percentage = evaluator.evaluate_errors()
                    
                    # Store the task-specific percentages
                    results_by_task[prompt][model][taskId]["wrong_format_percentage"] = round(wrong_format_percentage, 2)
                    results_by_task[prompt][model][taskId]["wrong_calculation_percentage"] = round(wrong_calculation_percentage, 2)
                    
                except Exception as e:
                    print(f"Error processing {csv_path}: {e}")
                    # Default values already set to 0 during initialization
    
    # Create tables for each prompt showing wrong format and wrong calculation percentages by task and model
    for prompt in prompts:
        print(f"\n\n--------------------------------------------{prompt.upper()} Results (Run 2)--------------------------------------------")
        
        # Create DataFrame for the current prompt
        rows = []
        for model in models:
            # First row for wrong format percentages
            wf_row = [model, "WF%"]
            for task in tasks:
                taskId = f"task{task}"
                wf_row.append(results_by_task[prompt][model][taskId]["wrong_format_percentage"])
            rows.append(wf_row)
            
            # Second row for wrong calculation percentages
            wc_row = ["", "WC%"]
            for task in tasks:
                taskId = f"task{task}"
                wc_row.append(results_by_task[prompt][model][taskId]["wrong_calculation_percentage"])
            rows.append(wc_row)
        
        # Create DataFrame
        columns = ["Model", "Metric"] + [f"Task {task}" for task in tasks]
        df_prompt = pd.DataFrame(rows, columns=columns)
        
        # Display the table
        print(df_prompt.to_string(index=False))
        
        # Save to CSV
        # df_prompt.to_csv(f"{prompt}_error_percentages_by_task.csv", index=False)
    
    # Optional: Create a consolidated CSV with all prompts
    # This will create a multi-index DataFrame with prompt, model, and metric as indices
    rows_all = []
    for prompt in prompts:
        for model in models:
            for metric, label in [("wrong_format_percentage", "WF%"), ("wrong_calculation_percentage", "WC%")]:
                row = [prompt, model, label]
                for task in tasks:
                    taskId = f"task{task}"
                    row.append(results_by_task[prompt][model][taskId][metric])
                rows_all.append(row)
    
    columns_all = ["Prompt", "Model", "Metric"] + [f"Task {task}" for task in tasks]
    # df_all = pd.DataFrame(rows_all, columns=columns_all)
    # df_all.to_csv("all_error_percentages_by_task.csv", index=False)
    
    # print("\nDetailed error percentages by task saved to CSV files.")

if __name__ == "__main__":
    main()