In [1]:
from app.core.db import Scoped_Session
import dspy
from dotenv import load_dotenv
import os
import openai
import warnings
from sqlalchemy.exc import SAWarning

load_dotenv()

warnings.filterwarnings("ignore", category=SAWarning)

session = Scoped_Session()

turbo = dspy.OpenAI(model='gpt-4o', api_key=os.getenv("OPENAI_API_KEY"), max_tokens=4096)
dspy.settings.configure(lm=turbo)

plan_model = "gpt-4o"
generate_model = "gpt-4o-mini"

user_query = "Could you summarize the performance improvement of TiDB from version 6.5 to the newest version?"

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
from app.rag.knowledge_graph.graph_store import TiDBGraphStore
from app.rag.knowledge_graph import KnowledgeGraphIndex
from llama_index.embeddings.openai import OpenAIEmbedding, OpenAIEmbeddingModelType

_embed_model = OpenAIEmbedding(
    model=OpenAIEmbeddingModelType.TEXT_EMBED_3_SMALL
)

graph_store = TiDBGraphStore(
    dspy_lm=turbo,
    session=session,
    embed_model=_embed_model,
)
graph_index =  KnowledgeGraphIndex = KnowledgeGraphIndex.from_existing(
    dspy_lm=turbo,
    kg_store=graph_store,
)

def retrieve_knowledge_graph(query):
    return graph_index.retrieve_with_weight(
        query,
        [],
        depth=1,
    )

In [3]:
from app.rag.vector_store.tidb_vector_store import TiDBVectorStore
from llama_index.core import VectorStoreIndex

vector_store = TiDBVectorStore(session=session)
vector_index = VectorStoreIndex.from_vector_store(
    vector_store,
    embed_model=_embed_model
)

def retrieve_knowledge_embedded_chunks(query, top_k=5):
    retriver = vector_index.as_retriever(
        similarity_top_k=5
    )

    nodes = retriver.retrieve(query)
    return [node.text for node in nodes]

In [4]:
def llm_generate(prompt):
    completion = openai.OpenAI().chat.completions.create(
        model=generate_model,
        messages=[{
            "role": "user",
            "content": prompt
        }],
    )

    return completion.choices[0].message.content

In [5]:
def QA(query):
    graph_data = retrieve_knowledge_graph(query)
    chunks_data = retrieve_knowledge_embedded_chunks(query)

    prompt = (
        f"The context:\n Related Graph Knowledge: {graph_data}\n\n Related chunk data {chunks_data}\n\n"
        f"please answer question: {query}\nThe answer is:"
    )

    return llm_generate(prompt)

In [6]:
from dotenv import load_dotenv
import openai
from pydantic import BaseModel
from typing import List, Dict, Optional

load_dotenv()

system_instruction = """
You are an intelligent assistant designed to analyze user queries by performing the following tasks:

1. **Analyze the Question**:
    - Break down the main question into a dependency graph that outlines the key components and their relationships.

2. **Break Down into Subquestions**:
    - Decompose the main question into smaller, specific, and manageable subquestions that are conducive to information retrieval.
    - Ensure that each subquestion is concrete and directly related to fetching necessary information.
    - Identify dependencies between subquestions where the answer to one subquestion is required to formulate or answer another.

3. **Generate an Action Plan**:
    - For each subquestion, create a corresponding action step to answer it.
    - Specify the appropriate tool to be used, any necessary arguments, and output tags for each step.
    - Ensure that dependent steps correctly reference the outputs of their prerequisite steps using `{output_tag}` placeholders.

**Available APIs/Tools**:

1. QA Tool:
  - Function: QA(query)
  - Description: Provides direct and accurate answers to user queries by leveraging retrieved knowledge and utilizing advanced natural language processing techniques. The QA Tool interprets the intent behind the query and synthesizes information from various sources to formulate coherent and contextually relevant responses.
  - Use Cases:
	- Direct Information Retrieval: Answering specific factual questions based on the retrieved data.
	- Contextual Assistance: Providing answers that consider the context of prior interactions or retrieved information to ensure relevance and coherence.

**Instructions**:

- **Subquestions Specificity**:
  - Each subquestion should be specific and aimed at retrieving precise information.
  - Avoid vague or overly broad subquestions that may hinder effective information retrieval.

- **Handling Dependencies**:
  - Identify and outline dependencies between subquestions.
  - Ensure that subquestions requiring information from previous steps reference them appropriately using `{output_tag}` placeholders in their `arguments`.

- **Utilizing Tools Effectively**:
  - Select the most appropriate tool for each subquestion based on its nature.
  - Ensure that `arguments` for each tool are correctly populated, incorporating any necessary data from dependent steps.

- **Output Structure**:
  - Use `output_tags` to uniquely identify the output of each step.
  - Reference these tags in subsequent steps to maintain the flow of information and dependencies.

**Example**:

*User Query*: "Could you summary the performance improvement of TiDB in the newest version."

*Generated Action Plan*:

```python
[
    Step(
        id=1,
        subquestion='What is the latest version of TiDB?',
        tool_used='QA',
        arguments={'query': 'Latest version of TiDB'},
        output_tags='latest_version'
    ),
    Step(
        id=2,
        subquestion='What are the performance improvements in the newest TiDB version?',
        tool_used='QA',
        arguments={'query': 'TiDB {latest_version} performance improvements'},
        output_tags='tidb_newest_performance'
    ),
    Step(
        id=4,
        subquestion='Summary the performance improvements of TiDB in the newest version.',
        tool_used='llm_generate',
        arguments={'prompt': 'Summary the performance improvements of TiDB in the newest version based on {tidb_newest_performance}.'},
        output_tags='performance_comparison'
    )
]
```

Dependency Graph:

```python
[
    DependencyEdge(from_step=1, to_step=2),
    DependencyEdge(from_step=2, to_step=3),
]
```
"""

