In [2]:
import requests

response = requests.get("http://localhost:8094/health", timeout=10)
print(response)

<Response [200]>


In [None]:
# Initialise the settings
from prompting import settings
settings.settings = settings.Settings(mode="validator")
settings = settings.settings 

## LLM Pipeline

In [None]:
# Initialise the LLM we use on the validator
from prompting.llms.vllm_llm import vLLMPipeline
pipeline = vLLMPipeline(llm_model_id="casperhansen/llama-3-8b-instruct-awq", llm_max_allowed_memory_in_gb=20, device="CUDA", quantization=False, llm_max_model_len=2048)

In [None]:
# as you can see, "pipeline" is an object that simply wraps around the LLM and is callable
pipeline("What's 1+2?").split("\n")[0]

## Dataset

Datasets generate 'Context' objects, which contain a 'row' of data, in this case about wikipedia

In [None]:
from prompting.datasets.wiki import WikiDataset
dataset = WikiDataset()
context = dataset.random()
context

## Tasks

Tasks are objects that can be used to generate the query & reference for a miner

### Initialise with past data

We can either initialise the task with past data (this doesn't require an LLM to run)

In [None]:
from prompting.tasks.summarization import SummarizationTask, SummarizationRewardConfig
SummarizationTask.generate_query_reference(llm_pipeline=pipeline, context=context)

## Miner Responses

Now let's say we have a few miners giving us responses

In [6]:
import numpy as np
from prompting.base.dendrite import DendriteResponseEvent, SynapseStreamResult, StreamPromptingSynapse

miner_response_1 = SynapseStreamResult(synapse=StreamPromptingSynapse(completion="4", roles=["user"], messages=["What's 1+2?"]))
miner_response_2 = SynapseStreamResult(synapse=StreamPromptingSynapse(completion="3", roles=["assistant"], messages=["What's 1+2?"]))


# the synapses from all miners get collected into the DenriteResponseEvent
dendrite_response = DendriteResponseEvent(stream_results=[miner_response_1, miner_response_2], uids=np.array([1, 2]), timeout=10)

# Scoring

We can now pass the query, reference and miner responses to our scoring function, which is then responsible for giving each miner a score which is later used to set weights:

In [None]:
from prompting.tasks.summarization import SummarizationRewardConfig

reward_events, penality_events, rewards = SummarizationRewardConfig.apply(challenge="What's 1+2?", reference="1+2 is equal to 3", response_event=dendrite_response)
rewards

# Other tests/examples on different tasks

In [8]:
from prompting.tasks.qa import QuestionAnsweringTask, QARewardConfig
qa = QuestionAnsweringTask(context=context.model_dump(), llm_pipeline=pipeline, reward_config=SummarizationRewardConfig())

In [9]:
# Used to obtain the query (which is a question about the context)
QUERY_PROMPT_TEMPLATE = """\
Ask a specific question about the following context:

#Context:
{context}

You must ask a question that can be answered by the context.
"""

In [None]:
query_prompt = QUERY_PROMPT_TEMPLATE.format(context=context.content)
query = qa.generate_query(llm_pipeline=pipeline, message=query_prompt)
query


In [11]:
# Used to obtain reference answer
REFERENCE_PROMPT_TEMPLATE = """\
Answer the question you will receive in detail, utilizing the following context.

#Context:
{context}

# Question:
{question}
"""

In [None]:
reference_prompt = REFERENCE_PROMPT_TEMPLATE.format(context=context.content, question=query)
reference = qa.generate_reference(llm_pipeline=pipeline, messages=[reference_prompt])
reference

In [10]:
import asyncio
from abc import ABC, abstractmethod
from loguru import logger
from pydantic import BaseModel, model_validator
from datetime import timedelta
import datetime
import aiohttp


class AsyncLoopRunner(BaseModel, ABC):
    interval: int = 10  # interval to run the main function in seconds
    running: bool = False
    sync: bool = False  # New parameter to enable/disable synchronization
    time_server_url: str = "http://worldtimeapi.org/api/ip"
    name: str | None = None
    step: int = 0

    @model_validator(mode="after")
    def validate_name(self):
        if self.name is None:
            self.name = self.__class__.__name__
        return self

    @abstractmethod
    async def run_step(self):
        """Implement this method with the logic that needs to run periodically."""
        raise NotImplementedError("run_step method must be implemented")

    async def get_time(self):
        """Get the current time from the time server with a timeout."""
        if not self.sync:
            time = datetime.datetime.now(datetime.timezone.utc)
            logger.debug(f"Time: {time}")
            return time
        try:
            async with aiohttp.ClientSession() as session:
                logger.info("Waiting for response time")
                async with session.get(self.time_server_url, timeout=5) as response:
                    if response.status == 200:
                        data = await response.json()
                        logger.info("Got response")
                        return datetime.datetime.fromisoformat(data["datetime"].replace("Z", "+00:00"))
                    else:
                        raise Exception(f"Failed to get server time. Status: {response.status}")
        except Exception as ex:
            logger.warning(f"Could not get time from server: {ex}. Falling back to local time.")
            return datetime.datetime.now(datetime.timezone.utc)

    def next_sync_point(self, current_time):
        """Calculate the next sync point based on the current time and interval."""
        epoch = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc)
        time_since_epoch = current_time - epoch
        seconds_since_epoch = time_since_epoch.total_seconds()
        next_interval = (seconds_since_epoch // self.interval + 1) * self.interval
        return epoch + timedelta(seconds=next_interval)

    async def wait_for_next_execution(self, last_run_time):
        """Wait until the next execution time, either synced or based on last run."""
        current_time = await self.get_time()
        logger.debug("Current time")
        if self.sync:
            next_run = self.next_sync_point(current_time)
        else:
            next_run = last_run_time + timedelta(seconds=self.interval)
        logger.debug(f"Next run: {next_run}")

        wait_time = (next_run - current_time).total_seconds()
        if wait_time > 0:
            logger.debug(
                f"{self.name}: Waiting for {wait_time:.2f} seconds until next {'sync point' if self.sync else 'execution'}"
            )
            await asyncio.sleep(wait_time)
        return next_run

    async def run_loop(self):
        """Run the loop periodically, optionally synchronizing across all instances."""
        logger.debug(f"Starting loop {self.__class__.__name__}; running: {self.running}")

        last_run_time = await self.get_time()
        logger.debug(f"Got time of last run: {last_run_time}")
        try:
            while self.running:
                logger.debug("Waiting...")
                next_run = await self.wait_for_next_execution(last_run_time)
                logger.debug("Wait ended")
                try:
                    await self.run_step()
                except Exception as ex:
                    logger.exception(f"Error in loop iteration: {ex}")
                self.step += 1
                logger.debug(f"{self.name}: Step {self.step} completed at {next_run}")
                last_run_time = next_run
        except asyncio.CancelledError:
            logger.info("Loop was stopped.")
        except Exception as e:
            logger.error(f"Fatal error in loop: {e}")
        finally:
            self.running = False
            logger.info("Loop has been cleaned up.")
        logger.debug("Exiting run_loop")

    async def start(self):
        """Start the loop."""
        if self.running:
            logger.warning("Loop is already running.")
            return
        self.running = True
        logger.debug(f"{self.name}: Starting loop with {'synchronized' if self.sync else 'non-synchronized'} mode")
        self._task = asyncio.create_task(self.run_loop())

    async def stop(self):
        """Stop the loop."""
        self.running = False
        if self._task:
            self._task.cancel()
            try:
                await self._task
            except asyncio.CancelledError:
                logger.info("Loop task was cancelled.")


# ---------------------------------------------------------------------------------------
import numpy as np
responses = []

# simulates getting scored responses from miners
class ResponseGatherer(AsyncLoopRunner):
    interval: int = 2

    async def run_step(self):
        miner_uids = np.random.randint(0, 1024)
        miner_score = np.random.random()
        responses.append({miner_uids: miner_score})
        logger.info("Added reward")

# simulates settings weights
class WeightSetter(AsyncLoopRunner):
    interval: int = 5
    
    async def run_step(self):
        logger.info("Setting weights")
        all_uids: dict[int, float] = {}
        for response in responses:
            miner_uid, miner_score = list(response.items())[0]
            all_uids[miner_uid] = miner_score
        logger.info(f"All uids: {all_uids}")
        
response_gatherer = ResponseGatherer()
weight_setter = WeightSetter()

# start both loops
asyncio.create_task(response_gatherer.start())
asyncio.create_task(weight_setter.start())
# run for 20 seconds
await asyncio.sleep(20)
# stop both loops
await response_gatherer.stop()
await weight_setter.stop()



[32m2024-10-30 11:51:45.772[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36mstart[0m:[36m108[0m - [34m[1mResponseGatherer: Starting loop with non-synchronized mode[0m
[32m2024-10-30 11:51:45.774[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36mstart[0m:[36m108[0m - [34m[1mWeightSetter: Starting loop with non-synchronized mode[0m
[32m2024-10-30 11:51:45.776[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36mrun_loop[0m:[36m77[0m - [34m[1mStarting loop ResponseGatherer; running: True[0m
[32m2024-10-30 11:51:45.778[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36mget_time[0m:[36m33[0m - [34m[1mTime: 2024-10-30 11:51:45.778430+00:00[0m
[32m2024-10-30 11:51:45.780[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36mrun_loop[0m:[36m80[0m - [34m[1mGot time of last run: 2024-10-30 11:51:45.778430+00:00[0m
[32m2024-10-30 11:51:45.782[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36mrun_loop[0m:[36m83[0m - [34m[1mWaiting...[0m
[32m20

[32m2024-10-30 11:51:47.788[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36mrun_loop[0m:[36m85[0m - [34m[1mWait ended[0m
[32m2024-10-30 11:51:47.790[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun_step[0m:[36m133[0m - [1mAdded reward[0m
[32m2024-10-30 11:51:47.792[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36mrun_loop[0m:[36m91[0m - [34m[1mResponseGatherer: Step 1 completed at 2024-10-30 11:51:47.778430+00:00[0m
[32m2024-10-30 11:51:47.794[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36mrun_loop[0m:[36m83[0m - [34m[1mWaiting...[0m
[32m2024-10-30 11:51:47.797[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36mget_time[0m:[36m33[0m - [34m[1mTime: 2024-10-30 11:51:47.797453+00:00[0m
[32m2024-10-30 11:51:47.799[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36mwait_for_next_execution[0m:[36m60[0m - [34m[1mCurrent time[0m
[32m2024-10-30 11:51:47.800[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36mwait_for_next_executio