diff --git a/README.md b/README.md index b2d83a769..164cee111 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,12 @@ If you are running a miner, you will also need to uninstall uvloop. pip uninstall uvloop -y ``` +If you are running a validator, logging in to Hugging Face is required: +```shell +huggingface-cli login +``` +You also need to accept the License Agreement for the LMSYS-Chat-1M dataset: https://huggingface.co/datasets/lmsys/lmsys-chat-1m + # Compute Requirements diff --git a/prompting/agent.py b/prompting/agent.py index b4b67255b..615bff1d1 100644 --- a/prompting/agent.py +++ b/prompting/agent.py @@ -18,10 +18,11 @@ import time import bittensor as bt from dataclasses import asdict +from typing import Optional + from prompting.tasks import Task -from prompting.llms import HuggingFaceLLM, vLLM_LLM +from prompting.llms import vLLM_LLM from prompting.cleaners.cleaner import CleanerPipeline - from prompting.persona import Persona, create_persona from transformers import Pipeline @@ -42,7 +43,7 @@ def finished(self): """This is a roleplaying game where you are impersonating {mood} human user with a specific persona. As a human, you are using AI assistant to {desc} related to {topic} ({subtopic}) in a {tone} tone. You don't need to greet the assistant or be polite, unless this is part of your persona. The spelling and grammar of your messages should also reflect your persona. Your singular focus is to use the assistant to {goal}: {query} - """ + """ ) def __init__( @@ -52,10 +53,8 @@ def __init__( system_template: str = None, persona: Persona = None, begin_conversation=True, + system_prompt: Optional[str] = None, ): - if persona is None: - persona = create_persona() - self.persona = persona self.task = task self.llm_pipeline = llm_pipeline @@ -63,11 +62,15 @@ def __init__( if system_template is not None: self.system_prompt_template = system_template - self.system_prompt = self.system_prompt_template.format( - mood=self.persona.mood, - tone=self.persona.tone, - **self.task.__state_dict__(), # Adds desc, subject, topic - ) + self.system_prompt = system_prompt + if self.system_prompt is None: + if self.persona is None: + self.persona = create_persona() + self.system_prompt = self.system_prompt_template.format( + mood=self.persona.mood, + tone=self.persona.tone, + **self.task.__state_dict__(), # Adds desc, subject, topic + ) super().__init__( llm_pipeline=llm_pipeline, diff --git a/prompting/base/neuron.py b/prompting/base/neuron.py index 80fe42260..99400a037 100644 --- a/prompting/base/neuron.py +++ b/prompting/base/neuron.py @@ -17,6 +17,7 @@ import copy import sys +import threading import bittensor as bt diff --git a/prompting/base/validator.py b/prompting/base/validator.py index 93240ad5b..7b4f27e6b 100644 --- a/prompting/base/validator.py +++ b/prompting/base/validator.py @@ -15,19 +15,21 @@ # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER # DEALINGS IN THE SOFTWARE. -import sys -import copy -import torch -import asyncio import argparse +import asyncio +import copy +import sys import threading -import bittensor as bt - -from typing import List from traceback import print_exception +from typing import Optional + +import bittensor as bt +import torch +from organic_scoring.synth_dataset import SynthDatasetConversation from prompting.base.neuron import BaseNeuron from prompting.mock import MockDendrite +from prompting.organic.organic_scoring_prompting import OrganicScoringPrompting from prompting.utils.config import add_validator_args from prompting.utils.exceptions import MaxRetryError @@ -64,9 +66,9 @@ def __init__(self, config=None): # Init sync with the network. Updates the metagraph. self.sync() - # Serve axon to enable external connections. + self.axon: Optional[bt.axon] = None if not self.config.neuron.axon_off: - self.serve_axon() + self.axon = bt.axon(wallet=self.wallet, config=self.config) else: bt.logging.warning("axon off, not serving ip to chain.") @@ -79,23 +81,39 @@ def __init__(self, config=None): self.thread: threading.Thread = None self.lock = asyncio.Lock() - def serve_axon(self): - """Serve axon to enable external connections.""" - - bt.logging.info("serving ip to chain...") - try: - self.axon = bt.axon(wallet=self.wallet, config=self.config) - - try: - self.subtensor.serve_axon( - netuid=self.config.netuid, - axon=self.axon, + self._organic_scoring: Optional[OrganicScoringPrompting] = None + if self.axon is not None and not self.config.neuron.organic_disabled: + dataset = SynthDatasetConversation() + if dataset.exception is not None: + bt.logging.error( + f"Organic scoring on synthetic data is disabled. Failed to load dataset: {dataset.exception}" ) - except Exception as e: - bt.logging.error(f"Failed to serve Axon with exception: {e}") + dataset = None + self._organic_scoring = OrganicScoringPrompting( + axon=self.axon, + synth_dataset=dataset, + trigger_frequency=self.config.neuron.organic_trigger_frequency, + trigger_frequency_min=self.config.neuron.organic_trigger_frequency_min, + trigger=self.config.neuron.organic_trigger, + trigger_scaling_factor=self.config.neuron.organic_scaling_factor, + validator=self, + ) + else: + bt.logging.warning( + "Organic scoring is not enabled. To enable, remove '--neuron.axon_off' and '--neuron.organic_disabled'" + ) + + if self.axon is not None: + self._serve_axon() - except Exception as e: - bt.logging.error(f"Failed to create Axon initialize with exception: {e}") + if self._organic_scoring is not None: + self.loop.create_task(self._organic_scoring.start_loop()) + + def _serve_axon(self): + """Serve axon to enable external connections""" + validator_uid = self.metagraph.hotkeys.index(self.wallet.hotkey.ss58_address) + bt.logging.info(f"Serving validator IP of UID {validator_uid} to chain...") + self.axon.serve(netuid=self.config.netuid, subtensor=self.subtensor).start() def run(self): """ @@ -116,7 +134,6 @@ def run(self): KeyboardInterrupt: If the miner is stopped by a manual interruption. Exception: For unforeseen errors during the miner's operation, which are logged for diagnosis. """ - # Check that validator is registered on the network. self.sync() @@ -313,7 +330,7 @@ def resync_metagraph(self): # Update the hotkeys. self.hotkeys = copy.deepcopy(self.metagraph.hotkeys) - def update_scores(self, rewards: torch.FloatTensor, uids: List[int]): + def update_scores(self, rewards: torch.FloatTensor, uids: list[int]): """Performs exponential moving average on the scores based on the rewards received from the miners.""" # Check if rewards contains NaN values. @@ -327,6 +344,7 @@ def update_scores(self, rewards: torch.FloatTensor, uids: List[int]): step_rewards = self.scores.scatter( 0, torch.tensor(uids).to(self.device), rewards.to(self.device) ).to(self.device) + bt.logging.debug(f"Scattered rewards: {rewards}") # Update scores with rewards produced by this step. diff --git a/prompting/forward.py b/prompting/forward.py index a3a63272c..0cbb1d368 100644 --- a/prompting/forward.py +++ b/prompting/forward.py @@ -23,6 +23,7 @@ import numpy as np import bittensor as bt from typing import List, Dict, Awaitable + from prompting.agent import HumanAgent from prompting.dendrite import DendriteResponseEvent, SynapseStreamResult from prompting.conversation import create_task @@ -33,16 +34,9 @@ from prompting.utils.logging import log_event from prompting.utils.misc import async_log, serialize_exception_to_string from transformers import PreTrainedTokenizerFast as Tokenizer -from prompting.utils.uids import get_random_uids -from dataclasses import dataclass -SINGLE_TURN_TASKS = ['sentiment', 'translation'] +SINGLE_TURN_TASKS = ('sentiment', 'translation') -@async_log -async def generate_reference(agent): - loop = asyncio.get_running_loop() - result = await loop.run_in_executor(None, agent.task.generate_reference, agent.llm_pipeline) - return result @async_log async def execute_dendrite_call(dendrite_call): @@ -59,7 +53,8 @@ async def process_stream(uid: int, async_iterator: Awaitable, tokenizer: Tokeniz accumulated_tokens_per_chunk = [] start_time = time.time() - try: + try: + chunk = None async for chunk in async_iterator: # most important loop, as this is where we acquire the final synapse. if isinstance(chunk, str): accumulated_chunks.append(chunk) @@ -76,7 +71,7 @@ async def process_stream(uid: int, async_iterator: Awaitable, tokenizer: Tokeniz raise ValueError( f"Something went wrong with miner uid {uid}, Synapse is not StreamPromptingSynapse." ) - except Exception as e: + except Exception as e: exception = e traceback_details = traceback.format_exc() bt.logging.error( @@ -204,10 +199,11 @@ async def run_step( handle_stream_responses_task = asyncio.create_task(handle_response(stream_results_dict, tokenizer)) if not agent.task.static_reference: - reference_generation_task = generate_reference(agent) - _, stream_results = await asyncio.gather( - reference_generation_task, handle_stream_responses_task - ) + async with self.lock: + reference_generation_task = generate_reference(agent) + _, stream_results = await asyncio.gather( + reference_generation_task, handle_stream_responses_task + ) else: stream_results = await handle_stream_responses_task @@ -244,7 +240,7 @@ async def run_step( "best": best_response, "block": self.block, "step": self.step, - "step_time": time.time() - start_time, + "step_time": time.time() - start_time, **agent.__state_dict__(full=self.config.neuron.log_full), **reward_result.__state_dict__(full=self.config.neuron.log_full), **response_event.__state_dict__(), @@ -292,7 +288,7 @@ async def forward(self): turn = 0 exclude_uids = [] - roles = ['user'] + roles = ["user"] messages = [agent.challenge] while True: # Note: The try catch is a safe clause to ensure that the forward loop continues even if an error occurs in run_step. @@ -314,13 +310,13 @@ async def forward(self): event["turn"] = turn log_event(self, event) task.complete = True - + accepted_answer = event["best"] if random.random() < 0.5 else agent.task.reference roles.append("assistant") messages.append(accepted_answer) - # 50% chance of single turn conversation, 25% of two turns, 12.5% chance of 3 turns, 6.25% chance of 4 turns, 3.63% chance of 5... - if random.random()<0.5 or turn>=1: + # 50% chance of single turn conversation, 25% of two turns. + if random.random() < 0.5 or turn >= 1: break if task.name in SINGLE_TURN_TASKS: @@ -341,13 +337,16 @@ async def forward(self): except BaseException as e: unexpected_errors = serialize_exception_to_string(e) bt.logging.error( - f"Error in run_step: Skipping to next round. \n {unexpected_errors}" + f"Error in run_step: Skipping to next round.\n" + f"Task: {task_name}\nMessages: {messages}\nRoles: {roles}\nTurn: {turn}.\n" + f"{unexpected_errors}\n" ) event = {"unexpected_errors": unexpected_errors} log_event(self, event) - continue + await asyncio.sleep(1) + continue del agent del task diff --git a/prompting/llms/vllm_llm.py b/prompting/llms/vllm_llm.py index 85ee7b974..995a60de1 100644 --- a/prompting/llms/vllm_llm.py +++ b/prompting/llms/vllm_llm.py @@ -14,11 +14,10 @@ # THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER # DEALINGS IN THE SOFTWARE. -import gc +import threading import time -import torch import bittensor as bt -from typing import List, Dict +from typing import List, Dict, Optional, Any from vllm import LLM, SamplingParams from prompting.cleaners.cleaner import CleanerPipeline from prompting.llms import BasePipeline, BaseLLM @@ -55,6 +54,8 @@ def load_vllm_pipeline(model_id: str, device: str, gpus: int, max_allowed_memory class vLLMPipeline(BasePipeline): + _LOCK = threading.Lock() + def __init__( self, model_id: str, @@ -81,7 +82,8 @@ def __call__(self, composed_prompt: str, **model_kwargs: Dict) -> str: sampling_params = SamplingParams( temperature=temperature, top_p=top_p, max_tokens=max_tokens ) - output = self.llm.generate(composed_prompt, sampling_params, use_tqdm=True) + with self._LOCK: + output = self.llm.generate(composed_prompt, sampling_params, use_tqdm=True) response = output[0].outputs[0].text return response @@ -112,6 +114,30 @@ def __init__( "end": "<|start_header_id|>assistant<|end_header_id|>", } + def query_conversation( + self, + messages: list[str], + roles: list[str], + cleaner: Optional[CleanerPipeline] = None, + ): + """Query LLM with the given lists of conversation history and roles + + Args: + messages (list[str]): List of messages in the conversation. + roles (list[str]): List of roles for each message. + cleaner (Optional[CleanerPipeline], optional): Cleaner pipeline to use, if any. + """ + assert len(messages) == len(roles), "Length of messages and roles must be the same" + inputs: list[dict[str, Any]] = [{"content": self.system_prompt, "role": "system"}] + for role, message in zip(roles, messages): + inputs.append({"content": message, "role": role}) + + t0 = time.perf_counter() + response = self.forward(messages=inputs) + response = self.clean_response(cleaner, response) + self.times.extend((0, time.perf_counter() - t0)) + return response + def query( self, message: str, diff --git a/prompting/organic/__init__.py b/prompting/organic/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/prompting/organic/organic_scoring_prompting.py b/prompting/organic/organic_scoring_prompting.py new file mode 100644 index 000000000..68fb93791 --- /dev/null +++ b/prompting/organic/organic_scoring_prompting.py @@ -0,0 +1,351 @@ +import asyncio +import json +import time +from functools import partial +from typing import Any, AsyncGenerator, Literal, Sequence, Tuple, Union + +import bittensor as bt +import torch +import numpy as np +from organic_scoring import OrganicScoringBase +from organic_scoring.synth_dataset import SynthDatasetBase +from starlette.types import Send +from typing_extensions import override +from bittensor.dendrite import dendrite + +from prompting.agent import HumanAgent +from prompting.base.neuron import BaseNeuron +from prompting.cleaners.cleaner import CleanerPipeline +from prompting.dendrite import DendriteResponseEvent, SynapseStreamResult +from prompting.forward import handle_response +from prompting.llms.vllm_llm import vLLM_LLM +from prompting.organic.organic_task import OrganicTask +from prompting.organic.synth_organic_task import SynthOrganicTask +from prompting.protocol import StreamPromptingSynapse +from prompting.rewards.pipeline import RewardPipeline +from prompting.rewards.reward import RewardResult +from prompting.tasks.task import make_system_prompt +from prompting.utils.logging import log_event +from prompting.utils.uids import get_random_uids, get_uids + + +class OrganicScoringPrompting(OrganicScoringBase): + def __init__( + self, + axon: bt.axon, + synth_dataset: Union[SynthDatasetBase, Sequence[SynthDatasetBase]], + trigger_frequency: Union[float, int], + trigger: Literal["seconds", "steps"], + validator: BaseNeuron, + trigger_frequency_min: Union[float, int] = 5, + trigger_scaling_factor: Union[float, int] = 5, + ): + """Organic Scoring implementation. + + Organic scoring runs in a separate `asyncio` task and is triggered by a timer or a step counter. + + Process Workflow: + 1. Trigger Check: Upon triggering the rewarding process, the system checks if the organic queue is empty. + If the queue is empty, synthetic datasets are used to bootstrap the organic scoring mechanism. + Otherwise, samples from the organic queue are utilized. + 2. Data Processing: The sampled data is concurrently passed to the `_query_miners` and `_generate_reference` + methods. + 3. Reward Generation: After receiving responses from miners and any reference data, the information + is processed by the `_generate_rewards` method. + 4. Weight Setting: The generated rewards are then applied through the `_set_weights` method. + 5. Logging: Finally, the results can be logged using the `_log_results` method, along with all relevant data + provided as arguments, and default time elapsed on each step of rewarding process. + """ + super().__init__( + axon=axon, + synth_dataset=synth_dataset, + trigger_frequency=trigger_frequency, + trigger=trigger, + trigger_frequency_min=trigger_frequency_min, + trigger_scaling_factor=trigger_scaling_factor, + ) + self._val = validator + # Organic scoring reward pipeline. + self._reward_pipeline = RewardPipeline( + selected_tasks=[OrganicTask.name, SynthOrganicTask.name], + device=self._val.device, + available_tasks={ + OrganicTask.name: OrganicTask, + SynthOrganicTask.name: SynthOrganicTask, + }, + ) + + @override + async def _priority_fn(self, synapse: StreamPromptingSynapse) -> float: + """Priority function for the axon.""" + return 10000000.0 + + @override + async def _blacklist_fn(self, synapse: StreamPromptingSynapse) -> Tuple[bool, str]: + """Blacklist function for the axon.""" + # ! DO NOT CHANGE `Tuple` return type to `tuple`, it will break the code (bittensor internal signature checks). + # We expect the API to be run with one specific hotkey (e.g. OTF). + return synapse.dendrite.hotkey != self._val.config.neuron.organic_whitelist_hotkey, "" + + @override + async def _on_organic_entry(self, synapse: StreamPromptingSynapse) -> StreamPromptingSynapse: + """Organic query handle.""" + bt.logging.info(f"[Organic] Received from {synapse.dendrite.hotkey}, IP: {synapse.dendrite.ip}") + + uids = get_uids( + self._val, + sampling_mode=self._val.config.neuron.organic_sampling_mode, + k=self._val.config.neuron.organic_sample_size, + exclude=[], + ) + uids_list = uids.cpu().tolist() + completions: dict[int, dict] = {} + token_streamer = partial( + self._stream_miner_response, + synapse, + uids_list, + completions, + ) + + streaming_response = synapse.create_streaming_response(token_streamer) + self._organic_queue.add( + { + "roles": synapse.roles, + "messages": synapse.messages, + "organic": True, + "synapse": synapse, + "streaming_response": streaming_response, + "uids": uids_list, + "completions": completions, + } + ) + return streaming_response + + async def _stream_miner_response( + self, + synapse: StreamPromptingSynapse, + uids: list[int], + completions: dict[int, dict], + send: Send, + ): + """Stream back miner's responses.""" + bt.logging.info(f"[Organic] Querying miner UIDs: {uids}") + try: + async with dendrite(wallet=self._val.wallet) as dend: + responses = await dend( + axons=[self._val.metagraph.axons[uid] for uid in uids], + synapse=synapse, + timeout=self._val.config.neuron.organic_timeout, + deserialize=False, + streaming=True, + ) + except Exception as e: + bt.logging.error(f"[Organic] Error querying dendrite: {e}") + return + + async def stream_miner_chunks(uid: int, chunks: AsyncGenerator): + accumulated_chunks: list[str] = [] + accumulated_chunks_timings: list[float] = [] + accumulated_tokens_per_chunk: list[int] = [] + synapse: StreamPromptingSynapse | None = None + completions[uid] = {"completed": False} + timer_start = time.perf_counter() + async for chunk in chunks: + try: + if isinstance(chunk, str): + accumulated_chunks.append(chunk) + accumulated_chunks_timings.append(time.perf_counter() - timer_start) + json_chunk = json.dumps({"uid": uid, "chunk": chunk}) + await send( + { + "type": "http.response.body", + "body": json_chunk.encode("utf-8"), + "more_body": True, + } + ) + elif isinstance(chunk, StreamPromptingSynapse): + synapse = chunk + except Exception as e: + bt.logging.error(f"[Organic] Error while streaming chunks: {e}") + break + # TODO: Do we need to identify the end of each miner's response? + # json_chunk = json.dumps({"uid": uid, "chunk": b"", "completed": True}) + # await send({"type": "http.response.body", "body": json_chunk, "more_body": False}) + await send({"type": "http.response.body", "body": b"", "more_body": False}) + completions[uid]["accumulated_chunks"] = accumulated_chunks + completions[uid]["accumulated_chunks_timings"] = accumulated_chunks_timings + completions[uid]["accumulated_tokens_per_chunk"] = accumulated_tokens_per_chunk + completions[uid]["completed"] = True + completions[uid]["synapse"] = synapse + # bt.logging.debug(f"[Organic] Streaming {uid}: {''.join(accumulated_chunks)}") + + bt.logging.info(f"[Organic] Awaiting miner streams UIDs: {uids}") + await asyncio.gather( + *[stream_miner_chunks(uid, chunks) for uid, chunks in zip(uids, responses)], + return_exceptions=True, + ) + + async def _reuse_organic_response(self, sample: dict[str, Any]) -> dict[int, SynapseStreamResult]: + """Return a dictionary where the keys are miner UIDs and the values are their corresponding streaming responses. + + This method reuses miner responses for organic data. It waits for each miner to complete within the + `neuron.organic_timeout` specified timeout and returns the responses. For miners who exceed the timeout, + an empty synapse response is returned. + + Args: + sample: Dict where the keys are miner UIDs and the values are the input streaming synapses. + """ + if not sample.get("organic", False): + return None + + uids_cpu = sample["uids"] + responses: dict[int, SynapseStreamResult] = {} + bt.logging.info(f"[Organic] Reusing miner responses for organic data, UIDs: {uids_cpu}") + + async def _check_completion(sample: dict[str, Any], uid: int): + while not sample["completions"][uid]["completed"]: + await asyncio.sleep(0.1) + + async def _wait_for_completion(uid: int): + try: + await asyncio.wait_for( + _check_completion(sample, uid), + self._val.config.neuron.organic_timeout, + ) + response = SynapseStreamResult( + accumulated_chunks=sample["completions"][uid]["accumulated_chunks"], + accumulated_chunks_timings=sample["completions"][uid]["accumulated_chunks_timings"], + tokens_per_chunk=sample["completions"][uid]["accumulated_tokens_per_chunk"], + synapse=sample["completions"][uid]["synapse"], + uid=uid, + exception=None, + ) + except asyncio.TimeoutError: + response = SynapseStreamResult( + accumulated_chunks=[], + accumulated_chunks_timings=[], + tokens_per_chunk=[], + synapse=None, + uid=uid, + exception=None, + ) + responses[uid] = response + + await asyncio.gather(*[_wait_for_completion(uid) for uid in uids_cpu]) + return responses + + @override + async def _query_miners(self, sample: dict[str, Any]) -> dict[str, SynapseStreamResult]: + """Query miners with the given synthetic or organic sample.""" + if sample.get("organic", False) and not self._val.config.neuron.organic_reuse_response_disabled: + responses = await self._reuse_organic_response(sample) + return responses + + # Get the list of uids to query. + uids = get_random_uids(self._val, k=self._val.config.neuron.organic_sample_size, exclude=None).to( + self._val.device + ) + uids_cpu = uids.cpu().tolist() + bt.logging.info(f"[Organic] Querying miners with synthetic data, UIDs: {uids_cpu}") + streams_responses = await self._val.dendrite.forward( + axons=[self._val.metagraph.axons[uid] for uid in uids_cpu], + synapse=StreamPromptingSynapse(roles=sample["roles"], messages=sample["messages"]), + timeout=self._val.config.neuron.organic_timeout, + deserialize=False, + streaming=True, + ) + stream_results_dict = dict(zip(uids_cpu, streams_responses)) + responses = await handle_response(stream_results_dict, self._val.llm_pipeline.tokenizer) + return dict(zip(uids_cpu, responses)) + + @override + async def _generate_rewards( + self, + sample: dict[str, Any], + responses: dict[str, Any], + reference: dict[str, Any], + ) -> dict[str, Any]: + """Generate rewards for the given sample, responses, and reference.""" + assert reference is not None + if sample.get("organic", False): + task = OrganicTask(context=sample, reference=reference) + else: + task = SynthOrganicTask(context=sample, reference=reference) + stream_results = list(responses.values()) + uids_list = list(responses.keys()) + uids = torch.tensor(uids_list) + timeout = self._val.config.neuron.organic_timeout + response_event = DendriteResponseEvent(stream_results=stream_results, uids=uids, timeout=timeout) + + bt.logging.debug(f"[Organic] Miner stream results: {stream_results}") + + # Dummy HumanAgent used to reuse existing reward pipeline. + agent = HumanAgent( + task=task, + llm_pipeline=self._val.llm_pipeline, + begin_conversation=True, + system_prompt=make_system_prompt(), + ) + reward_result = RewardResult( + self._reward_pipeline, + agent=agent, + response_event=response_event, + device=self._val.device, + ) + bt.logging.info(f"[Organic] RewardResult: {reward_result}") + return {"reward": reward_result, "uids": uids_list, "agent": agent, "organic": sample.get("organic", False)} + + @override + async def _set_weights(self, reward: dict[str, Any]): + """Set weights based on the given reward.""" + uids = reward["uids"] + reward_result = reward["reward"] + if not reward.get("organic", False): + reward_result.rewards *= self._val.config.neuron.organic_synth_reward_scale + + uids_to_reward = dict(zip(uids, reward_result.rewards)) + bt.logging.info(f"[Organic] Rewards for miner's UIDs: {uids_to_reward}") + bt.logging.info(f"[Organic] Weight setting enabled: {self._val.config.neuron.organic_set_weights_enabled}") + if self._val.config.neuron.organic_set_weights_enabled: + self._val.update_scores(reward_result.rewards, uids) + # Sync is not needed as it's done in the benchmarks loop. + # self._val.sync() + + @override + async def _log_results( + self, + logs: dict[str, Any], + reference: str, + responses: dict[int, SynapseStreamResult], + rewards: dict[str, Any], + sample: dict[str, Any], + *args, + **kwargs, + ): + logs["block"] = self._val.block + logs["step"] = self._val.step + # Length of messages is incremented by 2 every step: query and response. + logs["turn"] = len(sample["messages"]) // 2 + completions_len: list[int] = [len(response.synapse.completion) for response in responses.values()] + logs["organic_response_mean_chars"] = np.mean(completions_len) + logs["organic_response_std_chars"] = np.std(completions_len) + logs["organic_reference_chars"] = len(reference) + logs.update(rewards["reward"].__state_dict__(full=self._val.config.neuron.log_full)) + log_event(self._val, logs) + + return logs + + @override + async def _generate_reference(self, sample: dict[str, Any]) -> str: + """Generate reference for the given organic or synthetic sample.""" + async with self._val.lock: + reference = vLLM_LLM( + self._val.llm_pipeline, + system_prompt=make_system_prompt(), + max_new_tokens=self._val.config.neuron.organic_reference_max_tokens, + ).query_conversation( + messages=sample["messages"], + roles=sample["roles"], + cleaner=CleanerPipeline(cleaning_pipeline=[]), + ) + return reference diff --git a/prompting/organic/organic_task.py b/prompting/organic/organic_task.py new file mode 100644 index 000000000..e8faee479 --- /dev/null +++ b/prompting/organic/organic_task.py @@ -0,0 +1,44 @@ +from dataclasses import dataclass + +from prompting.tasks import Task + + +@dataclass +class OrganicTask(Task): + """Task with defined reward and penalty mechanisms for organic prompts.""" + + name = "organic" + # Use challenge as a query. + challenge_type = "query" + + reward_definition = [ + dict(name="relevance", weight=0.8), + dict(name="rouge", ngram="rouge-1", metric="f", weight=0.2), + ] + + penalty_definition = [ + dict(name="relevance", weight=0.5), + ] + + cleaning_pipeline = [] + + def __init__(self, context: dict, reference: str): + self.context = context + self.messages = context["messages"] + self.roles = context["roles"] + self.query = context["messages"][-1] + self.topic = "Organic" + self.reference = reference + self.subtopic = "" + self.tags = [""] + + def __str__(self): + return f"{self.__class__.__name__}(name={self.name!r}, query={self.query!r}, reference={self.reference!r})" + + def __repr__(self): + return str(self) + + def __state_dict__(self, full=False): + # Disable any logs for organic queries. + state = {} + return state diff --git a/prompting/organic/synth_organic_task.py b/prompting/organic/synth_organic_task.py new file mode 100644 index 000000000..a30858021 --- /dev/null +++ b/prompting/organic/synth_organic_task.py @@ -0,0 +1,31 @@ +from dataclasses import dataclass + +from prompting.organic.organic_task import OrganicTask + + +@dataclass +class SynthOrganicTask(OrganicTask): + """Task with defined reward and penalty mechanisms for synthetic organic prompts.""" + + name = "synthetic-organic" + + reward_definition = [ + dict(name="relevance", weight=0.8), + dict(name="rouge", ngram="rouge-1", metric="f", weight=0.2), + ] + + penalty_definition = [ + dict(name="relevance", weight=0.5), + ] + + cleaning_pipeline = [] + + def __init__(self, context: dict, reference: str): + self.context = context + self.messages = context["messages"] + self.roles = context["roles"] + self.query = context["messages"][-1] + self.topic = "Organic" + self.reference = reference + self.subtopic = "" + self.tags = [""] diff --git a/prompting/rewards/pipeline.py b/prompting/rewards/pipeline.py index 2b126c401..27419e5ed 100644 --- a/prompting/rewards/pipeline.py +++ b/prompting/rewards/pipeline.py @@ -1,5 +1,4 @@ -from typing import List - +from typing import Type, Optional from prompting.tasks import TASKS from prompting.rewards import ( BaseRewardModel, @@ -11,6 +10,7 @@ OrdinalRewardModel, StreamingRewardModel ) +from prompting.tasks.task import Task REWARD_MODELS = { "rouge": RougeRewardModel, @@ -24,7 +24,10 @@ class RewardPipeline: - def __init__(self, selected_tasks: List[str], device): + def __init__(self, selected_tasks: list[str], device, available_tasks: Optional[dict[str, Type[Task]]] = None): + self.available_tasks = available_tasks + if self.available_tasks is None: + self.available_tasks = TASKS self.selected_tasks = selected_tasks self.device = device self.validate_tasks() @@ -41,9 +44,9 @@ def __repr__(self): def validate_tasks(self): for task in self.selected_tasks: - if task not in TASKS: + if task not in self.available_tasks: raise ValueError( - f"Task {task} not supported. Please choose from {TASKS.keys()}" + f"Task {task} not supported. Please choose from {self.available_tasks.keys()}" ) # Check that the reward_definition and penalty_definition are lists of dictionaries whose weights sum to one self._check_weights(task, "reward_definition", expected_weight=1) @@ -52,7 +55,7 @@ def validate_tasks(self): def _check_weights(self, task, definition, expected_weight): total_weight = 0 - model_infos = getattr(TASKS[task], definition) + model_infos = getattr(self.available_tasks[task], definition) for model_info in model_infos: if not isinstance(model_info, dict): @@ -90,9 +93,9 @@ def load_reward_pipeline(self): active_reward_models = [] for task in self.selected_tasks: - active_reward_models += TASKS[task].reward_definition - active_reward_models += TASKS[task].penalty_definition - active_reward_models += TASKS[task].global_penalty_definition + active_reward_models += self.available_tasks[task].reward_definition + active_reward_models += self.available_tasks[task].penalty_definition + active_reward_models += self.available_tasks[task].global_penalty_definition # Instantiate only the required reward models reward_models = {} diff --git a/prompting/tasks/qa.py b/prompting/tasks/qa.py index d8a6e91a8..e9564ec2b 100644 --- a/prompting/tasks/qa.py +++ b/prompting/tasks/qa.py @@ -92,7 +92,7 @@ def __init__(self, llm_pipeline, context, create_reference=True, history=None): self.query_prompt = FOLLOWUP_PROMPT_TEMPLATE.format(context=context.content, history=history) bt.logging.warning(f'Using history!!\n{history=}\n\n{context=}\n\n{self.query_prompt=}') else: - self.query_prompt = QUERY_PROMPT_TEMPLATE.format(context=context.content) + self.query_prompt = QUERY_PROMPT_TEMPLATE.format(context=context.content) self.query = self.generate_query(llm_pipeline) diff --git a/prompting/utils/config.py b/prompting/utils/config.py index 67ae5b94d..8fa1eea35 100644 --- a/prompting/utils/config.py +++ b/prompting/utils/config.py @@ -404,6 +404,106 @@ def add_validator_args(cls, parser): default=120, ) + parser.add_argument( + "--neuron.organic_sample_size", + type=int, + help="The number of miners to organic query in a single step.", + default=5, + ) + + parser.add_argument( + "--neuron.organic_sampling_mode", + type=str, + help="The mode for sampling miners using organic queries. Options include 'random' for random selection, " + "'top_incentive' for selecting based on highest incentives.", + default="random", + ) + + parser.add_argument( + "--neuron.organic_disabled", + action="store_true", + help="Set this flag to disable organic scoring.", + default=False, + ) + + # TODO: Set organic weight setting enabled by default after Aug 1, 2024. + parser.add_argument( + "--neuron.organic_set_weights_enabled", + action="store_true", + help="Set this flag to enable organic scoring weight setting.", + default=False, + ) + + parser.add_argument( + "--neuron.organic_synth_reward_scale", + type=float, + help="Scale factor for synthetic organic rewards.", + default=0.1, + ) + + parser.add_argument( + "--neuron.organic_reuse_response_disabled", + action="store_true", + help="If set, miner responses will be re-generated during reward generation. " + "The default behavior is to reuse responses.", + default=False, + ) + + parser.add_argument( + "--neuron.organic_timeout", + type=int, + help="Organic query timeout for each call in seconds.", + default=30, + ) + + parser.add_argument( + "--neuron.organic_reference_max_tokens", + type=int, + help="Organic query timeout for each call in seconds.", + default=1024, + ) + + # TODO: Increase sampling rate after after Aug 1, 2024. + parser.add_argument( + "--neuron.organic_trigger_frequency", + type=float, + help="Organic query sampling frequency (seconds or steps value).", + default=120.0, + ) + + parser.add_argument( + "--neuron.organic_trigger_frequency_min", + type=float, + help="Minimum organic query sampling frequency (seconds or steps value).", + default=5.0, + ) + + parser.add_argument( + "--neuron.organic_scaling_factor", + type=float, + help=( + "The scaling factor to adjust the trigger frequency based on the size of the organic queue. " + "A higher value means the trigger frequency adjusts more slowly to the increase of organic queue size." + ), + default=1.0, + ) + + parser.add_argument( + "--neuron.organic_trigger", + type=str, + help="Organic query validation trigger mode (seconds or steps).", + default="seconds", + ) + + parser.add_argument( + "--neuron.organic_whitelist_hotkey", + type=str, + help="Allow request from specific hotkey. Defaults to OTF hotkey.", + # OTF hotkey. + default="5F4tQyWrhfGVcNhoqeiNsR6KjD4wMZ2kfhLj4oHYuyHbZAc3", + ) + + def config(cls): """ diff --git a/prompting/utils/uids.py b/prompting/utils/uids.py index 15f8aceee..2286fb1ed 100644 --- a/prompting/utils/uids.py +++ b/prompting/utils/uids.py @@ -1,7 +1,9 @@ import torch import random import bittensor as bt -from typing import List +from typing import List, Union + +from prompting.base.neuron import BaseNeuron def check_uid_availability( @@ -43,7 +45,7 @@ def check_uid_availability( return True -def get_random_uids(self, k: int, exclude: List[int] = None) -> torch.LongTensor: +def get_random_uids(self: BaseNeuron, k: int, exclude: List[int] = None) -> torch.LongTensor: """Returns k available random uids from the metagraph. Args: k (int): Number of uids to return. @@ -89,3 +91,34 @@ def get_random_uids(self, k: int, exclude: List[int] = None) -> torch.LongTensor return torch.tensor(random.sample(candidate_uids, k)) else: raise ValueError(f"No eligible uids were found. Cannot return {k} uids") + + +def get_top_incentive_uids(self, k: int, vpermit_tao_limit: int) -> torch.LongTensor: + metagraph = self.metagraph + miners_uids = list(map(int, filter(lambda uid: check_uid_availability(metagraph, uid, vpermit_tao_limit), + metagraph.uids))) + + # Builds a dictionary of uids and their corresponding incentives. + all_miners_incentives = { + "miners_uids": miners_uids, + "incentives": list(map(lambda uid: metagraph.I[uid], miners_uids)) + } + + # Zip the uids and their corresponding incentives into a list of tuples. + uid_incentive_pairs = list(zip(all_miners_incentives["miners_uids"], all_miners_incentives["incentives"])) + + # Sort the list of tuples by the incentive value in descending order. + uid_incentive_pairs_sorted = sorted(uid_incentive_pairs, key=lambda x: x[1], reverse=True) + + # Extract the top uids. + top_k_uids = [uid for uid, incentive in uid_incentive_pairs_sorted[:k]] + + return torch.tensor(top_k_uids) + + +def get_uids(self: BaseNeuron, sampling_mode: str, k: int, exclude: List[int] = []) -> torch.LongTensor: + if sampling_mode == "random": + return get_random_uids(self, k=k, exclude=exclude or []) + if sampling_mode == "top_incentive": + vpermit_tao_limit = self.config.neuron.vpermit_tao_limit + return get_top_incentive_uids(self, k=k, vpermit_tao_limit=vpermit_tao_limit) diff --git a/prompting/validator.py b/prompting/validator.py index 030bbd44f..2891120dd 100644 --- a/prompting/validator.py +++ b/prompting/validator.py @@ -1,7 +1,8 @@ import bittensor as bt + +from prompting.base.validator import BaseValidatorNeuron from prompting.forward import forward from prompting.llms import vLLMPipeline -from prompting.base.validator import BaseValidatorNeuron from prompting.rewards import RewardPipeline from prompting.tasks.translate import TranslationPipeline diff --git a/requirements.txt b/requirements.txt index bb2b86b4e..1c93400f9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -25,9 +25,6 @@ wikipedia_sections==2.0.0 vllm==0.4.3 loguru==0.7.2 argostranslate==1.9.6 -python-dotenv -wikipedia_sections -vllm -argostranslate transformers==4.41.2 autoawq==0.2.5 +git+https://github.com/macrocosm-os/organic-scoring.git@main