class Step(BaseModel):
    id: int
    subquestion: str
    tool_used: Optional[str] = None
    arguments: Optional[Dict[str, str]] = None
    output_tags: Optional[str] = None

class DependencyEdge(BaseModel):
    from_step: int
    to_step: int

class QuestionAnalysis(BaseModel):
    steps: List[Step]
    dependency_graph: List[DependencyEdge]

messages = [
    {"role": "system", "content": system_instruction},
    {"role": "user", "content": user_query},
]

client = openai.OpenAI()

completion = client.beta.chat.completions.parse(
    model="gpt-4o-2024-08-06",
    messages=messages,
    response_format=QuestionAnalysis,
)

message = completion.choices[0].message


In [7]:
message.parsed.steps

[Step(id=1, subquestion='What is the newest version of TiDB?', tool_used='QA', arguments={'query': 'Newest version of TiDB'}, output_tags='newest_version'),
 Step(id=2, subquestion='What were the performance improvements made from TiDB version 6.5 to the newest version?', tool_used='QA', arguments={'query': 'Performance improvements in TiDB from version 6.5 to {newest_version}'}, output_tags='performance_improvements'),
 Step(id=3, subquestion='Summarize the performance improvements from TiDB version 6.5 to the newest version based on the retrieved information.', tool_used='QA', arguments={'query': 'Summarize these performance improvements in TiDB between version 6.5 and {newest_version}, based on: {performance_improvements}'}, output_tags='performance_summary')]

In [8]:
message.parsed.dependency_graph

[DependencyEdge(from_step=1, to_step=2),
 DependencyEdge(from_step=2, to_step=3)]

In [12]:
import re
import copy


