In [None]:
#@title Install dependancies such as `dataquality`

# Upgrade pip
!pip install -U pip &> /dev/null

# Install all dependecies
!pip install tokenizers cohere
!pip install 'dataquality[cuda]' --extra-index-url=https://pypi.nvidia.com/

print('👋 Installed necessary libraries.')

In [None]:
#@title Imports

from typing import Any, Dict, List, Optional
import os
from collections import defaultdict
import pandas as pd
import torch
import asyncio
from dataclasses import dataclass

from datasets import Dataset
from tokenizers import Tokenizer

import cohere
from cohere import AsyncClient

import dataquality as dq
from dataquality.integrations.seq2seq.core import watch

from google.colab import output
output.enable_custom_widget_manager()

## Loading and Expanding Chat Data

Below we include helper functions for loading your Chat data and "expanding" out the conversational turns into individual chat completion samples. The "expanded" dataset has one line per `User` + `Chatbot` pair, stored in columns `input` and `target`.

**Note** to handle chat history, we include historical turns in the `input` column.

In [None]:
#@title Formatting Helpers

def format_sample(
    sample: Dict[str, Any],
    user_role: str = "User",
    chatbot_role: str = "Chatbot",
    idx: Optional[int] = None
) -> Dict[str, Any]:
    """Formats a chat dataset for seq2seq

    Takes in a sample with "turns" column and explodes it to have one row
    per turn.

    Example:
        >>> sample = {
        ...     "turns": [
        ...         {"role": "User", "content": "Hello"},
        ...         {"role": "Chatbot", "content": "Hi"},
        ...         {"role": "User", "content": "How are you?"},
        ...         {"role": "Chatbot", "content": "I'm good, how are you?"},
        ...     ],
        ...     "metadata": {"unique_id": 1234, "dataset": "test"},
        ...     "score": 0.5,
        ... }
        >>> ChatFormatter().format_sample(sample, 5)
        {
            "chat_id": [5, 5],
            "turn_id": [1, 2],
            "input": ["Hello", "How are you?"],
            "target": ["Hi", "I'm good, how are you?"],
            "unique_id": [1234, 1234],
            "dataset": ["test", "test"],
        }
    """
    unraveled_turns: Dict[str, Any] = defaultdict(list)
    valid_meta_types = (str, int, float, bool)
    turns: List[Dict[str, Any]] = sample["turns"]

    # # Add metadata and sample level cols to each turn
    metadata: Dict[str, Any] = sample.get("metadata", {})
    for k, v in sample.items():
        if k not in ["metadata", "turns", "id"]:
            metadata[k] = v

    chat_history = ""
    turn_data: Dict[str, Any] = {"chat_id": None, "turn_id": None}
    turn_id = 1
    turn_default_cols = ["role", "content"]
    for turn in turns:
        role = turn["role"]
        content = turn["content"]

        # Add metadata to each turn
        turn_meta = {
            f"{role}_{col}": turn[col]
            for col in turn.keys()
            if col not in turn_default_cols
            and isinstance(turn[col], valid_meta_types)
        }
        turn_data.update(turn_meta)

        if role == user_role:
            # Add in the history
            new_line = "\n"
            if chat_history == "":
                new_line = ""
            turn_data["input"] = f"""{chat_history}{new_line}{user_role}: {content}"""
        elif role == chatbot_role:
            turn_data["target"] = content
            turn_data["turn_id"] = turn_id
            turn_data["chat_id"] = idx

            turn_data.update(metadata)
            for k, v in turn_data.items():
                unraveled_turns[k].append(v)

            # Update chat history - Note that the formatted input contains the history!
            chat_history = f"""{turn_data['input']}\{chatbot_role}: {content}"""

            # Reset turn data
            turn_data = {}
            turn_id += 1
        else:
            raise ValueError(f"Role {role} not recognized")

    return unraveled_turns

@dataclass
class BatchData:
    batch: Dict[str, Any]

    def sample_from_idx(self, batch_idx: int) -> Dict[str, Any]:
        """Gets a subset of the batch"""
        sample = {}
        for k, v in self.batch.items():
            sample[k] = v[batch_idx]
        return sample

def expand_batch(batch: Dict[str, List], idxs: List[int]) -> Dict[str, List]:
    """Formats a batch of chat data for seq2seq"""
    result: Dict[str, List] = defaultdict(list)
    batch_data = BatchData(batch)
    batch_sz = len(idxs)
    for idx in idxs:
        batch_idx = idx % batch_sz
        formatted_sample = format_sample(
            batch_data.sample_from_idx(batch_idx),
            idx=idx,
        )
        # formatted_sample returns one or more samples per idx, we add to result
        for k, v in formatted_sample.items():
            result[k] += v

    return result

