In [None]:
import openai
from openai import OpenAI
import dspy
import pandas as pd 
import json
import re
import ast

In [61]:
import json

def accuracy(predictions, actuals):
    # Define synonym mappings
    synonyms = {
        "top": "above",
        "top_right": "upper-right",
        "top_left": "upper-left",
        "down": "below",
        "down_right": "lower-right",
        "down_left": "lower-left", 
    }
    
    # Normalize predictions and actual answers using synonyms
    normalized_predictions = [synonyms.get(pred, pred) for pred in predictions]
    normalized_actuals = [synonyms.get(act, act) for act in actuals]
    
    # Calculate the number of correct predictions
    correct_predictions = sum(pred == act for pred, act in zip(normalized_predictions, normalized_actuals))
    
    # Calculate accuracy
    accuracy_value = correct_predictions / len(actuals) if actuals else 0
    return accuracy_value

def main(file_path):
    # Read the JSON file
    with open(file_path, 'r') as file:
        data = json.load(file)
    
    # Extract predictions and actual answers
    predictions = [item['asp_result'] for item in data]
    actuals = [item['actual_answer'] for item in data]
    
    # Calculate accuracy
    acc = accuracy(predictions, actuals)
    print(f"Accuracy: {acc:.4f}")

# Example usage
if __name__ == "__main__":
    file_path = 'gpt_qa1.json'  # Replace with your JSON file path
    main(file_path)


Accuracy: 0.8100


In [17]:
prompt_Facts= """ You are an expert semantic parser. Your task is to parse EVERY sentence in the context into only ONE fact. 
There are two types of facts: is("A", relation, "B"). query("X", "K"), which means "What is the relation X relative to K?"
Relation must be one of the following: left, right, top, down, top_left, top_right, down_left, down_right.
Always use one of the eight relations listed above. Do not use any other words to describe relations.
For clock-wise information: 12 = top; 1-2 = top_right; 3 = right; 4-5 = down_right; 6 = down; 7-8 = down_left; 9 = left; 10-11 = top_left
For cardinal directions: north = top; east = right; south = down; west = left
Example:
"Story":
"U and V are parallel, and U is to the left of V." --> is("U",left,"V").
"T and G are parallel, and T on the left of G." --> is("T",left,"G").
"V is positioned in the front right corner of Z."-->is("V",top_right,"Z").
"A is over there with C above."-->is("C",top,"A").
"If H is the center of a clock face, Z is located between 4 and 5."-->is("Z",down_right,"H").
"M is sitting at the 9:00 position of H."-->is("M",left,"H").
"M is on the left side and above T."-->is("M",top_left,"T").
"C is at a 45 degree angle to G, in the lower righthand corner."-->is("C",down_right,"G").
"question": "What is the relation of the agent G to the agent Z?".query("G","Z").

Before outputting, carefully review and refine the generated facts:
1.Completeness:
- Convert every sentence into one fact in the context.
- Ensure facts and query are consistent with the story and question
2. Syntax and Safety:
- Relation must be one of the following: left, right, top, down, top_left, top_right, down_left, down_right
- Convert all clock positions and cardinal directions to one of the eight relations
- Use the format is("Arg1",Relation,"Arg2"). for positional facts in the story
- Use the format query("Arg1","Arg2"). for question
- Always use double quotes ("") around arguments
- Do not use quotes around relations
- Use lowercase for all predicates and relations
- End each fact with a period

Output format:
is("A",left,"B").
is("C",top_right,"D").
query("E","F").
"""

In [4]:
prompt_Refine ="""
Step One: Analyze the generated ASP facts for the following and List any issues found.
1. Completeness: Are all elements from the story represented?
2. Syntax: Is each fact in the correct format (is("Arg1",Relation,"Arg2")., query("Arg1","Arg2").) ?
3. Safety: Are all arguments in quotes? Are relations unquoted and lowercase?
4. Consistency: Are all spatial relationships correctly converted to the eight standard relations?
5. Format: Is each fact on a separate line, ending with a period, without extra punctuation?
6. Query: Is the question correctly represented as a query fact?

Step Two: Based on the analysis, please refine the ASP facts
1. Confirm all issues from the analysis have been addressed
2. Verify that no new issues were introduced during refinement
3. Ensure the final output adheres to all rules for ASP fact generation
If any issues remain, repeat the refinement process. If no issues are found, present the final set of ASP facts.

Step Three: Present the final, refined set of ASP facts, with each fact on a new line and no additional commentary.
Expected Output format:
is("A",left,"B").
is("C",top_right,"D").
query("E","F").
"""

