Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
246a482
Add draft organic dataset, task, validator axon
dbobrenko Jun 14, 2024
b158b90
Merge branch 'main' into feature/organic-task
dbobrenko Jun 14, 2024
8786155
[WIP] Add architecture, rewards, task, dataset etc
dbobrenko Jun 19, 2024
f36ed99
Update draft notebook
dbobrenko Jun 19, 2024
02d2de2
Add minor organic changes
dbobrenko Jun 19, 2024
6db5919
Merge with main branch
dbobrenko Jun 19, 2024
d9927d3
Add WIP miners response
dbobrenko Jun 19, 2024
785afb6
Finish end-to-end organic communication
dbobrenko Jun 26, 2024
b4f29fe
Remove commented code
dbobrenko Jun 26, 2024
d565732
WIP Organic module refactor
dbobrenko Jun 27, 2024
7782d5d
WIP-2 Organic thread refactor
dbobrenko Jun 28, 2024
ec51132
WIP concurrent streams
dbobrenko Jul 4, 2024
1b6c16b
[WIP] Fix issue with dendride streaming
dbobrenko Jul 5, 2024
7d26c26
WIP
dbobrenko Jul 9, 2024
5c5078e
SN1-109: Refactor organics to base organics framework
dbobrenko Jul 18, 2024
ba5641f
SN1-131: Clean up the code
dbobrenko Jul 18, 2024
262e5c5
Add organic sampling method to config
dbobrenko Jul 18, 2024
66c5655
Merge with staging
dbobrenko Jul 18, 2024
fcb653d
Add axon disabled warning
dbobrenko Jul 18, 2024
19094e7
Fix OrganicScoring args
dbobrenko Jul 18, 2024
abd352c
Run isort, remove Optional typing
dbobrenko Jul 18, 2024
1bbcc1d
Move Organic reward pipeline to a separate object
dbobrenko Jul 18, 2024
8e0f3e3
Add whitelist hotkey for axon
dbobrenko Jul 18, 2024
027d706
Add config flags to disable weight setting, docstring
dbobrenko Jul 19, 2024
e3b4e24
Set max_tokens to 1024 for organic reference
dbobrenko Jul 19, 2024
6e0fbd3
Wait for miners completion during reuse, add flags
dbobrenko Jul 20, 2024
8760b8a
Remove debugging blacklist hotkey
dbobrenko Jul 20, 2024
844cb9d
Rename config param to organic_set_weights_enabled
dbobrenko Jul 20, 2024
3574d52
Fix wrong config param
dbobrenko Jul 20, 2024
9386f67
Add timestamp to chunk
dbobrenko Jul 20, 2024
8345d52
Remove debugging blacklist hotkey
dbobrenko Jul 20, 2024
c541408
Add closing chunk; run isort, black, ruff
dbobrenko Jul 22, 2024
0de69f6
Change organic response logging to debug
dbobrenko Jul 22, 2024
9ac31bf
Add more logs to organics, increase trigger frequency
dbobrenko Jul 22, 2024
e016fed
SN1-136: Save organics to csv file
dbobrenko Jul 22, 2024
6a20477
Remove timestamp from chunk
dbobrenko Jul 22, 2024
0b64a2d
Split organic into 2 csv files
dbobrenko Jul 23, 2024
0daa5b1
Fix wrong csv files
dbobrenko Jul 23, 2024
f8f7deb
Remove unused import
dbobrenko Jul 23, 2024
d8203f0
Move axon serve after organic init
dbobrenko Jul 23, 2024
702ef8f
Move axon serve after organic init
dbobrenko Jul 23, 2024
fdf4749
Move run to async function, apply lock to LLM
dbobrenko Jul 23, 2024
8116e36
Revert debugging timeout to 15 sec
dbobrenko Jul 23, 2024
caeaa81
Revert debugging blacklist key
dbobrenko Jul 23, 2024
0a9f657
Add try except blocks for organic scoring
dbobrenko Jul 23, 2024
3b76700
Merge with debug csv branch
dbobrenko Jul 23, 2024
152f371
Remove unused import
dbobrenko Jul 23, 2024
358dbee
Move organic to main loop
dbobrenko Jul 23, 2024
41746dd
Remove debugging blacklist hotkey
dbobrenko Jul 23, 2024
204f3e4
Rvert asyncio loop
dbobrenko Jul 23, 2024
3d1342a
Clean up the code
dbobrenko Jul 23, 2024
6988799
Reduce rewards for synth
dbobrenko Jul 23, 2024
e14d56f
Merge branch 'feature/organic-csv-debug' into feature/organic-task
dbobrenko Jul 24, 2024
64d2ae3
Small fixes
dbobrenko Jul 24, 2024
8bc004e
Remove weights scale
dbobrenko Jul 24, 2024
aa456f6
Add LLM locks
dbobrenko Jul 24, 2024
0799aa1
Make synth dataset optional
dbobrenko Jul 24, 2024
ef8f820
Address comments
dbobrenko Jul 24, 2024
fdf8a38
Fix unavailable synth dataset
dbobrenko Jul 24, 2024
77caafa
Update README for validators
dbobrenko Jul 24, 2024
22d3ae1
Add rouge, reduce penalty
dbobrenko Jul 24, 2024
0629b29
Address Kalei's comments
dbobrenko Jul 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ If you are running a miner, you will also need to uninstall uvloop.
pip uninstall uvloop -y
```

If you are running a validator, logging in to Hugging Face is required:
```shell
huggingface-cli login
```
You also need to accept the License Agreement for the LMSYS-Chat-1M dataset: https://huggingface.co/datasets/lmsys/lmsys-chat-1m

</div>

# Compute Requirements
Expand Down
25 changes: 14 additions & 11 deletions prompting/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
import time
import bittensor as bt
from dataclasses import asdict
from typing import Optional

from prompting.tasks import Task
from prompting.llms import HuggingFaceLLM, vLLM_LLM
from prompting.llms import vLLM_LLM
from prompting.cleaners.cleaner import CleanerPipeline

from prompting.persona import Persona, create_persona

from transformers import Pipeline
Expand All @@ -42,7 +43,7 @@ def finished(self):
"""This is a roleplaying game where you are impersonating {mood} human user with a specific persona. As a human, you are using AI assistant to {desc} related to {topic} ({subtopic}) in a {tone} tone. You don't need to greet the assistant or be polite, unless this is part of your persona. The spelling and grammar of your messages should also reflect your persona.

Your singular focus is to use the assistant to {goal}: {query}
"""
"""
)

def __init__(
Expand All @@ -52,22 +53,24 @@ def __init__(
system_template: str = None,
persona: Persona = None,
begin_conversation=True,
system_prompt: Optional[str] = None,
):
if persona is None:
persona = create_persona()

self.persona = persona
self.task = task
self.llm_pipeline = llm_pipeline

if system_template is not None:
self.system_prompt_template = system_template

self.system_prompt = self.system_prompt_template.format(
mood=self.persona.mood,
tone=self.persona.tone,
**self.task.__state_dict__(), # Adds desc, subject, topic
)
self.system_prompt = system_prompt
if self.system_prompt is None:
if self.persona is None:
self.persona = create_persona()
self.system_prompt = self.system_prompt_template.format(
mood=self.persona.mood,
tone=self.persona.tone,
**self.task.__state_dict__(), # Adds desc, subject, topic
)

super().__init__(
llm_pipeline=llm_pipeline,
Expand Down
1 change: 1 addition & 0 deletions prompting/base/neuron.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import copy
import sys
import threading

import bittensor as bt

Expand Down
70 changes: 44 additions & 26 deletions prompting/base/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,21 @@
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.

import sys
import copy
import torch
import asyncio
import argparse
import asyncio
import copy
import sys
import threading
import bittensor as bt

from typing import List
from traceback import print_exception
from typing import Optional

import bittensor as bt
import torch
from organic_scoring.synth_dataset import SynthDatasetConversation

from prompting.base.neuron import BaseNeuron
from prompting.mock import MockDendrite
from prompting.organic.organic_scoring_prompting import OrganicScoringPrompting
from prompting.utils.config import add_validator_args
from prompting.utils.exceptions import MaxRetryError

Expand Down Expand Up @@ -64,9 +66,9 @@ def __init__(self, config=None):
# Init sync with the network. Updates the metagraph.
self.sync()

# Serve axon to enable external connections.
self.axon: Optional[bt.axon] = None
if not self.config.neuron.axon_off:
self.serve_axon()
self.axon = bt.axon(wallet=self.wallet, config=self.config)
else:
bt.logging.warning("axon off, not serving ip to chain.")

Expand All @@ -79,23 +81,39 @@ def __init__(self, config=None):
self.thread: threading.Thread = None
self.lock = asyncio.Lock()

def serve_axon(self):
"""Serve axon to enable external connections."""

bt.logging.info("serving ip to chain...")
try:
self.axon = bt.axon(wallet=self.wallet, config=self.config)

try:
self.subtensor.serve_axon(
netuid=self.config.netuid,
axon=self.axon,
self._organic_scoring: Optional[OrganicScoringPrompting] = None
if self.axon is not None and not self.config.neuron.organic_disabled:
dataset = SynthDatasetConversation()
if dataset.exception is not None:
bt.logging.error(
f"Organic scoring on synthetic data is disabled. Failed to load dataset: {dataset.exception}"
)
except Exception as e:
bt.logging.error(f"Failed to serve Axon with exception: {e}")
dataset = None
self._organic_scoring = OrganicScoringPrompting(
axon=self.axon,
synth_dataset=dataset,
trigger_frequency=self.config.neuron.organic_trigger_frequency,
trigger_frequency_min=self.config.neuron.organic_trigger_frequency_min,
trigger=self.config.neuron.organic_trigger,
trigger_scaling_factor=self.config.neuron.organic_scaling_factor,
validator=self,
)
else:
bt.logging.warning(
"Organic scoring is not enabled. To enable, remove '--neuron.axon_off' and '--neuron.organic_disabled'"
)

if self.axon is not None:
self._serve_axon()

except Exception as e:
bt.logging.error(f"Failed to create Axon initialize with exception: {e}")
if self._organic_scoring is not None:
self.loop.create_task(self._organic_scoring.start_loop())

def _serve_axon(self):
"""Serve axon to enable external connections"""
validator_uid = self.metagraph.hotkeys.index(self.wallet.hotkey.ss58_address)
bt.logging.info(f"Serving validator IP of UID {validator_uid} to chain...")
self.axon.serve(netuid=self.config.netuid, subtensor=self.subtensor).start()

def run(self):
"""
Expand All @@ -116,7 +134,6 @@ def run(self):
KeyboardInterrupt: If the miner is stopped by a manual interruption.
Exception: For unforeseen errors during the miner's operation, which are logged for diagnosis.
"""

# Check that validator is registered on the network.
self.sync()

Expand Down Expand Up @@ -313,7 +330,7 @@ def resync_metagraph(self):
# Update the hotkeys.
self.hotkeys = copy.deepcopy(self.metagraph.hotkeys)

def update_scores(self, rewards: torch.FloatTensor, uids: List[int]):
def update_scores(self, rewards: torch.FloatTensor, uids: list[int]):
"""Performs exponential moving average on the scores based on the rewards received from the miners."""

# Check if rewards contains NaN values.
Expand All @@ -327,6 +344,7 @@ def update_scores(self, rewards: torch.FloatTensor, uids: List[int]):
step_rewards = self.scores.scatter(
0, torch.tensor(uids).to(self.device), rewards.to(self.device)
).to(self.device)

bt.logging.debug(f"Scattered rewards: {rewards}")

# Update scores with rewards produced by this step.
Expand Down
41 changes: 20 additions & 21 deletions prompting/forward.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import numpy as np
import bittensor as bt
from typing import List, Dict, Awaitable

from prompting.agent import HumanAgent
from prompting.dendrite import DendriteResponseEvent, SynapseStreamResult
from prompting.conversation import create_task
Expand All @@ -33,16 +34,9 @@
from prompting.utils.logging import log_event
from prompting.utils.misc import async_log, serialize_exception_to_string
from transformers import PreTrainedTokenizerFast as Tokenizer
from prompting.utils.uids import get_random_uids
from dataclasses import dataclass

SINGLE_TURN_TASKS = ['sentiment', 'translation']
SINGLE_TURN_TASKS = ('sentiment', 'translation')

@async_log
async def generate_reference(agent):
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, agent.task.generate_reference, agent.llm_pipeline)
return result

@async_log
async def execute_dendrite_call(dendrite_call):
Expand All @@ -59,7 +53,8 @@ async def process_stream(uid: int, async_iterator: Awaitable, tokenizer: Tokeniz
accumulated_tokens_per_chunk = []
start_time = time.time()

try:
try:
chunk = None
async for chunk in async_iterator: # most important loop, as this is where we acquire the final synapse.
if isinstance(chunk, str):
accumulated_chunks.append(chunk)
Expand All @@ -76,7 +71,7 @@ async def process_stream(uid: int, async_iterator: Awaitable, tokenizer: Tokeniz
raise ValueError(
f"Something went wrong with miner uid {uid}, Synapse is not StreamPromptingSynapse."
)
except Exception as e:
except Exception as e:
exception = e
traceback_details = traceback.format_exc()
bt.logging.error(
Expand Down Expand Up @@ -204,10 +199,11 @@ async def run_step(
handle_stream_responses_task = asyncio.create_task(handle_response(stream_results_dict, tokenizer))

if not agent.task.static_reference:
reference_generation_task = generate_reference(agent)
_, stream_results = await asyncio.gather(
reference_generation_task, handle_stream_responses_task
)
async with self.lock:
reference_generation_task = generate_reference(agent)
_, stream_results = await asyncio.gather(
reference_generation_task, handle_stream_responses_task
)
else:
stream_results = await handle_stream_responses_task

Expand Down Expand Up @@ -244,7 +240,7 @@ async def run_step(
"best": best_response,
"block": self.block,
"step": self.step,
"step_time": time.time() - start_time,
"step_time": time.time() - start_time,
**agent.__state_dict__(full=self.config.neuron.log_full),
**reward_result.__state_dict__(full=self.config.neuron.log_full),
**response_event.__state_dict__(),
Expand Down Expand Up @@ -292,7 +288,7 @@ async def forward(self):

turn = 0
exclude_uids = []
roles = ['user']
roles = ["user"]
messages = [agent.challenge]
while True:
# Note: The try catch is a safe clause to ensure that the forward loop continues even if an error occurs in run_step.
Expand All @@ -314,13 +310,13 @@ async def forward(self):
event["turn"] = turn
log_event(self, event)
task.complete = True

accepted_answer = event["best"] if random.random() < 0.5 else agent.task.reference
roles.append("assistant")
messages.append(accepted_answer)

# 50% chance of single turn conversation, 25% of two turns, 12.5% chance of 3 turns, 6.25% chance of 4 turns, 3.63% chance of 5...
if random.random()<0.5 or turn>=1:
# 50% chance of single turn conversation, 25% of two turns.
if random.random() < 0.5 or turn >= 1:
break

if task.name in SINGLE_TURN_TASKS:
Expand All @@ -341,13 +337,16 @@ async def forward(self):
except BaseException as e:
unexpected_errors = serialize_exception_to_string(e)
bt.logging.error(
f"Error in run_step: Skipping to next round. \n {unexpected_errors}"
f"Error in run_step: Skipping to next round.\n"
f"Task: {task_name}\nMessages: {messages}\nRoles: {roles}\nTurn: {turn}.\n"
f"{unexpected_errors}\n"
)

event = {"unexpected_errors": unexpected_errors}

log_event(self, event)
continue

await asyncio.sleep(1)
continue
del agent
del task
34 changes: 30 additions & 4 deletions prompting/llms/vllm_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
import gc
import threading
import time
import torch
import bittensor as bt
from typing import List, Dict
from typing import List, Dict, Optional, Any
from vllm import LLM, SamplingParams
from prompting.cleaners.cleaner import CleanerPipeline
from prompting.llms import BasePipeline, BaseLLM
Expand Down Expand Up @@ -55,6 +54,8 @@ def load_vllm_pipeline(model_id: str, device: str, gpus: int, max_allowed_memory


class vLLMPipeline(BasePipeline):
_LOCK = threading.Lock()

def __init__(
self,
model_id: str,
Expand All @@ -81,7 +82,8 @@ def __call__(self, composed_prompt: str, **model_kwargs: Dict) -> str:
sampling_params = SamplingParams(
temperature=temperature, top_p=top_p, max_tokens=max_tokens
)
output = self.llm.generate(composed_prompt, sampling_params, use_tqdm=True)
with self._LOCK:
output = self.llm.generate(composed_prompt, sampling_params, use_tqdm=True)
response = output[0].outputs[0].text
return response

Expand Down Expand Up @@ -112,6 +114,30 @@ def __init__(
"end": "<|start_header_id|>assistant<|end_header_id|>",
}

def query_conversation(
self,
messages: list[str],
roles: list[str],
cleaner: Optional[CleanerPipeline] = None,
):
"""Query LLM with the given lists of conversation history and roles

Args:
messages (list[str]): List of messages in the conversation.
roles (list[str]): List of roles for each message.
cleaner (Optional[CleanerPipeline], optional): Cleaner pipeline to use, if any.
"""
assert len(messages) == len(roles), "Length of messages and roles must be the same"
inputs: list[dict[str, Any]] = [{"content": self.system_prompt, "role": "system"}]
for role, message in zip(roles, messages):
inputs.append({"content": message, "role": role})

t0 = time.perf_counter()
response = self.forward(messages=inputs)
response = self.clean_response(cleaner, response)
self.times.extend((0, time.perf_counter() - t0))
return response

def query(
self,
message: str,
Expand Down
Empty file added prompting/organic/__init__.py
Empty file.
Loading