diff --git a/validator_api/gpt_endpoints.py b/validator_api/gpt_endpoints.py index 7b011e854..f29be7cf8 100644 --- a/validator_api/gpt_endpoints.py +++ b/validator_api/gpt_endpoints.py @@ -5,7 +5,7 @@ import uuid import numpy as np -from fastapi import APIRouter, Depends, Header, HTTPException, Request +from fastapi import APIRouter, Depends, Header, HTTPException from loguru import logger from openai.types.chat.chat_completion_chunk import ChatCompletionChunk, Choice, ChoiceDelta from starlette.responses import StreamingResponse @@ -21,19 +21,63 @@ from validator_api.test_time_inference import generate_response from validator_api.utils import filter_available_uids +from .serializers import ChatCompletionRequest, ErrorResponse, SearchResult, WebSearchResponse + router = APIRouter() N_MINERS = 5 def validate_api_key(api_key: str = Header(...)): + """Validates API key for authentication.""" if api_key not in _keys: raise HTTPException(status_code=403, detail="Invalid API key") return _keys[api_key] -@router.post("/v1/chat/completions") -async def completions(request: Request, api_key: str = Depends(validate_api_key)): - """Main endpoint that handles both regular and mixture of miners chat completion.""" +@router.post( + "/v1/chat/completions", + response_model=WebSearchResponse, + responses={ + 200: { + "description": "Successfully generated chat completion", + "model": WebSearchResponse, + "content": {"application/json": {"example": {"results": [{"content": "This is a sample response..."}]}}}, + }, + 403: { + "description": "Invalid API key provided", + "model": ErrorResponse, + "content": {"application/json": {"example": {"detail": "Invalid API key"}}}, + }, + 500: { + "description": "Server error occurred", + "model": ErrorResponse, + "content": {"application/json": {"example": {"detail": "No available miners"}}}, + }, + }, + summary="Generate chat completions", + description=""" + Generates chat completions using various strategies: + + - Standard chat completion + - Mixture-of-miners strategy + - Test-time inference + + The endpoint automatically selects the appropriate strategy based on request parameters. + Results are streamed back to the client as they are generated. + """, +) +async def completions(request: ChatCompletionRequest, api_key: str = Depends(validate_api_key)): + """ + Executes a chat completion request. + + - **request**: JSON request body following `ChatCompletionRequest` model. + - **api_key**: Authentication header for API access. + + Determines whether to use: + - Standard chat completion (`chat_completion`). + - Mixture-of-miners strategy (`mixture_of_miners`). + - Test-time inference (`test_time_inference`). + """ try: body = await request.json() body["seed"] = int(body.get("seed") or random.randint(0, 1000000)) @@ -55,8 +99,35 @@ async def completions(request: Request, api_key: str = Depends(validate_api_key) return StreamingResponse(content="Internal Server Error", status_code=500) -@router.post("/web_retrieval") +@router.post( + "/web_retrieval", + response_model=WebSearchResponse, + responses={ + 200: {"description": "Successfully retrieved search results", "model": WebSearchResponse}, + 403: {"description": "Invalid API key provided", "model": ErrorResponse}, + 422: {"description": "Invalid request parameters", "model": ErrorResponse}, + 500: {"description": "Server error occurred", "model": ErrorResponse}, + }, + summary="Search the web using distributed miners", + description=""" + Executes web searches using a distributed network of miners: + + 1. Queries multiple miners in parallel + 2. Aggregates and deduplicates results + 3. Parses and validates all responses + 4. Returns a unified set of search results + + The search is performed using DuckDuckGo through the miner network. + """, +) async def web_retrieval(search_query: str, n_miners: int = 10, n_results: int = 5, max_response_time: int = 10): + """ + Handles web retrieval through distributed miners. + + - **request**: JSON request body following `WebSearchQuery` model. + + If no miners are available, an HTTPException is raised. + """ uids = filter_available_uids(task="WebRetrievalTask", test=shared_settings.API_TEST_MODE, n_miners=n_miners) if not uids: raise HTTPException(status_code=500, detail="No available miners") @@ -79,36 +150,61 @@ async def web_retrieval(search_query: str, n_miners: int = 10, n_results: int = timeout_seconds = 30 stream_results = await query_miners(uids, body, timeout_seconds) + results = [ "".join(res.accumulated_chunks) for res in stream_results if isinstance(res, SynapseStreamResult) and res.accumulated_chunks ] + distinct_results = list(np.unique(results)) - loaded_results = [] + search_results = [] for result in distinct_results: try: - loaded_results.append(json.loads(result)) - logger.info(f"🔍 Result: {result}") + parsed_result = json.loads(result) + search_results.append(SearchResult(**parsed_result)) + logger.info(f"🔍 Parsed Result: {parsed_result}") except Exception: - logger.error(f"🔍 Result: {result}") - if len(loaded_results) == 0: + logger.error(f"🔍 Failed to parse result: {result}") + + if len(search_results) == 0: raise HTTPException(status_code=500, detail="No miner responded successfully") - collected_chunks_list = [res.accumulated_chunks if res and res.accumulated_chunks else [] for res in stream_results] - asyncio.create_task(scoring_queue.scoring_queue.append_response(uids=uids, body=body, chunks=collected_chunks_list)) - return loaded_results + asyncio.create_task(scoring_queue.scoring_queue.append_response(uids=uids, body=body, chunks=[])) + return WebSearchResponse(results=search_results) + + +@router.post( + "/test_time_inference", + responses={ + 200: {"description": "Successfully generated inference response", "content": {"text/event-stream": {}}}, + 500: {"description": "Server error occurred", "model": ErrorResponse}, + }, + summary="Generate responses using test-time inference", + description=""" + Generates responses using test-time inference strategy: + + - Streams response steps as they are generated + - Includes thinking time metrics + - Returns results in a streaming event format + """, +) +async def test_time_inference(messages: list[dict], model: str | None = None): + """ + Handles test-time inference requests. + + - **messages**: List of messages used for inference. + - **model**: Optional model to use for generating responses. + Returns a streaming response of the generated chat output. + """ -@router.post("/test_time_inference") -async def test_time_inference(messages: list[dict], model: str = None): async def create_response_stream(messages): async for steps, total_thinking_time in generate_response(messages, model=model): if total_thinking_time is not None: logger.debug(f"**Total thinking time: {total_thinking_time:.2f} seconds**") yield steps, total_thinking_time - # Create a streaming response that yields each step async def stream_steps(): try: i = 0 diff --git a/validator_api/serializers.py b/validator_api/serializers.py new file mode 100644 index 000000000..e12444ee7 --- /dev/null +++ b/validator_api/serializers.py @@ -0,0 +1,144 @@ +from typing import List + +from pydantic import BaseModel, Field + + +class WebSearchQuery(BaseModel): + """Request model for web search queries.""" + + search_query: str = Field( + ..., + description="The search query to execute using DuckDuckGo", + example="latest developments in quantum computing", + ) + n_miners: int = Field( + default=10, description="Number of miners to query for results (between 1-100)", example=10, ge=1, le=100 + ) + uids: List[int] | None = Field( + default=None, + description="List of specific miner UIDs to query. If not provided, miners will be automatically selected.", + example=[1, 2, 3, 4], + ) + n_results: int = Field( + default=5, description="Number of results each miner should return (between 1-30)", example=5, ge=1, le=30 + ) + + class Config: + schema_extra = { + "example": { + "search_query": "latest developments in quantum computing", + "n_miners": 10, + "uids": [1, 2, 3, 4], + "n_results": 5, + } + } + + +class SearchResult(BaseModel): + """Model representing a single search result.""" + + url: str = Field( + ..., description="URL of the search result", example="https://www.nature.com/articles/d41586-023-02192-x" + ) + title: str = Field( + ..., + description="Title of the webpage or document", + example="Quantum computing breakthrough: New superconducting qubits", + ) + snippet: str = Field( + ..., + description="Brief excerpt or summary of the content", + example="Researchers have developed a new type of superconducting qubit that increases stability by 50%.", + ) + timestamp: str = Field(..., description="Timestamp when the result was retrieved", example="2024-01-01 12:00:00") + + +class WebSearchResponse(BaseModel): + """Response model containing search results from distributed miners.""" + + results: List[SearchResult] = Field(..., description="List of deduplicated and parsed search results") + + class Config: + schema_extra = { + "example": { + "results": [ + { + "url": "https://www.nature.com/articles/d41586-023-02192-x", + "title": "Quantum computing breakthrough: New superconducting qubits", + "snippet": "Researchers have developed a new type of superconducting qubit that increases stability by 50%.", + "timestamp": "2024-01-01 12:00:00", + }, + { + "url": "https://arxiv.org/abs/2307.05230", + "title": "Arxiv paper: Enhancing Quantum Error Correction", + "snippet": "A novel quantum error correction technique reduces noise in superconducting circuits.", + "timestamp": "2024-01-02 09:30:00", + }, + ] + } + } + + +class ErrorResponse(BaseModel): + """Model for API error responses.""" + + detail: str = Field( + ..., + description="Detailed error message explaining what went wrong", + example="No available miners found to process the request", + ) + + +class ChatMessage(BaseModel): + """Model representing a single chat message.""" + + role: str = Field( + ..., description="Role of the message sender", example="user", enum=["user", "assistant", "system"] + ) + content: str = Field(..., description="The message content", example="What is the meaning of life?") + + +class ChatCompletionRequest(BaseModel): + """Request model for chat completion.""" + + messages: List[dict] = Field( + ..., + description="List of chat messages containing user input and system responses", + example=[ + {"role": "user", "content": "What is the meaning of life?"}, + {"role": "assistant", "content": "Philosophers have debated this question for centuries."}, + ], + ) + model: str = Field( + default="gpt-4", + description="Model to use for generating responses", + example="gpt-4", + enum=["gpt-4", "gpt-3.5-turbo", "custom-model"], + ) + seed: int | None = Field( + default=None, + description="Random seed for response generation. If not provided, a random seed will be generated.", + example=42, + ) + test_time_inference: bool = Field( + default=False, description="When true, enables test-time inference mode", example=False + ) + mixture: bool = Field(default=False, description="When true, enables mixture-of-miners strategy", example=False) + uids: List[int] | None = Field( + default=None, + title="Miner UIDs", + description="List of specific miner UIDs to query. If not provided, miners will be automatically selected.", + example=[1, 5, 7], + ) + + class Config: + schema_extra = { + "example": { + "messages": [{"role": "user", "content": "What is quantum computing?"}], + "model": "gpt-4", + "seed": 42, + "test_time_inference": False, + "mixture": False, + "uids": [1, 5, 7], + } + }