In [8]:
prompt_Rule = """
% assume the 2nd queried object is at location (0,0)
location(Q2, 0, 0) :- query(_, Q2).

% define the offsets of 8 spacial relations
offset(overlap,0,0; top,0,1; down,0,-1; left,-1,0; right,1,0; 
    top_left,-1,1; top_right,1,1; down_left,-1,-1; down_right,1,-1).

% derive the kind of spacial relation from synonyms and offset
is(A, R1, B) :- is(A, R2, B), synonyms(R1, R2).
is(A, R1, B) :- is(B, R2, A), offset(R2,X,Y), offset(R1,-X,-Y).

% derive the location of every object
% the search space of X or Y coordinate is within -100 and 100 (to avoid infinite loop in clingo when data has error)
nums(-100..100).
location(A, Xa, Ya) :-
    location(B, Xb, Yb), nums(Xa), nums(Ya),
    is(A, Kind, B), offset(Kind, Dx, Dy),
    Xa-Xb=Dx, Ya-Yb=Dy.
location(B, Xb, Yb) :-
    location(A, Xa, Ya), nums(Xb), nums(Yb),
    is(A, Kind, B), offset(Kind, Dx, Dy),
    Xa-Xb=Dx, Ya-Yb=Dy.
    
% extract answer relation R such that the offset (Ox,Oy) of R is in the same direction of (X,Y)
answer(R) :- query(Q1, _), location(Q1, X, Y), offset(R, Ox, Oy),
    Ox=-1: X<0; Ox=0: X=0; Ox=1: X>0;
    Oy=-1: Y<0; Oy=0: Y=0; Oy=1: Y>0.

#show answer/1.
"""


In [60]:
import dspy
import pandas as pd
import json
import re
import sys
from io import StringIO
from typing import List, Dict, Any
from dataclasses import asdict
import clingo

class Convert(dspy.Module):
    class Signature(dspy.Signature):
        prompt_1 = dspy.InputField(desc="Please convert each sentence in teh story and question as one atomic fact with is/3, query/2 as predicates")
        story = dspy.InputField(desc="Spatial description of scene, to be parsed into facts")
        question = dspy.InputField(desc="the question to be parsed into query")
        facts = dspy.OutputField(desc="the complete and refined atomic facts of Answering Set Programming")

    def __init__(self, state):
        super().__init__()
        self.state = state
        self.predictor = dspy.ChainOfThought(self.Signature)

    def forward(self, prompt_1,  story, question):
        result = self.predictor(prompt_1=prompt_1, story=story, question=question)
        facts = result.facts
        asp = facts + """
% assume the 2nd queried object is at location (0,0)
location(Q2, 0, 0) :- query(_, Q2).
% define the offsets of 8 spacial relations
offset(overlap,0,0; top,0,1; down,0,-1; left,-1,0; right,1,0; 
    top_left,-1,1; top_right,1,1; down_left,-1,-1; down_right,1,-1).
% derive the kind of spacial relation from  offset
is(A, R1, B) :- is(B, R2, A), offset(R2,X,Y), offset(R1,-X,-Y).
nums(-100..100).
location(A, Xa, Ya) :-
    location(B, Xb, Yb), nums(Xa), nums(Ya),
    is(A, Kind, B), offset(Kind, Dx, Dy),
    Xa-Xb=Dx, Ya-Yb=Dy.
location(B, Xb, Yb) :-
    location(A, Xa, Ya), nums(Xb), nums(Yb),
    is(A, Kind, B), offset(Kind, Dx, Dy),
    Xa-Xb=Dx, Ya-Yb=Dy.   
% extract answer relation R such that the offset (Ox,Oy) of R is in the same direction of (X,Y)
answer(R) :- query(Q1, _), location(Q1, X, Y), offset(R, Ox, Oy),
    Ox=-1: X<0; Ox=0: X=0; Ox=1: X>0;
    Oy=-1: Y<0; Oy=0: Y=0; Oy=1: Y>0.
#show answer/1."""
        self.state['convert'] = {
            'story': story,
            'question': question,
            'asp': asp,
        }
        return dspy.Prediction(asp=asp)

class RunClingo(dspy.Module):
    class Signature(dspy.Signature):
        asp = dspy.InputField(desc="Initial ASP code to be run and evaluated")
        result = dspy.OutputField(desc="Result from Clingo")
        error = dspy.OutputField(desc="Error output from Clingo")

    def __init__(self, state):
        super().__init__()
        self.state = state
        self.predictor = dspy.Predict(self.Signature)

    def forward(self, asp):
        result, error = self.run_clingo(asp)
        return dspy.Prediction(result=result, error=error)
    
    def run_clingo(self, asp):
        results = []
        error_output = StringIO()
        asp= asp.replace('```', '').replace('``', '').replace('`', '').replace('plaintext','')
    
        try:
            # Redirect stderr to capture errors
            original_stderr = sys.stderr
            sys.stderr = error_output

            # Initialize Clingo control
            ctl = clingo.Control()
            ctl.add("base", [], asp)
            ctl.ground([("base", [])])

            # Define on_model callback to collect query results
            def on_model(model):
                for atom in model.symbols(shown=True):
                    if atom.name == "answer" and atom.arguments:
                        results.append(str(atom.arguments[0]))

            # Solve and collect results
            solve_result = ctl.solve(on_model=on_model)

            # Process the results
            if solve_result.satisfiable:
                asp_output = ", ".join(results) if results else "satisfiable, but no query results"
            elif solve_result.unsatisfiable:
                asp_output = "unsatisfiable"
            else:
                asp_output = "unknown"

        except Exception as e:
            asp_output = f"Error: {str(e)}"
        finally:
            sys.stderr = original_stderr

        error_string = error_output.getvalue()
        error_output.close()

        return asp_output, error_string
 
