In [1]:
#!pip install --quiet dspy-ai

In [2]:
import os

print(os.getcwd())
os.chdir('/Users/rudra/work/personal_projects/chat_reddit/csv_insights')
print(os.getcwd())

/Users/rudra/work/personal_projects/chat_reddit/csv_insights/notebooks
/Users/rudra/work/personal_projects/chat_reddit/csv_insights


In [3]:
import os

os.environ["OPENAI_API_KEY"] = ""

### Setup

In [4]:
import dspy

# MODEL = "gpt-4-turbo-preview"  # "gpt-3.5-turbo"
MODEL = "gpt-3.5-turbo"
lm = dspy.OpenAI(model="gpt-4-turbo-preview", max_tokens=500)
dspy.settings.configure(lm=lm)

### Dataset

In [5]:
# Examples
dataset = [
    dspy.Example(question="Show me top 5 categories in the trends."),
    dspy.Example(question="Show me the top 10 posts with highest upvotes in a tabular format"),
    dspy.Example(question="What has been the biggest complaint from the users?"),
    dspy.Example(question="What percentage of the posts are critical?"),
    dspy.Example(question="What has been the biggest talking point in the forum?"),
    
]

dataset = [x.with_inputs('question') for x in dataset]

### Signature

In [6]:
class BasicDataAnalysis(dspy.Signature):
    """Answer the questions with answers with the help of given tools."""
    question = dspy.InputField()
    answer = dspy.OutputField(desc="answer the given question based on the observations generated by the tools.")

### Reddit Data

In [7]:
from csvinsights.core.reddit_data import get_reddit_data

CSV_FILE_PATH = './data/reddit_data.csv'
df = get_reddit_data(CSV_FILE_PATH)

Reading csv file from ./data/reddit_data.csv
(1982, 8)


### Tools

In [8]:
import pandas as pd
import ast
import re
from contextlib import redirect_stdout
from io import StringIO

"""
############   Tools ##################
"""


def sanitize_input(query: str) -> str:
    """Sanitize input to the python REPL.
    Remove whitespace, backtick & python (if llm mistakes python console as terminal)

    Args:
        query: The query to sanitize

    Returns:
        str: The sanitized query
    """

    # Removes `, whitespace & python from start
    query = re.sub(r"^(\s|`)*(?i:python)?\s*", "", query)
    # Removes whitespace & ` from end
    query = re.sub(r"(\s|`)*$", "", query)
    return query


class PythonInterpreterTool:
    name = "python interpreter"
    input_variable = "python_code"
    desc = """takes a python code as string as input and returns the output of the code. 
    It has a df which contains all the data to answer the question with columns {column_names}.
    """

    def __init__(self, df, k=3):
        self.locals = {"df": df}
        self.globals = {}
        column_names = ", ".join(df.columns.tolist())
        self.desc = self.desc.format(column_names=column_names)
        self.sanitize_input = True

    def __call__(self, python_code: str, *args, **kwargs) -> str:
        """Use the tool."""
        try:
            if self.sanitize_input:
                python_code = sanitize_input(python_code)
            tree = ast.parse(python_code)
            module = ast.Module(tree.body[:-1], type_ignores=[])
            exec(ast.unparse(module), self.globals, self.locals)  # type: ignore
            module_end = ast.Module(tree.body[-1:], type_ignores=[])
            module_end_str = ast.unparse(module_end)  # type: ignore
            io_buffer = StringIO()
            try:
                with redirect_stdout(io_buffer):
                    ret = eval(module_end_str, self.globals, self.locals)
                    if ret is None:
                        return io_buffer.getvalue()
                    else:
                        if not isinstance(ret, str):
                            return str(ret)
                        return ret

            except Exception:
                with redirect_stdout(io_buffer):
                    exec(module_end_str, self.globals, self.locals)
                return io_buffer.getvalue()
        except Exception as e:
            return "{}: {}".format(type(e).__name__, str(e))


In [9]:
#from csvinsights.core.dspy.tools import PythonInterpreterTool

tools = [PythonInterpreterTool(df=df)]

### ReACT module

In [10]:
import dsp
import dspy
from dspy.primitives.program import Module
from dspy.predict import Predict

# TODO: Simplify a lot.
# TODO: Divide Action and Action Input like langchain does for ReAct.