# Step Execution
class PlanExecutor:
    def __init__(self):
        self.state = {
            'variables': {}
        }
    
    def log_error(self, message: str):
        print(f"ERROR: {message}")
    
    def log_info(self, message: str):
        print(f"INFO: {message}")
    
    def log_warning(self, message: str):
        print(f"WARNING: {message}")
    
    def save_milestone(self, milestone: str, output):
        print(f"MILESTONE: {milestone}, output: {output}")

    def log_execution(self, message: str):
        print(f"EXECUION: {message}")
    
    def retrieve_knowledge_embedded_chunks(self, query: str, top_k: int = 5) -> str:
        retriver = vector_index.as_retriever(
            similarity_top_k=5
        )

        nodes = retriver.retrieve(query)
        return [node.text for node in nodes]
    
    def llm_generate(self, prompt: str) -> str:
        return llm_generate(prompt)
    
    def retrieve_knowledge_graph(self, query: str) -> str:
        return graph_index.retrieve_with_weight(
            query,
            [],
            depth=1,
        )
    
    # Step Execution Handler
    def execute_step_handler(self, step: Step) -> bool:
        step_type = step.tool_used
        params = step.arguments or {}
    
        self.log_execution(f"execute step {step}")
        if step_type == "retrieve_knowledge_graph":
            query = params.get('query')
            if not query:
                self.log_error("No query provided for 'retrieve_knowledge_graph' instruction.")
                return False, None
            result = self.retrieve_knowledge_graph(query)
            self.state['variables'][step.output_tags] = result
            self.save_milestone(f"AfterStep{step.id}_KnowledgeGraphRetrieval", result)
            return True, result
    
        elif step_type == "retrieve_knowledge_embedded_chunks":
            query = params.get('query')
            top_k = int(params.get('top_k', 5))
            if not query:
                self.log_error("No query provided for 'retrieve_knowledge_embedded_chunks' instruction.")
                return False, None
            result = self.retrieve_knowledge_embedded_chunks(query, top_k)
            self.state['variables'][step.output_tags] = result
            self.save_milestone(f"AfterStep{step.id}_EmbeddedChunksRetrieval", result)
            return True, result
    
        elif step_type == "llm_generate":
            prompt = params.get('prompt')
            if not prompt:
                self.log_error("No prompt provided for 'llm_generate' instruction.")
                return False, None
            result = self.llm_generate(prompt)
            self.state['variables'][step.output_tags] = result
            self.save_milestone(f"AfterStep{step.id}_LLMGeneration", result)
            return True, result
        
        elif step_type == "QA":
            query = params.get('query')
            if not query:
                self.log_error("No query provided for 'QA' instruction.")
                return False, None
            result = QA(query)
            self.state['variables'][step.output_tags] = result
            self.save_milestone(f"AfterStep{step.id}_QA", result)
            return True, result
    
        else:
            self.log_warning(f"Unknown step type: {step_type}")
            return False, None
    
    # Plan Execution
    def execute_plan(self, plan: QuestionAnalysis) -> bool:
        plan = copy.deepcopy(plan)
        self.log_info("Starting plan execution.")
        # Determine execution order based on dependency graph
        # Simple approach: execute steps in the order they appear, assuming dependencies are met
        for step in plan.steps:
            # Replace placeholders in arguments
            if step.arguments:
                for key, value in step.arguments.items():
                    placeholders = re.findall(r'\{(.*?)\}', value)
                    for tag in placeholders:
                        replacement = self.state['variables'].get(tag)
                        if replacement:
                            value = value.replace(f'{{{tag}}}', str(replacement))
                        else:
                            self.log_error(f"Missing value for placeholder '{tag}' in step {step.id}.")
                            return False, None
                    step.arguments[key] = value
            
            # Execute the step
            success, result = self.execute_step_handler(step)
            if not success:
                self.log_error(f"Execution failed at step {step.id}.")
                return False, None
        
        self.state['goal_completed'] = True
        self.log_info("Plan executed successfully.")
        return True, result

In [13]:
executor = PlanExecutor()
success, result = executor.execute_plan(message.parsed)

if success:
    print("\nFinal Summary:")
    print(result)
else:
    print("\nPlan execution failed.")

INFO: Starting plan execution.
EXECUION: execute step id=1 subquestion='What is the newest version of TiDB?' tool_used='QA' arguments={'query': 'Newest version of TiDB'} output_tags='newest_version'


AttributeError: 'PlanExecutor' object has no attribute 'QA'

