diff --git a/neurons/miners/epistula_miner/miner.py b/neurons/miners/epistula_miner/miner.py index 30f908525..5e70038d6 100644 --- a/neurons/miners/epistula_miner/miner.py +++ b/neurons/miners/epistula_miner/miner.py @@ -6,6 +6,7 @@ import asyncio import json +import random import time import httpx @@ -128,14 +129,27 @@ async def word_stream(body, headers): async def create_chat_completion(self, request: Request): data = await request.json() headers = request.headers - if self.llm and request.headers.get("task", None) == "InferenceTask": - return await self.create_inference_completion(request) + if ( + request.headers.get("task", None) == "multi_step_reasoning_v2" + and request.headers.get("stage", None) == "discriminative" + ): + return await self.create_multi_step_reasoning_completion(request) if request.headers.get("task", None) == "WebRetrievalTask": return await self.stream_web_retrieval(data, headers) + if self.llm and request.headers.get("task", None) == "InferenceTask": + return await self.create_inference_completion(request) req = self.client.build_request("POST", "chat/completions", json=await self.format_openai_query(request)) r = await self.client.send(req, stream=True) return StreamingResponse(r.aiter_raw(), background=BackgroundTask(r.aclose), headers=r.headers) + async def create_multi_step_reasoning_completion(self, request: Request): + """ + Randomly guess a float as the discriminator answer + """ + data = {"choices": [{"delta": {"content": random.random()}, "index": 0, "finish_reason": None}]} + yield f"data: {json.dumps(data)}\n\n" + yield "data: [DONE]\n\n" + async def create_inference_completion(self, request: Request): async def word_stream(): data = await request.json() diff --git a/neurons/validator.py b/neurons/validator.py index eff556605..87c437ae7 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -73,7 +73,14 @@ async def spawn_loops(task_queue: list, scoring_queue: list, reward_events: list ) model_scheduler_task = asyncio.create_task(model_scheduler.start(scoring_queue), name="ModelScheduler") task_scorer_task = asyncio.create_task( - task_scorer.start(model_scheduler, scoring_queue, reward_events, mp_lock=mp_lock, simultaneous_loops=1), + task_scorer.start( + model_scheduler, + scoring_queue, + reward_events, + mp_lock=mp_lock, + task_queue=task_queue, + simultaneous_loops=1, + ), name="TaskScorer", ) all_tasks.extend([profile, task_loop_task, model_scheduler_task, task_scorer_task]) diff --git a/prompting/rewards/MSRv2_reward.py b/prompting/rewards/MSRv2_reward.py new file mode 100644 index 000000000..460ec1d4b --- /dev/null +++ b/prompting/rewards/MSRv2_reward.py @@ -0,0 +1,80 @@ +from typing import TYPE_CHECKING + +import numpy as np +from loguru import logger +from pydantic import ConfigDict + +from prompting.rewards.reward import BaseRewardModel, BatchRewardOutput +from shared import settings +from shared.dendrite import DendriteResponseEvent +from shared.uids import get_uids + +if TYPE_CHECKING: + from prompting.tasks.MSRv2_task import MSRv2Task + +shared_settings = settings.shared_settings + + +uids_to_sample = get_uids(sampling_mode="all") + + +class MSRv2RewardModel(BaseRewardModel): + model_config = ConfigDict(arbitrary_types_allowed=True) + + async def reward( + self, + reference: str, + response_event: DendriteResponseEvent, + task: "MSRv2Task", + task_queue: list, + **kwargs, + ) -> BatchRewardOutput: + completions: list[str] = response_event.completions + + if task.stage == "generative": + if len(completions) > 1: + logger.warning(f"Received {len(completions)} completions in generative stage, only using the first one") + + if completions: + task.generative_miner_answer = completions[0] if completions[0] else "Miner did not return a response" + task.generator_uid = response_event.uids[0] + + # Add task back to the task queue but now in the discriminative stage + task_queue.append(task) + + logger.debug(f"Generate stage with answer: {task.generative_miner_answer} scored and re-appended") + output = BatchRewardOutput(rewards=np.array([]), timings=np.array([]), threshold=None, uids=np.array([])) + + return output + + elif task.stage == "discriminative": + discriminator_rewards = [] + for comp in completions: + try: + # discriminator reward is (1-Squared Error)/N_Discriminators + comp_value = float(comp) + discriminator_rewards.append((1 - (task.ground_truth - comp_value) ** 2) / len(completions)) + except (ValueError, TypeError): + # logger.error(f"Error converting completion to float: {e}") + discriminator_rewards.append(0.0) # Assign zero reward for invalid responses + generator_reward = 1 - sum(discriminator_rewards) + + # If the answer was 'real' (hence no generator uid), we need to average the reward over all miners + if task.generator_uid is None: + assert task.ground_truth == 1, "If the answer was 'real', there should NOT be a generator uid" + generator_uids = get_uids(sampling_mode="all", exclude=response_event.uids) + generator_reward /= len(generator_uids) + else: + generator_uids = [task.generator_uid] + + logger.debug( + f"Discriminative stage for task: {task.task_id} Generator rewards: {generator_reward} Discriminator rewards: {discriminator_rewards}, Ground truth: {task.ground_truth}" + ) + return BatchRewardOutput( + rewards=np.array([generator_reward] * len(generator_uids) + discriminator_rewards), + timings=np.array([0] * (len(generator_uids) + len(discriminator_rewards))), + threshold=None, + uids=np.array(generator_uids + response_event.uids), + ) + else: + raise ValueError(f"Invalid task stage: {task.stage}") diff --git a/prompting/rewards/reward.py b/prompting/rewards/reward.py index 9cffabe57..f7d3044b0 100644 --- a/prompting/rewards/reward.py +++ b/prompting/rewards/reward.py @@ -50,6 +50,7 @@ class BatchRewardOutput(BaseModel): threshold: float | None = None extra_info: dict = {} model_config = ConfigDict(arbitrary_types_allowed=True) + uids: list[int] | None = None @model_validator(mode="after") def validate_rewards_and_timings(cls, v): @@ -71,7 +72,12 @@ class BaseRewardModel(ABC, BaseModel): @abstractmethod async def reward( - self, reference: str, response_event: DendriteResponseEvent, model_manager: ModelManager = None, **kwargs + self, + reference: str, + response_event: DendriteResponseEvent, + model_manager: ModelManager = None, + task_queue: list[BaseTextTask] | None = None, + **kwargs, ) -> BatchRewardOutput: raise NotImplementedError("You must implement the reward method") @@ -83,14 +89,18 @@ async def apply( reward_type: Literal["reward", "penalty"] = "reward", task: BaseTextTask | None = None, model_manager: ModelManager | None = None, + task_queue: list[BaseTextTask] | None = None, **kwargs, ) -> WeightedRewardEvent: + if task_queue is None: + raise ValueError("Task queue must be provided to BaseRewardModel.apply()") t0 = time.time() comparator = reference if reward_type == "reward" else challenge batch_rewards_output: BatchRewardOutput = await self.reward( - comparator, response_event, task=task, model_manager=model_manager, **kwargs + comparator, response_event, task=task, model_manager=model_manager, task_queue=task_queue, **kwargs ) batch_rewards_time = time.time() - t0 + uids = batch_rewards_output.uids if batch_rewards_output.uids is not None else response_event.uids return WeightedRewardEvent( weight=self.weight, @@ -103,7 +113,7 @@ async def apply( threshold=batch_rewards_output.threshold, timings=batch_rewards_output.timings, extra_info=kwargs, - uids=response_event.uids, + uids=uids, ) @@ -150,7 +160,10 @@ async def apply( model_id: str | None = None, task: BaseTextTask | None = None, model_manager: ModelManager | None = None, + task_queue: list[BaseTextTask] | None = None, ) -> list[WeightedRewardEvent]: + if task_queue is None: + raise ValueError("Task queue must be provided to BaseRewardConfig.apply()") reward_events = [] for weighted_reward in cls.reward_definitions: reward_events.append( @@ -162,6 +175,7 @@ async def apply( model_id=model_id, task=task, model_manager=model_manager, + task_queue=task_queue, ), ) return reward_events diff --git a/prompting/rewards/scoring.py b/prompting/rewards/scoring.py index 3848cb5ab..adb35338f 100644 --- a/prompting/rewards/scoring.py +++ b/prompting/rewards/scoring.py @@ -29,7 +29,7 @@ class TaskScorer(AsyncLoopRunner): interval: int = 1 scoring_queue: list | None = None reward_events: list | None = None - + task_queue: list | None = None model_config = ConfigDict(arbitrary_types_allowed=True) async def start( @@ -39,12 +39,14 @@ async def start( reward_events, mp_lock: AcquirerProxy, name: str | None = None, + task_queue: list | None = None, **kwargs, ): self.scoring_queue = scoring_queue self.reward_events = reward_events self.model_scheduler = model_scheduler self.mp_lock = mp_lock + self.task_queue = task_queue return await super().start(name=name, **kwargs) def add_to_queue( @@ -93,6 +95,8 @@ async def run_step(self) -> RewardLoggingEvent: # and there we then calculate the reward reward_pipeline = TaskRegistry.get_task_reward(scoring_config.task) with Timer(label=f"Scoring {scoring_config.task.__class__.__name__}"): + if self.task_queue is None: + raise ValueError("Task queue must be provided to TaskScorer.run_step()") reward_events = await reward_pipeline.apply( response_event=scoring_config.response, challenge=scoring_config.task.query, @@ -100,6 +104,7 @@ async def run_step(self) -> RewardLoggingEvent: model_id=scoring_config.task.llm_model, task=scoring_config.task, model_manager=self.model_scheduler.llm_model_manager, + task_queue=self.task_queue, ) self.reward_events.append(reward_events) diff --git a/prompting/tasks/MSRv2_task.py b/prompting/tasks/MSRv2_task.py new file mode 100644 index 000000000..56aeff744 --- /dev/null +++ b/prompting/tasks/MSRv2_task.py @@ -0,0 +1,80 @@ +import random +from typing import ClassVar, Literal + +from loguru import logger + +from prompting.datasets.random_website import DDGDatasetEntry +from prompting.llms.model_manager import ModelManager +from prompting.rewards.MSRv2_reward import MSRv2RewardModel +from prompting.rewards.reward import BaseRewardConfig, BaseRewardModel +from prompting.tasks.multi_step_reasoning import MultiStepReasoningTask +from shared.base import Context + + +class MSRv2RewardConfig(BaseRewardConfig): + reward_definitions: ClassVar[list[BaseRewardModel]] = [ + MSRv2RewardModel(weight=1), + ] + + +class MSRv2Task(MultiStepReasoningTask): + """QuestionAnsweringTasks must be initialised with an LLM pipeline to generate query and reference plus + context from a dataset to base the query on""" + + name: ClassVar[str] = "multi_step_reasoning_v2" + augmentation_system_prompt: ClassVar[str] = "" + generative_miner_answer: str | None = None + reference: str | None = None + REAL_REFERENCE_PROBABILITY: float = 0.1 + generator_uid: int | None = None + + @property + def stage(self) -> Literal["generative", "discriminative"]: + if self.generative_miner_answer or self.reference: + return "discriminative" + return "generative" + + @property + def ground_truth(self) -> int | None: + """Returns 1 if the reference was generated by the validator, 0 if it was generated by the miner""" + if self.reference: + return 1 + elif self.generative_miner_answer: + return 0 + logger.error("No ground truth for MSRv2Task available yet") + return None + + def make_query(self, dataset_entry: DDGDatasetEntry): + if self.stage == "generative": + # Question to send to miner + self.query = super().make_query(dataset_entry) + # Wrapped Query + self.messages = [{"role": "user", "content": self.query}] + return self.query + else: + return self.reference or self.generative_miner_answer + + async def make_reference(self, dataset_entry: Context, model_manager: ModelManager): + if self.stage == "generative": + # Generates a real reference with probability REAL_REFERENCE_PROBABILITY, otherwise waits for miner to generate an answer + if random.random() < self.REAL_REFERENCE_PROBABILITY: + return super().make_reference(dataset_entry, model_manager=model_manager) + else: + return self.reference + else: + # return 1 if it's validator generated, 0 if it's miner generated + return 1 if self.reference else 0 + + @property + def request_body(self) -> dict: + body = super().request_body + # By sending this over, we can allow miners to scale their prediction based on the probability of the reference being real + # so that validators can adjust the probability based on load in later iterations + body["real_reference_probability"] = self.REAL_REFERENCE_PROBABILITY + body["stage"] = self.stage + # if we're in the discriminative stage, we need to send the messages and the miner's answer, otherwise we just send the query + if self.stage == "discriminative": + body["messages"] = self.messages + [ + {"role": "assistant", "content": self.reference or self.generative_miner_answer} + ] + return body diff --git a/prompting/tasks/base_task.py b/prompting/tasks/base_task.py index a4746862b..ae6469979 100644 --- a/prompting/tasks/base_task.py +++ b/prompting/tasks/base_task.py @@ -33,6 +33,7 @@ class BaseTask(BaseModel, ABC): reference: Any = None task_id: str = Field(default_factory=lambda: str(uuid4()), allow_mutation=False) organic: bool = False + timeout: int = settings.shared_settings.NEURON_TIMEOUT model_config = ConfigDict(arbitrary_types_allowed=True) @@ -44,6 +45,14 @@ async def make_query(self, **kwargs): async def make_reference(self, **kwargs): raise NotImplementedError("Method make_reference must be implemented") + @property + def request_body(self) -> dict: + body = { + "task": self.__class__.__name__, + "timeout": self.timeout, + } + return body + class BaseTextTask(BaseTask): query: str | None = None @@ -58,7 +67,6 @@ class BaseTextTask(BaseTask): augmentation_system_prompt: ClassVar[str | None] = None dataset_entry: DatasetEntry | None = None sampling_params: dict[str, float] = settings.shared_settings.SAMPLING_PARAMS - timeout: int = settings.shared_settings.NEURON_TIMEOUT max_tokens: int = settings.shared_settings.NEURON_MAX_TOKENS organic: bool = False @@ -114,3 +122,12 @@ def augment_query( ) self.query = challenge return challenge + + @property + def request_body(self) -> dict: + body = super().request_body + body["seed"] = self.seed + body["sampling_parameters"] = self.sampling_params + body["model"] = self.llm_model_id + body["messages"] = self.task_messages + return body diff --git a/prompting/tasks/multi_step_reasoning.py b/prompting/tasks/multi_step_reasoning.py index 4b6a87948..e3ebcc99d 100644 --- a/prompting/tasks/multi_step_reasoning.py +++ b/prompting/tasks/multi_step_reasoning.py @@ -85,9 +85,8 @@ class MultiStepReasoningTask(WebQuestionAnsweringTask): async def make_query(self, dataset_entry: DDGDatasetEntry): query_prompt = QUERY_PROMPT_TEMPLATE.format(context=dataset_entry.website_content) question = await self.generate_query(messages=[query_prompt]) - msgs = [p + ". " if i < len(question.split(". ")) - 1 else p for i, p in enumerate(question.split(". ")) if p] self.messages = [{"role": "system", "content": random.choice(SAMPLE_SYSTEM_PROMPTS)}] + [ - {"role": random.choice(["user", "assistant"]), "content": msg} for msg in msgs + {"role": "user", "content": question} ] return self.query diff --git a/prompting/tasks/task_registry.py b/prompting/tasks/task_registry.py index 75ddbcc71..879f07297 100644 --- a/prompting/tasks/task_registry.py +++ b/prompting/tasks/task_registry.py @@ -11,7 +11,7 @@ from prompting.rewards.reward import BaseRewardConfig from prompting.tasks.base_task import BaseTextTask from prompting.tasks.inference import InferenceRewardConfig, InferenceTask -from prompting.tasks.multi_step_reasoning import MultiStepReasoningRewardConfig, MultiStepReasoningTask +from prompting.tasks.MSRv2_task import MSRv2RewardConfig, MSRv2Task from prompting.tasks.programming_task import ProgrammingRewardConfig, ProgrammingTask from prompting.tasks.qa import QARewardConfig, WebQuestionAnsweringTask from prompting.tasks.web_retrieval import WebRetrievalRewardConfig, WebRetrievalTask @@ -32,6 +32,7 @@ def __hash__(self): class TaskRegistry(BaseModel): task_configs: ClassVar[list[TaskConfig]] = [ + TaskConfig(task=MSRv2Task, probability=0.05, datasets=[DDGDataset], reward_model=MSRv2RewardConfig), TaskConfig( task=WebQuestionAnsweringTask, probability=0.05, @@ -40,27 +41,21 @@ class TaskRegistry(BaseModel): ), TaskConfig( task=InferenceTask, - probability=0.3, + probability=0.40, datasets=[SN13Dataset], reward_model=InferenceRewardConfig, ), TaskConfig( task=ProgrammingTask, - probability=0.10, + probability=0.20, datasets=[HuggingFaceGithubDataset], reward_model=ProgrammingRewardConfig, ), TaskConfig( task=WebRetrievalTask, - probability=0.25, - datasets=[DDGDataset], - reward_model=WebRetrievalRewardConfig, - ), - TaskConfig( - task=MultiStepReasoningTask, probability=0.3, datasets=[DDGDataset], - reward_model=MultiStepReasoningRewardConfig, + reward_model=WebRetrievalRewardConfig, ), ] diff --git a/prompting/tasks/task_sending.py b/prompting/tasks/task_sending.py index 1afc7f00d..4f2627e99 100644 --- a/prompting/tasks/task_sending.py +++ b/prompting/tasks/task_sending.py @@ -7,7 +7,6 @@ from prompting.rewards.scoring_config import ScoringConfig from prompting.tasks.base_task import BaseTextTask from prompting.tasks.inference import InferenceTask -from prompting.tasks.web_retrieval import WebRetrievalTask from shared import settings from shared.dendrite import DendriteResponseEvent from shared.epistula import query_miners @@ -45,17 +44,7 @@ async def collect_responses(task: BaseTextTask, miners_dict: dict) -> DendriteRe logger.warning("No available miners. This should already have been caught earlier.") return - body = { - "seed": task.seed, - "sampling_parameters": task.sampling_params, - "task": task.__class__.__name__, - "model": task.llm_model_id, - "messages": task.task_messages, - } - if isinstance(task, WebRetrievalTask): - body["target_results"] = task.target_results - body["timeout"] = task.timeout - stream_results = await query_miners(uids, body, timeout_seconds=task.timeout) + stream_results = await query_miners(uids, task.request_body, timeout_seconds=task.timeout) response_event = DendriteResponseEvent( stream_results=stream_results, diff --git a/prompting/tasks/web_retrieval.py b/prompting/tasks/web_retrieval.py index fdcd9f75f..aed37b945 100644 --- a/prompting/tasks/web_retrieval.py +++ b/prompting/tasks/web_retrieval.py @@ -53,3 +53,9 @@ async def make_reference(self, dataset_entry: DDGDatasetEntry, model_manager: Mo ref_dict = dataset_entry.model_dump_json() self.reference = json.dumps(ref_dict) return self.reference + + @property + def request_body(self) -> dict: + body = super().request_body + body["target_results"] = self.target_results + return body diff --git a/shared/logging/logging.py b/shared/logging/logging.py index 3f361d0a6..1ba74edcd 100644 --- a/shared/logging/logging.py +++ b/shared/logging/logging.py @@ -177,7 +177,7 @@ class RewardLoggingEvent(BaseEvent): response_event: DendriteResponseEvent reward_events: list[WeightedRewardEvent] task_id: str - reference: str + reference: str | None challenge: str | list[dict] task: str task_dict: dict