class ReAct(Module):
    def __init__(self, signature, max_iters=5, num_results=3, tools=None):
        super().__init__()
        self.signature = signature = dspy.Predict(signature).signature
        self.max_iters = max_iters

        self.tools = tools or [dspy.Retrieve(k=num_results)]
        self.tools = {
            tool.name: tool for tool in self.tools
        }  # if isinstance(self.tools, list) else self.tools

        self.input_fields = {
            k: v
            for k, v in self.signature.kwargs.items()
            if isinstance(v, dspy.InputField)
        }
        self.output_fields = {
            k: v
            for k, v in self.signature.kwargs.items()
            if isinstance(v, dspy.OutputField)
        }

        inputs, outputs = signature.fields[:-1], signature.fields[-1:]

        inputs_ = ", ".join([f"`{field.input_variable}`" for field in inputs])
        outputs_ = ", ".join([f"`{field.output_variable}`" for field in outputs])

        assert len(outputs) == 1, "ReAct only supports one output field."

        instr = []
        instr.append(
            f"You will be given {inputs_} and you will respond with {outputs_}.\n"
        )
        instr.append(
            "To do this, you will interleave Thought, Action, and Observation steps.\n"
        )
        instr.append(
            "Thought can reason about the current situation, and Action can be the following types:\n"
        )

        self.tools["Finish"] = dspy.Example(
            name="Finish",
            input_variable=outputs_.strip("`"),
            desc=f"returns the final {outputs_} and finishes the task",
        )

        for idx, tool in enumerate(self.tools):
            tool = self.tools[tool]
            instr.append(
                f"({idx+1}) {tool.name}[{tool.input_variable}], which {tool.desc}"
            )

        instr = "\n".join(instr)
        self.react = [
            Predict(dsp.Template(instr, **self._generate_signature(i)))
            for i in range(1, max_iters + 1)
        ]

    def _generate_signature(self, iters):
        signature_dict = {}
        for key, val in self.input_fields.items():
            signature_dict[key] = val

        for j in range(1, iters + 1):
            signature_dict[f"Thought_{j}"] = dspy.OutputField(
                prefix=f"Thought {j}:",
                desc="next steps to take based on last observation",
            )

            tool_list = " or ".join(
                [
                    f"{tool.name}[{tool.input_variable}]"
                    for tool in self.tools.values()
                    if tool.name != "Finish"
                ]
            )
            signature_dict[f"Action_{j}"] = dspy.OutputField(
                prefix=f"Action {j}:",
                desc=f"always either {tool_list} or, when done, Finish[answer]",
            )

            if j < iters:
                signature_dict[f"Observation_{j}"] = dspy.OutputField(
                    prefix=f"Observation {j}:",
                    desc="observations based on action",
                    format=dsp.passages2text,
                )

        return signature_dict

    def act(self, output, hop):
        try:
            action = output[f"Action_{hop+1}"]
            action_name, action_val = action.strip().split("\n")[0].split("[", 1)
            action_val = action_val.rsplit("]", 1)[0]

            if action_name == "Finish":
                return action_val

            output[f"Observation_{hop+1}"] = self.tools[action_name](
                action_val
            )  # .passages # RRP_DEBUG

        except Exception as e:
            output[f"Observation_{hop+1}"] = (
                "Failed to parse action. Bad formatting or incorrect action name."
            )

    def forward(self, **kwargs):
        args = {key: kwargs[key] for key in self.input_fields.keys() if key in kwargs}

        for hop in range(self.max_iters):
            # with dspy.settings.context(show_guidelines=(i <= 2)):
            output = self.react[hop](**args)

            if action_val := self.act(output, hop):
                break
            args.update(output)

        # assumes only 1 output field for now - TODO: handling for multiple output fields
        return dspy.Prediction(**{list(self.output_fields.keys())[0]: action_val or ""})


In [11]:
#from csvinsights.core.dspy.react import ReAct

# react_module = dspy.ReAct(BasicDataAnalysis, tools=tools)
react_module = ReAct(BasicDataAnalysis, tools=tools)

### Execute the pipeline in zero shot settings

In [12]:
question = "show me the post with highest upvote"

pred = react_module(question=question)

print(f"Question: {question}")
print(f"Predicted answer: {pred.answer}")

Question: show me the post with highest upvote
Predicted answer: The post with the highest upvote is titled "Pure Utopia" with a score of 421. It is found in the subreddit r/GeForceNOW under the category 'hot'. The post ID is 19eeu4p, and it was authored by Xandermacer. The post can be viewed at https://i.redd.it/gj01kdhxgdec1.jpeg.


In [13]:
lm.inspect_history(n=5)





You will be given `question` and you will respond with `answer`.

To do this, you will interleave Thought, Action, and Observation steps.

Thought can reason about the current situation, and Action can be the following types:

(1) python interpreter[python_code], which takes a python code as string as input and returns the output of the code. 
    It has a df which contains all the data to answer the question with columns page_content, post_subreddit, post_category, post_title, post_score, post_id, post_url, post_author.
    
(2) Finish[answer], which returns the final `answer` and finishes the task

---

Follow the following format.

Question: ${question}
Thought 1: next steps to take based on last observation
Action 1: always either python interpreter[python_code] or, when done, Finish[answer]

---

Question: show me the post with highest upvote
Thought 1:[32m To find the post with the highest upvote, I need to identify the post with the maximum value in the `post_score` column 

### Define evaluation metric

In [14]:
class Assess(dspy.Signature):
    assessed_text = dspy.InputField()
    assessment_question = dspy.InputField()
    assessment_answer = dspy.OutputField(desc= "Yes or No")

In [15]:
gpt4T = dspy.OpenAI(model="gpt-4-1106-preview", max_tokens=1000, model_type="chat")

def validate_answer(question, pred, trace=None):
    question, answer = question, pred.answer
    
    correctness = f"The text should answer `{question}`. Does the assessed text answer the question?"
    
    with dspy.context(lm=gpt4T):
        correctness = dspy.Predict(Assess)(assessed_text=answer, assessment_question=correctness)
    
    correctness = correctness.assessment_answer.lower() == 'yes'
    
    score = 1 if correctness else 0
    
    if trace is not None: return score == 1
    return score

### Optimize the pipeline

In [16]:
from dspy.teleprompt import BootstrapFewShot

def get_compiled_model():
    teleprompter = BootstrapFewShot(metric=validate_answer)
    compiled_model = teleprompter.compile(react_module, trainset=dataset)
    return compiled_model

In [17]:
compiled_model = get_compiled_model()

TypeError: cannot pickle '_contextvars.Context' object

In [None]:
### Execute the pipeline

question = "Show me the post with highest upvotes"

pred = compiled_model(question=question)

print(f"Question: {question}")
print(f"Predicted answer: {pred.answer}")

In [None]:
lm.inspect_history(n=5)