# Create a Distillation Pipeline to Distill DeepSeek-R1 into Qwen model with NeNo 2.0 Framework

In the field of LLMs, reasoning models leverage deep thinking capabilities to significantly enhance model performance across complex scenarios. According to the [DeepSeek-R1](https://arxiv.org/abs/2501.12948) paper, the reasoning pattern of larger models can be distilled into smaller models. Specifically, we can distill long-chain-of-thought (long-CoT) data that includes reasoning processes from DeepSeek-R1 and directly fine-tune open-source models like Qwen and Llama. This straightforward distillation method significantly enhances the reasoning abilities of smaller models.

To demonstrate the complete distillation process, we have prepared two notebooks that cover how to distill reasoning data from DeepSeek-R1 using the NIM API, and how to train models using the distilled data.


- [generate_reasoning_data.ipynb](./generate_reasoning_data.ipynb) demonstrates how to distill reasoning data from DeepSeek-R1 using the NIM API. 
- [qwen2_distill_nemo.ipynb](./qwen2_distill_nemo.ipynb) (⭐) shows how to train open-source models using the distilled data.



This notebook is part 2 of the series, and it demonstrates how to distill the reasoning ability of DeepSeek-R1 into the Qwen model using the NeMo Framework. The training is based on a custom dataset [Bespoke-Stratos-17k](https://huggingface.co/datasets/bespokelabs/Bespoke-Stratos-17k) distilled from deepseek-R1, which includes 17k reasoning chains in mathematics, code, Olympiads, science, and puzzles. You can also use the [generate_reasoning_data.ipynb](./generate_reasoning_data.ipynb) to generate your own reasoning data.


## Preparation
We use NeMo 2.0 and NeMo-Run to fine-tune (SFT) the Qwen model. You can start and enter the container `nvcr.io/nvidia/nemo:25.02`.


## Step-By-Step Instructions

This notebook contains five steps:

1. Download the Qwen model and convert it to NeMo 2.0 format.
2. Prepare the fine-tuning data.
3. Fine-tune the Qwen model with NeMo 2.0 and NeMo-Run.
4. Evaluate the model.
5. Convert the output model from NeMo 2.0 to HF format.

### Step 1: Download Qwen and Convert to NeMo 2.0 Format

First, download the Qwen-2.5-7B-Instruct model from the Hugging Face model hub and convert it to NeMo format.

We use the `llm.import_ckpt` API to download the model using `hf://<huggingface_model_id>` and convert it to NeMo 2.0 format.


In [None]:
import nemo_run as run
import lightning.pytorch as pl
from nemo.collections import llm

# llm.import_ckpt is the nemo2 API for converting Hugging Face checkpoint to NeMo format
# example python usage:
# llm.import_ckpt(model=llm.llama3_8b.model(), source="hf://meta-llama/Meta-Llama-3-8B")
#
# We use run.Partial to configure this function
def configure_checkpoint_conversion():
    return run.Partial(
        llm.import_ckpt,
        model=llm.qwen2_7b.model(),
        source="hf://Qwen/Qwen2.5-7B-Instruct",
        overwrite=True,
    )

# configure your function
import_ckpt = configure_checkpoint_conversion()
# define your executor
local_executor = run.LocalExecutor()

# run your experiment
run.run(import_ckpt, executor=local_executor)


### Step 2: Prepare Fine-tuning Data

[Bespoke-Stratos-17k](https://huggingface.co/datasets/bespokelabs/Bespoke-Stratos-17k) is an open-souced dataset created by Bespoke Labs. It contains 17k reasoning questions from math, code, Olympiads, science and puzzle areas, which is distilled from DeepSeek-R1.

Each case includes a question, a reasoning chain, and an answer. There is an example:

```
{
    "system": "Your role as an assistant involves thoroughly exploring questions through a systematic long thinking process before providing the final precise and accurate solutions. This requires engaging in a comprehensive cycle of analysis, summarizing, exploration, reassessment, reflection, backtracing, and iteration to develop well-considered thinking process. Please structure your response into two main sections: Thought and Solution. In the Thought section, detail your reasoning process using the specified format: <|begin_of_thought|> {thought with steps separated with '\\n\\n'} <|end_of_thought|> Each step should include detailed considerations such as analisying questions, summarizing relevant findings, brainstorming new ideas, verifying the accuracy of the current steps, refining any errors, and revisiting previous steps. In the Solution section, based on various attempts, explorations, and reflections from the Thought section, systematically present the final solution that you deem correct. The solution should remain a logical, accurate, concise expression style and detail necessary step needed to reach the conclusion, formatted as follows: <|begin_of_solution|> {final formatted, precise, and clear solution} <|end_of_solution|> Now, try to solve the following question through the above guidelines:",
    "conversations": [
      {
        "from": "user",
        "value": "Return your final response within \\boxed{}. Two counterfeit coins of equal weight are mixed with $8$ identical genuine coins. The weight of each of the counterfeit coins is different from the weight of each of the genuine coins. A pair of coins is selected at random without replacement from the $10$ coins. A second pair is selected at random without replacement from the remaining $8$ coins. The combined weight of the first pair is equal to the combined weight of the second pair. What is the probability that all $4$ selected coins are genuine?\n$\\textbf{(A)}\\ \\frac{7}{11}\\qquad\\textbf{(B)}\\ \\frac{9}{13}\\qquad\\textbf{(C)}\\ \\frac{11}{15}\\qquad\\textbf{(D)}\\ \\frac{15}{19}\\qquad\\textbf{(E)}\\ \\frac{15}{16}$"
      },
      {
        "from": "assistant",
        "value": "<|begin_of_thought|>\n\nOkay, so I need to solve this probability problem. Let me read it again and make sure I understand what's being asked. \n\nThere are two counterfeit coins that have the same weight, and they're mixed with 8 genuine coins. ...
        ...the probability that all four selected coins are genuine is \\(\\boxed{D}\\).\n\n<|end_of_solution|>"
      }
    ],
  },
  ```

  To use this dataset, we need to write a custom dataloader "BespokeDataModule" to load the data from the dataset and support chat format.

In [2]:
%%writefile bespoke.py

import json
import shutil
from typing import TYPE_CHECKING, Any, Dict, List, Optional

# import numpy as np
from datasets import load_dataset

from nemo.collections.llm.gpt.data.core import get_dataset_root
from nemo.collections.llm.gpt.data.fine_tuning import FineTuningDataModule
from nemo.lightning.io.mixin import IOMixin
from nemo.utils import logging

from functools import lru_cache

from nemo.collections.llm.gpt.data.core import create_sft_dataset

if TYPE_CHECKING:
    from nemo.collections.common.tokenizers import TokenizerSpec
    from nemo.collections.llm.gpt.data.packed_sequence import PackedSequenceSpecs


class BespokeDataModule(FineTuningDataModule, IOMixin):
    """A data module for fine-tuning on the Bespoke dataset.

    This class inherits from the `FineTuningDataModule` class and is specifically designed for fine-tuning models on the
    "bespokelabs/Bespoke-Stratos-17k" dataset. It handles data download, preprocessing, splitting, and preparing the data
    in a format suitable for training, validation, and testing.

    Args:
        force_redownload (bool, optional): Whether to force re-download the dataset even if it exists locally. Defaults to False.
        delete_raw (bool, optional): Whether to delete the raw downloaded dataset after preprocessing. Defaults to True.
        See FineTuningDataModule for the other args
    """

    def __init__(
        self,
        seq_length: int = 2048,
        tokenizer: Optional["TokenizerSpec"] = None,
        micro_batch_size: int = 4,
        global_batch_size: int = 8,
        rampup_batch_size: Optional[List[int]] = None,
        force_redownload: bool = False,
        delete_raw: bool = True,
        seed: int = 1234,
        memmap_workers: int = 1,
        num_workers: int = 8,
        pin_memory: bool = True,
        persistent_workers: bool = False,
        packed_sequence_specs: Optional["PackedSequenceSpecs"] = None,
        dataset_kwargs: Optional[Dict[str, Any]] = None,
        dataset_root: str = "./bespoke",
    ):
        self.force_redownload = force_redownload
        self.delete_raw = delete_raw

        super().__init__(
            dataset_root=dataset_root,
            seq_length=seq_length,
            tokenizer=tokenizer,
            micro_batch_size=micro_batch_size,
            global_batch_size=global_batch_size,
            rampup_batch_size=rampup_batch_size,
            seed=seed,
            memmap_workers=memmap_workers,
            num_workers=num_workers,
            pin_memory=pin_memory,
            persistent_workers=persistent_workers,
            packed_sequence_specs=packed_sequence_specs,
            dataset_kwargs=dataset_kwargs,
        )

    def prepare_data(self) -> None:
        # if train file is specified, no need to do anything
        if not self.train_path.exists() or self.force_redownload:
            dset = self._download_data()
            self._preprocess_and_split_data(dset)
        super().prepare_data()

    def _download_data(self):
        logging.info(f"Downloading {self.__class__.__name__}...")
        return load_dataset(
            "bespokelabs/Bespoke-Stratos-17k",
            cache_dir=str(self.dataset_root),
            download_mode="force_redownload" if self.force_redownload else None,
        )

    def _preprocess_and_split_data(self, dset, train_ratio: float = 0.80, val_ratio: float = 0.15):
        logging.info(f"Preprocessing {self.__class__.__name__} to jsonl format and splitting...")

        test_ratio = 1 - train_ratio - val_ratio
        save_splits = {}
        dataset = dset.get('train')
        split_dataset = dataset.train_test_split(test_size=val_ratio + test_ratio, seed=self.seed)
        split_dataset2 = split_dataset['test'].train_test_split(
            test_size=test_ratio / (val_ratio + test_ratio), seed=self.seed
        )
        save_splits['training'] = split_dataset['train']
        save_splits['validation'] = split_dataset2['train']
        save_splits['test'] = split_dataset2['test']

        print("len training: ", len(save_splits['training']))
        print("len validation: ", len(save_splits['validation']))
        print("len test: ", len(save_splits['test']))

        for split_name, dataset in save_splits.items():
            output_file = self.dataset_root / f"{split_name}.jsonl"
            with output_file.open("w", encoding="utf-8") as f:
                for example in dataset:
                    
                    conversations = example["conversations"]

                    for conversation in conversations:
                        if conversation["from"] == "user":
                            conversation["from"] = "User"
                        elif conversation["from"] == "assistant":
                            conversation["from"] = "Assistant"
                        else:
                            raise ValueError(f"Unknown role: {conversation['role']}")

                    example["mask"] = "User"
                    example["type"] = "VALUE_TO_TEXT"
                    
                    f.write(json.dumps(example) + "\n")

            logging.info(f"{split_name} split saved to {output_file}")

        if self.delete_raw:
            for p in self.dataset_root.iterdir():
                if p.is_dir():
                    shutil.rmtree(p)
                elif '.jsonl' not in str(p.name):
                    p.unlink()


    @lru_cache
    def _create_dataset(self, path, pack_metadata_path=None, is_test=False, **kwargs):
        # pylint: disable=C0115,C0116
        return create_sft_dataset(
            path,
            tokenizer=self.tokenizer,
            seq_length=(self.seq_length if is_test or self.packed_sequence_size <= 0 else self.packed_sequence_size),
            memmap_workers=self.memmap_workers,
            seed=self.seed,
            chat=True,
            is_test=is_test,
            pack_metadata_file_path=None,  # packing is not supported
            pad_cu_seqlens=False,
            **kwargs,
        )

After the dataloader "BespokeDataModule" is defined, we can configure it with the `run.Config` API. Given that long-CoT data typically contains longer sequences, we set seq_length=16384.

In [None]:
from bespoke import BespokeDataModule

def bespoke() -> run.Config[pl.LightningDataModule]:
    return run.Config(BespokeDataModule, seq_length=16384, micro_batch_size=1, global_batch_size=32, num_workers=1)

### Step 3: Fine-tune Qwen with NeMo 2.0 and NeMo-Run.

In this part, we will introduce how to fine-tune the Qwen model with NeMo 2.0 and NeMo-Run. We need to configure the SFT training components, including the trainer, logger, optimizer and model, then launch the training using the `llm.finetune` API.


#### Step 3.1: Configure SFT with NeMo 2.0
First, we need to configure the trainer, logger, optimizer, and other components.

In [None]:
import nemo_run as run
from nemo import lightning as nl
from nemo.collections import llm
from megatron.core.optimizer import OptimizerConfig
import torch
import lightning.pytorch as pl
from pathlib import Path
from nemo.collections.llm.recipes.precision.mixed_precision import bf16_mixed


# Configure the trainer
# we use 4 GPUs for training and set the max_steps to 300.
def trainer() -> run.Config[nl.Trainer]:
    strategy = run.Config(
        nl.MegatronStrategy,
        tensor_model_parallel_size=4,
    )
    trainer = run.Config(
        nl.Trainer,
        devices=4,
        max_steps=300,
        accelerator="gpu",
        strategy=strategy,
        plugins=bf16_mixed(),
        log_every_n_steps=1,
        limit_val_batches=0,
        val_check_interval=0,
        num_sanity_val_steps=0,
    )
    return trainer


# Configure the logger
# Here, we configure the log interval to 100 steps and save the model every 100 steps. you can change these parameters as needed.
def logger() -> run.Config[nl.NeMoLogger]:
    ckpt = run.Config(
        nl.ModelCheckpoint,
        save_last=True,
        every_n_train_steps=100,
        monitor="reduced_train_loss",
        save_top_k=1,
        save_on_train_epoch_end=True,
        save_optim_on_train_end=True,
    )

    return run.Config(
        nl.NeMoLogger,
        name="qwen_sft",
        log_dir="./results",
        use_datetime_version=False,
        ckpt=ckpt,
        wandb=None
    )


# Configure the optimizer
# We use the distributed Adam optimizer and pass in the OptimizerConfig.
def adam_with_cosine_annealing() -> run.Config[nl.OptimizerModule]:
    opt_cfg = run.Config(
        OptimizerConfig,
        optimizer="adam",
        lr=2e-5,
        adam_beta2=0.98,
        use_distributed_optimizer=True,
        clip_grad=1.0,
        bf16=True,
    )
    return run.Config(
        nl.MegatronOptimizerModule,
        config=opt_cfg
    )


# Configure the model
# We use Qwen2Config7B to configure the model.
def qwen() -> run.Config[pl.LightningModule]:
    return run.Config(llm.Qwen2Model, config=run.Config(llm.Qwen2Config7B))

# Configure the resume
def resume() -> run.Config[nl.AutoResume]:
    return run.Config(
        nl.AutoResume,
        restore_config=run.Config(nl.RestoreConfig,
            path="nemo://Qwen/Qwen2.5-7B-Instruct"
        ),
        resume_if_exists=True,
    )

#### Step 3.2: Configure NeMo 2.0 `llm.finetune` API

To use the components defined above, we can call the `llm.finetune` API and pass the components as parameters to it.


In [None]:
def configure_finetuning_recipe():
    return run.Partial(
        llm.finetune,
        model=qwen(),
        trainer=trainer(),
        data=bespoke(),
        log=logger(),
        optim=adam_with_cosine_annealing(),
        resume=resume(),
    )

#### Step 3.3: Launch Training

To launch the training, we use `LocalExecutor` for executing our configured finetune function. In this example, we use 4 GPUs for training.

For more details on the NeMo-Run executor, refer to [Execute NeMo Run](https://github.com/NVIDIA/NeMo-Run/blob/main/docs/source/guides/execution.md) of NeMo-Run Guides.


In [None]:

def local_executor_torchrun(nodes: int = 1, devices: int = 4) -> run.LocalExecutor:
    # Env vars for jobs are configured here
    env_vars = {
        "TORCH_NCCL_AVOID_RECORD_STREAMS": "1",
        "NCCL_NVLS_ENABLE": "0",
        "NVTE_DP_AMAX_REDUCE_INTERVAL": "0",
        "NVTE_ASYNC_AMAX_REDUCTION": "1",
    }

    executor = run.LocalExecutor(ntasks_per_node=devices, launcher="torchrun", env_vars=env_vars)

    return executor

if __name__ == '__main__':
    run.run(configure_finetuning_recipe(), executor=local_executor_torchrun())

### Step 4. Evaluate Model

This section demonstrates how to generate inference results using the trained checkpoint. In this example, we perform inference on a single GPU. You can modify the `tensor_model_parallel_size` and `local_executor_torchrun` functions to utilize multiple GPUs for speedup.

In [None]:
from megatron.core.inference.common_inference_params import CommonInferenceParams

sft_ckpt_path=str(next((d for d in Path("./results/qwen_sft/checkpoints/").iterdir() if d.is_dir() and d.name.endswith("-last")), None))
print("We will load SFT checkpoint from:", sft_ckpt_path)



def trainer() -> run.Config[nl.Trainer]:
    strategy = run.Config(
        nl.MegatronStrategy,
        tensor_model_parallel_size=1,
    )
    trainer = run.Config(
        nl.Trainer,
        accelerator="gpu",
        devices=1,
        num_nodes=1,
        strategy=strategy,
        plugins=bf16_mixed(),
    )
    return trainer

prompts = [
    "How many r's are in the word 'strawberry'?",
    "Which number is bigger? 10.119 or 10.19?",
]

def configure_inference():
    return run.Partial(
        llm.generate,
        path=str(sft_ckpt_path),
        trainer=trainer(),
        prompts=prompts,
        inference_params=CommonInferenceParams(num_tokens_to_generate=16384, top_p=0.6),
        output_path="sft_prediction.jsonl",
    )


def local_executor_torchrun(nodes: int = 1, devices: int = 1) -> run.LocalExecutor:
    # Env vars for jobs are configured here
    env_vars = {
        "TORCH_NCCL_AVOID_RECORD_STREAMS": "1",
        "NCCL_NVLS_ENABLE": "0",
        "NVTE_DP_AMAX_REDUCE_INTERVAL": "0",
        "NVTE_ASYNC_AMAX_REDUCTION": "1",
    }

    executor = run.LocalExecutor(ntasks_per_node=devices, launcher="torchrun", env_vars=env_vars)

    return executor

if __name__ == '__main__':
    run.run(configure_inference(), executor=local_executor_torchrun())

### Step 5. Convert Fine-tuned Model to HF Format.

After training, you can convert the output model to hf format using the `llm.export_ckpt` API.

In [None]:

sft_ckpt_path=str(next((d for d in Path("./results/qwen_sft/checkpoints/").iterdir() if d.is_dir() and d.name.endswith("-last")), None))

print("We will load SFT checkpoint from:", sft_ckpt_path)

# llm.export_ckpt is the nemo2 API for exporting a NeMo checkpoint to Hugging Face format
# example python usage:
# llm.export_ckpt(path="/path/to/model.nemo", target="hf", output_path="/path/to/save")
def configure_checkpoint_conversion():
    return run.Partial(
        llm.export_ckpt,
        path=sft_ckpt_path,
        target="hf",
        output_path="./model"
    )

# configure your function
export_ckpt = configure_checkpoint_conversion()
# define your executor
local_executor = run.LocalExecutor()

# run your experiment
run.run(export_ckpt, executor=local_executor)