#Pipeline

##Mount Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
cd '/content/drive/My Drive/Code Documentation Project/Hallucination/OpenJ9/' #ElasticSearch

/content/drive/My Drive/Code Documentation Project/Hallucination/OpenJ9


##Load libraries


In [None]:
!pip install langchain
!pip install openai
!pip install langchain-experimental
!pip install langchainhub

In [None]:
from langchain import OpenAI, SQLDatabase
from langchain_experimental.sql import SQLDatabaseChain
from langchain import OpenAI, ConversationChain

import os
import sqlite3
from operator import itemgetter

from langchain.prompts import ChatPromptTemplate,FewShotChatMessagePromptTemplate
from langchain.chat_models import ChatOpenAI
from langchain.schema.output_parser import StrOutputParser
from langchain.schema.runnable import RunnablePassthrough, RunnableLambda
from langchain import PromptTemplate
from langchain.chat_models import ChatOpenAI
from langchain.chains import SequentialChain, LLMChain
from langchain.output_parsers import PydanticOutputParser
from langchain.pydantic_v1 import BaseModel, Field

##ChatGPT



In [None]:
# Set the OpenAI API key as an environment variable
os.environ["OPENAI_API_KEY"] = "your-api-key-here"

# Initialize the OpenAI language model with a specified temperature
llm = OpenAI(temperature=0)

# Load two different SQLite databases from the provided URIs
db = SQLDatabase.from_uri("sqlite:///Benchmark/OpenJ9_benchmark_issues_unstructured.db")
db_cfg = SQLDatabase.from_uri("sqlite:///Benchmark/OpenJ9_benchmark_issues_structured.db")


In [None]:

def retriever(query, cfg=False):
    """
    Retrieves data based on the provided query and configuration.

    Parameters:
    query (str): The SQL query to be executed.
    cfg (bool): Flag to determine which database configuration to use.
                If True, use the structured database. If False, use the unstructured database.

    Returns:
    str: The result of the query execution.
    """
    if cfg:
        # If cfg is True, use the structured database
        print('Using structured database (cfg)')
        db_chain_cfg = SQLDatabaseChain.from_llm(llm, db_cfg, verbose=True, use_query_checker=True, top_k=40)
        return db_chain_cfg.run(query)
    else:
        # If cfg is False, use the unstructured database
        print('Using unstructured database (normal)')
        db_chain_normal = SQLDatabaseChain.from_llm(llm, db, verbose=True, use_query_checker=True, top_k=40)
        return db_chain_normal.run(query)

##Query Preprocessor

In [2]:
# Few-shot examples for Query Preprocessor
examples = [
    {
        "input": "Find unresolved issues with no activity in the last 6 months",
        "output": "Select issue numbers of open issues with last activity date older than 6 months ago."
    },
    {
        "input": "Suggest existing labels to tag issue 18608?",
        "output": "List all existing labels and find suitable one for issue 18608 based on its content"
    },
    {
        "input": "Is issue 18102 and 18669 similar?",
        "output": "Compare the exceptions, stack traces, and descriptions of issues 18102 and 18669 to determine similarity."
    },
    {
        "input": "Are there any issues similar to issue 18669?",
        "output": "Identify issues with exceptions similar to those in issue 18669."
    },
    {
        "input": "How many times did the internal grinder tests fail in issue 17852?",
        "output": "Extract the number of internal grinder test failures mentioned in issue 17852."
    }
]


In [None]:
# Define example messages for the prompt
example_prompt = ChatPromptTemplate.from_messages(
    [
        ("human", "{input}"),  # User's input message
        ("ai", "{output}"),   # AI's response message
    ]
)

# Create a few-shot prompt template using the defined examples
few_shot_prompt = FewShotChatMessagePromptTemplate(
    example_prompt=example_prompt,
    examples=examples,  # List of example input-output pairs
)

# Define the main chat prompt template
prompt = ChatPromptTemplate.from_messages([
    ("system", """You need to extract query information from a database of issues.
     Your task is to rephrase the question to make it more concise and direct, without altering its core intent or specificity.
     Questions are of four intents: Yes/No, Fact, Summarization, and List.
     1. Yes/No: Change 'is/are/have there issues' to check if there are any issues with the provided condition.
     2. List: Change 'Find issues' to 'List issue numbers' with the provided condition.
     3. Summarization: Summarize the contents from issue title, exceptions, body, and labels.
     4. Fact: Extract the fact.
     Retain keywords and intent as it is.
     Follow these examples:"""),  # System message providing instructions on how to process the input

    # Include few-shot examples for guidance
    few_shot_prompt,

    # New user question to be processed
    ("user", "{question}"),
])

# Create a pipeline for generating responses from the prompt using ChatOpenAI and parsing the output
question_gen = prompt | ChatOpenAI(temperature=0) | StrOutputParser()

