Skip to content
Merged
6 changes: 3 additions & 3 deletions neurons/miners/epistula_miner/web_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from openai import OpenAI

from prompting.base.duckduckgo_patch import PatchedDDGS
from shared.settings import shared_settings
from shared import settings

# Import the patched DDGS and use that

Expand Down Expand Up @@ -56,7 +56,7 @@ async def get_websites_with_similarity(
List of dictionaries containing website URLs and their best matching chunks
"""
logger.debug("Getting results")
ddgs = PatchedDDGS(proxy=shared_settings.PROXY_URL, verify=False)
ddgs = PatchedDDGS(proxy=settings.shared_settings.PROXY_URL, verify=False)
results = list(ddgs.text(query))
logger.debug(f"Got {len(results)} results")
urls = [r["href"] for r in results][:n_results]
Expand All @@ -66,7 +66,7 @@ async def get_websites_with_similarity(
extracted = await asyncio.gather(*[extract_content(c) for c in content])

# Create embeddings
client = OpenAI(api_key=shared_settings.OPENAI_API_KEY)
client = OpenAI(api_key=settings.shared_settings.OPENAI_API_KEY)
query_embedding = client.embeddings.create(model="text-embedding-ada-002", input=query).data[0].embedding
# Process each website
results_with_similarity = []
Expand Down
45 changes: 14 additions & 31 deletions neurons/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

# ruff: noqa: E402
from shared import settings
from shared.logging import init_wandb

shared_settings = settings.shared_settings
settings.shared_settings = settings.SharedSettings.load(mode="validator")


Expand All @@ -26,13 +26,12 @@


def create_loop_process(task_queue, scoring_queue, reward_events):
settings.shared_settings = settings.SharedSettings.load(mode="validator")
if settings.shared_settings.WANDB_ON:
init_wandb(neuron="validator")

async def spawn_loops(task_queue, scoring_queue, reward_events):
# ruff: noqa: E402
wandb.setup()
from shared import settings

settings.shared_settings = settings.SharedSettings.load(mode="validator")

from prompting.llms.model_manager import model_scheduler
from prompting.miner_availability.miner_availability import availability_checking_loop
from prompting.rewards.scoring import task_scorer
Expand Down Expand Up @@ -88,10 +87,11 @@ async def start():
# TODO: We should not use 2 availability loops for each process, in reality
# we should only be sharing the miner availability data between processes.
from prompting.miner_availability.miner_availability import availability_checking_loop
from prompting.rewards.scoring import task_scorer

asyncio.create_task(availability_checking_loop.start())

await start_scoring_api(scoring_queue, reward_events)
await start_scoring_api(task_scorer, scoring_queue, reward_events)

while True:
await asyncio.sleep(10)
Expand All @@ -100,23 +100,6 @@ async def start():
asyncio.run(start())


# def create_task_loop(task_queue, scoring_queue):
# async def start(task_queue, scoring_queue):
# logger.info("Starting AvailabilityCheckingLoop...")
# asyncio.create_task(availability_checking_loop.start())

# logger.info("Starting TaskSender...")
# asyncio.create_task(task_sender.start(task_queue, scoring_queue))

# logger.info("Starting TaskLoop...")
# asyncio.create_task(task_loop.start(task_queue, scoring_queue))
# while True:
# await asyncio.sleep(10)
# logger.debug("Running task loop...")

# asyncio.run(start(task_queue, scoring_queue))


async def main():
# will start checking the availability of miners at regular intervals, needed for API and Validator
with torch.multiprocessing.Manager() as manager:
Expand All @@ -130,7 +113,7 @@ async def main():
try:
# # Start checking the availability of miners at regular intervals

if shared_settings.DEPLOY_SCORING_API:
if settings.shared_settings.DEPLOY_SCORING_API:
# Use multiprocessing to bypass API blocking issue
api_process = mp.Process(target=start_api, args=(scoring_queue, reward_events), name="API_Process")
api_process.start()
Expand All @@ -152,17 +135,17 @@ async def main():
while True:
await asyncio.sleep(30)
if (
shared_settings.SUBTENSOR.get_current_block()
- shared_settings.METAGRAPH.last_update[shared_settings.UID]
settings.shared_settings.SUBTENSOR.get_current_block()
- settings.shared_settings.METAGRAPH.last_update[settings.shared_settings.UID]
> 500
and step > 120
):
current_block = settings.shared_settings.SUBTENSOR.get_current_block()
last_update_block = settings.shared_settings.METAGRAPH.last_update[settings.shared_settings.UID]
logger.warning(
f"UPDATES HAVE STALED FOR {shared_settings.SUBTENSOR.get_current_block() - shared_settings.METAGRAPH.last_update[shared_settings.UID]} BLOCKS AND {step} STEPS"
)
logger.warning(
f"STALED: {shared_settings.SUBTENSOR.get_current_block()}, {shared_settings.METAGRAPH.block}"
f"UPDATES HAVE STALED FOR {current_block - last_update_block} BLOCKS AND {step} STEPS"
)
logger.warning(f"STALED: {current_block}, {settings.shared_settings.METAGRAPH.block}")
sys.exit(1)
step += 1

Expand Down
20 changes: 13 additions & 7 deletions prompting/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@

from prompting.api.miner_availabilities.api import router as miner_availabilities_router
from prompting.api.scoring.api import router as scoring_router
from prompting.rewards.scoring import task_scorer
from shared.settings import shared_settings
from shared import settings

app = FastAPI()
app.include_router(miner_availabilities_router, prefix="/miner_availabilities", tags=["miner_availabilities"])
Expand All @@ -18,10 +17,17 @@ def health():
return {"status": "healthy"}


async def start_scoring_api(scoring_queue, reward_events):
task_scorer.scoring_queue = scoring_queue
task_scorer.reward_events = reward_events
logger.info(f"Starting Scoring API on https://0.0.0.0:{shared_settings.SCORING_API_PORT}")
async def start_scoring_api(task_scorer, scoring_queue, reward_events):
# We pass an object of task scorer then override it's attributes to ensure that they are managed by mp
app.state.task_scorer = task_scorer
app.state.task_scorer.scoring_queue = scoring_queue
app.state.task_scorer.reward_events = reward_events

logger.info(f"Starting Scoring API on https://0.0.0.0:{settings.shared_settings.SCORING_API_PORT}")
uvicorn.run(
"prompting.api.api:app", host="0.0.0.0", port=shared_settings.SCORING_API_PORT, loop="asyncio", reload=False
"prompting.api.api:app",
host="0.0.0.0",
port=settings.shared_settings.SCORING_API_PORT,
loop="asyncio",
reload=False,
)
23 changes: 14 additions & 9 deletions prompting/api/scoring/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,33 @@

from prompting.datasets.random_website import DDGDatasetEntry
from prompting.llms.model_zoo import ModelZoo
from prompting.rewards.scoring import task_scorer
from prompting.tasks.inference import InferenceTask
from prompting.tasks.web_retrieval import WebRetrievalTask
from shared import settings
from shared.base import DatasetEntry
from shared.dendrite import DendriteResponseEvent
from shared.epistula import SynapseStreamResult
from shared.settings import shared_settings

router = APIRouter()


def validate_scoring_key(api_key: str = Header(...)):
if api_key != shared_settings.SCORING_KEY:
if api_key != settings.shared_settings.SCORING_KEY:
raise HTTPException(status_code=403, detail="Invalid API key")


def get_task_scorer(request: Request):
return request.app.state.task_scorer


@router.post("/scoring")
async def score_response(request: Request, api_key_data: dict = Depends(validate_scoring_key)):
async def score_response(
request: Request, api_key_data: dict = Depends(validate_scoring_key), task_scorer=Depends(get_task_scorer)
):
model = None
payload: dict[str, Any] = await request.json()
body = payload.get("body")
timeout = payload.get("timeout", shared_settings.NEURON_TIMEOUT)
timeout = payload.get("timeout", settings.shared_settings.NEURON_TIMEOUT)
uids = payload.get("uid", [])
chunks = payload.get("chunks", {})
if not uids or not chunks:
Expand All @@ -54,7 +59,7 @@ async def score_response(request: Request, api_key_data: dict = Depends(validate
llm_model=llm_model,
llm_model_id=body.get("model"),
seed=int(body.get("seed", 0)),
sampling_params=body.get("sampling_parameters", shared_settings.SAMPLING_PARAMS),
sampling_params=body.get("sampling_parameters", settings.shared_settings.SAMPLING_PARAMS),
query=body.get("messages"),
)
logger.info(f"Task created: {organic_task}")
Expand All @@ -66,7 +71,7 @@ async def score_response(request: Request, api_key_data: dict = Depends(validate
timeout=timeout,
),
dataset_entry=DatasetEntry(),
block=shared_settings.METAGRAPH.block,
block=settings.shared_settings.METAGRAPH.block,
step=-1,
task_id=str(uuid.uuid4()),
)
Expand All @@ -90,10 +95,10 @@ async def score_response(request: Request, api_key_data: dict = Depends(validate
stream_results=[
SynapseStreamResult(accumulated_chunks=[chunk for chunk in chunks if chunk is not None])
],
timeout=body.get("timeout", shared_settings.NEURON_TIMEOUT),
timeout=body.get("timeout", settings.shared_settings.NEURON_TIMEOUT),
),
dataset_entry=DDGDatasetEntry(search_term=search_term),
block=shared_settings.METAGRAPH.block,
block=settings.shared_settings.METAGRAPH.block,
step=-1,
task_id=str(uuid.uuid4()),
)
Expand Down
4 changes: 2 additions & 2 deletions prompting/datasets/random_website.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

from prompting.base.duckduckgo_patch import PatchedDDGS
from prompting.datasets.utils import ENGLISH_WORDS
from shared import settings
from shared.base import BaseDataset, Context, DatasetEntry
from shared.settings import shared_settings

MAX_CHARS = 5000

Expand All @@ -25,7 +25,7 @@ class DDGDataset(BaseDataset):
english_words: list[str] = None

def search_random_term(self, retries: int = 3) -> tuple[Optional[str], Optional[list[dict[str, str]]]]:
ddg = PatchedDDGS(proxy=shared_settings.PROXY_URL, verify=False)
ddg = PatchedDDGS(proxy=settings.shared_settings.PROXY_URL, verify=False)
for _ in range(retries):
random_words = " ".join(random.sample(ENGLISH_WORDS, 3))
try:
Expand Down
4 changes: 3 additions & 1 deletion prompting/llms/apis/gpt_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
from pydantic import BaseModel

from prompting.llms.apis.llm_messages import LLMMessage, LLMMessages
from shared.settings import shared_settings
from shared import settings

shared_settings = settings.shared_settings


class GPT(BaseModel):
Expand Down
4 changes: 3 additions & 1 deletion prompting/llms/apis/llm_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from prompting.llms.apis.gpt_wrapper import openai_client
from prompting.llms.apis.llm_messages import LLMMessages
from prompting.llms.apis.sn19_wrapper import chat_complete
from shared.settings import shared_settings
from shared import settings

shared_settings = settings.shared_settings


class LLMWrapper:
Expand Down
4 changes: 3 additions & 1 deletion prompting/llms/apis/sn19_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
from tenacity import retry, stop_after_attempt, wait_exponential

from prompting.llms.apis.llm_messages import LLMMessages
from shared.settings import shared_settings
from shared import settings

shared_settings = settings.shared_settings


# TODO: key error in response.json() when response is 500
Expand Down
10 changes: 5 additions & 5 deletions prompting/llms/hf_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from loguru import logger
from transformers import AutoModelForCausalLM, AutoTokenizer, PreTrainedModel, pipeline

from shared.settings import shared_settings
from shared import settings
from shared.timer import Timer


Expand All @@ -31,7 +31,7 @@ def __init__(self, model_id="hugging-quants/Meta-Llama-3.1-70B-Instruct-AWQ-INT4

self.llm = pipeline("text-generation", model=self.model, tokenizer=self.tokenizer)

self.sampling_params = shared_settings.SAMPLING_PARAMS
self.sampling_params = settings.shared_settings.SAMPLING_PARAMS

@torch.inference_mode()
def generate(self, messages: list[str] | list[dict], sampling_params=None, seed=None):
Expand All @@ -46,15 +46,15 @@ def generate(self, messages: list[str] | list[dict], sampling_params=None, seed=
add_generation_prompt=True,
return_tensors="pt",
return_dict=True,
).to(shared_settings.NEURON_DEVICE)
).to(settings.shared_settings.NEURON_DEVICE)

params = sampling_params if sampling_params else self.sampling_params
filtered_params = {k: v for k, v in params.items() if k in self.valid_generation_params}

with Timer() as timer:
with Timer():
# Generate with optimized settings
outputs = self.model.generate(
**inputs.to(shared_settings.NEURON_DEVICE),
**inputs.to(settings.shared_settings.NEURON_DEVICE),
**filtered_params,
eos_token_id=self.tokenizer.eos_token_id,
)
Expand Down
6 changes: 3 additions & 3 deletions prompting/llms/model_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from prompting.llms.hf_llm import ReproducibleHF
from prompting.llms.model_zoo import ModelConfig, ModelZoo
from prompting.llms.utils import GPUInfo
from shared import settings
from shared.loop_runner import AsyncLoopRunner
from shared.settings import shared_settings

# This maintains a list of tasks for which we need to generate references. Since
# we can only generate the references, when the correct model is loaded, we work
Expand All @@ -20,7 +20,7 @@

class ModelManager(BaseModel):
always_active_models: list[ModelConfig] = []
total_ram: float = shared_settings.LLM_MODEL_RAM
total_ram: float = settings.shared_settings.LLM_MODEL_RAM
active_models: dict[ModelConfig, ReproducibleHF] = {}
used_ram: float = 0.0
model_config = ConfigDict(arbitrary_types_allowed=True)
Expand Down Expand Up @@ -71,7 +71,7 @@ def load_model(self, model_config: ModelConfig, force: bool = True):
model = ReproducibleHF(
model=model_config.llm_model_id,
gpu_memory_utilization=model_config.min_ram / GPUInfo.free_memory,
max_model_len=shared_settings.LLM_MAX_MODEL_LEN,
max_model_len=settings.shared_settings.LLM_MAX_MODEL_LEN,
)

self.active_models[model_config] = model
Expand Down
8 changes: 6 additions & 2 deletions prompting/llms/model_zoo.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from loguru import logger
from pydantic import BaseModel, ConfigDict

from shared.settings import shared_settings
from shared import settings


class ModelConfig(BaseModel):
Expand All @@ -20,7 +20,11 @@ def __hash__(self):
class ModelZoo:
# Currently, we are only using one single model - the one the validator is running
models_configs: ClassVar[list[ModelConfig]] = [
ModelConfig(llm_model_id=shared_settings.LLM_MODEL, reward=1, min_ram=shared_settings.MAX_ALLOWED_VRAM_GB),
ModelConfig(
llm_model_id=settings.shared_settings.LLM_MODEL,
reward=1,
min_ram=settings.shared_settings.MAX_ALLOWED_VRAM_GB,
),
]

# Code below can be uncommended for testing purposes and demonstrates how we rotate multiple LLMs in the future
Expand Down
4 changes: 3 additions & 1 deletion prompting/miner_availability/miner_availability.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
from prompting.llms.model_zoo import ModelZoo
from prompting.tasks.base_task import BaseTask
from prompting.tasks.task_registry import TaskRegistry
from shared import settings
from shared.epistula import query_availabilities
from shared.loop_runner import AsyncLoopRunner
from shared.settings import shared_settings
from shared.uids import get_uids

shared_settings = settings.shared_settings

task_config: dict[str, bool] = {str(task_config.task.__name__): True for task_config in TaskRegistry.task_configs}
model_config: dict[str, bool] = {conf.llm_model_id: False for conf in ModelZoo.models_configs}

Expand Down
3 changes: 2 additions & 1 deletion prompting/rewards/exact_match.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
from loguru import logger

from prompting.rewards.reward import BaseRewardModel, BatchRewardOutput
from shared import settings
from shared.dendrite import DendriteResponseEvent
from shared.settings import shared_settings

shared_settings = settings.shared_settings
INCORRECT_PENALTY = 3
INCOMPLETE_PENALTY = 1

Expand Down
Loading