In [1]:
import sys
sys.path.append("..")  # Add the project root to Python path

import requests
from datetime import datetime, timedelta
from typing import List, Dict, Optional


from notebooks.optimize_plan import get_task_answer, update_plan, execute_task_using_new_plan, evaulate_task_answer, stackvm_host

def get_evaluation_pending_tasks(        
    start_time: Optional[datetime] = None,
    end_time: Optional[datetime] = None,
    evaluation_statuses: Optional[List[str]] = None
) -> List[Dict]:
    """
    Fetches the list of tasks pending evaluation from the API.

    Args:
        start_time (Optional[datetime]): The start time to filter tasks.
        end_time (Optional[datetime]): The end time to filter tasks.
        evaluation_statuses (Optional[List[str]]): List of evaluation statuses to filter by. Defaults to ['NOT_EVALUATED'].

    Returns:
        List[Dict]: A list of tasks pending evaluation.
    
    Raises:
        requests.exceptions.RequestException: If the request fails.
        ValueError: If the response cannot be decoded.
    """
    endpoint = f"{stackvm_host}/api/tasks/evaluation"
    params = {}
    
    if start_time:
        params['start_time'] = start_time.isoformat()
    if end_time:
        params['end_time'] = end_time.isoformat()
    if evaluation_statuses:
        # Join multiple statuses with commas
        params['evaluation_status'] = ','.join(evaluation_statuses)
    else:
        # Default to NOT_EVALUATED if no statuses are provided
        params['evaluation_status'] = 'NOT_EVALUATED'
    
    try:
        response = requests.get(endpoint, params=params)
        response.raise_for_status()  # Raise an HTTPError for bad responses (4XX or 5XX)
        data = response.json()
        
        if not isinstance(data, list):
            raise ValueError("Unexpected response format: Expected a list of tasks.")
        
        return data
    except requests.exceptions.RequestException as e:
        # Handle network-related errors
        print(f"An error occurred while making the request: {e}")
        raise
    except ValueError as ve:
        # Handle JSON decoding errors or unexpected data formats
        print(f"An error occurred while processing the response: {ve}")
        raise

def record_evaluation(
    task_id: str,
    evaluation_status: str,
    evaluation_reason: Optional[str] = "",
    timeout: int = 60
) -> Dict:
    """
    Records the evaluation result of a specific task by calling the API endpoint.

    Args:
        base_url (str): The base URL of the API (e.g., 'http://stackvm-dev.tidb.ai:5556').
        task_id (str): The ID of the task to be evaluated.
        evaluation_status (str): The evaluation status (e.g., "APPROVED", "REJECTED").
        evaluation_reason (Optional[str]): The reason for the evaluation decision.
        api_token (Optional[str]): API token for authentication, if required.
        timeout (int): Timeout in seconds for the API request.

    Returns:
        Dict: The JSON response from the API indicating success or failure.
    
    Raises:
        requests.exceptions.RequestException: If the request fails.
        ValueError: If the response cannot be decoded or contains an error.
    """
    endpoint = f"{stackvm_host}/api/tasks/{task_id}/evaluation"
    payload = {
        "evaluation_status": evaluation_status,
        "evaluation_reason": evaluation_reason
    }
    headers = {
        "Content-Type": "application/json"
    }

    try:
        response = requests.post(endpoint, json=payload, headers=headers, timeout=timeout)
        response.raise_for_status()
        data = response.json()

        if not isinstance(data, dict):
            raise ValueError("Unexpected response format: Expected a JSON object.")

        if not data.get("success", False):
            error_message = data.get("error", "Unknown error occurred.")
            raise ValueError(f"API Error: {error_message}")

        return data

    except requests.exceptions.RequestException as e:
        print(f"An error occurred while making the request: {e}")
        raise
    except ValueError as ve:
        print(f"An error occurred while processing the response: {ve}")
        raise

