Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions neurons/miners/epistula_miner/miner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import asyncio
import json
import random
import time

import httpx
Expand Down Expand Up @@ -128,14 +129,27 @@ async def word_stream(body, headers):
async def create_chat_completion(self, request: Request):
data = await request.json()
headers = request.headers
if self.llm and request.headers.get("task", None) == "InferenceTask":
return await self.create_inference_completion(request)
if (
request.headers.get("task", None) == "multi_step_reasoning_v2"
and request.headers.get("stage", None) == "discriminative"
):
return await self.create_multi_step_reasoning_completion(request)
if request.headers.get("task", None) == "WebRetrievalTask":
return await self.stream_web_retrieval(data, headers)
if self.llm and request.headers.get("task", None) == "InferenceTask":
return await self.create_inference_completion(request)
req = self.client.build_request("POST", "chat/completions", json=await self.format_openai_query(request))
r = await self.client.send(req, stream=True)
return StreamingResponse(r.aiter_raw(), background=BackgroundTask(r.aclose), headers=r.headers)

async def create_multi_step_reasoning_completion(self, request: Request):
"""
Randomly guess a float as the discriminator answer
"""
data = {"choices": [{"delta": {"content": random.random()}, "index": 0, "finish_reason": None}]}
yield f"data: {json.dumps(data)}\n\n"
yield "data: [DONE]\n\n"

async def create_inference_completion(self, request: Request):
async def word_stream():
data = await request.json()
Expand Down
9 changes: 8 additions & 1 deletion neurons/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,14 @@ async def spawn_loops(task_queue: list, scoring_queue: list, reward_events: list
)
model_scheduler_task = asyncio.create_task(model_scheduler.start(scoring_queue), name="ModelScheduler")
task_scorer_task = asyncio.create_task(
task_scorer.start(model_scheduler, scoring_queue, reward_events, mp_lock=mp_lock, simultaneous_loops=1),
task_scorer.start(
model_scheduler,
scoring_queue,
reward_events,
mp_lock=mp_lock,
task_queue=task_queue,
simultaneous_loops=1,
),
name="TaskScorer",
)
all_tasks.extend([profile, task_loop_task, model_scheduler_task, task_scorer_task])
Expand Down
80 changes: 80 additions & 0 deletions prompting/rewards/MSRv2_reward.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from typing import TYPE_CHECKING

import numpy as np
from loguru import logger
from pydantic import ConfigDict

from prompting.rewards.reward import BaseRewardModel, BatchRewardOutput
from shared import settings
from shared.dendrite import DendriteResponseEvent
from shared.uids import get_uids

if TYPE_CHECKING:
from prompting.tasks.MSRv2_task import MSRv2Task

shared_settings = settings.shared_settings


uids_to_sample = get_uids(sampling_mode="all")


class MSRv2RewardModel(BaseRewardModel):
model_config = ConfigDict(arbitrary_types_allowed=True)

async def reward(
self,
reference: str,
response_event: DendriteResponseEvent,
task: "MSRv2Task",
task_queue: list,
**kwargs,
) -> BatchRewardOutput:
completions: list[str] = response_event.completions

if task.stage == "generative":
if len(completions) > 1:
logger.warning(f"Received {len(completions)} completions in generative stage, only using the first one")

if completions:
task.generative_miner_answer = completions[0] if completions[0] else "Miner did not return a response"
task.generator_uid = response_event.uids[0]

# Add task back to the task queue but now in the discriminative stage
task_queue.append(task)

logger.debug(f"Generate stage with answer: {task.generative_miner_answer} scored and re-appended")
output = BatchRewardOutput(rewards=np.array([]), timings=np.array([]), threshold=None, uids=np.array([]))

return output

elif task.stage == "discriminative":
discriminator_rewards = []
for comp in completions:
try:
# discriminator reward is (1-Squared Error)/N_Discriminators
comp_value = float(comp)
discriminator_rewards.append((1 - (task.ground_truth - comp_value) ** 2) / len(completions))
except (ValueError, TypeError):
# logger.error(f"Error converting completion to float: {e}")
discriminator_rewards.append(0.0) # Assign zero reward for invalid responses
generator_reward = 1 - sum(discriminator_rewards)

# If the answer was 'real' (hence no generator uid), we need to average the reward over all miners
if task.generator_uid is None:
assert task.ground_truth == 1, "If the answer was 'real', there should NOT be a generator uid"
generator_uids = get_uids(sampling_mode="all", exclude=response_event.uids)
generator_reward /= len(generator_uids)
else:
generator_uids = [task.generator_uid]