In [11]:
print(['## Rewrite DDL statements\n\nThe following statements are rewritten before being replicated to the downstream.\n\n|Original statement|Rewritten statement|\n|-|-|\n|`^CREATE DATABASE...`|`^CREATE DATABASE...IF NOT EXISTS`|\n|`^CREATE TABLE...`|`^CREATE TABLE..IF NOT EXISTS`|\n|`^DROP DATABASE...`|`^DROP DATABASE...IF EXISTS`|\n|`^DROP TABLE...`|`^DROP TABLE...IF EXISTS`|\n|`^DROP INDEX...`|`^DROP INDEX...IF EXISTS`|', '---\ntitle: Special Handling of DM DDLs\nsummary: Learn how DM parses and handles DDL statements according to the statement types.\n---\n\n# Special Handling of DM DDLs\n\nWhen TiDB Data Migration (DM) migrates data, it parses the DDL statements and handles them according to the statement type and the current migration stage.', '## Rewrite DDL statements\n\nThe following statements are rewritten before being replicated to the downstream.\n\n|Original statement|Rewritten statement|\n|-|-|\n|`^CREATE DATABASE...`|`^CREATE DATABASE...IF NOT EXISTS`|\n|`^CREATE TABLE...`|`^CREATE TABLE..IF NOT EXISTS`|\n|`^DROP DATABASE...`|`^DROP DATABASE...IF EXISTS`|\n|`^DROP TABLE...`|`^DROP TABLE...IF EXISTS`|\n|`^DROP INDEX...`|`^DROP INDEX...IF EXISTS`|', '## Skip DDL statements\n\nThe following statements are not supported by DM, so DM skips them directly after parsing.\n\n<table>\n    <tr>\n        <th>Description</th>\n        <th>SQL</th>\n    </tr>\n    <tr>\n        <td>transaction</td>\n        <td><code>^SAVEPOINT</code></td>\n    </tr>\n    <tr>\n        <td>skip all flush sqls</td>\n        <td><code>^FLUSH</code></td>\n    </tr>\n    <tr>\n        <td rowspan="3">table maintenance</td>\n        <td><code>^OPTIMIZE\\\\s+TABLE</code></td>\n    </tr>\n    <tr>\n        <td><code>^ANALYZE\\\\s+TABLE</code></td>\n    </tr>\n    <tr>\n        <td><code>^REPAIR\\\\s+TABLE</code></td>\n    </tr>\n    <tr>\n        <td>temporary table</td>\n        <td><code>^DROP\\\\s+(\\\\/\\\\*\\\\!40005\\\\s+)?TEMPORARY\\\\s+(\\\\*\\\\/\\\\s+)?TABLE</code></td>\n    </tr>\n    <tr>\n        <td rowspan="2">trigger</td>\n        <td><code>^CREATE\\\\s+(DEFINER\\\\s?=.+?)?TRIGGER</code></td>\n    </tr>\n    <tr>\n        <td><code>^DROP\\\\s+TRIGGER</code></td>\n    </tr>\n    <tr>\n        <td rowspan="3">procedure</td>\n        <td><code>^DROP\\\\s+PROCEDURE</code></td>\n    </tr>\n    <tr>\n        <td><code>^CREATE\\\\s+(DEFINER\\\\s?=.+?)?PROCEDURE</code></td>\n    </tr>\n    <tr>\n        <td><code>^ALTER\\\\s+PROCEDURE</code></td>\n    </tr>\n    <tr>\n        <td rowspan="3">view</td>\n        <td><code>^CREATE\\\\s*(OR REPLACE)?\\\\s+(ALGORITHM\\\\s?=.+?)?(DEFINER\\\\s?=.+?)?\\\\s+(SQL SECURITY DEFINER)?VIEW</code></td>\n    </tr>\n    <tr>\n        <td><code>^DROP\\\\s+VIEW</code></td>\n    </tr>\n    <tr>\n        <td><code>^ALTER\\\\s+(ALGORITHM\\\\s?=.+?)?(DEFINER\\\\s?=.+?)?(SQL SECURITY DEFINER)?VIEW</code></td>\n    </tr>\n    <tr>\n        <td rowspan="4">function</td>\n        <td><code>^CREATE\\\\s+(AGGREGATE)?\\\\s*?FUNCTION</code></td>\n    </tr>\n    <tr>\n        <td><code>^CREATE\\\\s+(DEFINER\\\\s?=.+?)?FUNCTION</code></td>\n    </tr>\n    <tr>\n        <td><code>^ALTER\\\\s+FUNCTION</code></td>\n    </tr>\n    <tr>\n        <td><code>^DROP\\\\s+FUNCTION</code></td>\n    </tr>\n    <tr>\n        <td rowspan="3">tableSpace</td>\n        <td><code>^CREATE\\\\s+TABLESPACE</code></td>\n    </tr>\n    <tr>\n        <td><code>^ALTER\\\\s+TABLESPACE</code></td>\n    </tr>\n    <tr>\n        <td><code>^DROP\\\\s+TABLESPACE</code></td>\n    </tr>\n    <tr>\n        <td rowspan="3">event</td>\n        <td><code>^CREATE\\\\s+(DEFINER\\\\s?=.+?)?EVENT</code></td>\n    </tr>\n    <tr>\n        <td><code>^ALTER\\\\s+(DEFINER\\\\s?=.+?)?EVENT</code></td>\n    </tr>\n    <tr>\n        <td><code>^DROP\\\\s+EVENT</code></td>\n    </tr>\n    <tr>\n        <td rowspan="7">account management</td>\n        <td><code>^GRANT</code></td>\n    </tr>\n    <tr>\n        <td><code>^REVOKE</code></td>\n    </tr>\n    <tr>\n        <td><code>^CREATE\\\\s+USER</code></td>\n    </tr>\n    <tr>\n        <td><code>^ALTER\\\\s+USER</code></td>\n    </tr>\n    <tr>\n        <td><code>^RENAME\\\\s+USER</code></td>\n    </tr>\n    <tr>\n        <td><code>^DROP\\\\s+USER</code></td>\n    </tr>\n    <tr>\n        <td><code>^DROP\\\\s+USER</code></td>\n    </tr>\n</table>', '## DDL related statement\n\n<CustomContent platform="tidb-cloud">\n\n| Statement                                                                                | Description                 |\n|------------------------------------------------------------------------------------------|-----------------------------|\n| [`ADMIN CANCEL DDL JOBS`](/sql-statements/sql-statement-admin-cancel-ddl.md)             | Cancels a currently running DDL jobs. |\n| [`ADMIN CHECKSUM TABLE`](/sql-statements/sql-statement-admin-checksum-table.md)          | Calculates the CRC64 of all rows + indexes of a table. |\n| [<code>ADMIN CHECK [TABLE\\|INDEX]</code>](/sql-statements/sql-statement-admin-check-table-index.md) | Checks for consistency of a table or index. |\n| [<code>ADMIN SHOW DDL [JOBS\\|QUERIES]</code>](/sql-statements/sql-statement-admin-show-ddl.md)      | Shows details about currently running or recently completed DDL jobs. |\n\n</CustomContent>\n\n<CustomContent platform="tidb">\n\n| Statement                                                                                | Description                 |\n|------------------------------------------------------------------------------------------|-----------------------------|\n| [`ADMIN CANCEL DDL JOBS`](/sql-statements/sql-statement-admin-cancel-ddl.md)             | Cancels a currently running DDL jobs. |\n| [`ADMIN CHECKSUM TABLE`](/sql-statements/sql-statement-admin-checksum-table.md)          | Calculates the CRC64 of all rows + indexes of a table. |\n| [<code>ADMIN CHECK [TABLE\\|INDEX]</code>](/sql-statements/sql-statement-admin-check-table-index.md) | Checks for consistency of a table or index. |\n| [<code>ADMIN SHOW DDL [JOBS\\|QUERIES]</code>](/sql-statements/sql-statement-admin-show-ddl.md)      | Shows details about currently running or recently completed DDL jobs. |\n| [`ADMIN SHOW TELEMETRY`](/sql-statements/sql-statement-admin-show-telemetry.md)      | Shows information that will be reported back to PingCAP as part of the telemetry feature. |\n\n</CustomContent>'])

['## Rewrite DDL statements\n\nThe following statements are rewritten before being replicated to the downstream.\n\n|Original statement|Rewritten statement|\n|-|-|\n|`^CREATE DATABASE...`|`^CREATE DATABASE...IF NOT EXISTS`|\n|`^CREATE TABLE...`|`^CREATE TABLE..IF NOT EXISTS`|\n|`^DROP DATABASE...`|`^DROP DATABASE...IF EXISTS`|\n|`^DROP TABLE...`|`^DROP TABLE...IF EXISTS`|\n|`^DROP INDEX...`|`^DROP INDEX...IF EXISTS`|', '---\ntitle: Special Handling of DM DDLs\nsummary: Learn how DM parses and handles DDL statements according to the statement types.\n---\n\n# Special Handling of DM DDLs\n\nWhen TiDB Data Migration (DM) migrates data, it parses the DDL statements and handles them according to the statement type and the current migration stage.', '## Rewrite DDL statements\n\nThe following statements are rewritten before being replicated to the downstream.\n\n|Original statement|Rewritten statement|\n|-|-|\n|`^CREATE DATABASE...`|`^CREATE DATABASE...IF NOT EXISTS`|\n|`^CREATE TABLE...`|`^