class Pipeline(dspy.Module):
    def __init__(self, state):
        super().__init__()
        self.state = state
        self.convert = Convert(state)
        self.clingo = RunClingo(state)  # Initialize RunClingo with state

    def forward(self, story, question, prompt_1):
        # Convert the natural language description into ASP facts and query
        for i in range(3):
            convert_result = self.convert.forward(prompt_1=prompt_1, story=story, question=question)
            asp = convert_result.asp
            clingo_result = self.clingo.forward(asp=asp)
        return dspy.Prediction(asp=asp, result=clingo_result.result, error=clingo_result.error)
        
def process_examples(examples: List[dspy.Example], pipeline: Pipeline) -> List[Dict[str, Any]]:
    results = []
    for example in examples:
        try:
            story = example.get('story')
            question = example.get('question')
            prompt_1 = example.get('prompt_1')
            prediction = pipeline(prompt_1=prompt_1, story=story, question=question)#prompt_2=prompt_2
            result = {
            "story": story,
            "question": question,
            "predicted_ASP": prediction.asp,
            "asp_result": prediction.result,
            "actual_answer": example.get('answer')}
            results.append(result)
     
        except Exception as e:
            print(f"Error processing example: {example.question}")
            print(f"Error: {str(e)}")
    return results  

def main(input_file: str, output_file: str):
    with open(input_file, 'r') as f:
         clean_data = json.load(f)

    examples =[dspy.Example(
        prompt_1= prompt_Facts,
        # prompt_2= prompt_Refine,
        story="\n".join(clean_data[key]["story"]),
        question=clean_data[key]["question"],
        answer=clean_data[key]["label"],
    ).with_inputs("prompt_1","story", "question","answer")
    for key in clean_data
]

    examples= examples[:100]
    state = {}
    pipeline = Pipeline(state)
    results = process_examples(examples, pipeline)

    with open(output_file, "w") as jsonfile:
        json.dump(results, jsonfile, indent=4)

if __name__ == "__main__":
    main('clean/qa1_test.json', "gpt_qa1.json")

In [None]:
import dspy
import pandas as pd
import json
import re
import sys
from io import StringIO
from typing import List, Dict, Any
from dataclasses import asdict
import json
import dspy

class Convert(dspy.Module):
    class Signature(dspy.Signature):
        prompt_1 = dspy.InputField(desc="The prompt for the task")
        story = dspy.InputField(desc="Spatial description of scene, to be parsed into facts")
        question = dspy.InputField(desc="the question to be parsed into query")
        facts = dspy.OutputField(desc="the complete atomic facts of Answering Set Programming")

    def __init__(self, state):
        super().__init__()
        self.state = state
        self.predictor = dspy.ChainOfThought(self.Signature)

    def forward(self, prompt_1, story, question):
        result = self.predictor(prompt_1=prompt_1, story=story, question=question)
        facts = result.facts
        return dspy.Prediction(facts=facts)

class ASP(dspy.Module):
    class Signature(dspy.Signature):
        facts = dspy.InputField(desc="Initial ASP facts")
        prompt_2 = dspy.InputField(desc="The prompt for facts refinement")
        story = dspy.InputField(desc="Spatial description of scene, to be parsed into facts")
        question = dspy.InputField(desc="the question to be parsed into query")
        revised_facts = dspy.OutputField(desc="The revised Answer Set Programming facts")

    def __init__(self, state):
        super().__init__()
        self.state = state
        self.predictor = dspy.ChainOfThought(self.Signature)

    def forward(self, facts, prompt_2, story, question):
        result = self.predictor(facts=facts, prompt_2=prompt_2, story=story, question=question)
        revised_facts = result.revised_facts
        asp = revised_facts + """
% assume the 2nd queried object is at location (0,0)
location(Q2, 0, 0) :- query(_, Q2).
% define the offsets of 8 spacial relations
offset(overlap,0,0; top,0,1; down,0,-1; left,-1,0; right,1,0; top_left,-1,1; top_right,1,1; down_left,-1,-1; down_right,1,-1).
% derive the kind of spacial relation from offset
is(A, R1, B) :- is(B, R2, A), offset(R2,X,Y), offset(R1,-X,-Y).
nums(-100..100).
location(A, Xa, Ya) :-
    location(B, Xb, Yb), nums(Xa), nums(Ya),
    is(A, Kind, B), offset(Kind, Dx, Dy),
    Xa-Xb=Dx, Ya-Yb=Dy.
location(B, Xb, Yb) :-
    location(A, Xa, Ya), nums(Xb), nums(Yb),
    is(A, Kind, B), offset(Kind, Dx, Dy),
    Xa-Xb=Dx, Ya-Yb=Dy.
% extract answer relation R such that the offset (Ox,Oy) of R is in the same direction of (X,Y)
answer(R) :- query(Q1, _), location(Q1, X, Y), offset(R, Ox, Oy),
    Ox=-1: X<0; Ox=0: X=0; Ox=1: X>0;
    Oy=-1: Y<0; Oy=0: Y=0; Oy=1: Y>0.
#show answer/1."""
        return dspy.Prediction(asp=asp)