In [None]:
train_ds = pd.read_json('chat.jsonl', lines=True)
train_ds = Dataset.from_pandas(train_ds)
train_ds_expanded = train_ds.map(expand_batch, batched=True, remove_columns=train_ds.column_names, with_indices=True)
train_ds_expanded

# TODO: Add if available
# val_ds = pd.read_json('...', lines=True)
# val_ds = Dataset.from_pandas(val_ds)
# val_ds_expanded = val_ds.map(expand_batch, batched=True, remove_columns=val_ds.column_names, with_indices=True)

# test_ds = pd.read_json('...', lines=True)
# test_ds = Dataset.from_pandas(test_ds)
# test_ds_expanded = test_ds.map(expand_batch, batched=True, remove_columns=test_ds.column_names, with_indices=True)

## Formatting Chat Data Samples

Next we format each Chat Sample based on the *Input* and *Target* columns generated from above. We call this the **formatted_prompt** and ensure that it follows the expected input format for your generation API

This format is as follows:
```
User: q_1
Chatbot: a_1
...
User: q_n
Chatbot:<EOP_TOKEN> a_n
```
Essentially we combine the `input` and `target` columns using the following rule:
```
{input}\nChatbot:<EOP_TOKEN> {target}
```

**Note**: One important variable that you will see in the `Prompt Formatting Function` section below is `response_template`. This provides Galileo the `seperator` between the model input and desired completion, so during processing we can just focus on the model's completion!

In [None]:
#@title Prompt Formatting Function

response_template = "Chatbot:<EOP_TOKEN>"
def create_formatted_prompt_chat(
    row: Dict,
    idx: int
) -> Dict:
    # Assuming completion data
    formatted_prompt = f"""{row['input']}\nChatbot:<EOP_TOKEN> {row['target']}"""
    return {"formatted_prompt": formatted_prompt, "id": idx}

In [None]:
train_ds_formatted = train_ds_expanded.map(create_formatted_prompt_chat, with_indices=True)
train_ds_formatted
# NOTE: If including val / test data, uncomment the below lines
# val_ds_formatted = val_ds_expanded.map(create_formatted_prompt_chat, with_indices=True)
# test_ds_formatted = test_ds_expanded.map(create_formatted_prompt_chat, with_indices=True)

## API Helper Functions

Below are a number of helper functions for calling the Cohere async APIs (using raw_prompting) and processing the returned output. The primary thing that we are interested in are the `token likelihoods` and the `sample embeddings`

In [None]:
#@title `AsyncClient.generate` with `raw_prompting`

a_co = AsyncClient('') # TODO: add key

async def co_generate(
    prompt: Optional[str] = None,
    prompt_vars: object = {},
    model: Optional[str] = None,
    preset: Optional[str] = None,
    num_generations: Optional[int] = None,
    max_tokens: Optional[int] = None,
    temperature: Optional[float] = None,
    k: Optional[int] = None,
    p: Optional[float] = None,
    frequency_penalty: Optional[float] = None,
    presence_penalty: Optional[float] = None,
    end_sequences: Optional[List[str]] = None,
    stop_sequences: Optional[List[str]] = None,
    return_likelihoods: Optional[str] = None,
    truncate: Optional[str] = None,
    logit_bias: Dict[int, float] = {},
    raw_prompting: bool = False,
) -> Dict:
    """
    Overwrites `AsyncClient.generate` to we can use the internal `raw_prompting` argument.
    """
    json_body = {
        "model": model,
        "prompt": prompt,
        "prompt_vars": prompt_vars,
        "preset": preset,
        "num_generations": num_generations,
        "max_tokens": max_tokens,
        "temperature": temperature,
        "k": k,
        "p": p,
        "frequency_penalty": frequency_penalty,
        "presence_penalty": presence_penalty,
        "end_sequences": end_sequences,
        "stop_sequences": stop_sequences,
        "return_likelihoods": return_likelihoods,
        "truncate": truncate,
        "logit_bias": logit_bias,
        "stream": False,
        "raw_prompting": raw_prompting,
    }
    response = await a_co._request(cohere.GENERATE_URL, json=json_body, stream=False)
    return response

In [None]:
#@title API Response Processors

