In [1]:
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Union

from transformers import AutoTokenizer, PreTrainedTokenizerBase
from transformers.utils import PaddingStrategy

  from .autonotebook import tqdm as notebook_tqdm


In [4]:
tokenizer = AutoTokenizer.from_pretrained("/Users/pingzhili/huggingface-repo/allenai/OLMoE-1B-7B-0125-Instruct",
                                          trust_remote_code=True)
examples = {
    "question": ["Is 123 a prime?"],
    "response": ["No, 123 is not a prime number. It can be factored as 3 × 41."]
}

In [3]:
def apply_general_chat_template(
        question: str,
        tokenizer: PreTrainedTokenizerBase,
        response: Optional[str] = None,
):
    messages = [
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": question}
    ]
    if response is None:
        return tokenizer.apply_chat_template(messages, add_generation_prompt=True, tokenize=False)
    else:
        messages.append({"role": "assistant", "content": response})
        return tokenizer.apply_chat_template(messages, add_generation_prompt=False, tokenize=False)


def sft_olmoe_train_batch_preprocess_fn(
        examples: Dict[str, List[Any]],
        tokenizer: PreTrainedTokenizerBase,
):
    if tokenizer is None:
        raise ValueError("Tokenizer is required for SFT training.")

    # 1. apply general chat template to each example
    all_chat_texts = []

    for question, response in zip(examples["question"], examples["response"]):
        chat_text = apply_general_chat_template(question, response=response, tokenizer=tokenizer)
        all_chat_texts.append(chat_text)

    # 2. Tokenize the chat
    all_input_ids = []
    all_attention_masks = []
    all_labels = []

    for chat_text in all_chat_texts:
        encoded = tokenizer(chat_text, padding=False, truncation=True)
        input_ids = encoded["input_ids"]
        attention_mask = encoded["attention_mask"]

        # 3. Only apply LM loss on the assistant's response & "<|endoftext|>"
        labels = [-100] * len(input_ids)

        assistant_token_id = tokenizer("<|assistant|>", add_special_tokens=False)["input_ids"]
        end_token_id = tokenizer.convert_tokens_to_ids("|||IP_ADDRESS|||")

        pos_assistant = -1
        pos_end_after_response = -1

        i = 0
        while i <= len(input_ids) - len(assistant_token_id):
            matched = True
            for j in range(len(assistant_token_id)):
                if input_ids[i + j] != assistant_token_id[j]:
                    matched = False
                    break

            if matched:
                pos_assistant = i + len(assistant_token_id) - 1
                break
            i += 1

        if pos_assistant != -1:
            for i in range(pos_assistant + 1, len(input_ids)):
                if input_ids[i] == end_token_id:
                    pos_end_after_response = i
                    break

        if pos_assistant != -1 and pos_end_after_response != -1:
            for i in range(pos_assistant + 1, pos_end_after_response):
                labels[i] = input_ids[i]

        all_input_ids.append(input_ids)
        all_attention_masks.append(attention_mask)
        all_labels.append(labels)
        print(pos_assistant, pos_end_after_response)

    return {
        "input_ids": all_input_ids,
        "attention_mask": all_attention_masks,
        "labels": all_labels
    }

In [4]:
results = sft_olmoe_train_batch_preprocess_fn(examples, tokenizer)

Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.


28 49


In [9]:
def pad_without_fast_tokenizer_warning(tokenizer, *pad_args, **pad_kwargs):
    """
    Pads without triggering the warning about how using the pad function is sub-optimal when using a fast tokenizer.
    """

    # To avoid errors when using Feature extractors
    if not hasattr(tokenizer, "deprecation_warnings"):
        return tokenizer.pad(*pad_args, **pad_kwargs)

    # Save the state of the warning, then disable it
    warning_state = tokenizer.deprecation_warnings.get("Asking-to-pad-a-fast-tokenizer", False)
    tokenizer.deprecation_warnings["Asking-to-pad-a-fast-tokenizer"] = True

    try:
        padded = tokenizer.pad(*pad_args, **pad_kwargs)
    finally:
        # Restore the state of the warning.
        tokenizer.deprecation_warnings["Asking-to-pad-a-fast-tokenizer"] = warning_state

    return padded