class Pipeline(dspy.Module):
    def __init__(self, state):
        super().__init__()
        self.state = state
        self.convert = Convert(state)
        self.revise = ASP(state)

    def forward(self, story, question, prompt_1, prompt_2):
        try:
            convert_result = self.convert.forward(prompt_1=prompt_1, story=story, question=question)
            facts = convert_result.facts
            asp = self.revise.forward(facts=facts, prompt_2=prompt_2, story=story, question=question)
            return dspy.Prediction(asp=asp)
        except Exception as e:
            print(f"Error during processing: {e}")
            return None

def process_examples(examples, pipeline):
    results = []
    for example in examples:
        try:
            story = example.get('story')
            question = example.get('question')
            prompt_1 = example.get('prompt_1')
            prompt_2 = example.get('prompt_2')
            prediction = pipeline.forward(prompt_1=prompt_1, prompt_2=prompt_2, story=story, question=question)
            result = {
                "story": story,
                "question": question,
                "predicted_ASP": prediction.asp if prediction else None,
                "actual_answer": example.get('answer')
            }
            results.append(result)
        except Exception as e:
            print(f"Error processing example: {example.get('question')}")
            print(f"Error: {str(e)}")
    return results

def main(input_file: str, output_file: str):
    with open(input_file, 'r') as f:
        clean_data = json.load(f)

    examples = [dspy.Example(
        prompt_1=prompt_Facts,
        prompt_2=prompt_Refine,
        story="\n".join(clean_data[key]["story"]),
        question=clean_data[key]["question"],
        answer=clean_data[key]["label"]
    ).with_inputs("prompt_1", "prompt_2", "story", "question", "answer") for key in clean_data]
    
    examples = examples[:100]
    state = {}  # Initialize state if needed
    pipeline = Pipeline(state)
    results = process_examples(examples, pipeline)

    with open(output_file, "w") as jsonfile:
        json.dump(results, jsonfile, indent=4)

if __name__ == "__main__":
    main('clean/qa10_test.json', "deepseek_qa10.json")



In [49]:
import json
import re
import sys
from io import StringIO
import clingo

# def extract_asp_program(predicted_asp):
#     # Find the ASP program between "% Facts" and "#show query/1."
#     match = re.search(r'block(.*?)#show query/1\.', predicted_asp, re.DOTALL)
#     if match:
#         asp_program = match.group(1).strip()
#         # Add back the start and end markers to the extracted program
#         asp_program = "block" + asp_program + "\n#show query/1."
#         # asp_program = asp_program.replace('\n', ' ')
     
#         # # Remove extra whitespace
#         # asp_program = ' '.join(asp_program.split())
#         return asp_program
#     return None
 
def run_asp_and_get_answer(asp_program):
    results = []
    error_output = StringIO()
    asp_program= asp_program.replace('```', '').replace('``', '').replace('`', '').replace('plaintext','')
    try:
        # Redirect stderr to capture errors
        original_stderr = sys.stderr
        sys.stderr = error_output

        # Initialize Clingo control
        ctl = clingo.Control()
        ctl.add("base", [], asp_program)
        ctl.ground([("base", [])])

        # Define on_model callback to collect query results
        def on_model(model):
            for atom in model.symbols(shown=True):
                if atom.name == "answer" and atom.arguments:
                    results.append(str(atom.arguments[0]))

        # Solve and collect results
        solve_result = ctl.solve(on_model=on_model)

        # Process the results
        if solve_result.satisfiable:
            asp_output = ", ".join(results) if results else "satisfiable, but no query results"
        elif solve_result.unsatisfiable:
            asp_output = "unsatisfiable"
        else:
            asp_output = "unknown"

    except Exception as e:
        asp_output = f"Error: {str(e)}"
    finally:
        sys.stderr = original_stderr

    error_string = error_output.getvalue()
    error_output.close()

    return asp_output

def process_json_file(file_path):
    with open(file_path, 'r') as file:
        data = json.load(file)

    for item in data:
        predicted_asp = item.get('predicted_ASP', '')
        # asp_program = extract_asp_program(predicted_asp)
        asp_result = run_asp_and_get_answer(predicted_asp)
        
        # item['clean_asp'] = asp_program
        item['asp_result'] = asp_result

    with open(file_path, 'w') as file:
        json.dump(data, file, indent=2)

    return data

# Test the function
file_path = "deepseek_qa10.json"
processed_data = process_json_file(file_path)

# Print the first result to check
print(json.dumps(processed_data[0], indent=2))


