Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions prompting/tasks/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ class BaseTextTask(BaseTask):
sampling_params: dict[str, float] = shared_settings.SAMPLING_PARAMS
timeout: int = shared_settings.NEURON_TIMEOUT

@property
def task_messages(self) -> list[str] | list[dict]:
return self.messages if self.messages else [{"role": "user", "content": self.query}]

@model_validator(mode="after")
def get_model_id_and_seed(self) -> "BaseTextTask":
if self.llm_model:
Expand Down
46 changes: 46 additions & 0 deletions prompting/tasks/multi_step_reasoning.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,45 @@ class MultiStepReasoningRewardConfig(BaseRewardConfig):
]


# Used to instruct the LLM to provide a good query when given a context
QUERY_SYSTEM_PROMPT = """\
You are a master of crafting intellectually stimulating questions that unfold across multiple sentences. Each question you generate should be structured as a brief narrative or scenario, where crucial information is deliberately distributed across multiple sentences. The complete question can only be understood and answered by carefully considering all the information provided across these sentences.

Your questions should:
1. Begin with context or background information
2. Introduce key variables or constraints in subsequent sentences
3. Present the actual question in the final sentence
4. Require analytical reasoning rather than mere fact recall
5. Draw from the provided context when available
6. Incorporate multiple related concepts or data points

EXAMPLE FORMATS:
✓ "The International Space Station orbits at an average height of 400km above Earth. At this height, it completes one orbit every 92 minutes. Assuming constant speed, how many kilometers does the ISS travel in one Earth day?"

✓ "A new streaming service launches with 500,000 subscribers in January. They observe that they lose 5% of their existing subscribers each month, but also gain 50,000 new subscribers in the same period. Their infrastructure costs increase by $100,000 for every 200,000 subscribers. What will their monthly infrastructure costs be after 6 months?"

✓ "The average American household generates 4.5 pounds of trash daily. Local recycling programs typically reduce landfill waste by 30%. Your city has just implemented a new composting initiative that diverts an additional 25% of waste from landfills. Considering there are 50,000 households in your city, how many pounds of waste would still reach landfills each week?"

AVOID:
- Single-sentence questions
- Questions answerable with simple facts
- Questions without context or background
- Obvious or straightforward calculations
- Questions that don't require analysis

Remember: The goal is to create questions where the context and parameters are revealed progressively, requiring the reader to integrate information across multiple sentences to fully understand and solve the problem.
"""

QUERY_PROMPT_TEMPLATE = """\
Ask a specific question about the following context:

#Context:
{context}

You must ask a question that can be answered by the context.
"""


class MultiStepReasoningTask(WikiQuestionAnsweringTask):
"""QuestionAnsweringTasks must be initialised with an LLM pipeline to generate query and reference plus
context from a dataset to base the query on"""
Expand All @@ -184,6 +223,13 @@ class MultiStepReasoningTask(WikiQuestionAnsweringTask):
query: str | None = None
reference: str | None = None

def make_query(self, dataset_entry: Context):
query_prompt = QUERY_PROMPT_TEMPLATE.format(context=dataset_entry.content)
question = self.generate_query(messages=[QUERY_SYSTEM_PROMPT, query_prompt])
msgs = [p + ". " if i < len(question.split(". ")) - 1 else p for i, p in enumerate(question.split(". ")) if p]
self.messages = [{"role": "user", "content": msg} for msg in msgs]
return self.query

def make_reference(self, dataset_entry: Context):
logger.info(f"Generating reference for Multi Step Reasoning task with query: {self.query}")
steps, total_thinking_time = execute_multi_step_reasoning(user_query=self.query)
Expand Down
25 changes: 7 additions & 18 deletions prompting/tasks/task_sending.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,13 @@ async def collect_responses(task: BaseTextTask) -> DendriteResponseEvent | None:
logger.warning("No available miners. This should already have been caught earlier.")
return