def record_human_evaluation(
    task_id: str,
    evaluation_status: str,
    feedback: Optional[str] = "",
    timeout: int = 60
) -> Dict:
    """
    Records the evaluation result of a specific task by calling the API endpoint.

    Args:
        base_url (str): The base URL of the API (e.g., 'http://stackvm-dev.tidb.ai:5556').
        task_id (str): The ID of the task to be evaluated.
        evaluation_status (str): The evaluation status (e.g., "APPROVED", "REJECTED").
        evaluation_reason (Optional[str]): The reason for the evaluation decision.
        api_token (Optional[str]): API token for authentication, if required.
        timeout (int): Timeout in seconds for the API request.

    Returns:
        Dict: The JSON response from the API indicating success or failure.
    
    Raises:
        requests.exceptions.RequestException: If the request fails.
        ValueError: If the response cannot be decoded or contains an error.
    """
    endpoint = f"{stackvm_host}/api/tasks/{task_id}/human_evaluation"
    payload = {
        "evaluation_status": evaluation_status,
        "feedback": feedback
    }
    headers = {
        "Content-Type": "application/json"
    }

    try:
        response = requests.post(endpoint, json=payload, headers=headers, timeout=timeout)
        response.raise_for_status()
        data = response.json()

        if not isinstance(data, dict):
            raise ValueError("Unexpected response format: Expected a JSON object.")

        if not data.get("success", False):
            error_message = data.get("error", "Unknown error occurred.")
            raise ValueError(f"API Error: {error_message}")

        return data

    except requests.exceptions.RequestException as e:
        print(f"An error occurred while making the request: {e}")
        raise
    except ValueError as ve:
        print(f"An error occurred while processing the response: {ve}")
        raise


In [4]:
import json
from app.utils.json import extract_json

def optimize_plan(task_id:str, branch_name:Optional[str]="main", max_iteration=2):
    current_branch_name = branch_name
    error_message = None
    iteration_round = 0

    while True:
        print(f"Start to evaluate plan for task(id={task_id},branch={current_branch_name})")
        detail = get_task_answer(task_id, current_branch_name)

        if detail is not None:
            goal = detail.get("goal")
            final_answer = detail.get("final_answer")
            plan = detail.get("plan")
            metadata = detail.get("metadata")

            response = evaulate_task_answer(goal, metadata, final_answer, plan)
            try:
                eval_res_str = extract_json(response)
                eval_res = json.loads(eval_res_str)
            except Exception as e:
                print(f"Failed to decode evaluation result {e}: {response}")
                return

            eval_status = "APPROVED" if eval_res.get("accept", False) else "REJECTED"
            eval_reason = json.dumps(eval_res, indent=2) 

            record_evaluation(task_id, eval_status, eval_reason)

            if eval_res.get("accept", False) is True:
                print(f"Goal Pass! {goal}, evaluation result:{eval_reason}")
                return

            print(f"Goal Not Pass! {goal}, the evaluation result:{eval_reason}")

            if iteration_round >= max_iteration:
                break

            revised_plan_response = update_plan(goal, metadata, plan, eval_reason)

            try:
                revised_plan_str = extract_json(revised_plan_response)
                revised_plan = json.loads(revised_plan_str)
            except Exception as e:
                error_message = f"Failed to decode revised plan {e}: {revised_plan_response}"
                break

            print("revised plan:", revised_plan)

            try:
                updated_result = execute_task_using_new_plan(task_id, revised_plan)
                print(f"Revised plan execution result {updated_result}")
            except Exception as e:
                error_message = f"Failed to execute task using new plan {e}"
                break
            
            current_branch_name = updated_result.get("branch_name", None)
            current_final_answer = updated_result.get("final_answer", None)
            if current_branch_name is None or current_final_answer is None:
                error_message = "Failed to execut task using new plan, get empty answer"
                break

            iteration_round += 1
    
    if error_message is None:
        error_message = "Still failed after two evaluations round."
    record_human_evaluation(task_id, "WAITTING_FOR_EVALUATION", error_message)
            

    


In [6]:
from app.controller.label_classifier import LabelClassifier


# optimize_plan("0eda1580-95db-4219-a23b-58336c3b8f4e", "main")