{
  "story": "S is sitting at the upper left position to F.\nL is placed at the upper left of K.\nA is below M and to the left of M.\nX and O are next to each other with X on the top and O at the bottom.\nF is over there and L is on the left of it.\nX is on the right side and below H.\nH is to the bottom-left of A.\nS is placed at the lower left of O.\nK is to the top of R vertically.\nR and Z are side by side with R on the top and Z at the bottom.",
  "question": "What is the relation of the agent X to the agent Z?",
  "predicted_ASP": "is(\"S\",top_left,\"F\").\nis(\"L\",top_left,\"K\").\nis(\"A\",down_left,\"M\").\nis(\"X\",top,\"O\").\nis(\"O\",down,\"X\").\nis(\"L\",left,\"F\").\nis(\"X\",down_right,\"H\").\nis(\"H\",down_left,\"A\").\nis(\"S\",down_left,\"O\").\nis(\"K\",top,\"R\").\nis(\"R\",top,\"Z\").\nis(\"Z\",down,\"R\").\nquery(\"X\",\"Z\").\n% assume the 2nd queried object is at location (0,0)\nlocation(Q2, 0, 0) :- query(_, Q2).\n% define the offsets of 8 spacial relation

In [None]:
import json

def accuracy(predictions, actuals):
    # Define synonym mappings
    synonyms = {
        "overlap":"overlap",
        "top": "above",
        "top_right": "upper-right",
        "top_left": "upper-left",
        "down": "below",
        "down_right": "lower-right",
        "down_left": "lower-left", 
    }
    
    # Normalize predictions and actual answers using synonyms
    normalized_predictions = [synonyms.get(pred, pred) for pred in predictions]
    normalized_actuals = [synonyms.get(act, act) for act in actuals]
    
    # Calculate the number of correct predictions
    correct_predictions = sum(pred == act for pred, act in zip(normalized_predictions, normalized_actuals))
    
    # Calculate accuracy
    accuracy_value = correct_predictions / len(actuals) if actuals else 0
    return accuracy_value

def main(file_path):
    # Read the JSON file
    with open(file_path, 'r') as file:
        data = json.load(file)
    
    # Extract predictions and actual answers
    predictions = [item['asp_result'] for item in data]
    actuals = [item['actual_answer'] for item in data]
    
    # Calculate accuracy
    acc = accuracy(predictions, actuals)
    print(f"Accuracy: {acc:.4f}")

# Example usage
if __name__ == "__main__":
    file_path = 'deepseek_qa10.json'  # Replace with your JSON file path
    main(file_path)


In [None]:
import dspy
import pandas as pd
import json
import re
import sys
from io import StringIO
from typing import List, Dict, Any
from dataclasses import asdict
import clingo

class Convert(dspy.Module):
    class Signature(dspy.Signature):
        prompt_1 = dspy.InputField(desc="The prompt for the task")
        story = dspy.InputField(desc="Spatial description of scene, to be parsed into facts")
        question = dspy.InputField(desc="the question to be parsed into query")
        facts = dspy.OutputField(desc="the complete and refined atomic facts of Answering Set Programming")

    def __init__(self, state):
        super().__init__()
        self.state = state
        self.predictor = dspy.ChainOfThought(self.Signature)

    def forward(self, prompt_1,  story, question):
        result = self.predictor(prompt_1=prompt_1, story=story, question=question)
        facts = result.facts
        asp = facts + """
% assume the 2nd queried object is at location (0,0)
location(Q2, 0, 0) :- query(_, Q2).
% extract answer relation R such that the offset (Ox,Oy) of R is in the same direction of (X,Y)
answer(R) :- query(Q1, _), location(Q1, X, Y), offset(R, Ox, Oy),
    Ox=-1: X<0; Ox=0: X=0; Ox=1: X>0;
    Oy=-1: Y<0; Oy=0: Y=0; Oy=1: Y>0.
% define the offsets of 8 spacial relations
offset(overlap,0,0; top,0,1; down,0,-1; left,-1,0; right,1,0; 
    top_left,-1,1; top_right,1,1; down_left,-1,-1; down_right,1,-1).
% derive the kind of spacial relation from  offset
is(A, R1, B) :- is(B, R2, A), offset(R2,X,Y), offset(R1,-X,-Y).
nums(-100..100).
location(A, Xa, Ya) :-
    location(B, Xb, Yb), nums(Xa), nums(Ya),
    is(A, Kind, B), offset(Kind, Dx, Dy),
    Xa-Xb=Dx, Ya-Yb=Dy.
location(B, Xb, Yb) :-
    location(A, Xa, Ya), nums(Xb), nums(Yb),
    is(A, Kind, B), offset(Kind, Dx, Dy),
    Xa-Xb=Dx, Ya-Yb=Dy.   
#show answer/1."""
        self.state['convert'] = {
            'story': story,
            'question': question,
            'asp': asp,
        }
        return dspy.Prediction(asp=asp)

class RunClingo(dspy.Module):
    class Signature(dspy.Signature):
        asp = dspy.InputField(desc="Initial ASP code to be run and evaluated")
        result = dspy.OutputField(desc="Result from Clingo")
        error = dspy.OutputField(desc="Error output from Clingo")

    def __init__(self, state):
        super().__init__()
        self.state = state
        self.predictor = dspy.Predict(self.Signature)

    def forward(self, asp):
        result, error = self.run_clingo(asp)
        return dspy.Prediction(result=result, error=error)
    
    def run_clingo(self, asp):
        results = []
        error_output = StringIO()
    
        try:
            # Redirect stderr to capture errors
            original_stderr = sys.stderr
            sys.stderr = error_output

            # Initialize Clingo control
            ctl = clingo.Control()
            ctl.add("base", [], asp)
            ctl.ground([("base", [])])

            # Define on_model callback to collect query results
            def on_model(model):
                for atom in model.symbols(shown=True):
                    if atom.name == "answer" and atom.arguments:
                        results.append(str(atom.arguments[0]))

            # Solve and collect results
            solve_result = ctl.solve(on_model=on_model)

            # Process the results
            if solve_result.satisfiable:
                asp_output = ", ".join(results) if results else "satisfiable, but no query results"
            elif solve_result.unsatisfiable:
                asp_output = "unsatisfiable"
            else:
                asp_output = "unknown"

        except Exception as e:
            asp_output = f"Error: {str(e)}"
        finally:
            sys.stderr = original_stderr

        error_string = error_output.getvalue()
        error_output.close()

        return asp_output, error_string
 
class Pipeline(dspy.Module):
    def __init__(self, state):
        super().__init__()
        self.state = state
        self.convert = Convert(state)
        self.clingo = RunClingo(state)  # Initialize RunClingo with state

    def forward(self, story, question, prompt_1):
        # Convert the natural language description into ASP facts and query
        convert_result = self.convert.forward(prompt_1=prompt_1, story=story, question=question)
        asp = convert_result.asp
        clingo_result = self.clingo.forward(asp=asp)
        return dspy.Prediction(asp=asp, result=clingo_result.result, error=clingo_result.error)
    
def process_examples(examples: List[dspy.Example], pipeline: Pipeline) -> List[Dict[str, Any]]:
    results = []
    for example in examples:
        try:
            story = example.get('story')
            question = example.get('question')
            prompt_1 = example.get('prompt_1')
            prediction = pipeline(prompt_1=prompt_1, story=story, question=question)#prompt_2=prompt_2
            result = {
            "story": story,
            "question": question,
            "predicted_ASP": prediction.asp,
            "asp_result": prediction.result,
            "actual_answer": example.get('answer')}
            results.append(result)
     
        except Exception as e:
            print(f"Error processing example: {example.question}")
            print(f"Error: {str(e)}")
    return results  

def main(input_file: str, output_file: str):
    with open(input_file, 'r') as f:
         clean_data = json.load(f)

    examples =[dspy.Example(
        prompt_1= prompt_Facts,
        # prompt_2= prompt_Refine,
        story="\n".join(clean_data[key]["story"]),
        question=clean_data[key]["question"],
        answer=clean_data[key]["label"],
    ).with_inputs("prompt_1","story", "question","answer")
    for key in clean_data
]

    examples= examples[:100]
    state = {}
    pipeline = Pipeline(state)
    results = process_examples(examples, pipeline)

    with open(output_file, "w") as jsonfile:
        json.dump(results, jsonfile, indent=4)

if __name__ == "__main__":
    main('clean/qa4_test.json', "deepseek_qa4.json")

In [None]:
import json
import openai

# Function to evaluate answers using an LLM
def evaluate_answers_with_llm(actual_answer, asp_result):
    prompt = f"Are the following two answers equivalent in meaning? \n\nActual Answer: {actual_answer}\nASP Result: {asp_result}\n\nPlease respond with 'Yes' if they are equivalent, and 'No' if they are not."

    # Call the LLM 
    response = openai.ChatCompletion.create(
        model="gpt-3.5-turbo",  # or "gpt-4" if available
        messages=[
            {"role": "user", "content": prompt}
        ]
    )

    # Extract the response text
    evaluation = response['choices'][0]['message']['content'].strip()
    return evaluation.lower() == 'yes'  # Return True if equivalent

def calculate_accuracy_with_llm(json_file):
    """Calculate the accuracy rate from the JSON file using an LLM."""
    with open(json_file, 'r') as f:
        data = json.load(f)

    correct_count = 0
    total_count = len(data)

    for record in data:
        actual_answer = record.get("actual_answer")
        asp_result = record.get("asp_result")

        # Evaluate using LLM
        if evaluate_answers_with_llm(actual_answer, asp_result):
            correct_count += 1

    # Calculate accuracy
    accuracy_rate = (correct_count / total_count) * 100 if total_count > 0 else 0
    return accuracy_rate

# Example usage
json_file = 'your_data.json'  # Replace with your actual JSON file path
accuracy = calculate_accuracy_with_llm(json_file)
print(f"Accuracy Rate: {accuracy:.2f}%")

In [39]:
import json
import sys
from io import StringIO
import clingo
import re

def extract_asp_program(predicted_asp):
    # Find the ASP program between the newline characters after "Here is the complete ASP program:"
    match = re.search(r'Here is the complete ASP program:\s*?\n?\n(.*)', predicted_asp, re.DOTALL)
    if match:
        asp_program = match.group(1).strip()
        
        # Remove newline characters
        asp_program = asp_program.replace('\n', ' ')
     
        # Remove extra whitespace
        asp_program = ' '.join(asp_program.split())
        
        return asp_program
    return ""

def run_asp_and_get_answer(asp_program, choices):
    results = []
    error_output = StringIO()
    
    try:
        # Redirect stderr to capture errors
        original_stderr = sys.stderr
        sys.stderr = error_output

        # Initialize Clingo control
        ctl = clingo.Control()
        ctl.add("base", [], asp_program)
        ctl.ground([("base", [])])

        # Define on_model callback to collect query results
        def on_model(model):
            for atom in model.symbols(shown=True):
                if atom.name == "query" and atom.arguments:
                    results.append(str(atom.arguments[0]))

        # Solve and collect results
        solve_result = ctl.solve(on_model=on_model)

        # Process the results
        if solve_result.satisfiable:
            asp_output = ", ".join(results) if results else "satisfiable, but no query results"
        elif solve_result.unsatisfiable:
            asp_output = "unsatisfiable"
        else:
            asp_output = "unknown"

    except Exception as e:
        asp_output = f"Error: {str(e)}"
    finally:
        sys.stderr = original_stderr

    error_string = error_output.getvalue()
    error_output.close()

    # Print debug information
    print(f"ASP Program:\n{asp_program}\n")
    print(f"ASP Output:\n{asp_output}\n")
    print(f"Error (if any):\n{error_string}\n")

    return asp_output

def process_json_file(file_path):
    with open(file_path, 'r') as file:
        data = json.load(file)

    results = []
    for item in data:
        predicted_asp = item.get('predicted ASP', '')
        asp_program = extract_asp_program(predicted_asp)
        choices = item.get('candidate_answers', '').split(', ')
        
        # Run ASP program and get result
        asp_result = run_asp_and_get_answer(asp_program, choices)
        
        result = {
            'context': item.get('context', ''),
            'question': item.get('question', ''),
            'candidate_answers': choices,
            'asp_result': asp_result,
            'actual_answer': item.get('actual_answer', ''),
            'clean_asp': asp_program  # Add this line to include the cleaned ASP program in the results
        }
        results.append(result)

    return results

# Main execution
if __name__ == "__main__":
    json_file_path = 'llama3_ASP_FR.json'  # Replace with your JSON file path
    results = process_json_file(json_file_path)

    # Print results
    for i, result in enumerate(results, 1):
        print(f"\nResult {i}:")
        print(f"Context: {result['context'][:100]}...")  # Truncate for brevity
        print(f"Question: {result['question']}")
        print(f"Candidate Answers: {', '.join(result['candidate_answers'])}")
        print(f"Clean ASP Program: {result['clean_asp']}")
        print(f"ASP Result: {result['asp_result']}")
        print(f"Actual Answer: {result['actual_answer']}")
        print("-" * 50)

ASP Program:


ASP Output:
satisfiable, but no query results

Error (if any):


ASP Program:
% Facts block(a). block(b). block(c). object(small_blue_circle, small, blue, circle, a). object(large_yellow_square, large, yellow, square, b). object(small_yellow_square, small, yellow, square, b). object(black_triangle, unknown, black, triangle, b). object(black_square, unknown, black, square, c). object(yellow_triangle, unknown, yellow, triangle, c). is(b, right, a). is(large_yellow_square, left, small_yellow_square). is(large_yellow_square, near, small_yellow_square). is(black_triangle, above, large_yellow_square). is(black_triangle, above, small_yellow_square). is(black_square, right, yellow_triangle). is(black_square, far, yellow_triangle). is(c, right, b). % Query query(Location) :- object(small_blue_circle, _, _, _, a), object(yellow_triangle, _, _, _, TriangleBlock), object(BlackShape, _, black, _, _), is(yellow_triangle, right, BlackShape), is(Location, _, small_blue_circle, TriangleB

In [None]:
import pandas as pd
import dspy
from dspy.teleprompt import BootstrapFewShot
from dspy.evaluate import Evaluate

class Convert(dspy.Module):
    class Signature(dspy.Signature):
       
        context = dspy.InputField(desc="Spatial description of scene and question, parsed it to facts")
        question = dspy.InputField(desc="The question to be parsed into query based on the context")
        #rules = dspy.InputField(desc="The rules are used to help you answer the question. Choose some rules based on the query and generate the complete ASP program.")
        choices = dspy.InputField(desc="The choices for the question")
        answer = dspy.OutputField(desc="One word,choose the best answer from the choices. Do not include ASP program. ")

    def __init__(self):
        super().__init__()
        self.predictor = dspy.ChainOfThought(self.Signature)

    def forward(self, context, question, choices):
        result = self.predictor(context=context, question=question, choices=choices) #rules=rules,
        answer = result.answer 
        return dspy.Prediction(answer=answer)

# Step 3: Prepare the dataset
df = pd.read_csv('quantifer_answer.csv')
clean_data = df.to_dict(orient='records')

valset = [
    dspy.Example(
       
        context=r["story"],
        question="".join(r["question"]),
        choices="".join(r["choices"]),
        answer="".join(r["answer"])
    ).with_inputs("context", "question","choices") #"rules", "choices"
    for r in clean_data 
]

train = valset[0:5]
val = valset[5:]

# Step 4: Define the metric for evaluation
metric_EM = dspy.evaluate.answer_exact_match

# Step 5: Use a teleprompter for optimization
teleprompter = BootstrapFewShot(metric=metric_EM, max_bootstrapped_demos=2)
cot_compiled = teleprompter.compile(Convert(), trainset=train)

# # Step 6: Prepare the evaluation set
# devset = val

# Step 7: Create an evaluator
evaluator = Evaluate(
    devset=valset,
    metric=metric_EM,
    num_threads=32,
    display_progress=True,
    display_table=10,
    output_field="answer"  # Specify that the evaluation should be based on the 'answer' output
)
evaluation= evaluator(Convert())
print("raw:",evaluation)
## the accurary with five shot examples for deepseek [5:]is 83.3       for ollama 3: raw 83.05, optimized: 88.14
## the accurary with no prompting and no examplars for is 0.78

#colbertv2_wiki17_abstracts = dspy.ColBERTv2(url='http://20.102.90.50:2017/wiki17_abstracts')


# Step 9: Evaluate the optimized pipeline
evaluation_results = evaluator(cot_compiled)
print("optimized:", evaluation_results)

In [None]:
turbo.inspect_history(n=10)

In [None]:
import dspy
import pandas as pd
from dspy.teleprompt import BootstrapFewShot
import pandas as pd
import dspy

class Convert(dspy.Module):
    class Signature(dspy.Signature):
        
        prompt = dspy.InputField(desc="The prompt for the task")
        context = dspy.InputField(desc="Spatial description of scene and question, parsed it to facts")
        question = dspy.InputField(desc="the question to be parsed into query based on the context")
        rules= dspy.InputField(desc="Choose some rules based on the query and facts, make sure the rules are adequete and relevant to solve the query. After the reasoning process, include the rule and generate the complete ASP program.")
        choices = dspy.InputField(desc="The choices for the question")
        asp = dspy.OutputField(desc="Limit your output to 3000 tokens. Only output the facts and one query. No comment and reasoning included")
        
    def __init__(self,state):
        super().__init__()
        self.state = state
        self.predictor = dspy.ChainOfThought(self.Signature)
    def forward(self, prompt, rules, context, question, choices):
        result = self.predictor(prompt=prompt, rules=rules, context=context, question=question, choices=choices)
        asp = result.asp
        self.state.memory['convert'] = {
            'context': context,
            'question': question,
            'choices': choices,
            'prompt': prompt, }
        
        # Save the ASP to a file
        with open("test_V1.asp", "w") as file:
            file.write(asp)
        # Return both outputs
        return dspy.Prediction(asp=asp)

class ReviseASP(dspy.Module):
    class Signature(dspy.Signature):
        """Check and Revise the initial ASP (facts, query rules) Please check 1. whether the facts are complete and correct. 2. whether the query is correct and complete. 
        3. whether the rules are correct and complete. If not, please revise them. If yes, please output the complete ASP code with facts, query, and rules.
        Please  limit your output to 3000 tokens. NO Comments and explanation. The ASP should be ready for any solver without any syntatic and logical errors. No punctuation error.Donot forget include #show query/1. in each ASP, we will use that to print the answer to the query. """
        asp= dspy.InputField(desc="Initial ASP facts, query and rules ")
        rules= dspy.InputField(desc="Choose some rules based on the query and facts, make sure the rules are adequete and relevant to solve the query. After the reasoning process, include the rule and generate the complete ASP program.")
        revised_asp_code = dspy.OutputField(desc="Revised ASP code, including facts, query, and rules. Limit your output to 3000 tokens. NO Comments and explanation")

    def __init__(self, state):
        super().__init__()
        self.state = state
        self.predictor = dspy.ChainOfThought(self.Signature)

    def forward(self,  asp, prompt, rules, context, question,choices ):
        # Retrieve facts, context_question, and prompt_1 from shared state
        convert_data = self.state.memory.get('convert', {})
        context = convert_data.get('context', '')
        question= convert_data.get('question', '')
        choices= convert_data.get('choices', '')
        prompt = convert_data.get('prompt', '')
        result = self.predictor(asp=asp, rules=rules, prompt=prompt,context=context, question=question, choices=choices)
        revised_asp_code = result.revised_asp_code
        return dspy.Prediction(revised_asp_code=revised_asp_code)
class CombinedModule(dspy.Module):
    def __init__(self, state):
        super().__init__()
        self.state = state
        self.convert = Convert(state)
        self.revise_asp = ReviseASP(state)
    def forward(self, context,question, choices,prompt, rules):
        # First, convert the natural language description into ASP facts and query
        convert_result = self.convert(prompt=prompt, rules=rules, context=context, question=question, choices=choices)
        asp = convert_result.asp
        # Then, revise the initial ASP facts and query
        revise_result = self.revise_asp(asp=asp, rules=rules, prompt=prompt,context=context, question=question, choices=choices)
        revised_asp_code = revise_result.revised_asp_code
        with open("test_V2.asp", "w") as file:
            file.write(revised_asp_code)
        return dspy.Prediction(revised_asp_code=revised_asp_code)


# Step 3: Prepare the dataset
# Step 3: Prepare the dataset
df = pd.read_csv('quantifer_answer.csv')
clean_data = df.to_dict(orient='records')

valset = [
    dspy.Example(
       
        context=r["story"],
        question="".join(r["question"]),
        choices="".join(r["choices"]),
        answer="".join(r["answer"])
    ).with_inputs("context", "question","choices") #"rules", "choices"
    for r in clean_data 
]

example = valset[20]
test = CombinedModule() 

result = test(prompt=example.prompt,rules= example.rules, context=example.context, question=example.question,choices= example.choices)
