From 9b334547645fb805465109339c26b977e3795c3d Mon Sep 17 00:00:00 2001 From: p-ferreira Date: Thu, 16 May 2024 19:14:47 +0000 Subject: [PATCH 01/18] clean up legacy prompting synapse --- prompting/protocol.py | 111 +----------------------------------------- 1 file changed, 2 insertions(+), 109 deletions(-) diff --git a/prompting/protocol.py b/prompting/protocol.py index 1367a4f45..5b8ad01c7 100644 --- a/prompting/protocol.py +++ b/prompting/protocol.py @@ -17,116 +17,9 @@ import pydantic import bittensor as bt - from typing import List, AsyncIterator from starlette.responses import StreamingResponse -import pdb - - -class PromptingSynapse(bt.Synapse): - """ - The PromptingSynapse subclass of the Synapse class encapsulates the functionalities related to prompting scenarios. - - It specifies three fields - `roles`, `messages` and `completion` - that define the state of the PromptingSynapse object. - The `roles` and `messages` are read-only fields defined during object initialization, and `completion` is a mutable - field that can be updated as the prompting scenario progresses. - - The Config inner class specifies that assignment validation should occur on this class (validate_assignment = True), - meaning value assignments to the instance fields are checked against their defined types for correctness. - - Attributes: - roles (List[str]): A list of roles in the prompting scenario. This field is both mandatory and immutable. - messages (List[str]): A list of messages in the prompting scenario. This field is both mandatory and immutable. - completion (str): A string that captures completion of the prompt. This field is mutable. - required_hash_fields List[str]: A list of fields that are required for the hash. - - Methods: - deserialize() -> "PromptingSynapse": Returns the instance of the current object. - - - The `PromptingSynapse` class also overrides the `deserialize` method, returning the - instance itself when this method is invoked. Additionally, it provides a `Config` - inner class that enforces the validation of assignments (`validate_assignment = True`). - - Here is an example of how the `PromptingSynapse` class can be used: - - ```python - # Create a PromptingSynapse instance - prompt = PromptingSynapse(roles=["system", "user"], messages=["Hello", "Hi"]) - - # Print the roles and messages - print("Roles:", prompt.roles) - print("Messages:", prompt.messages) - - # Update the completion - model_prompt =... # Use prompt.roles and prompt.messages to generate a prompt - for your LLM as a single string. - prompt.completion = model(model_prompt) - - # Print the completion - print("Completion:", prompt.completion) - ``` - - This will output: - ``` - Roles: ['system', 'user'] - Messages: ['You are a helpful assistant.', 'Hi, what is the meaning of life?'] - Completion: "The meaning of life is 42. Deal with it, human." - ``` - - This example demonstrates how to create an instance of the `PromptingSynapse` class, access the - `roles` and `messages` fields, and update the `completion` field. - """ - - class Config: - """ - Pydantic model configuration class for PromptingSynapse. This class sets validation of attribute assignment as True. - validate_assignment set to True means the pydantic model will validate attribute assignments on the class. - """ - - validate_assignment = True - - def deserialize(self) -> "PromptingSynapse": - """ - Returns the instance of the current PromptingSynapse object. - - This method is intended to be potentially overridden by subclasses for custom deserialization logic. - In the context of the PromptingSynapse class, it simply returns the instance itself. However, for subclasses - inheriting from this class, it might give a custom implementation for deserialization if need be. - - Returns: - PromptingSynapse: The current instance of the PromptingSynapse class. - """ - return self - - roles: List[str] = pydantic.Field( - ..., - title="Roles", - description="A list of roles in the PromptingSynapse scenario. Immuatable.", - allow_mutation=False, - ) - - messages: List[str] = pydantic.Field( - ..., - title="Messages", - description="A list of messages in the PromptingSynapse scenario. Immutable.", - allow_mutation=False, - ) - - completion: str = pydantic.Field( - "", - title="Completion", - description="Completion status of the current PromptingSynapse object. This attribute is mutable and can be updated.", - ) - - required_hash_fields: List[str] = pydantic.Field( - ["messages"], - title="Required Hash Fields", - description="A list of required fields for the hash.", - allow_mutation=False, - ) - class StreamPromptingSynapse(bt.StreamingSynapse): """ @@ -212,9 +105,9 @@ async def process_streaming_response( self.completion = "" async for chunk in response.content.iter_any(): - tokens = chunk.decode("utf-8").split("\n") + tokens = chunk.decode("utf-8") - self.completion = self.completion + "".join([t for t in tokens if t]) + self.completion = self.completion + tokens yield tokens def deserialize(self) -> str: From 5d3e1b99dca28286500c92060f646166824a3795 Mon Sep 17 00:00:00 2001 From: p-ferreira Date: Thu, 16 May 2024 19:15:23 +0000 Subject: [PATCH 02/18] initial adjustments to protocol + chunks logging --- prompting/forward.py | 97 +++++++++++++++++++++++--------------------- 1 file changed, 50 insertions(+), 47 deletions(-) diff --git a/prompting/forward.py b/prompting/forward.py index 170f85254..4ab253d50 100644 --- a/prompting/forward.py +++ b/prompting/forward.py @@ -46,32 +46,44 @@ async def execute_dendrite_call(dendrite_call): responses = await dendrite_call return responses + @dataclass class StreamResult: - synapse: StreamPromptingSynapse = None exception: BaseException = None uid: int = None + accumulated_chunks: List[str] + accumulated_chunks_timings: List[float] + synapse: StreamPromptingSynapse + -async def process_response(uid: int, async_generator: Awaitable): + +async def process_stream(uid: int, async_iterator: Awaitable) -> StreamResult: """Process a single response asynchronously.""" - try: - chunk = None # Initialize chunk with a default value - async for chunk in async_generator: # most important loop, as this is where we acquire the final synapse. + synapse = None # Initialize chunk with a default value + exception = None + accumulated_chunks = [] + accumulated_chunks_timings = [] + start_time = time.time() + + try: + async for chunk in async_iterator: # most important loop, as this is where we acquire the final synapse. + accumulated_chunks.append(chunk) + accumulated_chunks_timings.append(time.time() - start_time) + bt.logging.debug(f"\nchunk for uid {uid}: {chunk}") - if chunk is not None: - synapse = chunk # last object yielded is the synapse itself with completion filled - - # Assuming chunk holds the last value yielded which should be a synapse - if isinstance(synapse, StreamPromptingSynapse): - return synapse - - bt.logging.debug( - f"Synapse is not StreamPromptingSynapse. Miner uid {uid} completion set to '' " - ) - except Exception as e: - # bt.logging.error(f"Error in generating reference or handling responses: {e}", exc_info=True) + # Assuming last chunk of async_iterator holds the last value yielded as a StreamingSynapse + synapse = chunk + if synapse is None or not isinstance(synapse, StreamPromptingSynapse): + raise ValueError( + f"Something went wrong with miner uid {uid}, Synapse is not StreamPromptingSynapse." + ) + + accumulated_chunks.append(synapse.completion) + accumulated_chunks_timings.append(time.time() - start_time) + except Exception as e: + exception = e traceback_details = traceback.format_exc() bt.logging.error( f"Error in generating reference or handling responses for uid {uid}: {e}\n{traceback_details}" @@ -81,11 +93,19 @@ async def process_response(uid: int, async_generator: Awaitable): roles=["user"], messages=["failure"], completion="" ) - return failed_synapse + synapse = failed_synapse + finally: + return StreamResult( + accumulated_chunks=accumulated_chunks, + accumulated_chunks_timings=accumulated_chunks_timings, + synapse=synapse, + uid=uid, + exception=exception + ) @async_log -async def handle_response(responses: Dict[int, Awaitable]) -> List[StreamResult]: +async def handle_response(stream_results: Dict[int, Awaitable]) -> List[StreamResult]: """The handle_response function is responsible for creating asyncio tasks around acquiring streamed miner chunks and processing them asynchronously. It then pairs the results with their original UIDs and returns a list of StreamResults. @@ -99,36 +119,14 @@ async def handle_response(responses: Dict[int, Awaitable]) -> List[StreamResult] List[StreamResult]: DataClass containing the synapse, exception, and uid """ tasks_with_uid = [ - (uid, responses[uid]) for uid, _ in responses.items() + (uid, stream_results[uid]) for uid, _ in stream_results.items() ] # Pair UIDs with their tasks # Start tasks, preserving order and their associated UIDs - tasks = [process_response(uid, resp) for uid, resp in tasks_with_uid] - - results = await asyncio.gather(*tasks, return_exceptions=True) - - mapped_results = [] - # Pair each result with its original uid - for (uid, _), result in zip(tasks_with_uid, results): - # If the result is a StreamPromptingSynapse, the response was successful and the stream result is added without exceptions - if isinstance(result, StreamPromptingSynapse): - mapped_results.append(StreamResult(synapse=result, uid=uid)) - - # If the result is an exception, the response was unsuccessful and the stream result is added with the exception and an empty synapse - elif isinstance(result, BaseException): - failed_synapse = StreamPromptingSynapse( - roles=["user"], messages=["failure"], completion="" - ) - mapped_results.append( - StreamResult(synapse=failed_synapse, exception=result, uid=uid) - ) - - # If the result is neither an error or a StreamSynapse, log the error and raise a ValueError - else: - bt.logging.error(f"Unexpected result type for UID {uid}: {result}") - raise ValueError(f"Unexpected result type for UID {uid}: {result}") - - return mapped_results + process_stream_tasks = [process_stream(uid, resp) for uid, resp in tasks_with_uid] + stream_results = await asyncio.gather(*process_stream_tasks, return_exceptions=True) + + return stream_results @async_log @@ -208,7 +206,7 @@ async def run_step( # Prepare the task for handling stream responses handle_stream_responses_task = asyncio.create_task( - handle_response(responses=dict(zip(uids_cpu, streams_responses))) + handle_response(stream_results=dict(zip(uids_cpu, streams_responses))) ) if not agent.task.static_reference: @@ -254,6 +252,9 @@ async def run_step( serialize_exception_to_string(stream_result.exception) for stream_result in stream_results ] + stream_results_all_chunks = [stream_result.accumulated_chunks for stream_result in stream_results] + stream_results_all_chunks_timings = [stream_result.accumulated_chunks_timings for stream_result in stream_results] + # Log the step event. event = { "best": best_response, @@ -262,6 +263,8 @@ async def run_step( "step_time": time.time() - start_time, "stream_results_uids": stream_results_uids, "stream_results_exceptions": stream_results_exceptions, + "stream_results_all_chunks": stream_results_all_chunks, + "stream_results_all_chunks_timings": stream_results_all_chunks_timings, **agent.__state_dict__(full=self.config.neuron.log_full), **reward_result.__state_dict__(full=self.config.neuron.log_full), **response_event.__state_dict__(), From b6b7e22fe6d3eb88fada290e0dc3db8aa71d7cdd Mon Sep 17 00:00:00 2001 From: p-ferreira Date: Thu, 16 May 2024 20:11:28 +0000 Subject: [PATCH 03/18] fix chunk processing + mock --- prompting/forward.py | 17 +++++++++-------- prompting/mock.py | 2 +- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/prompting/forward.py b/prompting/forward.py index 4ab253d50..34de0661d 100644 --- a/prompting/forward.py +++ b/prompting/forward.py @@ -51,9 +51,9 @@ async def execute_dendrite_call(dendrite_call): class StreamResult: exception: BaseException = None uid: int = None - accumulated_chunks: List[str] - accumulated_chunks_timings: List[float] - synapse: StreamPromptingSynapse + accumulated_chunks: List[str] = None + accumulated_chunks_timings: List[float] = None + synapse: StreamPromptingSynapse = None @@ -67,11 +67,12 @@ async def process_stream(uid: int, async_iterator: Awaitable) -> StreamResult: start_time = time.time() try: - async for chunk in async_iterator: # most important loop, as this is where we acquire the final synapse. - accumulated_chunks.append(chunk) - accumulated_chunks_timings.append(time.time() - start_time) - - bt.logging.debug(f"\nchunk for uid {uid}: {chunk}") + async for chunk in async_iterator: # most important loop, as this is where we acquire the final synapse. + if isinstance(chunk, str): + accumulated_chunks.append(chunk) + accumulated_chunks_timings.append(time.time() - start_time) + + bt.logging.debug(f"\nchunk for uid {uid}: {chunk}") # Assuming last chunk of async_iterator holds the last value yielded as a StreamingSynapse synapse = chunk diff --git a/prompting/mock.py b/prompting/mock.py index e6058ddff..e8fd32193 100644 --- a/prompting/mock.py +++ b/prompting/mock.py @@ -3,7 +3,7 @@ import asyncio import random import bittensor as bt -from prompting.protocol import StreamPromptingSynapse, PromptingSynapse +from prompting.protocol import StreamPromptingSynapse from functools import partial from typing import Dict, List, Union, AsyncGenerator, Any, Iterator From f3e5466e8214b05a4869bcb5ce288fb9a568d849 Mon Sep 17 00:00:00 2001 From: p-ferreira Date: Thu, 16 May 2024 20:38:36 +0000 Subject: [PATCH 04/18] adjust dendrite response --- prompting/dendrite.py | 39 ++++++++++++++++++++++++++++++++------- prompting/forward.py | 39 +++++++-------------------------------- 2 files changed, 39 insertions(+), 39 deletions(-) diff --git a/prompting/dendrite.py b/prompting/dendrite.py index a9df0aa8c..12cdac562 100644 --- a/prompting/dendrite.py +++ b/prompting/dendrite.py @@ -1,19 +1,32 @@ import torch -import bittensor as bt from typing import List +from dataclasses import dataclass +from prompting.protocol import StreamPromptingSynapse +from prompting.utils.misc import serialize_exception_to_string + + +@dataclass +class SynapseStreamResult: + exception: BaseException = None + uid: int = None + accumulated_chunks: List[str] = None + accumulated_chunks_timings: List[float] = None + synapse: StreamPromptingSynapse = None class DendriteResponseEvent: def __init__( - self, responses: List[bt.Synapse], uids: torch.LongTensor, timeout: float + self, stream_results: SynapseStreamResult, uids: torch.LongTensor, timeout: float ): self.uids = uids self.completions = [] self.status_messages = [] self.status_codes = [] self.timings = [] + + synapses = [stream_result.synapse for stream_result in stream_results] - for synapse in responses: + for synapse in synapses: self.completions.append(synapse.completion) self.status_messages.append(synapse.dendrite.status_message) @@ -32,14 +45,22 @@ def __init__( else: self.timings.append(0) # situation where miner is not alive - self.completions = [synapse.completion for synapse in responses] + self.completions = [synapse.completion for synapse in synapses] self.timings = [ - synapse.dendrite.process_time or timeout for synapse in responses + synapse.dendrite.process_time or timeout for synapse in synapses ] self.status_messages = [ - synapse.dendrite.status_message for synapse in responses + synapse.dendrite.status_message for synapse in synapses ] - self.status_codes = [synapse.dendrite.status_code for synapse in responses] + self.status_codes = [synapse.dendrite.status_code for synapse in synapses] + + self.stream_results_uids = [stream_result.uid for stream_result in stream_results] + self.stream_results_exceptions = [ + serialize_exception_to_string(stream_result.exception) + for stream_result in stream_results + ] + self.stream_results_all_chunks = [stream_result.accumulated_chunks for stream_result in stream_results] + self.stream_results_all_chunks_timings = [stream_result.accumulated_chunks_timings for stream_result in stream_results] def __state_dict__(self): return { @@ -48,6 +69,10 @@ def __state_dict__(self): "timings": self.timings, "status_messages": self.status_messages, "status_codes": self.status_codes, + "stream_results_uids": self.stream_results_uids, + "stream_results_exceptions": self.stream_results_exceptions, + "stream_results_all_chunks": self.stream_results_all_chunks, + "stream_results_all_chunks_timings": self.stream_results_all_chunks_timings, } def __repr__(self): diff --git a/prompting/forward.py b/prompting/forward.py index 34de0661d..40ec9465b 100644 --- a/prompting/forward.py +++ b/prompting/forward.py @@ -25,7 +25,7 @@ import bittensor as bt from typing import List, Dict, Awaitable from prompting.agent import HumanAgent -from prompting.dendrite import DendriteResponseEvent +from prompting.dendrite import DendriteResponseEvent, SynapseStreamResult from prompting.conversation import create_task from prompting.protocol import StreamPromptingSynapse from prompting.rewards import RewardResult @@ -46,19 +46,8 @@ async def execute_dendrite_call(dendrite_call): responses = await dendrite_call return responses - -@dataclass -class StreamResult: - exception: BaseException = None - uid: int = None - accumulated_chunks: List[str] = None - accumulated_chunks_timings: List[float] = None - synapse: StreamPromptingSynapse = None - - - -async def process_stream(uid: int, async_iterator: Awaitable) -> StreamResult: +async def process_stream(uid: int, async_iterator: Awaitable) -> SynapseStreamResult: """Process a single response asynchronously.""" synapse = None # Initialize chunk with a default value exception = None @@ -96,7 +85,7 @@ async def process_stream(uid: int, async_iterator: Awaitable) -> StreamResult: synapse = failed_synapse finally: - return StreamResult( + return SynapseStreamResult( accumulated_chunks=accumulated_chunks, accumulated_chunks_timings=accumulated_chunks_timings, synapse=synapse, @@ -106,7 +95,7 @@ async def process_stream(uid: int, async_iterator: Awaitable) -> StreamResult: @async_log -async def handle_response(stream_results: Dict[int, Awaitable]) -> List[StreamResult]: +async def handle_response(stream_results: Dict[int, Awaitable]) -> List[SynapseStreamResult]: """The handle_response function is responsible for creating asyncio tasks around acquiring streamed miner chunks and processing them asynchronously. It then pairs the results with their original UIDs and returns a list of StreamResults. @@ -139,7 +128,7 @@ async def generate_reference(agent: HumanAgent): return result -def log_stream_results(stream_results: List[StreamResult]): +def log_stream_results(stream_results: List[SynapseStreamResult]): failed_responses = [ response for response in stream_results if response.exception is not None ] @@ -220,11 +209,9 @@ async def run_step( log_stream_results(stream_results) - all_synapses_results = [stream_result.synapse for stream_result in stream_results] - # Encapsulate the responses in a response event (dataclass) response_event = DendriteResponseEvent( - responses=all_synapses_results, uids=uids, timeout=timeout + stream_results=stream_results, uids=uids, timeout=timeout ) bt.logging.info(f"Created DendriteResponseEvent:\n {response_event}") @@ -247,25 +234,13 @@ async def run_step( ) self.update_scores(reward_result.rewards, uids) - - stream_results_uids = [stream_result.uid for stream_result in stream_results] - stream_results_exceptions = [ - serialize_exception_to_string(stream_result.exception) - for stream_result in stream_results - ] - stream_results_all_chunks = [stream_result.accumulated_chunks for stream_result in stream_results] - stream_results_all_chunks_timings = [stream_result.accumulated_chunks_timings for stream_result in stream_results] # Log the step event. event = { "best": best_response, "block": self.block, "step": self.step, - "step_time": time.time() - start_time, - "stream_results_uids": stream_results_uids, - "stream_results_exceptions": stream_results_exceptions, - "stream_results_all_chunks": stream_results_all_chunks, - "stream_results_all_chunks_timings": stream_results_all_chunks_timings, + "step_time": time.time() - start_time, **agent.__state_dict__(full=self.config.neuron.log_full), **reward_result.__state_dict__(full=self.config.neuron.log_full), **response_event.__state_dict__(), From 36344f080980b2b283ed55931d9a748f58be59ad Mon Sep 17 00:00:00 2001 From: p-ferreira Date: Thu, 16 May 2024 21:44:16 +0000 Subject: [PATCH 05/18] refactors base pipeline apply function --- prompting/rewards/__init__.py | 1 + prompting/rewards/code_diff.py | 5 +++-- prompting/rewards/date.py | 4 +++- prompting/rewards/float_diff.py | 5 +++-- prompting/rewards/ordinal.py | 6 ++++-- prompting/rewards/pipeline.py | 2 ++ prompting/rewards/relevance.py | 6 ++++-- prompting/rewards/reward.py | 8 ++++---- prompting/rewards/rouge.py | 5 +++-- prompting/rewards/streaming.py | 22 ++++++++++++++++++++++ 10 files changed, 49 insertions(+), 15 deletions(-) create mode 100644 prompting/rewards/streaming.py diff --git a/prompting/rewards/__init__.py b/prompting/rewards/__init__.py index 51cab779a..a44b50bfd 100644 --- a/prompting/rewards/__init__.py +++ b/prompting/rewards/__init__.py @@ -11,4 +11,5 @@ from .float_diff import FloatDiffModel from .date import DateRewardModel from .ordinal import OrdinalRewardModel +from .streaming import StreamingRewardModel from .pipeline import RewardPipeline, REWARD_MODELS diff --git a/prompting/rewards/code_diff.py b/prompting/rewards/code_diff.py index 356617902..df9e55cf4 100644 --- a/prompting/rewards/code_diff.py +++ b/prompting/rewards/code_diff.py @@ -6,6 +6,7 @@ BatchRewardOutput, RewardModelTypeEnum, ) +from prompting.dendrite import DendriteResponseEvent import time @@ -27,13 +28,13 @@ def unified_diff(self, reference, completion): def seq_match(self, reference, completion): return difflib.SequenceMatcher(None, reference, completion).ratio() - def reward(self, reference: str, completions: List[str]) -> BatchRewardOutput: + def reward(self, reference: str, response_event: DendriteResponseEvent) -> BatchRewardOutput: """Get the score between two strings. lines: If True, return a unified diff. If False, return a ratio. """ - rewards = [] timings = [] + completions: List[str] = response_event.completions if self.lines: for completion in completions: diff --git a/prompting/rewards/date.py b/prompting/rewards/date.py index 463eaf880..cf95537c7 100644 --- a/prompting/rewards/date.py +++ b/prompting/rewards/date.py @@ -5,6 +5,7 @@ import numpy as np from typing import List from prompting.rewards import BaseRewardModel, BatchRewardOutput, RewardModelTypeEnum +from prompting.dendrite import DendriteResponseEvent import bittensor as bt @@ -85,7 +86,7 @@ def date_score(self, reference: str, completion: str) -> float: score = 0 return score - def reward(self, reference: str, completions: List[str]) -> BatchRewardOutput: + def reward(self, reference: str, response_event: DendriteResponseEvent) -> BatchRewardOutput: """Compute difference scores given a completion and reference pair. Args: @@ -95,6 +96,7 @@ def reward(self, reference: str, completions: List[str]) -> BatchRewardOutput: Returns: BatchRewardOutput: A BatchRewardOutput object containing the rewards and timings. """ + completions: List[str] = response_event.completions rewards = [] timings = [] diff --git a/prompting/rewards/float_diff.py b/prompting/rewards/float_diff.py index 7d14c65cd..387c2a8a8 100644 --- a/prompting/rewards/float_diff.py +++ b/prompting/rewards/float_diff.py @@ -3,7 +3,7 @@ from typing import List from sympy.parsing.sympy_parser import parse_expr from prompting.rewards import BaseRewardModel, BatchRewardOutput, RewardModelTypeEnum - +from prompting.dendrite import DendriteResponseEvent class FloatDiffModel(BaseRewardModel): @property @@ -52,10 +52,11 @@ def math_score(reference: str, completion: str) -> float: except Exception: return 0.0 - def reward(self, reference: str, completions: List[str]) -> BatchRewardOutput: + def reward(self, reference: str, response_event: DendriteResponseEvent) -> BatchRewardOutput: """Compute difference scores given a completion and reference pair.""" rewards = [] timings = [] + completions: List[str] = response_event.completions for completion in completions: t0 = time.time() diff --git a/prompting/rewards/ordinal.py b/prompting/rewards/ordinal.py index 34f749621..bf2801a79 100644 --- a/prompting/rewards/ordinal.py +++ b/prompting/rewards/ordinal.py @@ -2,7 +2,7 @@ import torch from typing import List from prompting.rewards import BaseRewardModel, BatchRewardOutput - +from prompting.dendrite import DendriteResponseEvent class OrdinalRewardModel(BaseRewardModel): @property @@ -18,11 +18,13 @@ def __init__(self, **kwargs): "negative", ] - def reward(self, reference: str, completions: List[str]) -> BatchRewardOutput: + def reward(self, reference: str, response_event: DendriteResponseEvent) -> BatchRewardOutput: """Compute difference scores given a completion and reference pair.""" rewards = [] timings = [] classes = self.sentiments + completions: List[str] = response_event.completions + for completion in completions: t0 = time.time() completion = completion.lower() diff --git a/prompting/rewards/pipeline.py b/prompting/rewards/pipeline.py index 244ec0b3e..a05ea137d 100644 --- a/prompting/rewards/pipeline.py +++ b/prompting/rewards/pipeline.py @@ -9,6 +9,7 @@ FloatDiffModel, DateRewardModel, OrdinalRewardModel, + StreamingRewardModel ) REWARD_MODELS = { @@ -18,6 +19,7 @@ "float_diff": FloatDiffModel, "date": DateRewardModel, "ordinal": OrdinalRewardModel, + "streaming": StreamingRewardModel, } diff --git a/prompting/rewards/relevance.py b/prompting/rewards/relevance.py index b754ae330..e063a582c 100644 --- a/prompting/rewards/relevance.py +++ b/prompting/rewards/relevance.py @@ -6,8 +6,9 @@ from prompting.rewards import ( BaseRewardModel, BatchRewardOutput, - RewardModelTypeEnum, ) +from prompting.dendrite import DendriteResponseEvent + class RelevanceRewardModel(BaseRewardModel): @@ -25,7 +26,7 @@ def __init__(self, threshold=None, device=None, pooling_strategy="cls"): # This line is necessary to pass the model to the device defined at its initialization self.model = self.model.cuda() - def reward(self, reference: str, completions: List[str]) -> BatchRewardOutput: + def reward(self, reference: str, response_event: DendriteResponseEvent) -> BatchRewardOutput: """Calculates the cosine similarity between sentence embeddings of the reference and completions. We subtract a baseline score which is what an empty string would get (a failed completion). This is usually around 0.35 We also clip the rewards between 0 and 1. The maximum effective score is around 0.65 @@ -33,6 +34,7 @@ def reward(self, reference: str, completions: List[str]) -> BatchRewardOutput: reference_embedding = self.model.encode(reference, to_numpy=False) rewards = [] timings = [] + completions: List[str] = response_event.completions # baseline is the cosine similarity between the reference and an empty string baseline = cosine_similarity( reference_embedding.reshape(1, -1), diff --git a/prompting/rewards/reward.py b/prompting/rewards/reward.py index bf5d2bc0b..7a1f33832 100644 --- a/prompting/rewards/reward.py +++ b/prompting/rewards/reward.py @@ -5,7 +5,7 @@ from abc import ABC, abstractmethod from dataclasses import dataclass from enum import Enum - +from prompting.dendrite import DendriteResponseEvent class RewardModelTypeEnum(Enum): WEIGHTED_REWARD = "reward" @@ -151,12 +151,12 @@ def __init__(self, **kwargs): pass @abstractmethod - def reward(self, reference: str, completions: List[str]) -> BatchRewardOutput: + def reward(self, reference: str, response_event: DendriteResponseEvent) -> BatchRewardOutput: pass - def apply(self, reference: str, response_event, reward_type) -> RewardEvent: + def apply(self, reference: str, response_event: DendriteResponseEvent, reward_type: RewardModelTypeEnum) -> RewardEvent: t0 = time.time() - batch_rewards_output = self.reward(reference, response_event.completions) + batch_rewards_output = self.reward(reference, response_event) batch_rewards_time = time.time() - t0 return RewardEvent( diff --git a/prompting/rewards/rouge.py b/prompting/rewards/rouge.py index c06d236f8..cd56bec0a 100644 --- a/prompting/rewards/rouge.py +++ b/prompting/rewards/rouge.py @@ -5,8 +5,8 @@ from prompting.rewards import ( BaseRewardModel, BatchRewardOutput, - RewardModelTypeEnum, ) +from prompting.dendrite import DendriteResponseEvent class RougeRewardModel(BaseRewardModel): @@ -28,10 +28,11 @@ def rouge_score(self, reference, completion): self.ngram ][self.metric] - def reward(self, reference: str, completions: List[str]) -> BatchRewardOutput: + def reward(self, reference: str, response_event: DendriteResponseEvent) -> BatchRewardOutput: """Compute ROUGE scores given a completion and reference pair.""" rewards = [] timings = [] + completions: List[str] = response_event.completions for completion in completions: t0 = time.time() diff --git a/prompting/rewards/streaming.py b/prompting/rewards/streaming.py new file mode 100644 index 000000000..380a03c41 --- /dev/null +++ b/prompting/rewards/streaming.py @@ -0,0 +1,22 @@ +import time +import torch +from typing import List +from prompting.rewards import ( + BaseRewardModel, + BatchRewardOutput, +) +from prompting.dendrite import DendriteResponseEvent + + +class StreamingRewardModel(BaseRewardModel): + @property + def name(self) -> str: + return "streaming" + + def __init__(self, max_tokens_per_chunk:int, **kwargs): + super().__init__() + self.max_tokens_per_chunk = max_tokens_per_chunk + + def reward(self, _: str, response_event: DendriteResponseEvent) -> BatchRewardOutput: + """Compute difference scores given a completion and reference pair.""" + pass From b2693652ba8f20e1fb33497fd8c6b8b3be9e0748 Mon Sep 17 00:00:00 2001 From: p-ferreira Date: Fri, 17 May 2024 18:12:12 +0000 Subject: [PATCH 06/18] adds tokenizer to flow + dendrite refactor --- prompting/dendrite.py | 51 +++++++++++++++++++------------------------ prompting/forward.py | 32 ++++++++++++++------------- 2 files changed, 39 insertions(+), 44 deletions(-) diff --git a/prompting/dendrite.py b/prompting/dendrite.py index 12cdac562..824e4c948 100644 --- a/prompting/dendrite.py +++ b/prompting/dendrite.py @@ -11,6 +11,7 @@ class SynapseStreamResult: uid: int = None accumulated_chunks: List[str] = None accumulated_chunks_timings: List[float] = None + tokens_per_chunk: List[int] = None synapse: StreamPromptingSynapse = None @@ -23,44 +24,36 @@ def __init__( self.status_messages = [] self.status_codes = [] self.timings = [] + self.stream_results_uids = [] + self.stream_results_exceptions = [] + self.stream_results_all_chunks = [] + self.stream_results_all_chunks_timings = [] + self.stream_results_all_tokens_per_chunk = [] - synapses = [stream_result.synapse for stream_result in stream_results] + for stream_result in stream_results: + synapse = stream_result.synapse - for synapse in synapses: self.completions.append(synapse.completion) self.status_messages.append(synapse.dendrite.status_message) + status_code = synapse.dendrite.status_code - if len(synapse.completion) == 0 and synapse.dendrite.status_code == 200: - synapse.dendrite.status_code = 204 + if len(synapse.completion) == 0 and status_code == 200: + status_code = 204 - self.status_codes.append(synapse.dendrite.status_code) - - if (synapse.dendrite.process_time) and ( - synapse.dendrite.status_code == 200 - or synapse.dendrite.status_code == 204 - ): - self.timings.append(synapse.dendrite.process_time) - elif synapse.dendrite.status_code == 408: + self.status_codes.append(status_code) + process_time = synapse.dendrite.process_time or 0 + if status_code == 200 or status_code == 204: + self.timings.append(process_time) + elif status_code == 408: self.timings.append(timeout) else: - self.timings.append(0) # situation where miner is not alive + self.timings.append(0) - self.completions = [synapse.completion for synapse in synapses] - self.timings = [ - synapse.dendrite.process_time or timeout for synapse in synapses - ] - self.status_messages = [ - synapse.dendrite.status_message for synapse in synapses - ] - self.status_codes = [synapse.dendrite.status_code for synapse in synapses] - - self.stream_results_uids = [stream_result.uid for stream_result in stream_results] - self.stream_results_exceptions = [ - serialize_exception_to_string(stream_result.exception) - for stream_result in stream_results - ] - self.stream_results_all_chunks = [stream_result.accumulated_chunks for stream_result in stream_results] - self.stream_results_all_chunks_timings = [stream_result.accumulated_chunks_timings for stream_result in stream_results] + self.stream_results_uids.append(stream_result.uid) + self.stream_results_exceptions.append(serialize_exception_to_string(stream_result.exception)) + self.stream_results_all_chunks.append(stream_result.accumulated_chunks) + self.stream_results_all_chunks_timings.append(stream_result.accumulated_chunks_timings) + self.stream_results_all_tokens_per_chunk.append(stream_result.tokens_per_chunk) def __state_dict__(self): return { diff --git a/prompting/forward.py b/prompting/forward.py index 40ec9465b..ed4affb41 100644 --- a/prompting/forward.py +++ b/prompting/forward.py @@ -13,8 +13,7 @@ # THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL # THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -# DEALINGS IN -# THE SOFTWARE. +# DEALINGS IN THE SOFTWARE. import sys import time @@ -33,7 +32,7 @@ from prompting.utils.uids import get_random_uids from prompting.utils.logging import log_event from prompting.utils.misc import async_log, serialize_exception_to_string -from dataclasses import dataclass +from transformers import PreTrainedTokenizerFast as Tokenizer @async_log async def generate_reference(agent): @@ -47,13 +46,14 @@ async def execute_dendrite_call(dendrite_call): return responses -async def process_stream(uid: int, async_iterator: Awaitable) -> SynapseStreamResult: +async def process_stream(uid: int, async_iterator: Awaitable, tokenizer: Tokenizer) -> SynapseStreamResult: """Process a single response asynchronously.""" synapse = None # Initialize chunk with a default value exception = None accumulated_chunks = [] - accumulated_chunks_timings = [] - start_time = time.time() + accumulated_chunks_timings = [] + accumulated_tokens_per_chunk = [] + start_time = time.time() try: async for chunk in async_iterator: # most important loop, as this is where we acquire the final synapse. @@ -61,6 +61,9 @@ async def process_stream(uid: int, async_iterator: Awaitable) -> SynapseStreamRe accumulated_chunks.append(chunk) accumulated_chunks_timings.append(time.time() - start_time) + tokens_in_chunk = len(tokenizer.tokenize(chunk)) + accumulated_tokens_per_chunk.append(tokens_in_chunk) + bt.logging.debug(f"\nchunk for uid {uid}: {chunk}") # Assuming last chunk of async_iterator holds the last value yielded as a StreamingSynapse @@ -95,7 +98,7 @@ async def process_stream(uid: int, async_iterator: Awaitable) -> SynapseStreamRe @async_log -async def handle_response(stream_results: Dict[int, Awaitable]) -> List[SynapseStreamResult]: +async def handle_response(stream_results_dict: Dict[int, Awaitable], tokenizer: Tokenizer) -> List[SynapseStreamResult]: """The handle_response function is responsible for creating asyncio tasks around acquiring streamed miner chunks and processing them asynchronously. It then pairs the results with their original UIDs and returns a list of StreamResults. @@ -109,14 +112,14 @@ async def handle_response(stream_results: Dict[int, Awaitable]) -> List[SynapseS List[StreamResult]: DataClass containing the synapse, exception, and uid """ tasks_with_uid = [ - (uid, stream_results[uid]) for uid, _ in stream_results.items() + (uid, stream_results_dict[uid]) for uid, _ in stream_results_dict.items() ] # Pair UIDs with their tasks # Start tasks, preserving order and their associated UIDs - process_stream_tasks = [process_stream(uid, resp) for uid, resp in tasks_with_uid] - stream_results = await asyncio.gather(*process_stream_tasks, return_exceptions=True) + process_stream_tasks = [process_stream(uid, resp, tokenizer) for uid, resp in tasks_with_uid] + processed_stream_results = await asyncio.gather(*process_stream_tasks, return_exceptions=True) - return stream_results + return processed_stream_results @async_log @@ -174,7 +177,6 @@ async def run_step( timeout (float): The timeout for the queries. exclude (list, optional): The list of uids to exclude from the query. Defaults to []. """ - bt.logging.debug("run_step", agent.task.name) # Record event start time. @@ -195,9 +197,9 @@ async def run_step( ) # Prepare the task for handling stream responses - handle_stream_responses_task = asyncio.create_task( - handle_response(stream_results=dict(zip(uids_cpu, streams_responses))) - ) + stream_results_dict = dict(zip(uids_cpu, streams_responses)) + tokenizer = self.llm_pipeline.tokenizer + handle_stream_responses_task = asyncio.create_task(handle_response(stream_results_dict, tokenizer)) if not agent.task.static_reference: reference_generation_task = generate_reference(agent) From 7227814ba8b9140d06337780d8f224e90162c3ed Mon Sep 17 00:00:00 2001 From: p-ferreira Date: Fri, 17 May 2024 18:12:33 +0000 Subject: [PATCH 07/18] implements streaming reward function --- prompting/llms/base_llm.py | 1 + prompting/llms/vllm_llm.py | 1 + prompting/rewards/streaming.py | 37 +++++++++++++++++++++++++++++----- 3 files changed, 34 insertions(+), 5 deletions(-) diff --git a/prompting/llms/base_llm.py b/prompting/llms/base_llm.py index 0ed7b1398..e5df5d319 100644 --- a/prompting/llms/base_llm.py +++ b/prompting/llms/base_llm.py @@ -22,6 +22,7 @@ def __init__( self.model_kwargs = model_kwargs self.messages = [] self.times = [] + self.tokenizer = None def query( self, diff --git a/prompting/llms/vllm_llm.py b/prompting/llms/vllm_llm.py index 2c88ba37e..d054e98f2 100644 --- a/prompting/llms/vllm_llm.py +++ b/prompting/llms/vllm_llm.py @@ -73,6 +73,7 @@ def __init__(self, model_id: str, llm_max_allowed_memory_in_gb:int, device: str self.llm = load_vllm_pipeline(model_id, device, gpus, llm_max_allowed_memory_in_gb, mock) self.mock = mock self.gpus = gpus + self.tokenizer = self.llm.llm_engine.tokenizer def __call__(self, composed_prompt: str, **model_kwargs: Dict) -> str: if self.mock: diff --git a/prompting/rewards/streaming.py b/prompting/rewards/streaming.py index 380a03c41..44bb1c23f 100644 --- a/prompting/rewards/streaming.py +++ b/prompting/rewards/streaming.py @@ -1,13 +1,12 @@ import time import torch -from typing import List +from prompting.dendrite import DendriteResponseEvent from prompting.rewards import ( BaseRewardModel, BatchRewardOutput, ) -from prompting.dendrite import DendriteResponseEvent - +## TODO: Create unit tests class StreamingRewardModel(BaseRewardModel): @property def name(self) -> str: @@ -16,7 +15,35 @@ def name(self) -> str: def __init__(self, max_tokens_per_chunk:int, **kwargs): super().__init__() self.max_tokens_per_chunk = max_tokens_per_chunk + self.max_penalty = 0.2 def reward(self, _: str, response_event: DendriteResponseEvent) -> BatchRewardOutput: - """Compute difference scores given a completion and reference pair.""" - pass + """Compute difference scores given a completion and reference pair.""" + rewards = [] + timings = [] + penalty_per_exceeding_chunk = 0.01 + + # Iterate through each chunk of response tokens + for response_tokens_per_chunks in response_event.stream_results_all_tokens_per_chunk: + start_time = time.time() + + # Calculate the accumulated penalty for the current chunk + accumulated_penalty = sum( + penalty_per_exceeding_chunk if tokens_per_chunk > self.max_tokens_per_chunk else 0 + for tokens_per_chunk in response_tokens_per_chunks + ) + + # Record the timing for this computation + timings.append(time.time() - start_time) + + # Calculate the reward and ensure it does not go below max_penalty + rewards.append(max(1 - accumulated_penalty, self.max_penalty)) + + # Create the output object with rewards, timings, and extra information + output = BatchRewardOutput( + rewards=torch.FloatTensor(rewards), + timings=torch.FloatTensor(timings), + extra_info={"type": "streaming"} + ) + return output + From a47bdf124c5d8a880871ee31f4b6958d14d641b4 Mon Sep 17 00:00:00 2001 From: p-ferreira Date: Fri, 17 May 2024 20:35:56 +0000 Subject: [PATCH 08/18] adds tokens_per_chunk in forward flow + adds global penalties --- prompting/dendrite.py | 1 + prompting/forward.py | 5 ++++- prompting/rewards/pipeline.py | 1 + prompting/rewards/reward.py | 9 ++++++++- prompting/rewards/streaming.py | 8 ++++---- prompting/tasks/task.py | 4 ++++ 6 files changed, 22 insertions(+), 6 deletions(-) diff --git a/prompting/dendrite.py b/prompting/dendrite.py index 824e4c948..7b9981a94 100644 --- a/prompting/dendrite.py +++ b/prompting/dendrite.py @@ -66,6 +66,7 @@ def __state_dict__(self): "stream_results_exceptions": self.stream_results_exceptions, "stream_results_all_chunks": self.stream_results_all_chunks, "stream_results_all_chunks_timings": self.stream_results_all_chunks_timings, + "stream_results_all_tokens_per_chunk": self.stream_results_all_tokens_per_chunk, } def __repr__(self): diff --git a/prompting/forward.py b/prompting/forward.py index ed4affb41..6189bc552 100644 --- a/prompting/forward.py +++ b/prompting/forward.py @@ -74,7 +74,9 @@ async def process_stream(uid: int, async_iterator: Awaitable, tokenizer: Tokeniz ) accumulated_chunks.append(synapse.completion) - accumulated_chunks_timings.append(time.time() - start_time) + accumulated_chunks_timings.append(time.time() - start_time) + tokens_in_completion = len(tokenizer.tokenize(synapse.completion)) + accumulated_tokens_per_chunk.append(tokens_in_completion) except Exception as e: exception = e traceback_details = traceback.format_exc() @@ -91,6 +93,7 @@ async def process_stream(uid: int, async_iterator: Awaitable, tokenizer: Tokeniz return SynapseStreamResult( accumulated_chunks=accumulated_chunks, accumulated_chunks_timings=accumulated_chunks_timings, + tokens_per_chunk=accumulated_tokens_per_chunk, synapse=synapse, uid=uid, exception=exception diff --git a/prompting/rewards/pipeline.py b/prompting/rewards/pipeline.py index a05ea137d..2b126c401 100644 --- a/prompting/rewards/pipeline.py +++ b/prompting/rewards/pipeline.py @@ -92,6 +92,7 @@ def load_reward_pipeline(self): for task in self.selected_tasks: active_reward_models += TASKS[task].reward_definition active_reward_models += TASKS[task].penalty_definition + active_reward_models += TASKS[task].global_penalty_definition # Instantiate only the required reward models reward_models = {} diff --git a/prompting/rewards/reward.py b/prompting/rewards/reward.py index 7a1f33832..beed5ad9f 100644 --- a/prompting/rewards/reward.py +++ b/prompting/rewards/reward.py @@ -52,16 +52,23 @@ def __init__(self, reward_pipeline, agent, response_event, device): self.device = device self.task_rewards = agent.task.reward_definition self.task_penalties = agent.task.penalty_definition + self.global_task_penalties = agent.task.global_penalty_definition self.reward_events = self.reward_responses( reference=agent.task.reference, models=self.task_rewards, reward_type=RewardModelTypeEnum.WEIGHTED_REWARD, ) - self.penalty_events = self.reward_responses( + task_penalties= self.reward_responses( reference=agent.challenge, models=self.task_penalties, reward_type=RewardModelTypeEnum.PENALTY, ) + global_task_penalties = self.reward_responses( + reference=agent.challenge, + models=self.global_task_penalties, + reward_type=RewardModelTypeEnum.PENALTY, + ) + self.penalty_events = task_penalties + global_task_penalties self.rewards = self.total_reward() def __state_dict__(self, full=False): diff --git a/prompting/rewards/streaming.py b/prompting/rewards/streaming.py index 44bb1c23f..039326a66 100644 --- a/prompting/rewards/streaming.py +++ b/prompting/rewards/streaming.py @@ -15,13 +15,13 @@ def name(self) -> str: def __init__(self, max_tokens_per_chunk:int, **kwargs): super().__init__() self.max_tokens_per_chunk = max_tokens_per_chunk - self.max_penalty = 0.2 + def reward(self, _: str, response_event: DendriteResponseEvent) -> BatchRewardOutput: """Compute difference scores given a completion and reference pair.""" rewards = [] timings = [] - penalty_per_exceeding_chunk = 0.01 + penalty_per_exceeding_chunk = 0.25 # Iterate through each chunk of response tokens for response_tokens_per_chunks in response_event.stream_results_all_tokens_per_chunk: @@ -36,8 +36,8 @@ def reward(self, _: str, response_event: DendriteResponseEvent) -> BatchRewardOu # Record the timing for this computation timings.append(time.time() - start_time) - # Calculate the reward and ensure it does not go below max_penalty - rewards.append(max(1 - accumulated_penalty, self.max_penalty)) + # Calculate the reward and ensure it does not go above 1 + rewards.append(min(accumulated_penalty, 1)) # Create the output object with rewards, timings, and extra information output = BatchRewardOutput( diff --git a/prompting/tasks/task.py b/prompting/tasks/task.py index d837aac46..379c4d3cf 100644 --- a/prompting/tasks/task.py +++ b/prompting/tasks/task.py @@ -50,6 +50,10 @@ class Task(ABC): query_prompt = "" cleaner = None challenge_type = 'inference' + + global_penalty_definition = [ + dict(name="streaming", max_tokens_per_chunk=200, weight=0.2) + ] def __str__(self): return f"{self.__class__.__name__}(name={self.name!r}, desc={self.desc!r}, goal={self.goal!r}, query={self.query!r}, reference={self.reference!r}, topic={self.topic!r}, subtopic={self.subtopic!r}, tags={self.tags!r})" From 919780de5d344ecc1a1b68520e1b6a7e2de77200 Mon Sep 17 00:00:00 2001 From: p-ferreira Date: Mon, 20 May 2024 20:59:27 +0000 Subject: [PATCH 09/18] drops synapse final data out of accumulated chunks --- prompting/forward.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/prompting/forward.py b/prompting/forward.py index 6189bc552..264d6a450 100644 --- a/prompting/forward.py +++ b/prompting/forward.py @@ -72,11 +72,6 @@ async def process_stream(uid: int, async_iterator: Awaitable, tokenizer: Tokeniz raise ValueError( f"Something went wrong with miner uid {uid}, Synapse is not StreamPromptingSynapse." ) - - accumulated_chunks.append(synapse.completion) - accumulated_chunks_timings.append(time.time() - start_time) - tokens_in_completion = len(tokenizer.tokenize(synapse.completion)) - accumulated_tokens_per_chunk.append(tokens_in_completion) except Exception as e: exception = e traceback_details = traceback.format_exc() From 4b862e762364a2f41aeffe5fb63e2a6b2d2e2a5a Mon Sep 17 00:00:00 2001 From: p-ferreira Date: Tue, 21 May 2024 20:27:53 +0000 Subject: [PATCH 10/18] adds unit tests for streaming reward model --- tests/test_reward.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 tests/test_reward.py diff --git a/tests/test_reward.py b/tests/test_reward.py new file mode 100644 index 000000000..d074fb9fe --- /dev/null +++ b/tests/test_reward.py @@ -0,0 +1,30 @@ +import pytest +import torch +from unittest.mock import MagicMock +from prompting.rewards import StreamingRewardModel + +@pytest.mark.parametrize( + "all_tokens_per_chunk, expected_rewards", + [ + ([[1, 1, 1, 1, 1]], [0]), # No penalties + ([[2, 1, 1, 1, 1]], [0.25]), # First chunk exceeds + ([[2, 2, 1, 1, 1]], [0.5]), # Two chunks exceed + ([[2, 2, 2, 1, 1]], [0.75]), # Three chunks exceed + ([[2, 2, 2, 2, 1]], [1]), # Four chunks exceed + ([[2, 2, 2, 2, 2, 2]], [1]), # Sum of chunks > 1, clipped at 1 + ] +) +def test_streaming_reward_model(all_tokens_per_chunk, expected_rewards): + max_tokens_per_chunk = 1 + response_event = MagicMock() + response_event.stream_results_all_tokens_per_chunk = all_tokens_per_chunk + + model = StreamingRewardModel(max_tokens_per_chunk) + + output = model.reward("", response_event) + + assert torch.allclose(output.rewards, torch.tensor(expected_rewards, dtype=torch.float)), \ + f"Expected rewards {expected_rewards} but got {output.rewards.tolist()}" + +if __name__ == "__main__": + pytest.main() \ No newline at end of file From 9d6796307dffa586e8c81bfd916639525d0fabc9 Mon Sep 17 00:00:00 2001 From: p-ferreira Date: Wed, 22 May 2024 18:25:23 +0000 Subject: [PATCH 11/18] adjust reward calculation to include global penalties --- prompting/rewards/reward.py | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/prompting/rewards/reward.py b/prompting/rewards/reward.py index beed5ad9f..d51ce4c77 100644 --- a/prompting/rewards/reward.py +++ b/prompting/rewards/reward.py @@ -28,12 +28,16 @@ class RewardEvent: # implement custom asdict to return a dict with the same keys as the dataclass using the model name def asdict(self) -> dict: return { - f"{self.model_name}_raw_{self.model_type.value}": self.rewards.tolist(), - f"{self.model_name}_{self.model_type.value}": self.rewards_normalized.tolist(), - f"{self.model_name}_{self.model_type.value}_timings": self.timings.tolist(), + f"{self.model_name}_raw_{self.model_type.value}": self.tensor_to_rounded_list(self.rewards), + f"{self.model_name}_{self.model_type.value}": self.tensor_to_rounded_list(self.rewards_normalized, 4), + f"{self.model_name}_{self.model_type.value}_timings": self.tensor_to_rounded_list(self.timings), f"{self.model_name}_{self.model_type.value}_batch_time": self.batch_time, f"{self.model_name}_{self.model_type.value}_extra_info": self.extra_info, } + + def tensor_to_rounded_list(self, tensor, decimals=6): + # Convert the tensor elements to floats and round them to 6 decimal places + return [round(float(element), decimals) for element in tensor] class RewardResult: @@ -51,24 +55,17 @@ def __init__(self, reward_pipeline, agent, response_event, device): self.response_event = response_event self.device = device self.task_rewards = agent.task.reward_definition - self.task_penalties = agent.task.penalty_definition - self.global_task_penalties = agent.task.global_penalty_definition + self.task_penalties = agent.task.penalty_definition + agent.task.global_penalty_definition self.reward_events = self.reward_responses( reference=agent.task.reference, models=self.task_rewards, reward_type=RewardModelTypeEnum.WEIGHTED_REWARD, ) - task_penalties= self.reward_responses( + self.penalty_events = self.reward_responses( reference=agent.challenge, models=self.task_penalties, reward_type=RewardModelTypeEnum.PENALTY, ) - global_task_penalties = self.reward_responses( - reference=agent.challenge, - models=self.global_task_penalties, - reward_type=RewardModelTypeEnum.PENALTY, - ) - self.penalty_events = task_penalties + global_task_penalties self.rewards = self.total_reward() def __state_dict__(self, full=False): From 854b7ad29b6cd56cbdcad03cb725728d346f4034 Mon Sep 17 00:00:00 2001 From: p-ferreira Date: Tue, 28 May 2024 20:54:15 +0000 Subject: [PATCH 12/18] remove unit test todo comment --- prompting/rewards/streaming.py | 1 - 1 file changed, 1 deletion(-) diff --git a/prompting/rewards/streaming.py b/prompting/rewards/streaming.py index 039326a66..f6443915a 100644 --- a/prompting/rewards/streaming.py +++ b/prompting/rewards/streaming.py @@ -6,7 +6,6 @@ BatchRewardOutput, ) -## TODO: Create unit tests class StreamingRewardModel(BaseRewardModel): @property def name(self) -> str: From d1b73eb4227565d3cf54121c77e16feae0b47644 Mon Sep 17 00:00:00 2001 From: p-ferreira Date: Tue, 28 May 2024 20:58:41 +0000 Subject: [PATCH 13/18] updates versioning --- prompting/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prompting/__init__.py b/prompting/__init__.py index 476009f42..8a3357f46 100644 --- a/prompting/__init__.py +++ b/prompting/__init__.py @@ -16,7 +16,7 @@ # DEALINGS IN THE SOFTWARE. # Define the version of the template module. -__version__ = "2.3.1" +__version__ = "2.3.2" version_split = __version__.split(".") __spec_version__ = ( (10000 * int(version_split[0])) From 3808f4e8101889c976dcaab8588f8ce304c2c657 Mon Sep 17 00:00:00 2001 From: p-ferreira Date: Tue, 28 May 2024 21:09:25 +0000 Subject: [PATCH 14/18] drops deprecated prompting synapse from mock code --- prompting/mock.py | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/prompting/mock.py b/prompting/mock.py index e8fd32193..25d92a843 100644 --- a/prompting/mock.py +++ b/prompting/mock.py @@ -287,15 +287,10 @@ async def forward( deserialize: bool = True, run_async: bool = True, streaming: bool = False, - ): - if streaming: - assert isinstance( - synapse, StreamPromptingSynapse - ), "Synapse must be a StreamPromptingSynapse object when is_stream is True." - else: - assert isinstance( - synapse, PromptingSynapse - ), "Synapse must be a PromptingSynapse object when is_stream is False." + ): + assert isinstance( + synapse, StreamPromptingSynapse + ), "Synapse must be a StreamPromptingSynapse object when is_stream is True." async def query_all_axons(is_stream: bool): """Queries all axons for responses.""" From bf61e977983dbc13755eb9be9deebece54c3e84d Mon Sep 17 00:00:00 2001 From: p-ferreira Date: Tue, 28 May 2024 21:54:14 +0000 Subject: [PATCH 15/18] fix mock pipeline --- prompting/mock.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/prompting/mock.py b/prompting/mock.py index 25d92a843..cc8ea8167 100644 --- a/prompting/mock.py +++ b/prompting/mock.py @@ -7,6 +7,7 @@ from functools import partial from typing import Dict, List, Union, AsyncGenerator, Any, Iterator +from types import SimpleNamespace class MockTokenizer: @@ -44,6 +45,12 @@ class MockPipeline: @property def tokenizer(self): return self.model.tokenizer + + @property + def llm_engine(self): + return SimpleNamespace( + tokenizer=self.model.tokenizer + ) def __init__( self, From ff0de980c5f3ca8911a94310142748ac8c510da6 Mon Sep 17 00:00:00 2001 From: p-ferreira Date: Tue, 4 Jun 2024 14:48:34 +0000 Subject: [PATCH 16/18] fix stream synpase import --- tests/test_mock.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_mock.py b/tests/test_mock.py index 12e5dfa6c..75cc6a0b5 100644 --- a/tests/test_mock.py +++ b/tests/test_mock.py @@ -2,7 +2,7 @@ import asyncio import bittensor as bt from prompting.mock import MockDendrite, MockMetagraph, MockSubtensor -from prompting.protocol import PromptingSynapse +from prompting.protocol import StreamPromptingSynapse wallet = bt.MockWallet() wallet.create(coldkey_use_password=False) @@ -70,7 +70,7 @@ def test_mock_dendrite_timings(timeout, min_time, max_time, n): async def run(): return await mock_dendrite( axons, - synapse=PromptingSynapse( + synapse=StreamPromptingSynapse( roles=["user"], messages=["What is the capital of France?"] ), timeout=timeout, From 1d3e6bcf2a02227b5dc209fdbcd2781adf4dfe8b Mon Sep 17 00:00:00 2001 From: p-ferreira Date: Tue, 4 Jun 2024 15:07:16 +0000 Subject: [PATCH 17/18] fix mock dendrite call --- tests/test_mock.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_mock.py b/tests/test_mock.py index 75cc6a0b5..7ae6f8604 100644 --- a/tests/test_mock.py +++ b/tests/test_mock.py @@ -74,6 +74,7 @@ async def run(): roles=["user"], messages=["What is the capital of France?"] ), timeout=timeout, + deserialize=False, ) eps = 0.2 From 1961f6c41f2c0f269ddcf2fbabf1f33fbaaadded Mon Sep 17 00:00:00 2001 From: p-ferreira <38992619+p-ferreira@users.noreply.github.com> Date: Wed, 5 Jun 2024 09:55:32 -0400 Subject: [PATCH 18/18] updates versioning --- prompting/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prompting/__init__.py b/prompting/__init__.py index 8a3357f46..643f4f1ac 100644 --- a/prompting/__init__.py +++ b/prompting/__init__.py @@ -16,7 +16,7 @@ # DEALINGS IN THE SOFTWARE. # Define the version of the template module. -__version__ = "2.3.2" +__version__ = "2.4.0" version_split = __version__.split(".") __spec_version__ = ( (10000 * int(version_split[0]))