In [1]:
import os, psutil, gc
import time 
import json
import pprint
import copy 
import logging 
logger = logging.getLogger()
logger.setLevel(level=logging.DEBUG)

from collections import defaultdict
import random
import numpy as np

import multiprocessing as mp

from dataclasses import dataclass

In [2]:
import torch 
from torch.nn import CrossEntropyLoss
import torch.nn.functional as F
import torch.distributed as dist
from transformers import AutoModelForCausalLM, AutoTokenizer
from vllm import LLM, SamplingParams, PoolingParams

from sal.config import Config
from sal.search.utils import Beam, build_conv, generate_k_steps, last
from sal.utils.score import aggregate_scores
# from sal.models.reward_models import RLHFFlow

from core.reward_models import RLHFFlow
from core import select_diverse
from utils.load_data import load_data_prm800k

In [4]:
os.environ["CUDA_VISIBLE_DEVICES"] = "2,3"
if torch.cuda.is_available():
    GPUS = os.environ.get('CUDA_VISIBLE_DEVICES', "0").split(',')
    print(GPUS)
else:
    print("CUDA is not available.")

['2', '3']


In [5]:
# base_dir
base_dir = '/groups/kjun/tnn/datasets/'

# dataset path
data_dir = base_dir + "/prm800k/math_splits"

# llm and prm path
llm_dir = base_dir + "/Llama-3.2-1B-Instruct-GGUF/Llama-3.2-1B-Instruct.Q4_K_M.gguf"
prm_dir = base_dir + "/Llama3.1-8B-PRM-Deepseek-Data-GGUF/Llama3.1-8B-PRM-Deepseek-Data.Q4_K_M.gguf"

llm_tokenizer_dir = base_dir + "/Llama-3.2-1B-Instruct"
prm_tokenizer_dir = base_dir + "/Llama3.1-8B-PRM-Deepseek-Data"

In [6]:
#  load data 
data_by_levels = load_data_prm800k(data_dir)

# load random_seeds     
# random_seeds = np.loadtxt("random_seeds.txt").astype("int64")
# random_seeds = [int(seed) for seed in random_seeds]

1: 43
2: 90
3: 105
4: 128
5: 134


In [7]:
# baseline: gpu_memory_utilization=0.2
# use the standard model 
llm_vllm = LLM(
        model = llm_tokenizer_dir,
        tensor_parallel_size=1,
        gpu_memory_utilization = 0.7,  # Utilize 50% of GPU memory
        # enable_prefix_caching=True,  # V100 doesn't support enable_prefix_caching 
        # enable_chunked_prefill=False, # and enable_chunked_prefill
        max_model_len = 5000,
        dtype = "float16",
        seed = 123)
    
    # # use the gguf quantized model 
    # llm_regular = LLM(
    #     model = llm_dir,
    #     tokenizer = llm_tokenizer_dir,
    #     tensor_parallel_size=1,
    #     gpu_memory_utilization = 0.2,  # Utilize 50% of GPU memory
    #     max_model_len = 5000,
    #     dtype = "float16",
    #     seed = 123)


gc.collect();torch.cuda.empty_cache();
print('#--- memory:', torch.cuda.memory_allocated(0)/(1024**3))
print('#--- memory:', torch.cuda.memory_allocated(1)/(1024**3))

INFO 04-20 14:07:57 __init__.py:207] Automatically detected platform cuda.
INFO 04-20 14:08:04 config.py:549] This model supports multiple tasks: {'classify', 'score', 'embed', 'reward', 'generate'}. Defaulting to 'generate'.
INFO 04-20 14:08:04 llm_engine.py:234] Initializing a V0 LLM engine (v0.7.3) with config: model='/groups/kjun/tnn/datasets//Llama-3.2-1B-Instruct', speculative_config=None, tokenizer='/groups/kjun/tnn/datasets//Llama-3.2-1B-Instruct', skip_tokenizer_init=False, tokenizer_mode=auto, revision=None, override_neuron_config=None, tokenizer_revision=None, trust_remote_code=False, dtype=torch.float16, max_seq_len=5000, download_dir=None, load_format=LoadFormat.AUTO, tensor_parallel_size=1, pipeline_parallel_size=1, disable_custom_all_reduce=False, quantization=None, enforce_eager=False, kv_cache_dtype=auto,  device_config=cuda, decoding_config=DecodingConfig(guided_decoding_backend='xgrammar'), observability_config=ObservabilityConfig(otlp_traces_endpoint=None, collect_m

Loading safetensors checkpoint shards:   0% Completed | 0/1 [00:00<?, ?it/s]


INFO 04-20 14:08:08 model_runner.py:1115] Loading model weights took 2.3185 GB
INFO 04-20 14:08:09 worker.py:267] Memory profiling takes 0.50 seconds
INFO 04-20 14:08:09 worker.py:267] the current vLLM instance can use total_gpu_memory (31.73GiB) x gpu_memory_utilization (0.70) = 22.21GiB
INFO 04-20 14:08:09 worker.py:267] model weights take 2.32GiB; non_torch_memory takes 0.09GiB; PyTorch activation peak memory takes 1.19GiB; the rest of the memory reserved for KV Cache is 18.62GiB.
INFO 04-20 14:08:09 executor_base.py:111] # cuda blocks: 38125, # CPU blocks: 8192
INFO 04-20 14:08:09 executor_base.py:116] Maximum concurrency for 5000 tokens per request: 122.00x


