diff --git a/neurons/miners/epistula_miner/miner.py b/neurons/miners/epistula_miner/miner.py index 7ed556f9f..e8f32fa7a 100644 --- a/neurons/miners/epistula_miner/miner.py +++ b/neurons/miners/epistula_miner/miner.py @@ -18,6 +18,7 @@ from loguru import logger from starlette.background import BackgroundTask from starlette.responses import StreamingResponse +from web_retrieval import get_websites_with_similarity from prompting.llms.hf_llm import ReproducibleHF from shared.epistula import verify_signature @@ -60,9 +61,28 @@ async def format_openai_query(self, request: Request): return openai_request + async def stream_web_retrieval(self, body, headers): + async def word_stream(body, headers): + websites = await get_websites_with_similarity(body["messages"][0]["content"], 10, headers["target_results"]) + + # Simulate the OpenAI streaming response format + data = {"choices": [{"delta": {"content": json.dumps(websites)}, "index": 0, "finish_reason": None}]} + yield f"data: {json.dumps(data)}\n\n" + await asyncio.sleep(0.1) + # Indicate the end of the stream + data = {"choices": [{"delta": {}, "index": 0, "finish_reason": "stop"}]} + yield f"data: {json.dumps(data)}\n\n" + yield "data: [DONE]\n\n" + + return StreamingResponse(word_stream(body, headers), media_type="text/event-stream") + async def create_chat_completion(self, request: Request): + data = await request.json() + headers = request.headers if self.llm and request.headers.get("task", None) == "inference": return await self.create_inference_completion(request) + if request.headers.get("task", None) == "WebRetrievalTask": + return await self.stream_web_retrieval(data, headers) req = self.client.build_request("POST", "chat/completions", json=await self.format_openai_query(request)) r = await self.client.send(req, stream=True) return StreamingResponse(r.aiter_raw(), background=BackgroundTask(r.aclose), headers=r.headers) @@ -71,6 +91,7 @@ async def create_inference_completion(self, request: Request): async def word_stream(): inference = await self.run_inference(request) words = inference.split() + print(words) for word in words: # Simulate the OpenAI streaming response format data = {"choices": [{"delta": {"content": word + " "}, "index": 0, "finish_reason": None}]} diff --git a/neurons/miners/epistula_miner/web_retrieval.py b/neurons/miners/epistula_miner/web_retrieval.py new file mode 100644 index 000000000..344c2046e --- /dev/null +++ b/neurons/miners/epistula_miner/web_retrieval.py @@ -0,0 +1,113 @@ +import asyncio +from typing import Dict, List + +import numpy as np +import trafilatura +from loguru import logger +from openai import OpenAI + +from prompting.base.duckduckgo_patch import PatchedDDGS +from shared.settings import shared_settings + +# Import the patched DDGS and use that + + +# Import the patched DDGS and use that + + +async def fetch_url(url: str) -> str: + return trafilatura.fetch_url(url) + + +async def extract_content(content: str) -> str: + return trafilatura.extract(content) + + +def create_chunks(text: str, chunk_size: int = 500, min_length: int = 301) -> List[str]: + """Split text into chunks of approximately chunk_size characters.""" + chunks = [] + current_chunk = "" + + for sentence in text.split(". "): + if len(current_chunk) + len(sentence) <= chunk_size: + current_chunk += sentence + ". " + else: + chunks.append(current_chunk.strip()) + current_chunk = sentence + ". " + + if current_chunk and current_chunk.strip(): + chunks.append(current_chunk.strip()) + + return [chunk for chunk in chunks if chunk and len(chunk) > min_length] + + +async def get_websites_with_similarity( + query: str = "What are the 5 best phones I can buy this year?", n_results: int = 5, k: int = 3 +) -> List[Dict[str, str]]: + """ + Search for websites and return top K results based on embedding similarity. + + Args: + query: Search query string + n_results: Number of initial results to process + k: Number of top similar results to return + + Returns: + List of dictionaries containing website URLs and their best matching chunks + """ + logger.debug("Getting results") + ddgs = PatchedDDGS(proxy=shared_settings.PROXY_URL, verify=False) + results = list(ddgs.text(query)) + logger.debug(f"Got {len(results)} results") + urls = [r["href"] for r in results][:n_results] + + # Fetch and extract content + content = await asyncio.gather(*[fetch_url(url) for url in urls]) + extracted = await asyncio.gather(*[extract_content(c) for c in content]) + + # Create embeddings + client = OpenAI(api_key=shared_settings.OPENAI_API_KEY) + query_embedding = client.embeddings.create(model="text-embedding-ada-002", input=query).data[0].embedding + # Process each website + results_with_similarity = [] + for url, html, text in zip(urls, content, extracted): + if not text: # Skip if extraction failed + continue + + # logger.debug(f"TEXTS: {text}") + chunks = create_chunks(text) + chunk_embeddings = client.embeddings.create(model="text-embedding-ada-002", input=chunks).data + + # Find chunk with highest similarity + similarities = [np.dot(query_embedding, chunk.embedding) for chunk in chunk_embeddings] + best_chunk_idx = np.argmax(similarities) + + results_with_similarity.append( + { + "website": url, + "best_chunk": chunks[best_chunk_idx], + "similarity_score": similarities[best_chunk_idx], + # "html": html, + "text": text, + } + ) + + # Sort by similarity score and return top K + top_k = sorted(results_with_similarity, key=lambda x: x["similarity_score"], reverse=True)[: int(k)] + + return [ + { + "url": result["website"], + "content": result["text"], + # "html": result["html"], + "relevant": result["best_chunk"], + } + for result in top_k + ] + + +# await get_websites_with_similarity( +# "What are the 5 best phones I can buy this year?", +# n_results=5, # number of initial websites to get +# k=3 # number of top similar results to return +# ) diff --git a/poetry.lock b/poetry.lock index 1f8ded166..576c3896d 100644 --- a/poetry.lock +++ b/poetry.lock @@ -5366,6 +5366,20 @@ files = [ [package.extras] tests = ["pytest", "pytest-cov"] +[[package]] +name = "thefuzz" +version = "0.22.1" +description = "Fuzzy string matching in python" +optional = true +python-versions = ">=3.8" +files = [ + {file = "thefuzz-0.22.1-py3-none-any.whl", hash = "sha256:59729b33556850b90e1093c4cf9e618af6f2e4c985df193fdf3c5b5cf02ca481"}, + {file = "thefuzz-0.22.1.tar.gz", hash = "sha256:7138039a7ecf540da323792d8592ef9902b1d79eb78c147d4f20664de79f3680"}, +] + +[package.dependencies] +rapidfuzz = ">=3.0.0,<4.0.0" + [[package]] name = "tiktoken" version = "0.8.0" @@ -6511,9 +6525,9 @@ cffi = {version = ">=1.11", markers = "platform_python_implementation == \"PyPy\ cffi = ["cffi (>=1.11)"] [extras] -validator = ["accelerate", "angle-emb", "autoawq", "bs4", "datasets", "duckduckgo-search", "huggingface-hub", "nltk", "numpy", "pandas", "pillow", "rouge", "tiktoken", "tiktoken", "torch", "torchvision", "trafilatura", "transformers", "wandb", "wikipedia"] +validator = ["accelerate", "angle-emb", "autoawq", "bs4", "datasets", "duckduckgo-search", "huggingface-hub", "nltk", "numpy", "pandas", "pillow", "rouge", "thefuzz", "tiktoken", "tiktoken", "torch", "torchvision", "trafilatura", "transformers", "wandb", "wikipedia"] [metadata] lock-version = "2.0" python-versions = ">=3.10 <3.11" -content-hash = "c8c8792393f2c855bc5e090456bcebd07e249a27712ef2900778e167f601abc7" +content-hash = "fab6ac48abb10a86530acc8695bdae77ccf876c6ea252f544983c919a7f34359" diff --git a/prompting/api/scoring/api.py b/prompting/api/scoring/api.py index c78e68f54..1364f75f6 100644 --- a/prompting/api/scoring/api.py +++ b/prompting/api/scoring/api.py @@ -45,8 +45,8 @@ async def score_response(request: Request, api_key_data: dict = Depends(validate return else: llm_model = None - task = body.get("task") - if task == "InferenceTask": + task_name = body.get("task") + if task_name == "InferenceTask": logger.info(f"Received Organic InferenceTask with body: {body}") logger.info(f"With model of type {type(body.get('model'))}") organic_task = InferenceTask( @@ -70,7 +70,7 @@ async def score_response(request: Request, api_key_data: dict = Depends(validate step=-1, task_id=str(uuid.uuid4()), ) - elif task == "WebRetrievalTask": + elif task_name == "WebRetrievalTask": logger.info(f"Received Organic WebRetrievalTask with body: {body}") try: search_term = body.get("messages")[0].get("content") @@ -90,7 +90,7 @@ async def score_response(request: Request, api_key_data: dict = Depends(validate stream_results=[ SynapseStreamResult(accumulated_chunks=[chunk for chunk in chunks if chunk is not None]) ], - timeout=shared_settings.NEURON_TIMEOUT, # TODO: Change this to read from the body + timeout=body.get("timeout", shared_settings.NEURON_TIMEOUT), ), dataset_entry=DDGDatasetEntry(search_term=search_term), block=shared_settings.METAGRAPH.block, diff --git a/prompting/rewards/reward.py b/prompting/rewards/reward.py index f8e9c55ac..cd886db5f 100644 --- a/prompting/rewards/reward.py +++ b/prompting/rewards/reward.py @@ -83,7 +83,7 @@ def apply( ) -> WeightedRewardEvent: t0 = time.time() comparator = reference if reward_type == "reward" else challenge - batch_rewards_output: BatchRewardOutput = self.reward(comparator, response_event, **kwargs) + batch_rewards_output: BatchRewardOutput = self.reward(comparator, response_event, task=task, **kwargs) batch_rewards_time = time.time() - t0 return WeightedRewardEvent( diff --git a/prompting/rewards/web_retrieval.py b/prompting/rewards/web_retrieval.py index d09d102ed..dc736d686 100644 --- a/prompting/rewards/web_retrieval.py +++ b/prompting/rewards/web_retrieval.py @@ -9,19 +9,27 @@ """ import json -import time import numpy as np from loguru import logger +from pydantic import BaseModel from scipy import spatial +from thefuzz import fuzz from prompting.datasets.random_website import DDGDataset, DDGDatasetEntry from prompting.rewards.relevance import RelevanceRewardModel from prompting.rewards.reward import BatchRewardOutput +from prompting.tasks.base_task import BaseTextTask from shared.dendrite import DendriteResponseEvent -_SEARCH_TERM_THRESH = 0.2 -_VALID_URL_SCORE = 0.8 +MIN_RELEVANT_CHARS = 300 +MIN_MATCH_THRESHOLD = 90 + + +class WebsiteResult(BaseModel): + url: str + content: str + relevant: str class WebRetrievalRewardModel(RelevanceRewardModel): @@ -31,86 +39,88 @@ def _cosine_similarity(self, content1: str, content2: str) -> float: response_emb_flatten = self.embedding_model.encode(content2, to_numpy=True).flatten() return 1.0 - float(spatial.distance.cosine(reference_emb_flatten, response_emb_flatten)) + def score_website_result( + self, dataset_entry: DDGDatasetEntry, response_url: str, response_content: str, response_relevant: str + ) -> float: + if not response_url or not response_content or not response_relevant: + return 0 + + # Content scraped from the URL provided in the completion. + reference_website_content = DDGDataset.extract_website_content(response_url) + if not reference_website_content or len(reference_website_content) == 0: + logger.debug(f"Failed to extract miner's content from website: {response_url}") + return 0 + + if fuzz.token_set_ratio(response_content, reference_website_content) < MIN_MATCH_THRESHOLD: + logger.info("Miner returned text that doesn't match the website, scoring 0") + return 0 + + if len(response_relevant) > len(response_content) or len(response_relevant) < MIN_RELEVANT_CHARS: + logger.info( + f"Relevant section is too short (<{MIN_RELEVANT_CHARS} chars) or longer than the whole website content " + f"{len(response_relevant)} > {len(response_content)}" + ) + return 0 + # if len(response_relevant) < MIN_RELEVANT_CHARS: + # logger.info(f"Relevant section is too short (<{MIN_RELEVANT_CHARS} chars)") + # return 0 + + return self._cosine_similarity(content1=dataset_entry.query, content2=response_relevant) + + def score_miner_response( + self, dataset_entry: DDGDatasetEntry, completion: str, task: BaseTextTask | None = None + ) -> float: + scores = [] + miner_websites: list[WebsiteResult] = self._parse_response(completion) + unique_websites = np.unique([website.url for website in miner_websites]) + if unique_websites.size != len(miner_websites) and unique_websites.size != task.target_results: + # logger.warning("Miner returned multiple websites with the same URL") + return 0 + + for website in miner_websites: + scores.append(self.score_website_result(dataset_entry, website.url, website.content, website.relevant)) + + if scores: + weights = np.arange(len(scores), 0, -1) + return float(np.average(scores, weights=weights)) + return 0 + # TODO: Change base class reference type to Reference pydantic model, in order to store additional data. - def reward(self, reference: str, response_event: DendriteResponseEvent, **kwargs) -> BatchRewardOutput: + def reward( + self, reference: str, response_event: DendriteResponseEvent, task: BaseTextTask | None = None, **kwargs + ) -> BatchRewardOutput: """Score response website content and URL based on the similarity to the search term and reference content.""" rewards: list[float] = [] timings: list[float] = [] dataset_entry = DDGDatasetEntry.model_validate_json(json.loads(reference)) + if not dataset_entry.query: + # if the dataset doesn't have a query, we can't score the completions + return BatchRewardOutput( + rewards=np.array([0] * len(response_event.completions)), + timings=np.array([0] * len(response_event.completions)), + ) + for completion in response_event.completions: - timer_start = time.perf_counter() - - if not completion: - timings.append(time.perf_counter() - timer_start) - rewards.append(0) - continue - - # URL and the content provided in the completion. - response_url, response_content, response_relevant = self._parse_response(completion) - if response_url is None or response_content is None: - timings.append(time.perf_counter() - timer_start) - rewards.append(0) - continue - - # Content scraped from the URL provided in the completion. - response_url_scraped = DDGDataset.extract_website_content(response_url) - if not response_url_scraped or len(response_url_scraped) == 0: - logger.debug(f"Failed to extract miner's content from website: {response_url}") - timings.append(time.perf_counter() - timer_start) - rewards.append(0) - continue - - query = dataset_entry.query - reference_content = dataset_entry.website_content - - # Similarity between search term and miner's scraped content. - search_response_sim = self._cosine_similarity(content1=query, content2=response_content) - - # Similarity between search term and relevant section of content. - search_relevant_sim = 0 - if response_relevant is not None: - search_relevant_sim = self._cosine_similarity(content1=query, content2=response_relevant) - - # If the URL provided in the completion is valid. - valid_url_score = 0 - if response_url_scraped is not None: - valid_url_score = self._cosine_similarity(content1=response_content, content2=response_url_scraped) - - # Similarity between search term and reference content. - search_reference_sim = self._cosine_similarity(content1=query, content2=reference_content) - score = (search_response_sim + valid_url_score + search_relevant_sim) / 3 - if abs(search_response_sim - search_reference_sim) > _SEARCH_TERM_THRESH: - logger.info( - f"Response and reference scraped content relevance to the search term exceeds the threshold. " - f"Similarity: response = {search_response_sim:.2f}; reference = {search_reference_sim:.2f}" - ) - score = 0 - elif valid_url_score < _VALID_URL_SCORE: - # If provided URL does not contain content. - logger.info( - f"Search term is not relevant to the scraped content, similarity {valid_url_score} < {_VALID_URL_SCORE}" - ) - score = 0 - elif response_relevant is not None and len(response_relevant) > len(response_content): - logger.info( - "Relevant section is longer than the whole website content " - f"{len(response_relevant)} > {len(response_content)}" - ) - score = 0 - timings.append(time.perf_counter() - timer_start) - rewards.append(score) + rewards.append(self.score_miner_response(dataset_entry, completion, task=task)) + timings.append(0) + logger.debug(f"REWARDWEBRETRIEVAL: {rewards}") + logger.debug(f"COMPLETIONS: {response_event.completions}") return BatchRewardOutput(rewards=np.array(rewards), timings=np.array(timings)) @staticmethod def _parse_response(completion: str) -> tuple[str | None, ...]: + result = [] try: data = json.loads(completion) - response_url = data.get("url") - response_content = data.get("content") - response_relevant = data.get("relevant") + if not isinstance(data, list) and isinstance(data, dict): + data = [data] + for website in data: + response_url = website.get("url") + response_content = website.get("content") + response_relevant = website.get("relevant") + result.append(WebsiteResult(url=response_url, content=response_content, relevant=response_relevant)) + return result except json.JSONDecodeError: - response_url = None - response_content = None - response_relevant = None - return response_url, response_content, response_relevant + result = [] + return result diff --git a/prompting/tasks/base_task.py b/prompting/tasks/base_task.py index 7b9e15b8a..2b7ac1d7e 100644 --- a/prompting/tasks/base_task.py +++ b/prompting/tasks/base_task.py @@ -58,6 +58,7 @@ class BaseTextTask(BaseTask): augmentation_system_prompt: ClassVar[str | None] = None dataset_entry: DatasetEntry | None = None sampling_params: dict[str, float] = shared_settings.SAMPLING_PARAMS + timeout: int = shared_settings.NEURON_TIMEOUT @model_validator(mode="after") def get_model_id_and_seed(self) -> "BaseTextTask": diff --git a/prompting/tasks/task_sending.py b/prompting/tasks/task_sending.py index 6430ea2a5..36075e7a3 100644 --- a/prompting/tasks/task_sending.py +++ b/prompting/tasks/task_sending.py @@ -12,6 +12,7 @@ from prompting.rewards.scoring_config import ScoringConfig from prompting.tasks.base_task import BaseTextTask from prompting.tasks.inference import InferenceTask +from prompting.tasks.web_retrieval import WebRetrievalTask from shared.dendrite import DendriteResponseEvent, SynapseStreamResult from shared.epistula import query_miners from shared.logging import ErrorLoggingEvent, ValidatorLoggingEvent @@ -65,6 +66,9 @@ async def collect_responses(task: BaseTextTask) -> DendriteResponseEvent | None: {"role": "user", "content": task.query}, ], } + if isinstance(task, WebRetrievalTask): + body["target_results"] = task.target_results + body["timeout"] = task.timeout logger.info(f"🔍 SENDING TASK {task.task_id} WITH BODY: {body}") stream_results = await query_miners(uids, body) diff --git a/prompting/tasks/web_retrieval.py b/prompting/tasks/web_retrieval.py index 81399ccb8..8fcd9feee 100644 --- a/prompting/tasks/web_retrieval.py +++ b/prompting/tasks/web_retrieval.py @@ -1,4 +1,5 @@ import json +import random import textwrap from typing import ClassVar, Optional @@ -35,6 +36,8 @@ class WebRetrievalTask(BaseTextTask): name: ClassVar[str] = "web_retrieval" augmentation_system_prompt: ClassVar[str] = "" query_system_prompt: ClassVar[Optional[str]] = QUERY_SYSTEM_PROMPT + target_results: int = random.randint(1, 10) + timeout: int = random.randint(3, 20) def make_query(self, dataset_entry: DDGDatasetEntry) -> str: self.query = self.generate_query( diff --git a/pyproject.toml b/pyproject.toml index 54aa3091e..e7567345d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -161,10 +161,12 @@ trafilatura = { version = ">=1.12.1", optional = true } datasets = { version = ">=3.1.0", optional = true } primp = "^0.10.0" nltk = { version = ">=3.8.1", optional = true } +thefuzz = { version = ">=0.22.1", optional = true } wandb = { version = ">=0.19.4", optional = true } [tool.poetry.extras] validator = [ + "thefuzz", "torch", "tiktoken", "transformers", diff --git a/shared/epistula.py b/shared/epistula.py index 1de2cf9a6..b2621b696 100644 --- a/shared/epistula.py +++ b/shared/epistula.py @@ -74,11 +74,12 @@ def generate_header( return headers -def create_header_hook(hotkey, axon_hotkey): +def create_header_hook(hotkey, axon_hotkey, timeout_seconds=20): async def add_headers(request: httpx.Request): for key, header in generate_header(hotkey, request.read(), axon_hotkey).items(): if key not in ["messages", "model", "stream"]: request.headers[key] = str(header) + request.headers["X-Client-Timeout"] = str(timeout_seconds) return request return add_headers @@ -213,7 +214,9 @@ async def make_openai_query( max_retries=0, timeout=Timeout(timeout_seconds, connect=5, read=timeout_seconds - 5), http_client=openai.DefaultAsyncHttpxClient( - event_hooks={"request": [create_header_hook(wallet.hotkey, axon_info.hotkey)]} + event_hooks={ + "request": [create_header_hook(wallet.hotkey, axon_info.hotkey, timeout_seconds=timeout_seconds)] + } ), ) extra_body = {k: v for k, v in body.items() if k not in ["messages", "model"]} @@ -268,6 +271,7 @@ async def handle_inference( body: Dict[str, Any], uid: int, stream: bool = False, + timeout_seconds: int = shared_settings.NEURON_TIMEOUT, ) -> SynapseStreamResult: exception = None chunks = [] @@ -279,9 +283,11 @@ async def handle_inference( base_url=f"http://{axon_info.ip}:{axon_info.port}/v1", api_key="Apex", max_retries=0, - timeout=Timeout(shared_settings.NEURON_TIMEOUT, connect=5, read=10), + timeout=Timeout(timeout_seconds, connect=5, read=10), http_client=openai.DefaultAsyncHttpxClient( - event_hooks={"request": [create_header_hook(wallet.hotkey, axon_info.hotkey)]} + event_hooks={ + "request": [create_header_hook(wallet.hotkey, axon_info.hotkey, timeout_seconds=timeout_seconds)] + } ), ) payload = json.loads(body) diff --git a/shared/settings.py b/shared/settings.py index a04fab2b6..8bb342caf 100644 --- a/shared/settings.py +++ b/shared/settings.py @@ -55,7 +55,6 @@ class SharedSettings(BaseSettings): # Neuron parameters. NEURON_TIMEOUT: int = Field(20, env="NEURON_TIMEOUT") - INFERENCE_TIMEOUT: int = Field(60, env="INFERENCE_TIMEOUT") NEURON_DISABLE_SET_WEIGHTS: bool = Field(False, env="NEURON_DISABLE_SET_WEIGHTS") NEURON_MOVING_AVERAGE_ALPHA: float = Field(0.1, env="NEURON_MOVING_AVERAGE_ALPHA") NEURON_DECAY_ALPHA: float = Field(0.001, env="NEURON_DECAY_ALPHA") @@ -100,6 +99,8 @@ class SharedSettings(BaseSettings): API_HOST: str = Field("0.0.0.0", env="API_HOST") # Validator scoring API address. VALIDATOR_API: str = Field("0.0.0.0:8094", env="VALIDATOR_API") + # Default SN1 API address + DEFAULT_SN1_API: str = Field("http://sn1.api.macrocosmos.ai:11198/v1", env="DEFAULT_SN1_API") # File with keys used to access API. API_KEYS_FILE: str = Field("api_keys.json", env="API_KEYS_FILE") # Admin key used to generate API keys. diff --git a/tests/prompting/rewards/test_web_retrieval.py b/tests/prompting/rewards/test_web_retrieval.py index 226e6cf54..6270cb321 100644 --- a/tests/prompting/rewards/test_web_retrieval.py +++ b/tests/prompting/rewards/test_web_retrieval.py @@ -19,22 +19,23 @@ "This is some content.", "Section 1", ), - ('{"content": "This is some content.", "relevant": "Section 1"}', None, "This is some content.", "Section 1"), - ('{"url": "http://example.com", "relevant": "Section 1"}', "http://example.com", None, "Section 1"), - ( - '{"url": "http://example.com", "content": "This is some content."}', - "http://example.com", - "This is some content.", - None, - ), + # Invalid JSON should return an empty list ("Invalid JSON string", None, None, None), ], ) def test_parse_response(completion, expected_url, expected_content, expected_relevant): - response_url, response_content, response_relevant = WebRetrievalRewardModel._parse_response(completion) - assert response_url == expected_url - assert response_content == expected_content - assert response_relevant == expected_relevant + response = WebRetrievalRewardModel._parse_response(completion) + + if not response: # empty list => invalid JSON + assert expected_url is None + assert expected_content is None + assert expected_relevant is None + else: + # For the valid test case, we expect exactly one WebsiteResult + assert len(response) == 1 + assert response[0].url == expected_url + assert response[0].content == expected_content + assert response[0].relevant == expected_relevant def test_cosine_similarity_identical_embeddings():