Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,4 @@ wandb
**/api_keys.json
weights.csv
past_websites.csv
timer_logs*
2 changes: 1 addition & 1 deletion neurons/miners/epistula_miner/miner.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ def run(self):
async def run_inference(self, request: Request) -> str:
data = await request.json()
try:
response = self.llm.generate(
response = await self.llm.generate(
data.get("messages"), sampling_params=data.get("sampling_parameters"), seed=data.get("seed")
)
return response
Expand Down
137 changes: 103 additions & 34 deletions neurons/validator.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import asyncio
import multiprocessing as mp
import sys

import loguru
import netaddr
import requests
import torch

# import multiprocessing as mp
import torch.multiprocessing as mp
import wandb
from bittensor.core.extrinsics.serving import serve_extrinsic

Expand All @@ -29,40 +31,29 @@
NEURON_SAMPLE_SIZE = 100 # TODO: Should add this to constants.py


def create_loop_process(task_queue, scoring_queue, reward_events):
def create_loop_process(task_queue, scoring_queue, reward_events, miners_dict):
settings.shared_settings = settings.SharedSettings.load(mode="validator")
if settings.shared_settings.WANDB_ON:
init_wandb(neuron="validator")

async def spawn_loops(task_queue, scoring_queue, reward_events):
async def spawn_loops(task_queue, scoring_queue, reward_events, miners_dict):
# ruff: noqa: E402
from prompting.llms.model_manager import model_scheduler
from prompting.miner_availability.miner_availability import availability_checking_loop

# from prompting.miner_availability.miner_availability import availability_checking_loop
from prompting.tasks.task_creation import task_loop
from prompting.tasks.task_sending import task_sender
from prompting.weight_setting.weight_setter import weight_setter
from shared.profiling import profiler

logger.info("Starting Profiler...")
asyncio.create_task(profiler.print_stats(), name="Profiler"),

# -------- Duplicate of create_task_loop ----------
logger.info("Starting AvailabilityCheckingLoop...")
asyncio.create_task(availability_checking_loop.start())

logger.info("Starting TaskSender...")
asyncio.create_task(task_sender.start(task_queue, scoring_queue))

logger.info("Starting TaskLoop...")
asyncio.create_task(task_loop.start(task_queue, scoring_queue))
# -------------------------------------------------
asyncio.create_task(task_loop.start(task_queue, scoring_queue, miners_dict, simultaneous_loops=4))

logger.info("Starting ModelScheduler...")
asyncio.create_task(model_scheduler.start(scoring_queue), name="ModelScheduler"),
logger.info("Starting TaskScorer...")
asyncio.create_task(task_scorer.start(scoring_queue, reward_events), name="TaskScorer"),
logger.info("Starting WeightSetter...")
asyncio.create_task(weight_setter.start(reward_events))
asyncio.create_task(task_scorer.start(scoring_queue, reward_events, simultaneous_loops=4), name="TaskScorer"),

while True:
await asyncio.sleep(5)
Expand All @@ -73,9 +64,9 @@ async def spawn_loops(task_queue, scoring_queue, reward_events):
logger.debug(f"Number of tasks in Reward Events: {len(reward_events)}")

try:
asyncio.run(spawn_loops(task_queue, scoring_queue, reward_events))
asyncio.run(spawn_loops(task_queue, scoring_queue, reward_events, miners_dict))
except Exception as e:
logger.info(f"Terminating loop process: {e}")
logger.exception(f"Terminating loop process: {e}")
finally:
logger.info("Cleaning up resources...")

Expand All @@ -85,16 +76,10 @@ async def spawn_loops(task_queue, scoring_queue, reward_events):
logger.info("WandB run finished.")


def start_api(scoring_queue, reward_events):
def start_api(scoring_queue, reward_events, miners_dict):
async def start():
from prompting.api.api import start_scoring_api # noqa: F401

# TODO: We should not use 2 availability loops for each process, in reality
# we should only be sharing the miner availability data between processes.
from prompting.miner_availability.miner_availability import availability_checking_loop

asyncio.create_task(availability_checking_loop.start())

try:
external_ip = requests.get("https://checkip.amazonaws.com").text.strip()
netaddr.IPAddress(external_ip)
Expand All @@ -111,37 +96,121 @@ async def start():
logger.debug(f"Serve success: {serve_success}")
except Exception as e:
logger.warning(f"Failed to serve scoring api to chain: {e}")
await start_scoring_api(task_scorer, scoring_queue, reward_events)
await start_scoring_api(task_scorer, scoring_queue, reward_events, miners_dict)

while True:
await asyncio.sleep(10)

asyncio.run(start())


def start_task_sending_loop(task_queue, scoring_queue, miners_dict: dict):
async def spawn_loops(task_queue, scoring_queue, miners_dict: dict):
from prompting.tasks.task_sending import task_sender

logger.info("Starting task sending loop in validator2...")
asyncio.create_task(task_sender.start(task_queue, scoring_queue, miners_dict, simultaneous_loops=10))
while True:
await asyncio.sleep(5)
logger.debug("Task sending loop is running")

try:
logger.info("Starting task sending loop in validator...")
asyncio.run(spawn_loops(task_queue, scoring_queue, miners_dict))

except Exception as e:
logger.exception(f"Task sending loop error: {e}")
raise


def start_availability_checking_loop(miners_dict: dict):
async def spawn_loops(miners_dict: dict):
from prompting.miner_availability.miner_availability import availability_checking_loop

logger.info("Starting availability checking loop in validator2...")
asyncio.create_task(availability_checking_loop.start(miners_dict))
while True:
await asyncio.sleep(5)
logger.debug("Availability checking loop is running")

try:
logger.info("Starting availability checking loop in validator...")
asyncio.run(spawn_loops(miners_dict))

except Exception as e:
logger.exception(f"Availability checking loop error: {e}")
raise


def start_weight_setter_loop(reward_events):
async def spawn_loops(reward_events):
from prompting.weight_setting.weight_setter import weight_setter

logger.info("Starting weight setter loop in validator2...")
asyncio.create_task(weight_setter.start(reward_events))
while True:
await asyncio.sleep(5)
logger.debug("Weight setter loop is running")

try:
logger.info("Starting weight setter loop in validator...")
asyncio.run(spawn_loops(reward_events))

except Exception as e:
logger.exception(f"Weight setter loop error: {e}")
raise


async def main():
# will start checking the availability of miners at regular intervals, needed for API and Validator
with torch.multiprocessing.Manager() as manager:
reward_events = manager.list()
scoring_queue = manager.list()
task_queue = manager.list()

# Create process pool for managed processes
miners_dict = manager.dict()
processes = []

try:
# # Start checking the availability of miners at regular intervals
# Start checking the availability of miners at regular intervals
if settings.shared_settings.DEPLOY_SCORING_API:
# Use multiprocessing to bypass API blocking issue
api_process = mp.Process(target=start_api, args=(scoring_queue, reward_events), name="API_Process")
api_process = mp.Process(
target=start_api, args=(scoring_queue, reward_events, miners_dict), name="API_Process"
)
api_process.start()
processes.append(api_process)

loop_process = mp.Process(
target=create_loop_process, args=(task_queue, scoring_queue, reward_events), name="LoopProcess"
availability_process = mp.Process(
target=start_availability_checking_loop,
args=(miners_dict,),
name="AvailabilityProcess",
)
availability_process.start()
processes.append(availability_process)

loop_process = mp.Process(
target=create_loop_process,
args=(task_queue, scoring_queue, reward_events, miners_dict),
name="LoopProcess",
)
loop_process.start()

task_sending_process = mp.Process(
target=start_task_sending_loop,
args=(task_queue, scoring_queue, miners_dict),
name="TaskSendingProcess",
)
task_sending_process.start()
processes.append(task_sending_process)

weight_setter_process = mp.Process(
target=start_weight_setter_loop,
args=(reward_events,),
name="WeightSetterProcess",
)
weight_setter_process.start()
processes.append(weight_setter_process)

processes.append(loop_process)
GPUInfo.log_gpu_info()

Expand Down
Loading