async def a_query_batch(prompts: List[str]) -> torch.Tensor:
    response_jobs = []
    for prompt in prompts:
        # For now append <BOS_TOKEN> and <EOS_TOKEN> - NOTE DQ TOKENIZER THINKS IT IS EOP
        prompt = f"""<BOS_TOKEN>{prompt}<EOS_TOKEN>"""
        response_job = co_generate(
            prompt = prompt,
            return_likelihoods = "ALL",
            raw_prompting = True,
            max_tokens = 0
        )
        response_jobs.append(response_job)

    responses = await asyncio.gather(*response_jobs)
    logprob_responses = []
    for response in responses:
        logprobs = [token['likelihood'] for token in response['generations'][0]['token_likelihoods']]
        logprob_responses.append(torch.Tensor(logprobs))

    # Pad to the max sequence length in the batch
    logprob_responses = torch.nn.utils.rnn.pad_sequence(logprob_responses, batch_first=True)
    return logprob_responses

async def batch_embedd(prompts: List[str]) -> torch.Tensor:
    """Embedd the batch of FULL prompts using Cohere API

    For each sample we generate the embedding using the combination of the
    (prompt, completion).

    Using the embeddings API we set input type to `clustering`.

    Note that we must use a batch size < 96? per the API docs
    """

    embedding_response = await a_co.embed(
      texts=prompts,
      model='embed-english-v3.0',
      input_type='clustering'
    )
    embeddings = torch.Tensor(embedding_response.embeddings)
    return embeddings

## Galileo Logging!

Now we can get to logging with Galileo. This will looks similar to the `dq.auto` flow with a bit of extra added logic. Specifically to:
1. Log your formatted dataset
2. Log (from your API models) model likelihoods and embeddings.

### Logging In and Creating a Run

In [None]:
# TODO: Update the below
os.environ['GALILEO_CONSOLE_URL']=""
os.environ["GALILEO_USERNAME"]=""
os.environ["GALILEO_PASSWORD"]=""
dq.configure()

In [None]:
dq.init("seq2seq", project_name="Seq2Seq-API-Integration", run_name="My Run")

tokenizer = Tokenizer.from_pretrained("Cohere/command-nightly")

# Tokenizer the response_template indicating the <Seperator> between
# model inputs and target outputs.
response_template_ids = tokenizer.encode(response_template, add_special_tokens=False).ids
print(f"Separator template is: {response_template}")
print(f"This maps to tokenized ids {response_template_ids}")

watch(
    tokenizer=tokenizer,
    model_type="decoder_only",
    generation_splits=[],
    max_input_tokens=4096,
    response_template=response_template_ids
)

### Logging Data

In [None]:
default_columns = {
    "id",
    "input",
    "target",
    "formatted_prompt",
}


def log_dataset(
    ds: Dataset,
    split: str,
    input_col: str = "input",
    target_col: str = "target",
    formatted_prompt: str = "formatted_prompt"
) -> None:
    meta = [col for col in ds.features if col not in default_columns]
    dq.log_dataset(
        ds,
        text=input_col,
        label=target_col,
        formatted_prompt=formatted_prompt,
        split=split,
        meta=meta
    )

log_dataset(train_ds_formatted, "training")
# NOTE: Uncomment the below to include val / test data
# log_dataset(val_ds_formatted, "validation")
# log_dataset(test_ds_formatted, "test")

### Logging Model Outputs

Logging model outputs involves:
1. Iterating over your dataset(s) in batches - controled by the `batch_size` param
2. Retrieve likelihoods and embeddings from the respective Cohere API endpoints.
3. Logging results to Galileo!

In [None]:
batch_size = 50

async def log_model_outputs(ds: Dataset, split) -> None:
  """Log model outputs in batches

  Given the provided dataset split, query in batches
  the Cohere `generate` and `embeddings` APIs to log
  sample likelihoods and embeddings.
  """
  dq.set_epoch_and_split(epoch=0, split=split)
  for i in range(0, len(ds), batch_size):
      print (f"Processing batch {i // batch_size}")
      batch = ds[i: i + batch_size]
      batch_ids = batch['id']
      batch_model_inputs = batch['formatted_prompt']

      print ("Querying API...")
      logprobs = await a_query_batch(batch_model_inputs)
      embeddings = await batch_embedd(batch_model_inputs)
      print("Done querying! \n")

      dq.log_model_outputs(
          probs = logprobs,
          embs = embeddings,
          ids = batch_ids,
      )

response = await log_model_outputs(train_ds_formatted, split="training")
# NOTE: Uncomment the below to include val / test data
# response = await log_model_outputs(val_ds_formatted, split="validation")
# response = await log_model_outputs(test_ds_formatted, split="test")

In [None]:
dq.finish()