# Documentation:
# - `ChatPromptTemplate.from_messages` is used to define the structure of the chat prompt, specifying how messages are formatted and handled.
# - `FewShotChatMessagePromptTemplate` is used to incorporate example input-output pairs to guide the model in generating appropriate responses.
# - The `system` message provides instructions for rephrasing questions to be more concise and direct while maintaining their core intent and specificity.
# - The `ChatOpenAI` model is used to generate responses based on the defined prompt, with a temperature of 0 for deterministic output.
# - `StrOutputParser` parses the output from the model to ensure it meets the required format.

  warn_deprecated(


##Chain of Verification - CoVe

In [None]:
# Initialize the OpenAI Chat model with deterministic output (temperature=0)
llm = ChatOpenAI(temperature=0)

# Define the input variables that will be used in the prompt template
input_variables = ["query"]

# Define the template for the base response prompt
base_response_template = """Question: {query} Answer:"""

# Create a PromptTemplate instance using the input variables and the template
base_response_prompt_template = PromptTemplate(
    input_variables=input_variables,  # Variables that will be replaced in the template
    template=base_response_template   # The template string with placeholders
)

# Create an LLMChain instance that combines the language model and the prompt template
base_response_chain = LLMChain(
    llm=llm,  # The language model to use for generating responses
    prompt=base_response_prompt_template,  # The prompt template to format the input
    output_key=base_response_output_key  # Key to store the output in the chain
)

# Documentation:
# - `ChatOpenAI(temperature=0)` initializes the OpenAI language model with a temperature of 0 for consistent, deterministic output.
# - `input_variables` specifies the variables that will be used in the prompt template. In this case, only "query".
# - `PromptTemplate` is used to create a prompt format where the variable `query` will be inserted into the `base_response_template`.
# - `base_response_chain` is an `LLMChain` that combines the language model and the prompt template to generate a response based on the input query.
# - `base_response_output_key` specifies the key under which the output from the language model will be stored.

  warn_deprecated(


In [None]:
def verify(query):
    """
    Verifies the response to a user query by generating and answering verification questions.

    Args:
    - query (str): The user's input query to be verified.

    Returns:
    - intermediate_result: The intermediate result from the verification process.
    - verify_results_str (str): String containing the verification questions and their answers.
    """

    # Define input variables for the prompt templates
    input_variables = ["query"]
    base_response_output_key = "base_response"

    # Define the template for generating the base response
    base_response_template = """Question: {query} Answer:"""

    # Create a PromptTemplate instance for the base response
    base_response_prompt_template = PromptTemplate(
        input_variables=input_variables,
        template=base_response_template
    )

    # Create an LLMChain for generating the base response
    base_response_chain = LLMChain(
        llm=llm,
        prompt=base_response_prompt_template,
        output_key=base_response_output_key
    )

    # Choose the database based on the `cfg` flag
    if(cfg):
        db = SQLDatabase.from_uri("sqlite:///Benchmark/OpenJ9_benchmark_issues_structured.db")
    else:
        db = SQLDatabase.from_uri("sqlite:///Benchmark/OpenJ9_benchmark_issues_unstructured.db")

    # Define the database chain for normal queries
    db_chain_normal = SQLDatabaseChain.from_llm(
        llm,
        db,
        verbose=True,
        use_query_checker=True,
        top_k=40,
        output_key=base_response_output_key
    )

    # Function to run queries using the normal database chain
    def normal_retriever(query):
        return db_chain_normal.run(query)

    # Define the template for generating verification questions
    plan_verifications_template = """
    Given the below Question and answer, generate a series of verification questions that test the factual claims in the original baseline response.
    For example, if part of a longform model response contains the statement “The 2 exception types found in the issue report are java.io.EOFException, AssertionError”, then one possible
    verification question to check those data could be “Can java.io.EOFException be found in existing issue reports? If there is an issue number and asking about this issue only, focus on it.”

    Question: {query}
    Answer: {base_response}

    <fact in query and passage>, <verification question, generated by combining the query and the fact>

    {format_instructions}
    """

    # Define the output model for verification questions
    class PlanVerificationsOutput(BaseModel):
        query: str = Field(description="The user's query", default="")
        base_response: str = Field(description="The response to the user's query", default="")
        facts_and_verification_questions: dict[str, str] = Field(
            description="Facts (as the dictionary keys) extracted from the response and verification questions related to the query (as the dictionary values)",
            default=""
        )

    try:
        # Create a PydanticOutputParser instance for parsing verification results
        plan_verifications_output_parser = PydanticOutputParser(
            pydantic_object=PlanVerificationsOutput
        )

        # Create a PromptTemplate for generating verification questions
        plan_verifications_prompt_template = PromptTemplate(
            input_variables=input_variables + [base_response_output_key],
            template=plan_verifications_template,
            partial_variables={
                "format_instructions": plan_verifications_output_parser.get_format_instructions()
            },
        )

        # Create an LLMChain for generating verification questions
        plan_verifications_chain = LLMChain(
            llm=llm,
            prompt=plan_verifications_prompt_template,
            output_key="output",
            output_parser=plan_verifications_output_parser,
        )

        # Create a SequentialChain to combine the database chain and the verification chain
        answer_and_plan_verification = SequentialChain(
            chains=[db_chain_normal, plan_verifications_chain],
            input_variables=["query"],
            output_variables=["output"],
            verbose=True
        )

        # Run the SequentialChain to get the intermediate result
        intermediate_result = answer_and_plan_verification.run(query)

        # Extract the claimed facts and verification questions from the result
        claimed_facts = list(intermediate_result.facts_and_verification_questions.keys())
        verification_questions = list(
            intermediate_result.facts_and_verification_questions.values()
        )

        # Initialize an empty string to collect verification results
        verify_results_str = ""
        verify_input_variables = ["question"]
        verify_output_key = "answer"
        verify_template = """{question}"""

        # Create a PromptTemplate for answering verification questions
        verify_prompt_template = PromptTemplate(
            input_variables=verify_input_variables,
            template=verify_template
        )

        # Create a SQLDatabaseChain for answering verification questions
        verify_chain = SQLDatabaseChain.from_llm(
            llm,
            db,
            verbose=True,
            use_query_checker=True,
            output_key=verify_output_key
        )

        # Answer each verification question and compile results
        for i in range(len(verification_questions)):
            claimed_fact = claimed_facts[i]
            question = verification_questions[i]
            answer = verify_chain.run(question)
            answer = answer.lstrip("\n")
            verify_results_str += f"Question: {question}\nAnswer: {answer}\n\n"

        return intermediate_result, verify_results_str

    except Exception as e:
        # Handle exceptions and return default values
        return None, ''

In [None]:
def Cove(query):
    """
    Revises the response to a query based on verification results to ensure consistency.

    Args:
    - query (str): The user's input query to be verified and answered.

    Returns:
    - final_response (str): The revised response that is consistent with the verified information.
    """
    try:
        # Get the intermediate result and verification results for the query
        intermediate_result, verify_results_str = verify(query)

        # Check if intermediate result is None, indicating an issue with verification
        if intermediate_result is None:
            return ''

        # Define input variables and template for generating the final response
        final_response_input_variables = ["query", "base_response", "verify_results"]
        final_response_template = """Given the ORIGINAL_QUESTION and the ORIGINAL_RESPONSE,
        revise the ORIGINAL_RESPONSE (if applicable) such that it is consistent with information in VERIFIED_SOURCE as answer for ORIGINAL_QUESTION.
        Only keep consistent information.

        <ORIGINAL_QUESTION>
        {query}

        <ORIGINAL_RESPONSE>
        {base_response}

        <VERIFIED_SOURCE>
        {verify_results}

        Final response:
        """

        # Create a PromptTemplate instance for generating the final response
        final_response_prompt_template = PromptTemplate(
            input_variables=final_response_input_variables,
            template=final_response_template,
        )

        # Create an LLMChain instance for generating the final response
        final_response_chain = LLMChain(llm=llm, prompt=final_response_prompt_template)

        # Generate the final response using the LLMChain
        final_response = final_response_chain.run(
            query=intermediate_result.query,
            base_response=intermediate_result.base_response,
            # Use verification results to revise the response
            verify_results=verify_results_str,
        )

        # Print the final response for debugging or logging purposes
        print(final_response)

        return final_response
    except Exception as e:
        # Return an empty string if an exception occurs
        return ''


##Metamorphic Testing - MT

In [None]:
from langchain.chat_models import ChatOpenAI

def mutate_query(original_query, n_mutations=3):
    """
    Generates multiple variations of a given query while retaining its semantic meaning.

    Args:
    - original_query (str): The original question that needs to be mutated.
    - n_mutations (int): The number of different ways to mutate the original query (default is 3).

    Returns:
    - List[str]: A list of mutated queries.
    """
    # Template for generating query mutations
    template = """
    Generate {n_mutations} different ways to ask the following question, keeping the semantic meaning the same:\n\n'{input}'\n\nMutations:
    """

    # Initialize the ChatOpenAI model with a deterministic temperature setting
    llm = ChatOpenAI(temperature=0)

    # Create a PromptTemplate instance with the mutation prompt template
    prompt_template = PromptTemplate.from_template(template=template.format(n_mutations=n_mutations))

    # Create an LLMChain instance using the model and prompt template
    chain = LLMChain(llm=llm, prompt=prompt_template)

    # Generate mutations using the chain
    response = chain.predict(input=original_query)
    print(response)  # For debugging or logging purposes

    # Extract and return the mutated queries
    # Assuming the response is a string, split it by newlines
    mutated_queries = response.split("\n")  # Adjust this based on the actual format of 'response'

    # Filter out empty lines or irrelevant parts
    mutated_queries = [query.strip() for query in mutated_queries if query.strip() and not query.strip().startswith("Mutations")]

    return mutated_queries


In [None]:
def MT_answer(original_query, init_response):
    """
    Processes an initial response to a query by generating variations of the query,
    retrieving answers for each variation, and then revising the initial response based on verified information.

    Args:
    - original_query (str): The original question for which responses are being verified.
    - init_response (str): The initial response to the original query that may need revision.

    Returns:
    - str: The final revised response that is consistent with verified information.
    """

    # Generate multiple mutations of the original query
    mutated_queries = mutate_query(original_query)
    print(mutated_queries)  # For debugging purposes

    # Initialize an empty string to collect verification results
    verify_results_str = ""

    # Retrieve answers for each mutated query and collect verification information
    for q in mutated_queries:
        a = retriever(q)  # Use the retriever function to get answers for the mutated queries
        verify_results_str += f"Question: {q}\nAnswer: {a}\n\n"

    # Define the original query and the initial response
    query = original_query
    base_response = init_response

    # Print the base response and verification results for debugging
    print('BASE--------')
    print(base_response)
    print('verify-----')
    print(verify_results_str)

    # Define the final response prompt template
    final_response_input_variables = ["query", "base_response", "verify_results"]
    final_response_template = """Given the ORIGINAL_QUESTION and the ORIGINAL_RESPONSE,
    revise the ORIGINAL_RESPONSE (if applicable) such that it is consistent with information in VERIFIED_SOURCE as answer for ORIGINAL_QUESTION.
    Only keep consistent information.

    <ORIGINAL_QUESTION>
    {query}

    <ORIGINAL_RESPONSE>
    {base_response}

    <VERIFIED_SOURCE>
    {verify_results}

    Final response:
    """

    # Create a PromptTemplate instance for generating the final response
    final_response_prompt_template = PromptTemplate(
        input_variables=final_response_input_variables,
        template=final_response_template,
    )

    # Create an LLMChain instance for generating the final response
    final_response_chain = LLMChain(llm=llm, prompt=final_response_prompt_template)

    # Generate the final response by running the LLMChain
    final_response = final_response_chain.run(
        query=query,
        base_response=base_response,
        verify_results=verify_results_str,
    )

    # Print the final response for debugging purposes
    print('final-----')
    print("Final Response:", final_response)

    return final_response


##Configuration

In [None]:
# Initialize the name variable as an empty string
name = ''

# Flags for different configurations
cfg = False
change_query = False
COVE = False
MT = False

# Append suffixes to name based on the configuration flags
if cfg:
    name += '_CFG'  # Append '_CFG' if cfg is True
if change_query:
    name += '_Query'  # Append '_Query' if change_query is True
if COVE:
    name += '_COVE'  # Append '_COVE' if COVE is True
if MT:
    name += '_MT'  # Append '_MT' if MT is True

# Special case: if all flags are True, set name to '_CIMBUR'
if cfg and change_query and COVE and MT:
    name = '_CIMBUR'

# Special case: if none of the flags are True, set name to '_LLM'
if not (cfg or change_query or COVE or MT):
    name = '_LLM'

# Print the final value of name
print(name)


_COVE


In [None]:
import time

def process_questions(questions):
    # Initialize a list to store answers
    answers = []

    # Iterate over each question in the provided list
    for i, q in enumerate(questions):
        try:
            # If change_query is enabled, generate a new query
            if change_query:
                print('Generating query')
                q = question_gen.invoke({"question": q})

            # If MT (Mutation Testing) is enabled
            if MT:
                if COVE:
                    print('Running MT and COVE')
                    # Use COVE to get initial response
                    init_response = Cove(q)
                else:
                    # Retrieve initial response without COVE
                    init_response = retriever(q)
                # Get final answer using MT approach
                answer = MT_answer(q, init_response)

            # If only COVE is enabled
            elif COVE:
                print('Running COVE')
                answer = Cove(q)

            # Default case: use retriever
            else:
                answer = retriever(q)

            # Append the answer to the list
            answers.append(answer)
            print(f'-----------Final-------------: {answer}\n')

        except Exception as e:
            # Handle any exceptions that occur
            print(f"An error occurred while processing question {i+1}: {e}")
            answers.append(None)  # Append None if an error occurs or question is skipped

    # Return the list of answers
    return answers


#Benchmark

## T5 - Issue Backlog

###y/n

In [None]:
import pandas as pd

# Read the CSV file into a DataFrame
df = pd.read_csv('Benchmark/new/O-T5.csv')

# Extract non-null questions for processing
single_q = df.loc[df['T5-YNQ'].notnull(), 'T5-YNQ']

# Process questions to get answers
single_a = process_questions(single_q)

# Add a new column for answers
df[f'T5-YNA{name}'] = ''
df.loc[:len(single_a) - 1, f'T5-YNA{name}'] = single_a

# Save the updated DataFrame to a CSV file
df.to_csv('Benchmark/new/O-T5.csv', index=False)

# Get actual and expected answers from the DataFrame
actual_answers = df.loc[df[f'T5-YNA{name}'].notnull(), f'T5-YNA{name}']
expected_answers = df.loc[df['T5-YNA'].notnull(), 'T5-YNA']

# Call the function to classify and evaluate answers
results, correct, correctness = classify_and_evaluate(actual_answers, expected_answers)
print("Correctness:", correctness)

# Add a new column for evaluation results
df[f'T5-YN{name}_R'] = ''
df[f'T5-YN{name}_R'][:len(results)] = results

# Save the updated DataFrame to a CSV file
df.to_csv('Benchmark/new/O-T5.csv', index=False)

###sum

In [None]:
import pandas as pd
df = pd.read_csv('Benchmark/new/O-T5.csv')

single_q = df.loc[df['T5-SQ'].notnull(), 'T5-SQ']
single_a=process_questions(single_q)

df[f'T5-SA{name}'] = ''
df.loc[:len(single_a)-1, f'T5-SA{name}'] = single_a

df.to_csv('Benchmark/new/O-T5.csv', index=False)

# Get actual and expected answers
actual_answers = single_a
expected_answers = df.loc[df['T5-SA'].notnull(), 'T5-SA']

# Call the function
results, correct, correctness = evaluate_summary(actual_answers, expected_answers)
print("Correctness:", correctness)

df[f'T5-S{name}_R'] = ''
df[f'T5-S{name}_R'][:len(results)] = results

df.to_csv('Benchmark/new/O-T5.csv', index=False)

###fact

In [None]:

import pandas as pd
df = pd.read_csv('Benchmark/new/O-T5.csv')

single_q = df.loc[df['T5-FQ'].notnull(), 'T5-FQ']
single_a=process_questions(single_q)

df[f'T5-FA{name}'] = ''
df.loc[:len(single_a)-1, f'T5-FA{name}'] = single_a

df.to_csv('Benchmark/new/O-T5.csv', index=False)

# Get actual and expected answers
actual_answers = single_a
expected_answers = df.loc[df['T5-FA'].notnull(), 'T5-FA']
expected_answers = expected_answers.astype(str)
# Call the function
results, correct, correctness = evaluate_fact(actual_answers, expected_answers)
print("Correctness:", correctness)

df[f'T5-FA{name}_R'] = ''
df[f'T5-FA{name}_R'][:len(results)] = results

df.to_csv('Benchmark/new/O-T5.csv', index=False)

##T4 - Issue Labelling

###y/n

In [None]:
import pandas as pd
df = pd.read_csv('Benchmark/new/O-T4.csv')

single_q = df.loc[df['T4-YNQ'].notnull(), 'T4-YNQ']
single_a=process_questions(single_q)

df[f'T4-YNA{name}'] = ''
df.loc[:len(single_a)-1, f'T4-YNA{name}'] = single_a

df.to_csv('Benchmark/new/O-T4.csv', index=False)

# Get actual and expected answers
actual_answers = df.loc[df[f'T4-YNA{name}'].notnull(), f'T4-YNA{name}']
expected_answers = df.loc[df['T4-YNA'].notnull(), 'T4-YNA']

# Call the function
results, correct, correctness = classify_and_evaluate(actual_answers, expected_answers)
print("Correctness:", correctness)

df[f'T4-YN{name}_R'] = ''
df[f'T4-YN{name}_R'][:len(results)] = results


df.to_csv('Benchmark/new/O-T4.csv', index=False)

###sum

In [None]:
import pandas as pd
df = pd.read_csv('Benchmark/new/O-T4.csv')

single_q = df.loc[df['T4-SQ'].notnull(), 'T4-SQ']
single_a=process_questions(single_q)

df[f'T4-SA{name}'] = ''
df.loc[:len(single_a)-1, f'T4-SA{name}'] = single_a

df.to_csv('Benchmark/new/O-T4.csv', index=False)

# Get actual and expected answers
actual_answers = single_a
expected_answers = df.loc[df['T4-SA'].notnull(), 'T4-SA']

# Call the function
results, correct, correctness = evaluate_summary(actual_answers, expected_answers)
print("Correctness:", correctness)

df[f'T4-S{name}_R'] = ''
df[f'T4-S{name}_R'][:len(results)] = results

df.to_csv('Benchmark/new/O-T4.csv', index=False)
sum_correct=correct

###fact

In [None]:
import pandas as pd
df = pd.read_csv('Benchmark/new/O-T4.csv')

single_q = df.loc[df['T4-FQ'].notnull(), 'T4-FQ']
single_a=process_questions(single_q)

df[f'T4-FA{name}'] = ''
df.loc[:len(single_a)-1, f'T4-FA{name}'] = single_a

df.to_csv('Benchmark/new/O-T4.csv', index=False)

# Get actual and expected answers
actual_answers = single_a
expected_answers = df.loc[df['T4-FA'].notnull(), 'T4-FA']

# Call the function
results, correct, correctness = evaluate_fact(actual_answers, expected_answers)
print("Correctness:", correctness)

df[f'T4-FA{name}_R'] = ''
df[f'T4-FA{name}_R'][:len(results)] = results

df.to_csv('Benchmark/new/O-T4.csv', index=False)

##T3 - Issue Summary


###sum

In [None]:
import pandas as pd
df = pd.read_csv('Benchmark/new/O-T3.csv')

single_q = df.loc[df['T3-SQ'].notnull(), 'T3-SQ']
single_a=process_questions(single_q)

df[f'T3-SA{name}'] = ''
df.loc[:len(single_a)-1, f'T3-SA{name}'] = single_a

df.to_csv('Benchmark/new/O-T3.csv', index=False)

# Get actual and expected answers
actual_answers = single_a
expected_answers = df.loc[df['T3-SA'].notnull(), 'T3-SA']

# Call the function
results, correct, correctness = evaluate_summary(actual_answers, expected_answers)
print("Correctness:", correctness)

df[f'T3-S{name}_R'] = ''
df[f'T3-S{name}_R'][:len(results)] = results

df.to_csv('Benchmark/new/O-T3.csv', index=False)


###fact

In [None]:
import pandas as pd
df = pd.read_csv('Benchmark/new/O-T3.csv')

single_q = df.loc[df['T3-FQ'].notnull(), 'T3-FQ']
single_a=process_questions(single_q)

df[f'T3-FA{name}'] = ''
df.loc[:len(single_a)-1, f'T3-FA{name}'] = single_a

df.to_csv('Benchmark/new/O-T3.csv', index=False)

# Get actual and expected answers
actual_answers = single_a
expected_answers = df.loc[df['T3-FA'].notnull(), 'T3-FA']

# Call the function
results, correct, correctness = evaluate_fact(actual_answers, expected_answers)
print("Correctness:", correctness)

df[f'T3-FA{name}_R'] = ''
df[f'T3-FA{name}_R'][:len(results)] = results

df.to_csv('Benchmark/new/O-T3.csv', index=False)


##T2 - Issue Trend

###y/n

In [None]:
import pandas as pd
df = pd.read_csv('Benchmark/new/O-T2.csv')

single_q = df.loc[df['T2-YNQ'].notnull(), 'T2-YNQ']
single_a=process_questions(single_q)

df[f'T2-YNA{name}'] = ''
df.loc[:len(single_a)-1, f'T2-YNA{name}'] = single_a

df.to_csv('Benchmark/new/O-T2.csv', index=False)

# Get actual and expected answers
actual_answers = df.loc[df[f'T2-YNA{name}'].notnull(), f'T2-YNA{name}']
expected_answers = df.loc[df['T2-YNA'].notnull(), 'T2-YNA']

# Call the function
results, correct, correctness = classify_and_evaluate(actual_answers, expected_answers)
print("Correctness:", correctness)

df[f'T2-YN{name}_R'] = ''
df[f'T2-YN{name}_R'][:len(results)] = results


df.to_csv('Benchmark/new/O-T2.csv', index=False)


###sum

In [None]:
import pandas as pd
df = pd.read_csv('Benchmark/new/O-T2.csv')

single_q = df.loc[df['T2-SQ'].notnull(), 'T2-SQ']
single_a=process_questions(single_q)

df[f'T2-SA{name}'] = ''
df.loc[:len(single_a)-1, f'T2-SA{name}'] = single_a

df.to_csv('Benchmark/new/O-T2.csv', index=False)

# Get actual and expected answers
actual_answers = single_a
expected_answers = df.loc[df['T2-SA'].notnull(), 'T2-SA']

# Call the function
results, correct, correctness = evaluate_summary(actual_answers, expected_answers)
print("Correctness:", correctness)

df[f'T2-S{name}_R'] = ''
df[f'T2-S{name}_R'][:len(results)] = results

df.to_csv('Benchmark/new/O-T2.csv', index=False)

###fact

In [None]:
import pandas as pd
df = pd.read_csv('Benchmark/new/O-T2.csv')

single_q = df.loc[df['T2-FQ'].notnull(), 'T2-FQ']
single_a=process_questions(single_q)

df[f'T2-FA{name}'] = ''
df.loc[:len(single_a)-1, f'T2-FA{name}'] = single_a

df.to_csv('Benchmark/new/O-T2.csv', index=False)

# Get actual and expected answers
actual_answers = single_a
expected_answers = df.loc[df['T2-FA'].notnull(), 'T2-FA']

# Call the function
results, correct, correctness = evaluate_fact(actual_answers, expected_answers)
print("Correctness:", correctness)

df[f'T2-FA{name}_R'] = ''
df[f'T2-FA{name}_R'][:len(results)] = results

df.to_csv('Benchmark/new/O-T2.csv', index=False)


##T1 - Issue Analysis (Multiple)

###y/n

In [None]:
import pandas as pd
df = pd.read_csv('Benchmark/new/O-T1M.csv')

single_q = df.loc[df['T1M-YNQ'].notnull(), 'T1M-YNQ']
single_a=process_questions(single_q)

df[f'T1M-YNA{name}'] = ''
df.loc[:len(single_a)-1, f'T1M-YNA{name}'] = single_a

df.to_csv('Benchmark/new/O-T1M.csv', index=False)

# Get actual and expected answers
actual_answers = df.loc[df[f'T1M-YNA{name}'].notnull(), f'T1M-YNA{name}']
expected_answers = df.loc[df['T1M-YNA'].notnull(), 'T1M-YNA']

# Call the function
results, correct, correctness = classify_and_evaluate(actual_answers, expected_answers)
print("Correctness:", correctness)

df[f'T1M-YN{name}_R'] = ''
df[f'T1M-YN{name}_R'][:len(results)] = results


df.to_csv('Benchmark/new/O-T1M.csv', index=False)


###sum

In [None]:
import pandas as pd
df = pd.read_csv('Benchmark/new/O-T1M.csv')

single_q = df.loc[df['T1M-SQ'].notnull(), 'T1M-SQ']
single_a=process_questions(single_q)

df[f'T1M-SA{name}'] = ''
df.loc[:len(single_a)-1, f'T1M-SA{name}'] = single_a

df.to_csv('Benchmark/new/O-T1M.csv', index=False)

# Get actual and expected answers
actual_answers = single_a
expected_answers = df.loc[df['T1M-SA'].notnull(), 'T1M-SA']

# Call the function
results, correct, correctness = evaluate_summary(actual_answers, expected_answers)
print("Correctness:", correctness)

df[f'T1M-S{name}_R'] = ''
df[f'T1M-S{name}_R'][:len(results)] = results

df.to_csv('Benchmark/new/O-T1M.csv', index=False)
sum_correct=correct

###fact

In [None]:
import pandas as pd
df = pd.read_csv('Benchmark/new/O-T1M.csv')

single_q = df.loc[df['T1M-FQ'].notnull(), 'T1M-FQ']
single_a=process_questions(single_q)

df[f'T1M-FA{name}'] = ''
df.loc[:len(single_a)-1, f'T1M-FA{name}'] = single_a

df.to_csv('Benchmark/new/O-T1M.csv', index=False)

# Get actual and expected answers
actual_answers = single_a
expected_answers = df.loc[df['T1M-FA'].notnull(), 'T1M-FA']

# Call the function
results, correct, correctness = evaluate_fact(actual_answers, expected_answers)
print("Correctness:", correctness)

df[f'T1M-FA{name}_R'] = ''
df[f'T1M-FA{name}_R'][:len(results)] = results

df.to_csv('Benchmark/new/O-T1M.csv', index=False)


##T1 - Issue Analysis (Single)

###y/n

In [None]:
import pandas as pd
df = pd.read_csv('Benchmark/new/O-T1S.csv')

single_q = df.loc[df['T1S-YNQ'].notnull(), 'T1S-YNQ']
single_a=process_questions(single_q)

df[f'T1S-YNA{name}'] = ''
df.loc[:len(single_a)-1, f'T1S-YNA{name}'] = single_a

df.to_csv('Benchmark/new/O-T1S.csv', index=False)

# Get actual and expected answers
actual_answers = df.loc[df[f'T1S-YNA{name}'].notnull(), f'T1S-YNA{name}']
expected_answers = df.loc[df['T1S-YNA'].notnull(), 'T1S-YNA']

# Call the function
results, correct, correctness = classify_and_evaluate(actual_answers, expected_answers)
print("Correctness:", correctness)

yn_correct=correct

df[f'T1S-YN{name}_R'] = ''
df[f'T1S-YN{name}_R'][:len(results)] = results


df.to_csv('Benchmark/new/O-T1S.csv', index=False)




[1m> Entering new SQLDatabaseChain chain...[0m
Does issue 18400 affect jit component ?
SQLQuery:[32;1m[1;3mSELECT "number", "title", "labels" 
FROM issues 
WHERE "number" = 18400;[0m
SQLResult: [33;1m[1;3m[(18400, 'Apache Lucene CI builds sometimes fail with OpenJ9 specific issues', 'comp:jit, userRaised, project:MH')][0m
Answer:[32;1m[1;3mYes, issue 18400 does affect the jit component.[0m
[1m> Finished chain.[0m
-----------Final-------------: Yes, issue 18400 does affect the jit component.



[1m> Entering new SQLDatabaseChain chain...[0m
is issue 19014 an user raiser issue?
SQLQuery:[32;1m[1;3mSELECT issue_creator, labels 
FROM issues 
WHERE number = 19014;[0m
SQLResult: [33;1m[1;3m[('TemporaryRepos', 'comp:jit, userRaised')][0m
Answer:[32;1m[1;3mYes, issue 19014 is a user-raised issue.[0m
[1m> Finished chain.[0m
-----------Final-------------: Yes, issue 19014 is a user-raised issue.



[1m> Entering new SQLDatabaseChain chain...[0m
Is the issue 18082 fo

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df[f'T1S-YN{name}_R'][:len(results)] = results


###sum

In [None]:
import pandas as pd
df = pd.read_csv('Benchmark/new/O-T1S.csv')

single_q = df.loc[df['T1S-SQ'].notnull(), 'T1S-SQ']
single_a=process_questions(single_q)

df[f'T1S-SA{name}'] = ''
df.loc[:len(single_a)-1, f'T1S-SA{name}'] = single_a

df.to_csv('Benchmark/new/O-T1S.csv', index=False)

# Get actual and expected answers
actual_answers = single_a
expected_answers = df.loc[df['T1S-SA'].notnull(), 'T1S-SA']

# Call the function
results, correct, correctness = evaluate_summary(actual_answers, expected_answers)
print("Correctness:", correctness)

df[f'T1S-S{name}_R'] = ''
df[f'T1S-S{name}_R'][:len(results)] = results

df.to_csv('Benchmark/new/O-T1S.csv', index=False)


###fact

In [None]:
import pandas as pd
df = pd.read_csv('Benchmark/new/O-T1S-F.csv')

single_q = df.loc[df['T1S-FQ'].notnull(), 'T1S-FQ']
single_a=process_questions(single_q)

df[f'T1S-FA{name}'] = ''
df.loc[:len(single_a)-1, f'T1S-FA{name}'] = single_a

df.to_csv('Benchmark/new/O-T1S-F.csv', index=False)

# Get actual and expected answers
actual_answers = single_a
expected_answers = df.loc[df['T1S-FA'].notnull(), 'T1S-FA']

# Call the function
results, correct, correctness = evaluate_fact(actual_answers, expected_answers)
print("Correctness:", correctness)

df[f'T1S-FA{name}_R'] = ''
df[f'T1S-FA{name}_R'][:len(results)] = results

df.to_csv('Benchmark/new/O-T1S-F.csv', index=False)
fact_correct=correct


#Evaluation

###y/n

In [None]:
import pandas as pd
from transformers import pipeline

# Initialize the zero-shot classification pipeline
classifier = pipeline("zero-shot-classification")

In [None]:
from transformers import pipeline

# Initialize the zero-shot classification pipeline
classifier = pipeline("zero-shot-classification")

def classify_and_evaluate(actual_answers, expected_answers):
    """
    Classify actual answers using zero-shot classification and evaluate the results against expected answers.

    Parameters:
    - actual_answers (list of str): List of answers to be classified.
    - expected_answers (list of str): List of expected classifications.

    Returns:
    - results (list of int): List indicating correctness (1 for correct, 0 for incorrect).
    - correct (int): Number of correctly classified answers.
    - correctness (float): Proportion of correct classifications.
    """
    # Define classes for classification
    classes = ["Yes", "No"]

    correct = 0
    total = 0
    results = []

    for actual, expected in zip(actual_answers, expected_answers):
        print(actual)  # Print the actual answer for debugging
        print(expected)  # Print the expected answer for debugging

        total += 1
        # Classify the actual answer
        result = classifier(actual, candidate_labels=classes, hypothesis_template="This statement implies: {}.")

        # Check if the predicted label matches the expected label
        if result['labels'][0].lower() == expected.lower():
            correct += 1
            results.append(1)
            print(1)  # Debug output for correct classification
        else:
            results.append(0)
            print(0)  # Debug output for incorrect classification

    print('Correct ' + str(correct))  # Print the number of correct classifications
    correctness = correct / total  # Calculate the proportion of correct classifications
    return results, correct, correctness


###fact

In [None]:
pip install sentence-transformers

In [None]:
import re
from sentence_transformers import SentenceTransformer, util

# Initialize the model for semantic similarity
model = SentenceTransformer('all-MiniLM-L6-v2')

def extract_numbers(text):
    """
    Extracts all numeric values from the given text.

    Parameters:
    - text (str): The input text from which numbers are to be extracted.

    Returns:
    - list of str: A list of numeric strings found in the text.
    """
    return re.findall(r'\b\d+\b', text)

def clean_text(text):
    """
    Cleans the input text by removing non-alphanumeric characters and excess whitespace.

    Parameters:
    - text (str): The input text to be cleaned.

    Returns:
    - str: The cleaned text.
    """
    text = re.sub(r'[^\w\s:]', '', text)  # Remove punctuation except colons
    text = text.strip().replace('\n', '').replace('"', '').replace("'", '')
    return text

def containsOnlyNumbers(text):
    """
    Checks if the cleaned text contains only numbers.

    Parameters:
    - text (str): The input text to be checked.

    Returns:
    - bool: True if the text contains only numbers, False otherwise.
    """
    text = re.sub(r'[^\w\s]', '', text)  # Remove non-alphanumeric characters
    text = text.replace('\n', '').replace('and', '')

    all_numbers = ' '.join(extract_numbers(text))

    if text == all_numbers:
        return True

    return False

def evaluate_fact(actual_answers, expected_answers):
    """
    Evaluates the correctness of answers by comparing them with expected answers using semantic similarity.

    Parameters:
    - actual_answers (list of str): The list of actual answers to be evaluated.
    - expected_answers (list of str): The list of expected answers for comparison.

    Returns:
    - results (list of int): List indicating correctness (1 for correct, 0 for incorrect).
    - correct (int): Number of correct answers.
    - correctness (float): Proportion of correct answers.
    """
    results = []
    correct = 0

    for actual, expected in zip(actual_answers, expected_answers):

        if actual is None and expected is None:
            continue  # Skip if both are None

        if actual is None:
            actual = 'None'

        expected = expected.strip() if expected else ''
        actual = actual.strip() if actual else ''

        similarity = 0

        if expected.isdigit() or len(expected.split()) == 1:
            # Case: expected answer is a number or single word
            actual_s = set(clean_text(actual).split())
            expected_s = set(clean_text(expected).split())

            actual_s = set(word.lower() for word in actual_s)
            expected_s = set(word.lower() for word in expected_s)

            intersection = actual_s.intersection(expected_s)

            if intersection == set(expected.split()) or intersection:
                similarity = 1

        elif containsOnlyNumbers(expected):
            # Case: expected answer contains only numbers
            expected_num = sorted(extract_numbers(expected))
            expected_s = ' '.join(expected_num)
            min_limit = min(expected_num)
            max_limit = max(expected_num)

            actual_num = sorted(extract_numbers(actual))
            if actual_num:
                actual_num = [num for num in actual_num if num > min_limit or num <= max_limit]
                actual_s = ' '.join(actual_num)
                actual_embedding = model.encode(actual_s, convert_to_tensor=True)
                expected_embedding = model.encode(expected_s, convert_to_tensor=True)
                similarity = util.pytorch_cos_sim(actual_embedding, expected_embedding).item()

        else:
            # Case: general text comparison
            actual_embedding = model.encode(actual, convert_to_tensor=True)
            expected_embedding = model.encode(expected, convert_to_tensor=True)
            similarity = util.pytorch_cos_sim(actual_embedding, expected_embedding).item()

        threshold = 0.7
        is_correct = 1 if similarity >= threshold else 0
        if is_correct == 1:
            correct += 1
        results.append(is_correct)

    correctness = correct / len(results) if results else 0
    return results, correct, correctness


###sum

In [None]:
from sentence_transformers import SentenceTransformer, util
model = SentenceTransformer('all-MiniLM-L6-v2')

In [None]:
from sentence_transformers import SentenceTransformer, util

# Initialize the model
model = SentenceTransformer('all-MiniLM-L6-v2')

def evaluate_summary(actual_answers, expected_answers):
    """
    Evaluates the correctness of actual answers compared to expected answers based on semantic similarity.

    Args:
        actual_answers (list of str): List of actual answers.
        expected_answers (list of str): List of expected answers.

    Returns:
        results (list of int): List indicating correctness of each comparison (1 for correct, 0 for incorrect).
        correct (int): Total number of correct comparisons.
        correctness (float): Proportion of correct comparisons.
    """
    results = []
    correct = 0

    # Evaluate correctness through semantic similarity
    for actual, expected in zip(actual_answers, expected_answers):
        # Handle None values
        if actual is None:
            actual = 'None'

        # Encode answers to embeddings
        actual_embedding = model.encode(actual, convert_to_tensor=True)
        expected_embedding = model.encode(expected, convert_to_tensor=True)

        # Compute cosine similarity
        similarity = util.pytorch_cos_sim(actual_embedding, expected_embedding).item()

        # Threshold for correctness (can be adjusted)
        threshold = 0.7
        is_correct = 1 if similarity >= threshold else 0

        # Update count and results list
        if is_correct == 1:
            correct += 1
        results.append(is_correct)

        # Print debug information
        print(f'Actual: {actual}')
        print(f'Expected: {expected}')
        print(f'Similarity: {similarity:.4f}')
        print(f'Correct: {is_correct}')

    # Calculate and print overall correctness
    correctness = correct / len(results) if results else 0
    print(f'Correct: {correct}')
    print(f'Correctness: {correctness:.4f}')

    return results, correct, correctness


#Accuracy of CFG

In [None]:
import csv
import re
from sentence_transformers import SentenceTransformer, util

class StackTraceParser:
    def __init__(self):
        """
        Initializes the StackTraceParser with regex patterns for parsing stack traces.
        """
        self.exception_pattern = r'^(?:Caused by: )?(?:Exception in thread ".*" )?([\w.$]+)(?:: (.+))?$'
        self.code_details_pattern = r'^at ([\w./@$]+)\.([\w<>$]+)\(([\w.]+):?(\d+)?\)'

    def parse(self, stack_trace):
        """
        Parses a stack trace string and extracts elements such as exception type, exception message,
        class, method, file, and line number.

        Parameters:
            stack_trace (str): The stack trace string to be parsed.

        Returns:
            parsed_elements (list): A list of tuples representing parsed elements from the stack trace.
        """
        lines = stack_trace.strip().split('\n')
        parsed_elements = []

        for line in lines:
            line = line.strip()
            exception_match = re.match(self.exception_pattern, line)
            if exception_match:
                parsed_elements.append(('ExceptionType', exception_match.group(1)))
                if exception_match.group(2):
                    parsed_elements.append(('ExceptionMessage', exception_match.group(2)))
            else:
                code_match = re.match(self.code_details_pattern, line)
                if code_match:
                    class_method = code_match.group(1)
                    method = code_match.group(2)
                    file = code_match.group(3)
                    line_num = code_match.group(4)

                    parsed_elements.append(('ClassElem', class_method))
                    if method:
                        parsed_elements.append(('MethodElem', method))
                    if file:
                        parsed_elements.append(('FileElem', file))
                    if line_num:
                        parsed_elements.append(('LineElem', line_num))

        return parsed_elements


def calculate_accuracy(parsed_elements, expected_elements):
    """
    Calculates the accuracy of parsed elements by comparing them with expected elements.

    Parameters:
        parsed_elements (list): List of parsed elements from the stack trace.
        expected_elements (list): List of expected elements for comparison.

    Returns:
        accuracy (float): The accuracy of the parsed elements.
    """
    correct_elements = sum(1 for parsed in parsed_elements if parsed in expected_elements)
    total_elements = len(expected_elements)
    return correct_elements / total_elements if total_elements > 0 else 0


def calculate_precision_recall(parsed_elements, expected_elements):
    """
    Calculates the precision and recall of parsed elements.

    Parameters:
        parsed_elements (list): List of parsed elements from the stack trace.
        expected_elements (list): List of expected elements for comparison.

    Returns:
        precision (float): The precision of the parsed elements.
        recall (float): The recall of the parsed elements.
    """
    correct_elements = sum(1 for parsed in parsed_elements if parsed in expected_elements)
    precision = correct_elements / len(parsed_elements) if parsed_elements else 0
    recall = correct_elements / len(expected_elements) if expected_elements else 0
    return precision, recall


def calculate_f1_score(precision, recall):
    """
    Calculates the F1 score based on precision and recall.

    Parameters:
        precision (float): The precision value.
        recall (float): The recall value.

    Returns:
        f1_score (float): The F1 score.
    """
    return 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0


def process_csv(input_file, output_file):
    """
    Processes a CSV file containing stack traces and expected elements, calculates precision, recall,
    and F1 scores for each entry, and writes the results to an output CSV file.

    Parameters:
        input_file (str): The input CSV file containing stack traces and expected elements.
        output_file (str): The output CSV file to write the results.

    Returns:
        precisions (list): List of precision values for each stack trace.
        recalls (list): List of recall values for each stack trace.
        f1_scores (list): List of F1 scores for each stack trace.
    """
    parser = StackTraceParser()
    precisions = []
    recalls = []
    f1_scores = []

    with open(input_file, mode='r', newline='') as infile, open(output_file, mode='w', newline='') as outfile:
        reader = csv.reader(infile)
        writer = csv.writer(outfile)
        writer.writerow(['Stack Trace', 'Precision', 'Recall', 'F1 Score'])  # Write header
        next(reader)  # Skip header

        for row in reader:
            stack_trace = row[0].strip()
            expected_elements_str = row[1].strip()

            # Convert expected elements string to list of tuples
            expected_elements = eval(expected_elements_str)

            parsed_elements = parser.parse(stack_trace)
            precision, recall = calculate_precision_recall(parsed_elements, expected_elements)
            f1_score = calculate_f1_score(precision, recall)

            precisions.append(precision)
            recalls.append(recall)
            f1_scores.append(f1_score)

            print("Parsed Elements:")
            for elem in parsed_elements:
                print(f"  {elem[0]}: {elem[1]}")

            print(f"\nPrecision: {precision:.2%}")
            print(f"Recall: {recall:.2%}")
            print(f"F1 Score: {f1_score:.2%}\n")

            # Write precision, recall, and F1 score to file
            writer.writerow([stack_trace, f"{precision:.2%}", f"{recall:.2%}", f"{f1_score:.2%}"])

    return precisions, recalls, f1_scores


# File paths
#input_file =
output_file = 'metrics.csv'

# Process CSV and calculate metrics
precisions, recalls, f1_scores = process_csv(input_file, output_file)

# Calculate average metrics
average_precision = sum(precisions) / len(precisions) if precisions else 0
average_recall = sum(recalls) / len(recalls) if recalls else 0
average_f1_score = sum(f1_scores) / len(f1_scores) if f1_scores else 0

print(f"Average Precision: {average_precision:.2%}")
print(f"Average Recall: {average_recall:.2%}")
print(f"Average F1 Score: {average_f1_score:.2%}")
print(f"Individual metrics have been saved to {output_file}")