if isinstance(task, InferenceTask):
body = {
"seed": task.seed,
"sampling_parameters": task.sampling_params,
"task": task.__class__.__name__,
"model": task.llm_model_id,
"messages": task.query,
}
else:
body = {
"seed": task.seed,
"sampling_parameters": task.sampling_params,
"task": task.__class__.__name__,
"model": task.llm_model_id,
"messages": [
{"role": "user", "content": task.query},
],
}
body = {
"seed": task.seed,
"sampling_parameters": task.sampling_params,
"task": task.__class__.__name__,
"model": task.llm_model_id,
"messages": task.task_messages,
}
if isinstance(task, WebRetrievalTask):
body["target_results"] = task.target_results
body["timeout"] = task.timeout
Expand Down
6 changes: 0 additions & 6 deletions shared/uids.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,6 @@ def check_uid_availability(

# Filter validator permit > 1024 stake.
if metagraph.validator_permit[uid] and metagraph.S[uid] > shared_settings.NEURON_VPERMIT_TAO_LIMIT:
logger.debug(
f"uid: {uid} has vpermit and stake ({metagraph.S[uid]}) > {shared_settings.NEURON_VPERMIT_TAO_LIMIT}"
)
logger.debug(
f"uid: {uid} has vpermit and stake ({metagraph.S[uid]}) > {shared_settings.NEURON_VPERMIT_TAO_LIMIT}"
)
return False

if coldkeys and metagraph.axons[uid].coldkey in coldkeys:
Expand Down
4 changes: 2 additions & 2 deletions validator_api/gpt_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async def completions(request: Request, api_key: str = Depends(validate_api_key)

# Choose between regular completion and mixture of miners.
if body.get("test_time_inference", False):
return await test_time_inference(body["messages"], body.get("model"))
return await test_time_inference(body["messages"], body.get("model", None))
if body.get("mixture", False):
return await mixture_of_miners(body, uids=uids)
else:
Expand Down Expand Up @@ -103,7 +103,7 @@ async def web_retrieval(search_query: str, n_miners: int = 10, uids: list[int] =
@router.post("/test_time_inference")
async def test_time_inference(messages: list[dict], model: str = None):
async def create_response_stream(messages):
async for steps, total_thinking_time in generate_response(messages):
async for steps, total_thinking_time in generate_response(messages, model=model):
if total_thinking_time is not None:
logger.info(f"**Total thinking time: {total_thinking_time:.2f} seconds**")
yield steps, total_thinking_time
Expand Down
182 changes: 122 additions & 60 deletions validator_api/test_time_inference.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
import json
import random
import re
import time

Expand Down Expand Up @@ -39,14 +41,28 @@ def parse_multiple_json(api_response):
print(f"Failed to parse JSON object: {e}")
continue

if len(parsed_objects) == 0:
logger.error(
f"No valid JSON objects found in the response - couldn't parse json. The miner response was: {api_response}"
)
return None
if (
not parsed_objects[0].get("title")
or not parsed_objects[0].get("content")
or not parsed_objects[0].get("next_action")
):
logger.error(
f"Invalid JSON object found in the response - field missing. The miner response was: {api_response}"
)
return None
return parsed_objects


async def make_api_call(messages, max_tokens, model=None, is_final_answer=False):
ATTEMPTS_PER_STEP = 10
logger.info(f"Making API call with messages: {messages}")
response = None
response_dict = None
for attempt in range(3):

async def single_attempt():
try:
response = await chat_completion(
body={
Expand All @@ -60,71 +76,114 @@ async def make_api_call(messages, max_tokens, model=None, is_final_answer=False)
"max_new_tokens": 1000,
"top_p": 1,
},
"seed": random.randint(0, 1000000),
}
)
# return response.choices[0].message.content
response_dict = parse_multiple_json(response.choices[0].message.content)[0]
return response_dict
except Exception as e:
logger.error(f"Failed to get valid step back from miner: {e}")
if attempt == 2:
logger.exception(f"Error generating answer: {e}, RESPONSE DICT: {response_dict}")
if is_final_answer:
return {
"title": "Error",
"content": f"Failed to generate final answer after 3 attempts. Error: {str(e)}",
}
else:
return {
"title": "Error",
"content": f"Failed to generate step after 3 attempts. Error: {str(e)}",
"next_action": "final_answer",
}
time.sleep(1) # Wait for 1 second before retrying


async def generate_response(original_messages: list[dict[str, str]]):
logger.error(f"Failed to get valid response: {e}")
return None

# Create three concurrent tasks
tasks = [asyncio.create_task(single_attempt()) for _ in range(ATTEMPTS_PER_STEP)]

# As each task completes, check if it was successful
for completed_task in asyncio.as_completed(tasks):
try:
result = await completed_task
if result is not None:
# Cancel remaining tasks
for task in tasks:
task.cancel()
return result
except Exception as e:
logger.error(f"Task failed with error: {e}")
continue

# If all tasks failed, return error response
error_msg = "All concurrent API calls failed"
logger.error(error_msg)
if is_final_answer:
return {
"title": "Error",
"content": f"Failed to generate final answer. Error: {error_msg}",
}
else:
return {
"title": "Error",
"content": f"Failed to generate step. Error: {error_msg}",
"next_action": "final_answer",
}


async def generate_response(original_messages: list[dict[str, str]], model: str = None):
messages = [
{
"role": "system",
"content": """You are an expert AI assistant with advanced reasoning capabilities. Your task is to provide detailed, step-by-step explanations of your thought process. For each step:

1. Provide a clear, concise title describing the current reasoning phase.
2. Elaborate on your thought process in the content section.
3. Decide whether to continue reasoning or provide a final answer.

Response Format:
Use JSON with keys: 'title', 'content', 'next_action' (values: 'continue' or 'final_answer')

Key Instructions:
- Employ at least 5 distinct reasoning steps.
- Acknowledge your limitations as an AI and explicitly state what you can and cannot do.
- Actively explore and evaluate alternative answers or approaches.
- Critically assess your own reasoning; identify potential flaws or biases.
- When re-examining, employ a fundamentally different approach or perspective.
- Utilize at least 3 diverse methods to derive or verify your answer.
- Incorporate relevant domain knowledge and best practices in your reasoning.
- Quantify certainty levels for each step and the final conclusion when applicable.
- Consider potential edge cases or exceptions to your reasoning.
- Provide clear justifications for eliminating alternative hypotheses.
- Output only one step at a time to ensure a detailed and coherent explanation.


Example of a valid JSON response:
```json
"content": """You are a world-class expert in analytical reasoning and problem-solving. Your task is to break down complex problems through rigorous step-by-step analysis, carefully examining each aspect before moving forward. For each reasoning step:

OUTPUT FORMAT:
Return a JSON object with these required fields:
{
"title": "Initial Problem Analysis",
"content": "To approach this problem effectively, I'll first break down the given information into key components. This involves identifying...[detailed explanation]... By structuring the problem this way, we can systematically address each aspect.",
"next_action": "continue"
}```
""",
"title": "Brief, descriptive title of current reasoning phase",
"content": "Detailed explanation of your analysis",
"next_action": "continue" or "final_answer"
}

REASONING PROCESS:
1. Initial Analysis
- Break down the problem into core components
- Identify key constraints and requirements
- List relevant domain knowledge and principles

2. Multiple Perspectives
- Examine the problem from at least 3 different angles
- Consider both conventional and unconventional approaches
- Identify potential biases in initial assumptions

3. Exploration & Validation
- Test preliminary conclusions against edge cases
- Apply domain-specific best practices
- Quantify confidence levels when possible (e.g., 90% certain)
- Document key uncertainties or limitations

4. Critical Review
- Actively seek counterarguments to your reasoning
- Identify potential failure modes
- Consider alternative interpretations of the data/requirements
- Validate assumptions against provided context

5. Synthesis & Refinement
- Combine insights from multiple approaches
- Strengthen weak points in the reasoning chain
- Address identified edge cases and limitations
- Build towards a comprehensive solution

REQUIREMENTS:
- Each step must focus on ONE specific aspect of reasoning
- Explicitly state confidence levels and uncertainty
- When evaluating options, use concrete criteria
- Include specific examples or scenarios when relevant
- Acknowledge limitations in your knowledge or capabilities
- Maintain logical consistency across steps
- Build on previous steps while avoiding redundancy

CRITICAL THINKING CHECKLIST:
✓ Have I considered non-obvious interpretations?
✓ Are my assumptions clearly stated and justified?
✓ Have I identified potential failure modes?
✓ Is my confidence level appropriate given the evidence?
✓ Have I adequately addressed counterarguments?

Remember: Quality of reasoning is more important than speed. Take the necessary steps to build a solid analytical foundation before moving to conclusions.""",
}
]
messages += original_messages
messages += [
{
"role": "assistant",
"content": "Thank you! I will now think step by step following my instructions, starting at the beginning after decomposing the problem.",
"content": "I understand. I will now analyze the problem systematically, following the structured reasoning process while maintaining high standards of analytical rigor and self-criticism.",
}
]

Expand All @@ -134,7 +193,7 @@ async def generate_response(original_messages: list[dict[str, str]]):

for _ in range(MAX_THINKING_STEPS):
with Timer() as timer:
step_data = await make_api_call(messages, 300)
step_data = await make_api_call(messages, 300, model=model)
thinking_time = timer.final_time
total_thinking_time += thinking_time

Expand All @@ -146,27 +205,30 @@ async def generate_response(original_messages: list[dict[str, str]]):
break

step_count += 1

# Yield after each step
yield steps, None

# Generate final answer
messages.append(
{
"role": "user",
"content": "Please provide the final answer based on your reasoning above. You must return your answer in a valid json.",
"content": """Based on your thorough analysis, please provide your final answer. Your response should:
1. Clearly state your conclusion
2. Summarize the key supporting evidence
3. Acknowledge any remaining uncertainties
4. Include relevant caveats or limitations

Return your answer in the same JSON format as previous steps.""",
}
)

start_time = time.time()
final_data = await make_api_call(messages, 200, is_final_answer=True)
final_data = await make_api_call(messages, 200, is_final_answer=True, model=model)
end_time = time.time()
thinking_time = end_time - start_time
total_thinking_time += thinking_time

if final_data["title"] == "Error":
steps.append(("Error", final_data["content"], thinking_time))
raise ValueError("Failed to generate final answer: {final_data['content']}")
raise ValueError(f"Failed to generate final answer: {final_data['content']}")

steps.append(("Final Answer", final_data["content"], thinking_time))

Expand Down