<a href="https://colab.research.google.com/github/yuchenhe-xai/yccolab/blob/main/241113_scoring_data_distribution.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Scores for data learnability
As discussed in [doc](https://docs.google.com/document/d/1msVlIjZwDUNpwalNuJ60P-9ieSINEVsC5ItyIBPUHPk/edit?tab=t.0#heading=h.y24asf17gdl1) - we want to find the data distributions that worth labeling and RL against. Besides matching the distribution based on clustering and categories, here we are defining a score approximating the learnability. Our assumption is that examples with higher score is more learnable - aka policy would differs before and after training - model could learn from it.

Here are 3 different options, note ${a}$ is the LLM token output, $p = p_\text{target}(a|s)$ is the target policy after RL, $q = p_\text{ref}(a|s)$ is the original SFT policy.
- importance weight (similar as in ppo's off-policy importance weight) : $s = p/q$
- cross entropy: $s = - p \log q$
- kl divergence: $s = p \log (p/q)$

Here we wrap this into a reward client to be used downstream.

In [1]:
#@title imports

import aiohttp
import asyncio
import numpy as np
import pandas as pd
import glob
from enum import Enum
from tqdm import tqdm
from xlm.config import Config
from xlm.config import configclass
from xlm.posttrain import utils
from xlm.posttrain.data import formatting
from xlm.sampling_client import SamplingClient
from xlm.tokenizers.constants import get_tokenizer, Tokenizer


In [2]:
#@title define scorer
DEFAULT_TOKENIZER = get_tokenizer("v4")
SFT_MODEL = "v5l-1010-sft-rubrics-sglang.yuchen.svc.max.x.ai"
POLICY_MODEL = "v5l-inf-mb-sglang.yuchen.svc.max.x.ai"

class ScoreType(Enum):
  P = 0
  CE = 1
  KL = 2

@configclass
class RelativeScorer(Config):
  tokenizer: Tokenizer = DEFAULT_TOKENIZER
  timeout: int = 100
  model_addr: str = POLICY_MODEL
  model_addr_ref: str = SFT_MODEL
  score_type : ScoreType = ScoreType.KL
  max_parallel: int = 1024
  semaphore = asyncio.Semaphore(value=max_parallel)

  async def get_logprob(self, prompt: str, response: str, model_addr: str = None) -> float:
      """Compute the log probabilities for response"""
      prompt_tokens = self.tokenizer.tokenize_no_eos(prompt)
      suffix_prob = np.inf
      async with self.semaphore:
        async with aiohttp.ClientSession(
            timeout=aiohttp.ClientTimeout(total=self.timeout)
        ) as session:
          logp_response = await session.post(
              f"http://{model_addr}:30000/generate",
              json={
                  "text": prompt + response,
                  "sampling_params": {"temperature": 0, "max_new_tokens": 0},
                  "return_logprob": True,
                  "logprob_start_len": len(prompt_tokens) - 1,
              },
          )
          response_json = await logp_response.json()
          suffix_logprobs = response_json["meta_info"]["input_token_logprobs"]
          logprobs = [ri[0] for ri in suffix_logprobs]
          suffix_prob = np.exp(np.mean(logprobs)).item()
      return suffix_prob

  async def get_scores(self, prompt: str, response: str):
      target_score = await self.get_logprob(prompt, response, model_addr=self.model_addr)
      ref_score = await self.get_logprob(prompt, response, model_addr=self.model_addr_ref)
      gap_score = None
      match self.score_type :
        case ScoreType.P:
          gap_score = np.exp(target_score - ref_score)
        case ScoreType.CE:
          gap_score =  - target_score * np.exp(ref_score)
        case ScoreType.KL:
          gap_score = np.exp(target_score) * (target_score - ref_score)
      scores = {"score_type": self.score_type.name, "gap_score": gap_score, "target_score": target_score, "ref_score": ref_score}
      # we cares larger gap - more negative the more learning
      return scores

scorer = RelativeScorer()
await scorer.get_scores(prompt="asdfs", response="dx")

{'score_type': 'KL',
 'gap_score': 0.0001923118433048286,
 'target_score': 0.00025645714958583686,
 'ref_score': 6.419461970451348e-05}

In [3]:
#@title a new reward client that does scoring
from xlm.reward_client import RewardClient
from dataclasses import field

@configclass
class RelativeRewardClient(RewardClient):
    model_addr: str = POLICY_MODEL
    model_addr_ref: str = SFT_MODEL
    score_type: ScoreType = ScoreType.KL
    scorer: RelativeScorer = None

    def __post_init__(self):

        # if response is not provided or we want to use a specific model response, we set up the sampling client for it
        if self.address.endswith("/"):
          self.address = self.address[:-1]
        self.sampling_client = SamplingClient()
        self.model = self.address.split("/")[0]
        self.sampling_client._grok_client.register_model(
           model=self.model, model_address=self.address, max_parallel=self.max_parallel
        )
        self.semaphore = asyncio.Semaphore(value=self.max_parallel)
        if self.scorer is None:
          self.scorer = RelativeScorer(
              model_addr=self.model_addr, model_addr_ref=self.model_addr_ref, score_type=self.score_type
          )


    async def _generate(
        self, messages: list[dict[str, str]] = None, example: dict[str, Any] = None
    ) -> dict[str, Any]:
        try:
          async with self.semaphore:
            prompt = example.get("prompt", None)
            response = example.get("response", None)
            if messages is not None:
              if messages[-1]["role"] in ["ASSIS", "assistant"]:
                  prompt = formatting.render_conversation(
                      name="grok", messages=messages[:-1]
                  )
                  response = messages[-1]["content"]
              else:
                  prompt = formatting.render_conversation(
                      name="grok", messages=messages
                  )
            if response is None:
              response = await self.sampling_client.generate(prompt=prompt)
            scores = await self.scorer.get_scores(prompt=prompt, response=response)
            example["scores"] = scores
            return scores
        except Exception as e:
            print(e)
            traceback.print_exc()
            return None



In [4]:
prompt = """Human: You will classify what genre each book is based on its summary.  When answering give both the genre name and a short (1-2 sentences) justification for why it is that genre.  If a book could be classified as multiple genres pick the one that fits the best. Here are the genres:

[Fantasy]: A genre of imaginative fiction involving magic and adventure, especially in a setting other than the real world
[Science Fiction]: A form of fiction that deals principally with the impact of actual or imagined science upon society or individuals.
[Dystopian]: A popular genre of science fiction, dystopian novels offer a bleak and frightening vision of the future.
[Action & Adventure]: The protagonist has a very important goal to achieve, but they’re going to have to go through the wringer first! The hero experiences obstacle after obstacle and goes through downright dangerous situations but eventually, they triumph and return home transformed.
[Mystery]: Also called detective fiction, this book genre is characterized by a gripping plot that revolves around a mystery.
 [Historical Fiction]: This book genre encompasses fictional stories in a historical setting, carefully balancing creativity and facts. In most cases, the characters and events are imagined by the author and enriched with historically accurate details from a specific time period.
[Romance]:  The key thing to remember is that the romantic relationship must be the center point of the plot. (Other giveaways include a “happily ever after” ending and the warm fuzzies.)
[Contemporary Fiction]: This book genre is occasionally lumped in with others to indicate that the book takes place in the present day. But in its simplest form, contemporary fiction is better understood as the absence of a genre. Your book doesn’t need tropes and trappings, monsters and mysteries, when its tension, drama, and conflict lies in the quirks and quandaries of your protagonist’s everyday life: work, politics, relationships, and the struggles of the modern era.
[Literary Fiction]: Like contemporary fiction, books considered literary fiction can’t be neatly filed under any other genre. What distinguishes this genre from contemporary fiction is that works of literary fiction are thought to have considerable artistic value. If your prose is meant to engage the reader in thought, if your narrative is character-driven and introspective, and if you provide personal or social commentary on a “serious” theme, then chances are you’re writing lit-fic

Your output should consist of the name of the genre in bold followed by the 1-2 sentence justification for why the book is considered to be that genre.

Classify this book based on the summary:

Crown of Secrets by Melanie Cellier
Verene is a disappointment to her entire kingdom--the first royal ever born without power, despite her mother being the most powerful mage in history. So when she's sent to the Academy in neighboring Kallorway to forge ties with her people's traditional enemies, she's determined to succeed and prove she can still be of value to her kingdom.

Prince Darius of Kallorway is the strongest mage in his family--and the only reason his weak father is still clinging to his throne. Starting at the Academy at the same time as Verene, the crown prince is cold and distant and shows no desire to connect with her. Instead he seems suspicious of both her presence and her claimed lack of power.

Surrounded by unfamiliar politics and long-held enemies, Verene discovers that some at the Academy want her gone by whatever means necessary. As the threats grow ever more sinister, she starts to question all of her assumptions. The hardened prince might just be her best hope of survival and--even more shockingly--he might be right about her power. If Verene wants to survive Kallorway and the Academy, she must uncover her hidden powers and take her true place among the mages.<|separator|>

Assistant:""".strip()
response = "This book is classified as fantasy because it involves a world with magic and mages, which is a classic element of fantasy settings, and the plot revolves around magical powers and political intrigue in an imagined kingdom."
example = {"prompt": prompt, "response": response}

rr = RelativeRewardClient(address="v5l-1101-crm.yuchen.svc.max.x.ai")
await rr._generate(example=example)

[2m[2024-11-14 19:21:24,078 E] [2;36m[colabbox-0:1618030] sampling_client:1010:[0m bedrock setup failed: The config profile (key0) could not be found


{'score_type': 'KL',
 'gap_score': 0.007686059452784876,
 'target_score': 0.16672462110759176,
 'ref_score': 0.1602188892913625}

In [5]:
#@title example of wrapping the scoring into a data processing pipeline


# for new policy answer - if we dont have a model generated response yet
DEFAULT_MODEL = "v5l-1101-crm.yuchen.svc.max.x.ai"

sampling_client = SamplingClient()
sampling_client._grok_client.register_model(
    model=DEFAULT_MODEL, model_address=DEFAULT_MODEL
)
scorer = RelativeScorer()

async def process_convo(
    input_path, output_path, sampling_client = None, semaphore = None, pbar=None, scorer: RelativeScorer = scorer
):
    df1 = utils.read_df(glob.glob(input_path))
    row_pbar = tqdm(total=len(df1), desc=f"Processing {input_path}", leave=False)
    async def process_row(example):
        example = example.to_dict() if not isinstance(example, dict) else example
        reward = None
        conv = example.get('conversation0')[:-1]
        prompt = formatting.render_conversation(name="grok", messages=conv) if conv is not None else None
        if sampling_client:
          response = sampling_client.generate(prompt=prompt)
        else:
          model_id_0 = example.get("model_0_id")
          model_id_1 =  example.get("model_1_id")
          if "grok" in model_id_0 and "grok" in model_id_1:
            chosen_model = 0 if example.get("model_0_id") > example.get("model_1_id") else 1
          elif "grok" in model_id_0:
            chosen_model = 0
          elif "grok" in model_id_1:
            chosen_model = 1
          else:
            chosen_model = 0
          response = example.get(f'conversation{chosen_model}', [])[-1]["content"]
        scores = await scorer.get_scores(prompt, response)
        example.update({"entropy_scores": scores})
        row_pbar.update(1)
        return example
    # Process in batches
    batch_size = 10_000  # len(df1) #  1000  # Adjust this based on your system's capacity
    mapped_data = []
    for start in range(0, len(df1), batch_size):
        batch = df1.iloc[start : start + batch_size]
        batch_results = await asyncio.gather(
            *(process_row(example) for _, example in batch.iterrows())
        )
        mapped_data.extend(batch_results)

    df2 = pd.DataFrame(mapped_data)
    def getentropygap(row):
        return row['entropy_scores']['gap_scores']

    # Apply the function to create a new column or use directly for sorting
    # Here, we create a temporary column for clarity, but you can also sort directly
    df2['temp_entropy_gap'] = df2.apply(getentropygap, axis=1)

    # Now sort the DataFrame by the temporary column
    df2_sorted = df2.sort_values(by='temp_entropy_gap', ascending=True)

    utils.df_to_parquet(df2_sorted, output_path)
    return df2_sorted



[2m[2024-11-14 19:21:27,245 E] [2;36m[colabbox-0:1618030] sampling_client:1010:[0m bedrock setup failed: The config profile (key0) could not be found


In [6]:
input_folder = "/data/datasets/preferences-v2/filtered/xrenewNONE/train/surge_api_1/data/*.parquet"
output_path = "/data/datasets/preferences-v2/filtered/xrenewNONEentropy/surge_api_1/"
newdata = await process_convo(input_folder, output_path)


reading paths: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 4/4 [00:00<00:00, 26.11it/s]
Processing /data/datasets/preferences-v2/filtered/xrenewNONE/train/surge_api_1/data/*.parquet:  99%|████████████████████████████▋| 755/762 [01:02<00:00, 21.77it/s]

KeyError: 'gap_scores'