classifier = LabelClassifier()

end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=2)

pending_tasks = get_evaluation_pending_tasks(
    start_time=start_time
)

for task in pending_tasks:
    task_id = task["id"]
    optimize_plan(task_id, "main")
    break


Start to evaluate plan for task(id=858e373f-09df-4e1c-8b64-e7413c4aeb2d,branch=main)


2025-01-13 20:25:01,591 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


Goal:How can we set up a replication strategy in TiDB for backing up over 50TB of data from a TiDB-v4.0.14 cluster and restoring it to a new cluster as part of a disaster recovery strategy, using BR and TiCDC, without impacting production performance or compromising storage utilization?, the evaluation result:{'accept': False, 'answer_quality_assessment_explanation': 'The final answer does not fully resolve the goal as it lacks specific guidance on how to manage garbage collection settings, which is a critical aspect mentioned in the supplementary information. The user seeks an alternative approach to manage large-scale data backup and restore in TiDB without disabling garbage collection or significantly increasing its duration. The answer provides a comprehensive plan for backup and replication but does not address the garbage collection issue, which could impact production performance and storage efficiency. This omission fails to meet the guideline of providing a direct problem reso

2025-01-13 20:25:37,328 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


revised plan: [{'seq_no': 0, 'type': 'reasoning', 'parameters': {'chain_of_thoughts': 'To set up a replication strategy in TiDB for backing up over 50TB of data from a TiDB-v4.0.14 cluster and restoring it to a new cluster as part of a disaster recovery strategy, we will use BR (Backup & Restore) for full data backup and TiCDC for incremental data replication. The plan will ensure minimal impact on production performance and efficient storage utilization.\n\n1. **Overall Strategy**:\n   - Use BR for full backup of the existing cluster.\n   - Use TiCDC to capture and replicate incremental changes.\n   - Restore the full backup to the new cluster.\n   - Apply incremental changes to the new cluster.\n\n2. **Key Decision Points**:\n   - Scheduling backups during low-traffic periods to minimize performance impact.\n   - Configuring TiCDC to efficiently handle incremental data without overwhelming the network or storage.\n\n3. **Assumptions**:\n   - The production environment can tolerate sc

2025-01-13 20:26:45,900 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


Goal:How can we set up a replication strategy in TiDB for backing up over 50TB of data from a TiDB-v4.0.14 cluster and restoring it to a new cluster as part of a disaster recovery strategy, using BR and TiCDC, without impacting production performance or compromising storage utilization?, the evaluation result:{'accept': False, 'answer_quality_assessment_explanation': "The final answer does not fully resolve the goal as it lacks specific guidance on how to manage garbage collection without disabling it or significantly increasing its duration, which is a key concern for the user. Additionally, while the answer provides a comprehensive plan for backup and replication, it does not address the potential impact on production performance and storage efficiency in sufficient detail. The answer also does not include SQL examples, which could enhance understanding, as suggested by the supplementary information. This falls short of the 'Direct Problem Resolution' guideline, as it does not provid

2025-01-13 20:27:22,342 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


revised plan: [{'seq_no': 0, 'type': 'reasoning', 'parameters': {'chain_of_thoughts': 'To set up a replication strategy in TiDB for backing up over 50TB of data from a TiDB-v4.0.14 cluster and restoring it to a new cluster as part of a disaster recovery strategy, we will use BR (Backup & Restore) for full data backup and TiCDC for incremental data replication. The plan will ensure minimal impact on production performance and efficient storage utilization.\n\n1. **Overall Strategy**:\n   - Use BR for full backup of the existing cluster.\n   - Use TiCDC to capture and replicate incremental changes.\n   - Restore the full backup to the new cluster.\n   - Apply incremental changes to the new cluster.\n\n2. **Key Decision Points**:\n   - Scheduling backups during low-traffic periods to minimize performance impact.\n   - Configuring TiCDC to efficiently handle incremental data without overwhelming the network or storage.\n\n3. **Assumptions**:\n   - The production environment can tolerate sc