logger.debug(
f"Discriminative stage for task: {task.task_id} Generator rewards: {generator_reward} Discriminator rewards: {discriminator_rewards}, Ground truth: {task.ground_truth}"
)
return BatchRewardOutput(
rewards=np.array([generator_reward] * len(generator_uids) + discriminator_rewards),
timings=np.array([0] * (len(generator_uids) + len(discriminator_rewards))),
threshold=None,
uids=np.array(generator_uids + response_event.uids),
)
else:
raise ValueError(f"Invalid task stage: {task.stage}")
20 changes: 17 additions & 3 deletions prompting/rewards/reward.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class BatchRewardOutput(BaseModel):
threshold: float | None = None
extra_info: dict = {}
model_config = ConfigDict(arbitrary_types_allowed=True)
uids: list[int] | None = None

@model_validator(mode="after")
def validate_rewards_and_timings(cls, v):
Expand All @@ -71,7 +72,12 @@ class BaseRewardModel(ABC, BaseModel):

@abstractmethod
async def reward(
self, reference: str, response_event: DendriteResponseEvent, model_manager: ModelManager = None, **kwargs
self,
reference: str,
response_event: DendriteResponseEvent,
model_manager: ModelManager = None,
task_queue: list[BaseTextTask] | None = None,
**kwargs,
) -> BatchRewardOutput:
raise NotImplementedError("You must implement the reward method")

Expand All @@ -83,14 +89,18 @@ async def apply(
reward_type: Literal["reward", "penalty"] = "reward",
task: BaseTextTask | None = None,
model_manager: ModelManager | None = None,
task_queue: list[BaseTextTask] | None = None,
**kwargs,
) -> WeightedRewardEvent:
if task_queue is None:
raise ValueError("Task queue must be provided to BaseRewardModel.apply()")
t0 = time.time()
comparator = reference if reward_type == "reward" else challenge
batch_rewards_output: BatchRewardOutput = await self.reward(
comparator, response_event, task=task, model_manager=model_manager, **kwargs
comparator, response_event, task=task, model_manager=model_manager, task_queue=task_queue, **kwargs
)
batch_rewards_time = time.time() - t0
uids = batch_rewards_output.uids if batch_rewards_output.uids is not None else response_event.uids