OutOfMemoryError: CUDA out of memory. Tried to allocate 1.16 GiB. GPU 0 has a total capacity of 31.73 GiB of which 192.00 KiB is free. Process 480043 has 23.18 GiB memory in use. Including non-PyTorch memory, this process has 8.54 GiB memory in use. Of the allocated memory 8.15 GiB is allocated by PyTorch, and 9.71 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)

In [14]:
prm = RLHFFlow(model_path=prm_tokenizer_dir, device_map='cuda:3')

gc.collect();torch.cuda.empty_cache();
print('#--- memory:', torch.cuda.memory_allocated(0)/(1024**3))
print('#--- memory:', torch.cuda.memory_allocated(1)/(1024**3))

Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

#--- memory: 21.061745643615723
#--- memory: 14.95752763748169


In [None]:
stop

In [51]:
def _beam_search(batch_of_questions, config, llm, prm):
    sampling_params = SamplingParams(
        temperature=config.temperature,
        max_tokens=config.max_tokens,
        top_p=config.top_p,
        stop=["\n\n"],
        include_stop_str_in_output=True,
        n=1,
    )

    beams: list[Beam] = []
    for prompt in batch_of_questions:
        for i in range(config.n):
            beams.append(
                Beam(
                    prompt=prompt,
                    index=i,
                    current_text="",
                    next_texts=None,
                    lookahead_texts=None,
                    pruned=False,
                    completed=False,  # New flag to track completion
                    stop_reasons=None,
                    history=[],
                    best_scores=[],
                    all_scores=[],
                    previous_text=None,
                    completion_tokens=0,
                )
            )

    completed_beams: list[Beam] = []

    # for i in tqdm(range(config.num_iterations), desc="Beam search iterations"):
    for i in range(config.num_iterations):
        # print(f"\n-> i = {i}")
        if i == 0:
            active_beams = [b for b in beams if not b.pruned]
        else:
            active_beams = [b for b in active_beams if not b.pruned]
        # print("n-> pass 1")
        # print(len(active_beams))
        # for idx, beam in enumerate(active_beams):
        #     print(idx)
        #     print(beam.prompt[:10])

        # Duplicate active beams to ensure that we have config.n beams per iteration
        if len(active_beams) != config.n:
            repeats = (config.n // len(active_beams)) + 1
            logger.debug(
                f"Extending active_beams with {repeats} repetitions to reach size {config.n}"
            )
            extended_active_beams = [
                copy.deepcopy(b) for b in (active_beams * repeats)[: config.n]
            ]
            active_beams = extended_active_beams
            if len(active_beams) != config.n:
                raise ValueError(
                    f"Expected {config.n} active beams, but got {len(active_beams)}"
                )
        
        # print("n-> pass 2")
        # print(len(active_beams))
        # for idx, beam in enumerate(active_beams):
        #     print(idx)
        #     print(beam.prompt[:10])
            
        if i == config.num_iterations - 1:
            # Last iteration, generate to EOS
            sampling_params = SamplingParams(
                temperature=config.temperature,
                max_tokens=config.max_tokens,
                top_p=config.top_p,
                n=1,
            )

        convs = [
            build_conv(b.prompt, b.current_text, config.system_prompt)
            for b in active_beams
        ]
        continue_final_message = i > 0
        add_generation_prompt = i == 0

        tokenizer = llm.get_tokenizer()
        if config.custom_chat_template is not None:
            tokenizer.chat_template = config.custom_chat_template
        templated_convs = tokenizer.apply_chat_template(
            convs,
            add_generation_prompt=add_generation_prompt,
            continue_final_message=continue_final_message,
            tokenize=False,
        )
        lookahead = 0 if i == config.num_iterations - 1 else config.lookahead
        gen_results = generate_k_steps(
            templated_convs, lookahead, llm, sampling_params, 1
        )

        prompts, completions = [], []
        for beam, gen_result in zip(active_beams, gen_results, strict=True):
            beam.next_texts = gen_result.next_texts
            beam.stop_reasons = gen_result.stop_reasons
            beam.lookahead_texts = gen_result.lookahead_texts
            beam.completion_tokens += gen_result.completion_tokens
            beam.current_text += beam.next_texts[0]
            beam.history.append(beam.next_texts[0])

            if (
                beam.stop_reasons[0] == "EOS"
                or beam.stop_reasons[0] == "length"
                or beam.next_texts[0] == ""
            ):
                beam.completed = True
                completed_beams.append(beam)
            prompts.append(beam.prompt)
            completions.append([beam.current_text])

        scores = prm.score(prompts, completions)
        

        agg_scores = [
            [aggregate_scores(s, config.agg_strategy) for s in score]
            for score in scores
        ]
        # print(f"scores = {scores}")
        # print(f"agg_scores = {agg_scores}")

        for beam, score in zip(active_beams, scores, strict=True):
            beam.all_scores = score[0]

        # Now filter active_beams and agg_scores for beams that are completed
        agg_scores = [
            agg_scores[i] for i, b in enumerate(active_beams) if not b.completed
        ]
        active_beams = [b for b in active_beams if not b.completed]

        # logger.debug(len(active_beams))
        # print("n-> pass 3")
        # print(len(active_beams))
        # for idx, beam in enumerate(active_beams):
        #     print(idx)
        #     print(beam.prompt[:10])
        #     print(beam.completed)
        #     print(beam.pruned)
        
        # Early stopping if all beams are completed
        if len(active_beams) == 0:
            break

        # Filter duplicate active beams
        if config.filter_duplicates:
            # Create a dictionary to filter duplicates and retain order
            unique_beam_dict = {}
            for i, b in enumerate(active_beams):
                if b.current_text not in unique_beam_dict:
                    unique_beam_dict[b.current_text] = (
                        i  # Map the unique text to its index
                    )
            active_beams = [active_beams[i] for i in unique_beam_dict.values()]
            agg_scores = [agg_scores[i] for i in unique_beam_dict.values()]

        # Get indices for top (config.n / config.beam_width) completions
        top_indices = np.argsort(np.array(agg_scores).flatten())[
            -(config.n // config.beam_width) :
        ]

        for idx, beam in enumerate(active_beams):
            if idx not in top_indices:
                beam.pruned = True

    # Filter completed beams for those with top config.n scores
    if config.sort_completed:
        completed_beams = sorted(
            completed_beams,
            key=lambda b: aggregate_scores(b.all_scores, config.agg_strategy),
            reverse=True,
        )[: config.n]
    else:
        completed_beams = completed_beams[: config.n]

    if len(completed_beams) != config.n:
        # If we don't have enough completed_beams, duplicate until we reach config.n
        repeats = (config.n // len(completed_beams)) + 1
        logger.debug(
            f"Extending completed_beams with {repeats} repetitions to reach size {config.n}"
        )
        extended_completed_beams = [
            copy.deepcopy(b) for b in (completed_beams * repeats)[: config.n]
        ]
        completed_beams = extended_completed_beams

    return completed_beams


def beam_search(batch_of_questions, config, llm, prm):
    # Collect the completions from responses
    completions = [[] for _ in range(len(batch_of_questions))]
    completion_ntokens = [[] for _ in range(len(batch_of_questions))]

    for q_idx, question in enumerate(batch_of_questions):
        # print(f"question {q_idx}")
        beam_results = _beam_search([question], config, llm, prm)
        for b_idx, beam in enumerate(beam_results):
            # print(beam.current_text)
            completions[q_idx].append(beam.current_text)
            completion_ntokens[q_idx].append(beam.completion_tokens)

    # print(completions)
    # print(completion_ntokens)
    # stop
    results = defaultdict(list)
    results["completions"] = completions
    results["completion_ntokens"] = completion_ntokens

    return results
        


In [52]:
# general params
config = Config()
config.n = 4
config.beam_width = 2
config.lookahead = 0
config.num_iterations = 2
config.sort_completed = False 

level = '1'
num_questions = len(data_by_levels[level])
num_questions = 2
num_trials = 1
print(f"num_questions = {num_questions}")
print(f"num_trials = {num_trials}")

# get batch of questions
batch_of_questions = [data_by_levels[level][q_idx]['problem'] for q_idx in range(num_questions)]

start_time = time.time()
results = beam_search(batch_of_questions, config, llm_vllm, prm)
total_time = time.time() - start_time
trial_idx = 0
time_per_trial = total_time/(trial_idx+1)
time_per_question = time_per_trial/num_questions
print(f"trial {trial_idx}")
print(f"it takes {time_per_question:0.4f}s per question")
print(f"it takes {time_per_trial:0.4f}s per trial")

num_questions = 2
num_trials = 1


NameError: name 'trial_idx' is not defined

trial 0
it takes 2.7317s per question
it takes 5.4635s per trial


In [54]:
print(len(results))
pprint.pprint(results)

2
defaultdict(<class 'list'>,
            {'completion_ntokens': [[0, 0, 0, 0], [0, 0, 0, 0]],
             'completions': [['## Step 1:  The problem involves finding the '
                              'length of $DE$ in the given right-angled '
                              'triangle $DEF$.\n'
                              '## Step 2:  We can start by applying the '
                              'Pythagorean theorem to the entire triangle '
                              '$DEF$, which states that for any right-angled '
                              'triangle, the square of the length of the '
                              'hypotenuse (the side opposite the right angle) '
                              'is equal to the sum of the squares of the '
                              'lengths of the other two sides.\n'
                              '## Step 3:  Therefore, we have $DE^2 + DF^2 = '
                              'EF^2$.\n'
                              '## Step 4:  Since $\\sin D 