diff --git a/prompting/api/api.py b/prompting/api/api.py index 34b5dadc7..80fdd6841 100644 --- a/prompting/api/api.py +++ b/prompting/api/api.py @@ -18,16 +18,17 @@ def health(): async def start_scoring_api(task_scorer, scoring_queue, reward_events): - # We pass an object of task scorer then override it's attributes to ensure that they are managed by mp app.state.task_scorer = task_scorer app.state.task_scorer.scoring_queue = scoring_queue app.state.task_scorer.reward_events = reward_events logger.info(f"Starting Scoring API on https://0.0.0.0:{settings.shared_settings.SCORING_API_PORT}") - uvicorn.run( + config = uvicorn.Config( "prompting.api.api:app", host="0.0.0.0", port=settings.shared_settings.SCORING_API_PORT, loop="asyncio", reload=False, ) + server = uvicorn.Server(config) + await server.serve() diff --git a/prompting/tasks/base_task.py b/prompting/tasks/base_task.py index 0014fbcd5..ed202673d 100644 --- a/prompting/tasks/base_task.py +++ b/prompting/tasks/base_task.py @@ -61,6 +61,10 @@ class BaseTextTask(BaseTask): timeout: int = settings.shared_settings.NEURON_TIMEOUT max_tokens: int = settings.shared_settings.NEURON_MAX_TOKENS + @property + def task_messages(self) -> list[str] | list[dict]: + return self.messages if self.messages else [{"role": "user", "content": self.query}] + @model_validator(mode="after") def get_model_id_and_seed(self) -> "BaseTextTask": if self.llm_model: diff --git a/prompting/tasks/multi_step_reasoning.py b/prompting/tasks/multi_step_reasoning.py index ac2cb98c3..503654e49 100644 --- a/prompting/tasks/multi_step_reasoning.py +++ b/prompting/tasks/multi_step_reasoning.py @@ -175,6 +175,45 @@ class MultiStepReasoningRewardConfig(BaseRewardConfig): ] +# Used to instruct the LLM to provide a good query when given a context +QUERY_SYSTEM_PROMPT = """\ +You are a master of crafting intellectually stimulating questions that unfold across multiple sentences. Each question you generate should be structured as a brief narrative or scenario, where crucial information is deliberately distributed across multiple sentences. The complete question can only be understood and answered by carefully considering all the information provided across these sentences. + +Your questions should: +1. Begin with context or background information +2. Introduce key variables or constraints in subsequent sentences +3. Present the actual question in the final sentence +4. Require analytical reasoning rather than mere fact recall +5. Draw from the provided context when available +6. Incorporate multiple related concepts or data points + +EXAMPLE FORMATS: +✓ "The International Space Station orbits at an average height of 400km above Earth. At this height, it completes one orbit every 92 minutes. Assuming constant speed, how many kilometers does the ISS travel in one Earth day?" + +✓ "A new streaming service launches with 500,000 subscribers in January. They observe that they lose 5% of their existing subscribers each month, but also gain 50,000 new subscribers in the same period. Their infrastructure costs increase by $100,000 for every 200,000 subscribers. What will their monthly infrastructure costs be after 6 months?" + +✓ "The average American household generates 4.5 pounds of trash daily. Local recycling programs typically reduce landfill waste by 30%. Your city has just implemented a new composting initiative that diverts an additional 25% of waste from landfills. Considering there are 50,000 households in your city, how many pounds of waste would still reach landfills each week?" + +AVOID: +- Single-sentence questions +- Questions answerable with simple facts +- Questions without context or background +- Obvious or straightforward calculations +- Questions that don't require analysis + +Remember: The goal is to create questions where the context and parameters are revealed progressively, requiring the reader to integrate information across multiple sentences to fully understand and solve the problem. +""" + +QUERY_PROMPT_TEMPLATE = """\ +Ask a specific question about the following context: + +#Context: +{context} + +You must ask a question that can be answered by the context. +""" + + class MultiStepReasoningTask(WikiQuestionAnsweringTask): """QuestionAnsweringTasks must be initialised with an LLM pipeline to generate query and reference plus context from a dataset to base the query on""" @@ -184,6 +223,13 @@ class MultiStepReasoningTask(WikiQuestionAnsweringTask): query: str | None = None reference: str | None = None + def make_query(self, dataset_entry: Context): + query_prompt = QUERY_PROMPT_TEMPLATE.format(context=dataset_entry.content) + question = self.generate_query(messages=[QUERY_SYSTEM_PROMPT, query_prompt]) + msgs = [p + ". " if i < len(question.split(". ")) - 1 else p for i, p in enumerate(question.split(". ")) if p] + self.messages = [{"role": "user", "content": msg} for msg in msgs] + return self.query + def make_reference(self, dataset_entry: Context): logger.info(f"Generating reference for Multi Step Reasoning task with query: {self.query}") steps, total_thinking_time = execute_multi_step_reasoning(user_query=self.query) diff --git a/prompting/tasks/task_sending.py b/prompting/tasks/task_sending.py index 486ecb9df..ae88b06e7 100644 --- a/prompting/tasks/task_sending.py +++ b/prompting/tasks/task_sending.py @@ -50,24 +50,13 @@ async def collect_responses(task: BaseTextTask) -> DendriteResponseEvent | None: logger.warning("No available miners. This should already have been caught earlier.") return - if isinstance(task, InferenceTask): - body = { - "seed": task.seed, - "sampling_parameters": task.sampling_params, - "task": task.__class__.__name__, - "model": task.llm_model_id, - "messages": task.query, - } - else: - body = { - "seed": task.seed, - "sampling_parameters": task.sampling_params, - "task": task.__class__.__name__, - "model": task.llm_model_id, - "messages": [ - {"role": "user", "content": task.query}, - ], - } + body = { + "seed": task.seed, + "sampling_parameters": task.sampling_params, + "task": task.__class__.__name__, + "model": task.llm_model_id, + "messages": task.task_messages, + } if isinstance(task, WebRetrievalTask): body["target_results"] = task.target_results body["timeout"] = task.timeout diff --git a/scripts/autoupdater.sh b/scripts/autoupdater.sh index 30c098d49..e02d0f852 100644 --- a/scripts/autoupdater.sh +++ b/scripts/autoupdater.sh @@ -135,8 +135,11 @@ update_and_restart() { fi if [[ -x "./run.sh" ]]; then - log INFO "Update successful, restarting application..." - exec ./run.sh + log INFO "Update successful, restarting validator..." + pm2 restart s1_validator_main_process + log INFO "Validator restart initiated via PM2" + # Let PM2 handle our own restart if needed + return 0 else log ERROR "run.sh not found or not executable" return 1 diff --git a/scripts/test_api.py b/scripts/test_api.py index bfa40bd3f..ecf97d3ff 100644 --- a/scripts/test_api.py +++ b/scripts/test_api.py @@ -47,7 +47,7 @@ async def make_completion(client: openai.AsyncOpenAI, prompt: str, stream: bool async def main(): PORT = 8005 # Example API key, replace with yours: - API_KEY = "0566dbe21ee33bba9419549716cd6f1f" + API_KEY = "" client = openai.AsyncOpenAI( base_url=f"http://0.0.0.0:{PORT}/v1", max_retries=0, diff --git a/shared/settings.py b/shared/settings.py index ab61340df..1486aa21d 100644 --- a/shared/settings.py +++ b/shared/settings.py @@ -88,16 +88,17 @@ class SharedSettings(BaseSettings): # API key used to access validator organic scoring mechanism (both .env.validator and .env.api). SCORING_KEY: str | None = Field(None, env="SCORING_KEY") # Scoring request rate limit in seconds. - SCORING_RATE_LIMIT_SEC: float = Field(0.5, env="SCORING_RATE_LIMIT_SEC") + SCORING_RATE_LIMIT_SEC: float = Field(5, env="SCORING_RATE_LIMIT_SEC") # Scoring queue threshold when rate-limit start to kick in, used to query validator API with scoring requests. - SCORING_QUEUE_API_THRESHOLD: int = Field(5, env="SCORING_QUEUE_API_THRESHOLD") + SCORING_QUEUE_API_THRESHOLD: int = Field(1, env="SCORING_QUEUE_API_THRESHOLD") + API_TEST_MODE: bool = Field(False, env="API_TEST_MODE") # Validator scoring API (.env.validator). DEPLOY_SCORING_API: bool = Field(False, env="DEPLOY_SCORING_API") SCORING_API_PORT: int = Field(8094, env="SCORING_API_PORT") SCORING_ADMIN_KEY: str | None = Field(None, env="SCORING_ADMIN_KEY") SCORE_ORGANICS: bool = Field(False, env="SCORE_ORGANICS") - WORKERS: int = Field(2, env="WORKERS") + WORKERS: int = Field(1, env="WORKERS") # API Management (.env.api). API_PORT: int = Field(8005, env="API_PORT") @@ -202,6 +203,8 @@ def load_env_file(cls, mode: Literal["miner", "validator", "mock", "api"]): dotenv_file = ".env.validator" elif mode == "api": dotenv_file = ".env.api" + if os.getenv("API_TEST_MODE"): + logger.warning("API_TEST_MODE is set to true - THE API IS RUNNING IN TEST MODE.") else: raise ValueError(f"Invalid mode: {mode}") diff --git a/shared/uids.py b/shared/uids.py index 00b1a22bf..52fdd110a 100644 --- a/shared/uids.py +++ b/shared/uids.py @@ -32,12 +32,6 @@ def check_uid_availability( # Filter validator permit > 1024 stake. if metagraph.validator_permit[uid] and metagraph.S[uid] > shared_settings.NEURON_VPERMIT_TAO_LIMIT: - logger.debug( - f"uid: {uid} has vpermit and stake ({metagraph.S[uid]}) > {shared_settings.NEURON_VPERMIT_TAO_LIMIT}" - ) - logger.debug( - f"uid: {uid} has vpermit and stake ({metagraph.S[uid]}) > {shared_settings.NEURON_VPERMIT_TAO_LIMIT}" - ) return False if coldkeys and metagraph.axons[uid].coldkey in coldkeys: diff --git a/validator_api/api.py b/validator_api/api.py index af80c43c4..55c47ef0e 100644 --- a/validator_api/api.py +++ b/validator_api/api.py @@ -17,7 +17,6 @@ @contextlib.asynccontextmanager async def lifespan(app: FastAPI): - # Startup: start the background tasks. background_task = asyncio.create_task(update_miner_availabilities_for_api.start()) yield background_task.cancel() @@ -27,7 +26,6 @@ async def lifespan(app: FastAPI): pass -# Create the FastAPI app with the lifespan handler. app = FastAPI(lifespan=lifespan) app.include_router(gpt_router, tags=["GPT Endpoints"]) app.include_router(api_management_router, tags=["API Management"]) @@ -41,15 +39,21 @@ async def health(): async def main(): asyncio.create_task(update_miner_availabilities_for_api.start()) asyncio.create_task(scoring_queue.scoring_queue.start()) - uvicorn.run( + + config = uvicorn.Config( "validator_api.api:app", host=shared_settings.API_HOST, port=shared_settings.API_PORT, log_level="debug", timeout_keep_alive=60, + # Note: The `workers` parameter is typically only supported via the CLI. + # When running programmatically with `server.serve()`, only a single worker will run. workers=shared_settings.WORKERS, reload=False, ) + server = uvicorn.Server(config) + + await server.serve() if __name__ == "__main__": diff --git a/validator_api/chat_completion.py b/validator_api/chat_completion.py index b44c2b769..677dc1b09 100644 --- a/validator_api/chat_completion.py +++ b/validator_api/chat_completion.py @@ -8,10 +8,13 @@ from fastapi.responses import StreamingResponse from loguru import logger +from shared import settings + +shared_settings = settings.shared_settings + from shared.epistula import make_openai_query -from shared.settings import shared_settings -from shared.uids import get_uids from validator_api import scoring_queue +from validator_api.utils import filter_available_uids async def peek_until_valid_chunk( @@ -197,22 +200,20 @@ async def chat_completion( body: dict[str, any], uids: Optional[list[int]] = None, num_miners: int = 10 ) -> tuple | StreamingResponse: """Handle chat completion with multiple miners in parallel.""" - logger.debug(f"REQUEST_BODY: {body}") - # Get multiple UIDs if none specified - if uids is None: - uids = list(get_uids(sampling_mode="random", k=100)) - if uids is None or len(uids) == 0: # if not uids throws error, figure out how to fix - logger.error("No available miners found") - raise HTTPException(status_code=503, detail="No available miners found") - selected_uids = random.sample(uids, min(num_miners, len(uids))) - else: - selected_uids = uids[:num_miners] # If UID is specified, only use that one + body["seed"] = int(body.get("seed") or random.randint(0, 1000000)) + if not uids: + uids = body.get("uids") or filter_available_uids( + task=body.get("task"), model=body.get("model"), test=shared_settings.API_TEST_MODE, n_miners=num_miners + ) + if not uids: + raise HTTPException(status_code=500, detail="No available miners") + uids = random.sample(uids, min(len(uids), num_miners)) - logger.debug(f"Querying uids {selected_uids}") + logger.debug(f"Querying uids {uids}") STREAM = body.get("stream", False) # Initialize chunks collection for each miner - collected_chunks_list = [[] for _ in selected_uids] + collected_chunks_list = [[] for _ in uids] timeout_seconds = max( 30, max(0, math.floor(math.log2(body["sampling_parameters"].get("max_new_tokens", 256) / 256))) * 10 + 30 @@ -225,11 +226,11 @@ async def chat_completion( shared_settings.METAGRAPH, shared_settings.WALLET, timeout_seconds, body, uid, stream=True ) ) - for uid in selected_uids + for uid in uids ] return StreamingResponse( - stream_from_first_response(response_tasks, collected_chunks_list, body, selected_uids), + stream_from_first_response(response_tasks, collected_chunks_list, body, uids), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", @@ -240,7 +241,7 @@ async def chat_completion( # For non-streaming requests, wait for first valid response response_tasks = [ asyncio.create_task(get_response_from_miner(body=body, uid=uid, timeout_seconds=timeout_seconds)) - for uid in selected_uids + for uid in uids ] first_valid_response = None diff --git a/validator_api/gpt_endpoints.py b/validator_api/gpt_endpoints.py index 119937542..b7070945d 100644 --- a/validator_api/gpt_endpoints.py +++ b/validator_api/gpt_endpoints.py @@ -11,6 +11,8 @@ from starlette.responses import StreamingResponse from shared import settings + +shared_settings = settings.shared_settings from shared.epistula import SynapseStreamResult, query_miners from validator_api import scoring_queue from validator_api.api_management import _keys @@ -19,8 +21,6 @@ from validator_api.test_time_inference import generate_response from validator_api.utils import filter_available_uids -shared_settings = settings.shared_settings - router = APIRouter() N_MINERS = 5 @@ -37,14 +37,15 @@ async def completions(request: Request, api_key: str = Depends(validate_api_key) try: body = await request.json() body["seed"] = int(body.get("seed") or random.randint(0, 1000000)) - uids = body.get("uids") or filter_available_uids(task=body.get("task"), model=body.get("model")) + uids = body.get("uids") or filter_available_uids( + task=body.get("task"), model=body.get("model"), test=shared_settings.API_TEST_MODE, n_miners=N_MINERS + ) if not uids: raise HTTPException(status_code=500, detail="No available miners") - uids = random.sample(uids, min(len(uids), N_MINERS)) # Choose between regular completion and mixture of miners. if body.get("test_time_inference", False): - return await test_time_inference(body["messages"], body.get("model")) + return await test_time_inference(body["messages"], body.get("model", None)) if body.get("mixture", False): return await mixture_of_miners(body, uids=uids) else: @@ -56,12 +57,10 @@ async def completions(request: Request, api_key: str = Depends(validate_api_key) @router.post("/web_retrieval") -async def web_retrieval(search_query: str, n_miners: int = 10, uids: list[int] = None): - if not uids: - uids = filter_available_uids(task="WebRetrievalTask") +async def web_retrieval(search_query: str, n_miners: int = 10, n_results: int = 5, max_response_time: int = 10): + uids = filter_available_uids(task="WebRetrievalTask", test=shared_settings.API_TEST_MODE, n_miners=n_miners) if not uids: raise HTTPException(status_code=500, detail="No available miners") - uids = random.sample(uids, min(len(uids), n_miners)) logger.debug(f"🔍 Querying uids: {uids}") if len(uids) == 0: logger.warning("No available miners. This should already have been caught earlier.") @@ -71,6 +70,8 @@ async def web_retrieval(search_query: str, n_miners: int = 10, uids: list[int] = "seed": random.randint(0, 1_000_000), "sampling_parameters": shared_settings.SAMPLING_PARAMS, "task": "WebRetrievalTask", + "target_results": n_results, + "timeout": max_response_time, "messages": [ {"role": "user", "content": search_query}, ], @@ -105,7 +106,7 @@ async def web_retrieval(search_query: str, n_miners: int = 10, uids: list[int] = @router.post("/test_time_inference") async def test_time_inference(messages: list[dict], model: str = None): async def create_response_stream(messages): - async for steps, total_thinking_time in generate_response(messages): + async for steps, total_thinking_time in generate_response(messages, model=model): if total_thinking_time is not None: logger.info(f"**Total thinking time: {total_thinking_time:.2f} seconds**") yield steps, total_thinking_time diff --git a/validator_api/miner_availabilities.py b/validator_api/miner_availabilities.py index c58d96c30..177b9d24d 100644 --- a/validator_api/miner_availabilities.py +++ b/validator_api/miner_availabilities.py @@ -7,10 +7,12 @@ from pydantic import BaseModel from shared import settings + +shared_settings = settings.shared_settings + from shared.loop_runner import AsyncLoopRunner from shared.uids import get_uids -shared_settings = settings.shared_settings router = APIRouter() diff --git a/validator_api/scoring_queue.py b/validator_api/scoring_queue.py index 493bc55a8..ac1ff60f9 100644 --- a/validator_api/scoring_queue.py +++ b/validator_api/scoring_queue.py @@ -8,10 +8,11 @@ from pydantic import BaseModel from shared import settings -from shared.loop_runner import AsyncLoopRunner shared_settings = settings.shared_settings +from shared.loop_runner import AsyncLoopRunner + class ScoringPayload(BaseModel): payload: dict[str, Any] diff --git a/validator_api/test_time_inference.py b/validator_api/test_time_inference.py index f9bbc0167..05492a68a 100644 --- a/validator_api/test_time_inference.py +++ b/validator_api/test_time_inference.py @@ -1,4 +1,6 @@ +import asyncio import json +import random import re import time @@ -8,6 +10,7 @@ from validator_api.chat_completion import chat_completion MAX_THINKING_STEPS = 10 +ATTEMPTS_PER_STEP = 10 def parse_multiple_json(api_response): @@ -39,92 +42,151 @@ def parse_multiple_json(api_response): print(f"Failed to parse JSON object: {e}") continue + if len(parsed_objects) == 0: + logger.error( + f"No valid JSON objects found in the response - couldn't parse json. The miner response was: {api_response}" + ) + return None + if ( + not parsed_objects[0].get("title") + or not parsed_objects[0].get("content") + or not parsed_objects[0].get("next_action") + ): + logger.error( + f"Invalid JSON object found in the response - field missing. The miner response was: {api_response}" + ) + return None return parsed_objects async def make_api_call(messages, max_tokens, model=None, is_final_answer=False): logger.info(f"Making API call with messages: {messages}") - response = None - response_dict = None - for attempt in range(3): + + async def single_attempt(): try: response = await chat_completion( body={ "messages": messages, - "max_tokens": max_tokens, "model": model, - "stream": False, "task": "InferenceTask", + "test_time_inference": True, + "mixture": False, "sampling_parameters": { "temperature": 0.5, - "max_new_tokens": 1000, - "top_p": 1, + "max_new_tokens": 500, }, - } + "seed": (seed := random.randint(0, 1000000)), + }, + num_miners=3, + ) + logger.debug( + f"Making API call with\n\nMESSAGES: {messages}\n\nSEED: {seed}\n\nRESPONSE: {response.choices[0].message.content}" ) - # return response.choices[0].message.content response_dict = parse_multiple_json(response.choices[0].message.content)[0] return response_dict except Exception as e: - logger.error(f"Failed to get valid step back from miner: {e}") - if attempt == 2: - logger.exception(f"Error generating answer: {e}, RESPONSE DICT: {response_dict}") - if is_final_answer: - return { - "title": "Error", - "content": f"Failed to generate final answer after 3 attempts. Error: {str(e)}", - } - else: - return { - "title": "Error", - "content": f"Failed to generate step after 3 attempts. Error: {str(e)}", - "next_action": "final_answer", - } - time.sleep(1) # Wait for 1 second before retrying - - -async def generate_response(original_messages: list[dict[str, str]]): + logger.error(f"Failed to get valid response: {e}") + return None + + # Create three concurrent tasks for more robustness against invalid jsons + tasks = [asyncio.create_task(single_attempt()) for _ in range(ATTEMPTS_PER_STEP)] + + # As each task completes, check if it was successful + for completed_task in asyncio.as_completed(tasks): + try: + result = await completed_task + if result is not None: + # Cancel remaining tasks + for task in tasks: + task.cancel() + return result + except Exception as e: + logger.error(f"Task failed with error: {e}") + continue + + # If all tasks failed, return error response + error_msg = "All concurrent API calls failed" + logger.error(error_msg) + if is_final_answer: + return { + "title": "Error", + "content": f"Failed to generate final answer. Error: {error_msg}", + } + else: + return { + "title": "Error", + "content": f"Failed to generate step. Error: {error_msg}", + "next_action": "final_answer", + } + + +async def generate_response(original_messages: list[dict[str, str]], model: str = None): messages = [ { "role": "system", - "content": """You are an expert AI assistant with advanced reasoning capabilities. Your task is to provide detailed, step-by-step explanations of your thought process. For each step: - -1. Provide a clear, concise title describing the current reasoning phase. -2. Elaborate on your thought process in the content section. -3. Decide whether to continue reasoning or provide a final answer. - -Response Format: -Use JSON with keys: 'title', 'content', 'next_action' (values: 'continue' or 'final_answer') - -Key Instructions: -- Employ at least 5 distinct reasoning steps. -- Acknowledge your limitations as an AI and explicitly state what you can and cannot do. -- Actively explore and evaluate alternative answers or approaches. -- Critically assess your own reasoning; identify potential flaws or biases. -- When re-examining, employ a fundamentally different approach or perspective. -- Utilize at least 3 diverse methods to derive or verify your answer. -- Incorporate relevant domain knowledge and best practices in your reasoning. -- Quantify certainty levels for each step and the final conclusion when applicable. -- Consider potential edge cases or exceptions to your reasoning. -- Provide clear justifications for eliminating alternative hypotheses. -- Output only one step at a time to ensure a detailed and coherent explanation. - - -Example of a valid JSON response: -```json + "content": """You are a world-class expert in analytical reasoning and problem-solving. Your task is to break down complex problems through rigorous step-by-step analysis, carefully examining each aspect before moving forward. For each reasoning step: + +OUTPUT FORMAT: +Return a JSON object with these required fields: { - "title": "Initial Problem Analysis", - "content": "To approach this problem effectively, I'll first break down the given information into key components. This involves identifying...[detailed explanation]... By structuring the problem this way, we can systematically address each aspect.", - "next_action": "continue" -}``` -""", + "title": "Brief, descriptive title of current reasoning phase", + "content": "Detailed explanation of your analysis", + "next_action": "continue" or "final_answer" +} + +REASONING PROCESS: +1. Initial Analysis + - Break down the problem into core components + - Identify key constraints and requirements + - List relevant domain knowledge and principles + +2. Multiple Perspectives + - Examine the problem from at least 3 different angles + - Consider both conventional and unconventional approaches + - Identify potential biases in initial assumptions + +3. Exploration & Validation + - Test preliminary conclusions against edge cases + - Apply domain-specific best practices + - Quantify confidence levels when possible (e.g., 90% certain) + - Document key uncertainties or limitations + +4. Critical Review + - Actively seek counterarguments to your reasoning + - Identify potential failure modes + - Consider alternative interpretations of the data/requirements + - Validate assumptions against provided context + +5. Synthesis & Refinement + - Combine insights from multiple approaches + - Strengthen weak points in the reasoning chain + - Address identified edge cases and limitations + - Build towards a comprehensive solution + +REQUIREMENTS: +- Each step must focus on ONE specific aspect of reasoning +- Explicitly state confidence levels and uncertainty +- When evaluating options, use concrete criteria +- Include specific examples or scenarios when relevant +- Acknowledge limitations in your knowledge or capabilities +- Maintain logical consistency across steps +- Build on previous steps while avoiding redundancy + +CRITICAL THINKING CHECKLIST: +✓ Have I considered non-obvious interpretations? +✓ Are my assumptions clearly stated and justified? +✓ Have I identified potential failure modes? +✓ Is my confidence level appropriate given the evidence? +✓ Have I adequately addressed counterarguments? + +Remember: Quality of reasoning is more important than speed. Take the necessary steps to build a solid analytical foundation before moving to conclusions.""", } ] messages += original_messages messages += [ { "role": "assistant", - "content": "Thank you! I will now think step by step following my instructions, starting at the beginning after decomposing the problem.", + "content": "I understand. I will now analyze the problem systematically, following the structured reasoning process while maintaining high standards of analytical rigor and self-criticism.", } ] @@ -134,7 +196,7 @@ async def generate_response(original_messages: list[dict[str, str]]): for _ in range(MAX_THINKING_STEPS): with Timer() as timer: - step_data = await make_api_call(messages, 300) + step_data = await make_api_call(messages, 300, model=model) thinking_time = timer.final_time total_thinking_time += thinking_time @@ -146,27 +208,30 @@ async def generate_response(original_messages: list[dict[str, str]]): break step_count += 1 - - # Yield after each step yield steps, None - # Generate final answer messages.append( { "role": "user", - "content": "Please provide the final answer based on your reasoning above. You must return your answer in a valid json.", + "content": """Based on your thorough analysis, please provide your final answer. Your response should: +1. Clearly state your conclusion +2. Summarize the key supporting evidence +3. Acknowledge any remaining uncertainties +4. Include relevant caveats or limitations + +Return your answer in the same JSON format as previous steps.""", } ) start_time = time.time() - final_data = await make_api_call(messages, 200, is_final_answer=True) + final_data = await make_api_call(messages, 200, is_final_answer=True, model=model) end_time = time.time() thinking_time = end_time - start_time total_thinking_time += thinking_time if final_data["title"] == "Error": steps.append(("Error", final_data["content"], thinking_time)) - raise ValueError("Failed to generate final answer: {final_data['content']}") + raise ValueError(f"Failed to generate final answer: {final_data['content']}") steps.append(("Final Answer", final_data["content"], thinking_time)) diff --git a/validator_api/utils.py b/validator_api/utils.py index d9e7a2714..a24c708a6 100644 --- a/validator_api/utils.py +++ b/validator_api/utils.py @@ -1,3 +1,5 @@ +import random + import requests from loguru import logger @@ -12,6 +14,8 @@ class UpdateMinerAvailabilitiesForAPI(AsyncLoopRunner): miner_availabilities: dict[int, dict] = {} async def run_step(self): + if shared_settings.API_TEST_MODE: + return try: response = requests.post( f"http://{shared_settings.VALIDATOR_API}/miner_availabilities/miner_availabilities", @@ -27,26 +31,37 @@ async def run_step(self): logger.debug( f"MINER AVAILABILITIES UPDATED, TRACKED: {len(tracked_availabilities)}, UNTRACKED: {len(self.miner_availabilities) - len(tracked_availabilities)}" ) + logger.debug(f"SAMPLE AVAILABILITIES: {random.choice(list(self.miner_availabilities.values()))}") update_miner_availabilities_for_api = UpdateMinerAvailabilitiesForAPI() -def filter_available_uids(task: str | None = None, model: str | None = None) -> list[int]: - """ - Filter UIDs based on task and model availability. +def filter_available_uids( + task: str | None = None, + model: str | None = None, + test: bool = False, + n_miners: int = 10, + n_top_incentive: int = 100, +) -> list[int]: + """Filter UIDs based on task and model availability. Args: - uids: List of UIDs to filter - task: Task type to check availability for, or None if any task is acceptable - model: Model name to check availability for, or None if any model is acceptable + task (str | None, optional): The task to filter miners by. Defaults to None. + model (str | None, optional): The LLM model to filter miners by. Defaults to None. + test (bool, optional): Whether to run in test mode. Defaults to False. + n_miners (int, optional): Number of miners to return. Defaults to 10. + n_top_incentive (int, optional): Number of top incentivized miners to consider. Defaults to 10. Returns: - List of UIDs that can serve the requested task/model combination + list[int]: List of filtered UIDs that match the criteria. """ + if test: + return get_uids(sampling_mode="top_incentive", k=n_miners) + filtered_uids = [] - for uid in get_uids(sampling_mode="all"): + for uid in get_uids(sampling_mode="top_incentive", k=max(n_top_incentive, n_miners)): # Skip if miner data is None/unavailable if update_miner_availabilities_for_api.miner_availabilities.get(str(uid)) is None: continue @@ -64,5 +79,6 @@ def filter_available_uids(task: str | None = None, model: str | None = None) -> continue filtered_uids.append(uid) + filtered_uids = random.sample(filtered_uids, n_miners) return filtered_uids