@dataclass
class CustomDataCollatorWithPadding:
    """
    Data collator that will dynamically pad the inputs received.

    Args:
        tokenizer ([`PreTrainedTokenizer`] or [`PreTrainedTokenizerFast`]):
            The tokenizer used for encoding the data.
        padding (`bool`, `str` or [`~utils.PaddingStrategy`], *optional*, defaults to `True`):
            Select a strategy to pad the returned sequences (according to the model's padding side and padding index)
            among:

            - `True` or `'longest'` (default): Pad to the longest sequence in the batch (or no padding if only a single
              sequence is provided).
            - `'max_length'`: Pad to a maximum length specified with the argument `max_length` or to the maximum
              acceptable input length for the model if that argument is not provided.
            - `False` or `'do_not_pad'`: No padding (i.e., can output a batch with sequences of different lengths).
        max_length (`int`, *optional*):
            Maximum length of the returned list and optionally padding length (see above).
        pad_to_multiple_of (`int`, *optional*):
            If set will pad the sequence to a multiple of the provided value.

            This is especially useful to enable the use of Tensor Cores on NVIDIA hardware with compute capability >=
            7.0 (Volta).
        return_tensors (`str`, *optional*, defaults to `"pt"`):
            The type of Tensor to return. Allowable values are "np", "pt" and "tf".
    """

    tokenizer: PreTrainedTokenizerBase
    padding: Union[bool, str, PaddingStrategy] = True
    max_length: Optional[int] = None
    pad_to_multiple_of: Optional[int] = None
    return_tensors: str = "pt"
    extra_keys_to_ignore: Optional[List[str]] = None

    def __call__(self, features: List[Dict[str, Any]]) -> Dict[str, Any]:
        if "label" in features:
            features["labels"] = features["label"]
            del features["label"]
        if "label_ids" in features:
            features["labels"] = features["label_ids"]
            del features["label_ids"]

        features_to_ignore = {
            k: [item[k] for item in features] for k in self.extra_keys_to_ignore
        } if self.extra_keys_to_ignore else {}
        features = [
            {k: v for k, v in feature.items() if k not in self.extra_keys_to_ignore} for feature in features
        ] if self.extra_keys_to_ignore else features

        # take labels out of features
        labels_batch = [{"input_ids": feature["labels"]} for feature in features]  # Fake name for padding
        features = [{k: v for k, v in feature.items() if k != "labels"} for feature in features]
        batch = pad_without_fast_tokenizer_warning(
            self.tokenizer,
            features,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors=self.return_tensors,
        )
        labels_batch = pad_without_fast_tokenizer_warning(
            self.tokenizer,
            labels_batch,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors=self.return_tensors,
        )
        if self.tokenizer.pad_token_id is not None:
            labels_batch["input_ids"][labels_batch["input_ids"] == self.tokenizer.pad_token_id] = -100
        labels_batch["labels"] = labels_batch["input_ids"]
        del labels_batch["input_ids"]
        batch = {**batch, **features_to_ignore, **labels_batch}
        return batch


In [10]:
data_collator = CustomDataCollatorWithPadding(tokenizer=tokenizer, pad_to_multiple_of=8, max_length=1024)

In [11]:
batch = [{"input_ids": [1, 2, 3, 4, 5, 6], "labels": [-100, -100, 3, 4, 5, 6]}]

In [12]:
data_collator(batch)



{'input_ids': tensor([[    1,     2,     3,     4,     5,     6, 50280, 50280]]),
 'attention_mask': tensor([[1, 1, 1, 1, 1, 1, 0, 0]]),
 'labels': tensor([[-100, -100,    3,    4,    5,    6, -100, -100]])}