return WeightedRewardEvent(
weight=self.weight,
Expand All @@ -103,7 +113,7 @@ async def apply(
threshold=batch_rewards_output.threshold,
timings=batch_rewards_output.timings,
extra_info=kwargs,
uids=response_event.uids,
uids=uids,
)


Expand Down Expand Up @@ -150,7 +160,10 @@ async def apply(
model_id: str | None = None,
task: BaseTextTask | None = None,
model_manager: ModelManager | None = None,
task_queue: list[BaseTextTask] | None = None,
) -> list[WeightedRewardEvent]:
if task_queue is None:
raise ValueError("Task queue must be provided to BaseRewardConfig.apply()")
reward_events = []
for weighted_reward in cls.reward_definitions:
reward_events.append(
Expand All @@ -162,6 +175,7 @@ async def apply(
model_id=model_id,
task=task,
model_manager=model_manager,
task_queue=task_queue,
),
)
return reward_events
7 changes: 6 additions & 1 deletion prompting/rewards/scoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class TaskScorer(AsyncLoopRunner):
interval: int = 1
scoring_queue: list | None = None
reward_events: list | None = None

task_queue: list | None = None
model_config = ConfigDict(arbitrary_types_allowed=True)

async def start(
Expand All @@ -39,12 +39,14 @@ async def start(
reward_events,
mp_lock: AcquirerProxy,
name: str | None = None,
task_queue: list | None = None,
**kwargs,
):
self.scoring_queue = scoring_queue
self.reward_events = reward_events
self.model_scheduler = model_scheduler
self.mp_lock = mp_lock
self.task_queue = task_queue
return await super().start(name=name, **kwargs)

def add_to_queue(
Expand Down Expand Up @@ -93,13 +95,16 @@ async def run_step(self) -> RewardLoggingEvent:
# and there we then calculate the reward
reward_pipeline = TaskRegistry.get_task_reward(scoring_config.task)
with Timer(label=f"Scoring {scoring_config.task.__class__.__name__}"):
if self.task_queue is None:
raise ValueError("Task queue must be provided to TaskScorer.run_step()")
reward_events = await reward_pipeline.apply(
response_event=scoring_config.response,
challenge=scoring_config.task.query,
reference=scoring_config.task.reference,
model_id=scoring_config.task.llm_model,
task=scoring_config.task,
model_manager=self.model_scheduler.llm_model_manager,
task_queue=self.task_queue,
)
self.reward_events.append(reward_events)

Expand Down
80 changes: 80 additions & 0 deletions prompting/tasks/MSRv2_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import random
from typing import ClassVar, Literal

from loguru import logger

from prompting.datasets.random_website import DDGDatasetEntry
from prompting.llms.model_manager import ModelManager
from prompting.rewards.MSRv2_reward import MSRv2RewardModel
from prompting.rewards.reward import BaseRewardConfig, BaseRewardModel
from prompting.tasks.multi_step_reasoning import MultiStepReasoningTask
from shared.base import Context


class MSRv2RewardConfig(BaseRewardConfig):
reward_definitions: ClassVar[list[BaseRewardModel]] = [
MSRv2RewardModel(weight=1),
]


class MSRv2Task(MultiStepReasoningTask):
"""QuestionAnsweringTasks must be initialised with an LLM pipeline to generate query and reference plus
context from a dataset to base the query on"""

name: ClassVar[str] = "multi_step_reasoning_v2"
augmentation_system_prompt: ClassVar[str] = ""
generative_miner_answer: str | None = None
reference: str | None = None
REAL_REFERENCE_PROBABILITY: float = 0.1
generator_uid: int | None = None

@property
def stage(self) -> Literal["generative", "discriminative"]:
if self.generative_miner_answer or self.reference:
return "discriminative"
return "generative"

@property
def ground_truth(self) -> int | None:
"""Returns 1 if the reference was generated by the validator, 0 if it was generated by the miner"""
if self.reference:
return 1
elif self.generative_miner_answer:
return 0
logger.error("No ground truth for MSRv2Task available yet")
return None

def make_query(self, dataset_entry: DDGDatasetEntry):
if self.stage == "generative":
# Question to send to miner
self.query = super().make_query(dataset_entry)
# Wrapped Query
self.messages = [{"role": "user", "content": self.query}]
return self.query
else:
return self.reference or self.generative_miner_answer

async def make_reference(self, dataset_entry: Context, model_manager: ModelManager):
if self.stage == "generative":
# Generates a real reference with probability REAL_REFERENCE_PROBABILITY, otherwise waits for miner to generate an answer
if random.random() < self.REAL_REFERENCE_PROBABILITY:
return super().make_reference(dataset_entry, model_manager=model_manager)
else:
return self.reference
else:
# return 1 if it's validator generated, 0 if it's miner generated
return 1 if self.reference else 0

@property
def request_body(self) -> dict:
body = super().request_body
# By sending this over, we can allow miners to scale their prediction based on the probability of the reference being real
# so that validators can adjust the probability based on load in later iterations
body["real_reference_probability"] = self.REAL_REFERENCE_PROBABILITY
body["stage"] = self.stage
# if we're in the discriminative stage, we need to send the messages and the miner's answer, otherwise we just send the query
if self.stage == "discriminative":
body["messages"] = self.messages + [
{"role": "assistant", "content": self.reference or self.generative_miner_answer}
]
return body
19 changes: 18 additions & 1 deletion prompting/tasks/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class BaseTask(BaseModel, ABC):
reference: Any = None
task_id: str = Field(default_factory=lambda: str(uuid4()), allow_mutation=False)
organic: bool = False
timeout: int = settings.shared_settings.NEURON_TIMEOUT

model_config = ConfigDict(arbitrary_types_allowed=True)

Expand All @@ -44,6 +45,14 @@ async def make_query(self, **kwargs):
async def make_reference(self, **kwargs):
raise NotImplementedError("Method make_reference must be implemented")

@property
def request_body(self) -> dict:
body = {
"task": self.__class__.__name__,
"timeout": self.timeout,
}
return body


class BaseTextTask(BaseTask):
query: str | None = None
Expand All @@ -58,7 +67,6 @@ class BaseTextTask(BaseTask):
augmentation_system_prompt: ClassVar[str | None] = None
dataset_entry: DatasetEntry | None = None
sampling_params: dict[str, float] = settings.shared_settings.SAMPLING_PARAMS
timeout: int = settings.shared_settings.NEURON_TIMEOUT
max_tokens: int = settings.shared_settings.NEURON_MAX_TOKENS
organic: bool = False

Expand Down Expand Up @@ -114,3 +122,12 @@ def augment_query(
)
self.query = challenge
return challenge

@property
def request_body(self) -> dict:
body = super().request_body
body["seed"] = self.seed
body["sampling_parameters"] = self.sampling_params
body["model"] = self.llm_model_id
body["messages"] = self.task_messages
return body
3 changes: 1 addition & 2 deletions prompting/tasks/multi_step_reasoning.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,8 @@ class MultiStepReasoningTask(WebQuestionAnsweringTask):
async def make_query(self, dataset_entry: DDGDatasetEntry):
query_prompt = QUERY_PROMPT_TEMPLATE.format(context=dataset_entry.website_content)
question = await self.generate_query(messages=[query_prompt])
msgs = [p + ". " if i < len(question.split(". ")) - 1 else p for i, p in enumerate(question.split(". ")) if p]
self.messages = [{"role": "system", "content": random.choice(SAMPLE_SYSTEM_PROMPTS)}] + [
{"role": random.choice(["user", "assistant"]), "content": msg} for msg in msgs
{"role": "user", "content": question}
]
return self.query

Expand Down
Loading