diff --git a/neurons/miners/huggingface/miner.py b/neurons/miners/huggingface/miner.py index 29eaa6405..6c33730d1 100644 --- a/neurons/miners/huggingface/miner.py +++ b/neurons/miners/huggingface/miner.py @@ -17,10 +17,11 @@ import time import bittensor as bt from prompting.miners import HuggingFaceMiner +from deprecated import deprecated -# This is the main function, which runs the miner. -if __name__ == "__main__": +@deprecated(version="2.4.1+", reason="Class is deprecated, use openai miner for reference on example miner.") +def main(): with HuggingFaceMiner() as miner: while True: miner.log_status() @@ -29,3 +30,7 @@ if miner.should_exit: bt.logging.warning("Ending miner...") break + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/prompting/base/prompting_miner.py b/prompting/base/prompting_miner.py index f6b188185..1668f96ff 100644 --- a/prompting/base/prompting_miner.py +++ b/prompting/base/prompting_miner.py @@ -23,7 +23,7 @@ from prompting.protocol import StreamPromptingSynapse from prompting.base.miner import BaseStreamMinerNeuron from datetime import datetime - +from typing import List, Dict class BaseStreamPromptingMiner(BaseStreamMinerNeuron): """ @@ -159,27 +159,38 @@ def init_wandb(self): def log_event( self, + synapse: StreamPromptingSynapse, timing: float, - prompt: str, - completion: str, - system_prompt: str, + messages, + accumulated_chunks: List[str] = [], + accumulated_chunks_timings: List[float] = [], extra_info: dict = {}, ): if not getattr(self, "wandb_run", None): self.init_wandb() - + + dendrite_uid = self.metagraph.hotkeys.index(synapse.dendrite.hotkey) step_log = { "epoch_time": timing, - # "block": self.last_epoch_block, - "prompt": prompt, - "completion": completion, - "system_prompt": system_prompt, - "uid": self.metagraph.hotkeys.index(self.wallet.hotkey.ss58_address), - "stake": self.metagraph.S[self.uid].item(), - "trust": self.metagraph.T[self.uid].item(), - "incentive": self.metagraph.I[self.uid].item(), - "consensus": self.metagraph.C[self.uid].item(), - "dividends": self.metagraph.D[self.uid].item(), + # TODO: add block to logs in the future in a way that doesn't impact performance + # "block": self.block, + "messages": messages, + "accumulated_chunks": accumulated_chunks, + "accumulated_chunks_timings": accumulated_chunks_timings, + "validator_uid": dendrite_uid, + "validator_ip": synapse.dendrite.ip, + "validator_coldkey": self.metagraph.coldkeys[dendrite_uid], + "validator_hotkey": self.metagraph.hotkeys[dendrite_uid], + "validator_stake": self.metagraph.S[dendrite_uid].item(), + "validator_trust": self.metagraph.T[dendrite_uid].item(), + "validator_incentive": self.metagraph.I[dendrite_uid].item(), + "validator_consensus": self.metagraph.C[dendrite_uid].item(), + "validator_dividends": self.metagraph.D[dendrite_uid].item(), + "miner_stake": self.metagraph.S[self.uid].item(), + "miner_trust": self.metagraph.T[self.uid].item(), + "miner_incentive": self.metagraph.I[self.uid].item(), + "miner_consensus": self.metagraph.C[self.uid].item(), + "miner_dividends": self.metagraph.D[self.uid].item(), **extra_info, } diff --git a/prompting/miners/__init__.py b/prompting/miners/__init__.py index 9c3a34bf2..f53f9bb42 100644 --- a/prompting/miners/__init__.py +++ b/prompting/miners/__init__.py @@ -4,5 +4,4 @@ from .phrase import PhraseMiner # Real miners -from .hf_miner import HuggingFaceMiner -from .openai_miner import OpenAIMiner +from .openai_miner import OpenAIMiner \ No newline at end of file diff --git a/prompting/miners/hf_miner.py b/prompting/miners/hf_miner.py index 021361d91..659035fda 100644 --- a/prompting/miners/hf_miner.py +++ b/prompting/miners/hf_miner.py @@ -28,8 +28,9 @@ # import base miner class which takes care of most of the boilerplate from prompting.base.prompting_miner import BaseStreamPromptingMiner +from deprecated import deprecated - +@deprecated(version="2.4.1+", reason="Class is deprecated, use openai miner for reference on example miner.") class HuggingFaceMiner(BaseStreamPromptingMiner): """ Base miner which runs zephyr (https://huggingface.co/HuggingFaceH4/zephyr-7b-beta) diff --git a/prompting/miners/langchain_miner.py b/prompting/miners/langchain_miner.py new file mode 100644 index 000000000..b77d1433e --- /dev/null +++ b/prompting/miners/langchain_miner.py @@ -0,0 +1,169 @@ +# The MIT License (MIT) +# Copyright © 2024 Yuma Rao + +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the “Software”), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, +# and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of +# the Software. + +# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO +# THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +# DEALINGS IN THE SOFTWARE. + +import time +import os +import bittensor as bt +import argparse +from starlette.types import Send +from functools import partial +from typing import Dict, Awaitable + +# Bittensor Miner Template: +from prompting.base.prompting_miner import BaseStreamPromptingMiner +from prompting.protocol import StreamPromptingSynapse + +# import base miner class which takes care of most of the boilerplate + +from prompting.miners.utils import OpenAIUtils + +from langchain.prompts import ChatPromptTemplate +from langchain_core.output_parsers import StrOutputParser +from langchain.chat_models import ChatOpenAI +from dotenv import load_dotenv, find_dotenv +from langchain_core.runnables.base import RunnableSequence +from deprecated import deprecated + +@deprecated(version="2.4.1+", reason="Class is deprecated, use openai miner for reference on example miner.") +class LangchainMiner(BaseStreamPromptingMiner, OpenAIUtils): + """Langchain-based miner which uses OpenAI's API as the LLM. + This miner does not use any tools or external APIs when processing requests - it relies entirely on the models' own representation and world model. In some cases, this can produce lower quality results. + You should also install the dependencies for this miner, which can be found in the requirements.txt file in this directory. + """ + + @classmethod + def add_args(cls, parser: argparse.ArgumentParser): + """ + Adds OpenAI-specific arguments to the command line parser. + """ + super().add_args(parser) + + def __init__(self, config=None): + super().__init__(config=config) + + bt.logging.info(f"Initializing with model {self.config.neuron.model_id}...") + + if self.config.wandb.on: + self.identity_tags = ("openai_miner",) + (self.config.neuron.model_id,) + + _ = load_dotenv(find_dotenv()) + api_key = os.environ.get("OPENAI_API_KEY") + + # Set openai key and other args + self.model = ChatOpenAI( + api_key=api_key, + model_name=self.config.neuron.model_id, + max_tokens=self.config.neuron.max_tokens, + temperature=self.config.neuron.temperature, + ) + + self.system_prompt = self.config.neuron.system_prompt + self.accumulated_total_tokens = 0 + self.accumulated_prompt_tokens = 0 + self.accumulated_completion_tokens = 0 + self.accumulated_total_cost = 0 + + def forward(self, synapse: StreamPromptingSynapse) -> Awaitable: + async def _forward( + self, + message: str, + init_time: float, + timeout_threshold: float, + chain: RunnableSequence, + chain_formatter: Dict[str, str], + send: Send, + ): + buffer = [] + temp_completion = "" # for wandb logging + timeout_reached = False + + try: + # Langchain built in streaming. 'astream' also available for async + for token in chain.stream(chain_formatter): + buffer.append(token) + + if time.time() - init_time > timeout_threshold: + bt.logging.debug(f"⏰ Timeout reached, stopping streaming") + timeout_reached = True + break + + if len(buffer) == self.config.neuron.streaming_batch_size: + joined_buffer = "".join(buffer) + temp_completion += joined_buffer + bt.logging.debug(f"Streamed tokens: {joined_buffer}") + + await send( + { + "type": "http.response.body", + "body": joined_buffer.encode("utf-8"), + "more_body": True, + } + ) + buffer = [] + + if ( + buffer and not timeout_reached + ): # Don't send the last buffer of data if timeout. + joined_buffer = "".join(buffer) + await send( + { + "type": "http.response.body", + "body": joined_buffer.encode("utf-8"), + "more_body": False, + } + ) + + except Exception as e: + bt.logging.error(f"Error in forward: {e}") + if self.config.neuron.stop_on_forward_exception: + self.should_exit = True + + finally: + synapse_latency = time.time() - init_time + if self.config.wandb.on: + self.log_event( + timing=synapse_latency, + prompt=message, + completion=temp_completion, + system_prompt=self.system_prompt, + ) + + bt.logging.debug(f"📧 Message received, forwarding synapse: {synapse}") + + prompt = ChatPromptTemplate.from_messages( + [("system", self.system_prompt), ("user", "{input}")] + ) + chain = prompt | self.model | StrOutputParser() + + role = synapse.roles[-1] + message = synapse.messages[-1] + + chain_formatter = {"role": role, "input": message} + + init_time = time.time() + timeout_threshold = synapse.timeout + + token_streamer = partial( + _forward, + self, + message, + init_time, + timeout_threshold, + chain, + chain_formatter, + ) + return synapse.create_streaming_response(token_streamer) diff --git a/prompting/miners/openai_miner.py b/prompting/miners/openai_miner.py index 73bf17182..9afe0ea2d 100644 --- a/prompting/miners/openai_miner.py +++ b/prompting/miners/openai_miner.py @@ -30,12 +30,12 @@ # import base miner class which takes care of most of the boilerplate from prompting.miners.utils import OpenAIUtils - -from langchain.prompts import ChatPromptTemplate -from langchain_core.output_parsers import StrOutputParser -from langchain.chat_models import ChatOpenAI from dotenv import load_dotenv, find_dotenv -from langchain_core.runnables.base import RunnableSequence +from openai import OpenAI +from typing import List, Dict +from traceback import print_exception + +# Define the type for a list of dictionaries class OpenAIMiner(BaseStreamPromptingMiner, OpenAIUtils): @@ -63,13 +63,8 @@ def __init__(self, config=None): api_key = os.environ.get("OPENAI_API_KEY") # Set openai key and other args - self.model = ChatOpenAI( - api_key=api_key, - model_name=self.config.neuron.model_id, - max_tokens=self.config.neuron.max_tokens, - temperature=self.config.neuron.temperature, - ) - + self.model = OpenAI(api_key=api_key) + self.system_prompt = self.config.neuron.system_prompt self.accumulated_total_tokens = 0 self.accumulated_prompt_tokens = 0 @@ -79,21 +74,45 @@ def __init__(self, config=None): def forward(self, synapse: StreamPromptingSynapse) -> Awaitable: async def _forward( self, - message: str, + synapse: StreamPromptingSynapse, init_time: float, timeout_threshold: float, - chain: RunnableSequence, - chain_formatter: Dict[str, str], send: Send, ): buffer = [] + accumulated_chunks = [] + accumulated_chunks_timings = [] + messages = [] temp_completion = "" # for wandb logging timeout_reached = False - - try: - # Langchain built in streaming. 'astream' also available for async - for token in chain.stream(chain_formatter): - buffer.append(token) + + + try: + system_prompt_message = [{ 'role': 'system', 'content': self.system_prompt }] + synapse_messages = [{'role': role, 'content': message} for role, message in zip(synapse.roles, synapse.messages)] + + messages = system_prompt_message + synapse_messages + + start_time = time.time() + stream_response = self.model.chat.completions.create( + model=self.config.neuron.model_id, + messages=messages, + temperature=self.config.neuron.temperature, + max_tokens=self.config.neuron.max_tokens, + stream=True + ) + + for chunk in stream_response: + chunk_content = chunk.choices[0].delta.content + + if chunk_content is None: + bt.logging.info("OpenAI returned chunk content with None") + continue + + accumulated_chunks.append(chunk_content) + accumulated_chunks_timings.append(time.time() - start_time) + + buffer.append(chunk_content) if time.time() - init_time > timeout_threshold: bt.logging.debug(f"⏰ Timeout reached, stopping streaming") @@ -128,6 +147,7 @@ async def _forward( except Exception as e: bt.logging.error(f"Error in forward: {e}") + bt.logging.error(print_exception(type(e), e, e.__traceback__)) if self.config.neuron.stop_on_forward_exception: self.should_exit = True @@ -135,34 +155,23 @@ async def _forward( synapse_latency = time.time() - init_time if self.config.wandb.on: self.log_event( + synapse=synapse, timing=synapse_latency, - prompt=message, - completion=temp_completion, - system_prompt=self.system_prompt, + messages=messages, + accumulated_chunks=accumulated_chunks, + accumulated_chunks_timings = accumulated_chunks_timings, ) - bt.logging.debug(f"📧 Message received, forwarding synapse: {synapse}") - - prompt = ChatPromptTemplate.from_messages( - [("system", self.system_prompt), ("user", "{input}")] - ) - chain = prompt | self.model | StrOutputParser() - - role = synapse.roles[-1] - message = synapse.messages[-1] - - chain_formatter = {"role": role, "input": message} - + bt.logging.debug(f"📧 Message received from {synapse.dendrite.hotkey}, IP: {synapse.dendrite.ip}; \nForwarding synapse: {synapse}") + init_time = time.time() timeout_threshold = synapse.timeout token_streamer = partial( _forward, self, - message, + synapse, init_time, - timeout_threshold, - chain, - chain_formatter, + timeout_threshold, ) return synapse.create_streaming_response(token_streamer)