In [1]:
# import torch
# import re
# from typing import List, Dict, Optional
# from dataclasses import dataclass
# from transformers import AutoModelForCausalLM, AutoTokenizer
# from vllm import LLM, SamplingParams

# model = LLM(model="google/gemma-2-2B", dtype='float16')
# prompt = '아이의 아버지인 외과의사가 말했어. "난 수술 못해! 이 아이는 내 아들이라고!" 이 외과의사는 소년에게 누구일까요?'
# model.generate(prompt, use_tqdm=False)[0].outputs[0].text

In [1]:
import torch
import re
from typing import List, Dict, Optional
from dataclasses import dataclass
from transformers import AutoModelForCausalLM, AutoTokenizer
from vllm import LLM, SamplingParams

@dataclass
class Path:
    reasoning_text: str
    score: float
    answer_span: str
    num_path: int

@dataclass
class DecodingInfo:
    question: str
    paths: List[Path]

class CoTDecoder:
    """논문에서는 greedy decoding 대신 top-k를 이용하여 다양한 경로를 탐색하는 것을 권장합니다.
    특히 각 경로에서 어떻게 생각하는지 평가할 수 있도록 여러 경로에서 다양한 토큰을 샘플링 해야한다고 설명합니다."""
    def __init__(self, model_name: str, 
                 device: str = 'cuda', 
                 max_new_tokens: int = 1000, 
                 topk: int = 5, 
                 stop: List[str] = ['\n\n질문', '질문', 'Q:', '\n\nQ:', '\n\nExercise'],
                 prompt: str = '', 
                 pattern: str = r'[가-힣a-zA-Z0-9\s]+'):
        self.model = LLM(model=model_name, dtype='float16')
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.device = device
        self.max_new_tokens = max_new_tokens
        self.stop = stop
        self.topk = topk
        self.model.llm_engine.model_config.max_logprobs = self.topk + 1
        self.prompt = prompt
        self.pattern = pattern

    def search_cots(self, raw_prompt: str) -> DecodingInfo:
        # 질문과 답변의 형태로 포맷을 변경합니다. 
        formatted_prompt = self.format_prompt(raw_prompt)
        # 질문에 이어 생성된 단어를 top_k개만큼 생성하고, token_id, 생성된 토큰, 확률값을 저장한 topk_token을 생성합니다.
        topk_tokens = self.get_first_topk_tokens(formatted_prompt)
        # 생성된 토큰과 질문을 각각 합쳐주고 prompts라는 리스트에 저장합니다. 
        # 1개의 세트 예시  
        # 질문:아이의 아버지인 외과의사가 말했어. "난 수술 못해! 이 아이는 내 아들이라고!" 이 외과의사는 소년에게 누구일까요?
        # 답변:아이
        prompts = [formatted_prompt + token for token in topk_tokens['decoded']]
        outputs = self.generate_paths(prompts)
        return self.calculate_score(raw_prompt, topk_tokens, outputs)

    @torch.inference_mode()
    def get_first_topk_tokens(self, prompt: str) -> Dict[str, List]:
        sampling_params = SamplingParams(n=1, temperature=0, top_p=1, max_tokens=1, logprobs=self.topk, stop=self.stop)
        # 모델이 입력된 prompt을 기준으로 prompt 뒤에 올 10개의 단어를 예측합니다. 
        outputs = self.model.generate(prompt, sampling_params, use_tqdm=False)[0].outputs[0].logprobs[0]
        # decoded는 "그", "\n\n", "소", "아이", "이", "아", "어" 등의 단어가 생성되어 저장되어 있습니다. 
        # probs는 -2.064455270767212, -3.392580270767212 등의 로그 확률이 저정되어 있습니다. 
        # token_id는 당연히 token_id가 저장되어 있습니다. 
        topk_tokens = {'decoded': [], 'probs': [], 'token_id': [], 'logprobs': []}
        for token_id, logprob_obj in outputs.items():
            topk_tokens['logprobs'].append({token_id: logprob_obj})
            topk_tokens['decoded'].append(logprob_obj.decoded_token)
            topk_tokens['probs'].append(logprob_obj.logprob)
            topk_tokens['token_id'].append(token_id)

        # 로그 확률을 실제 확률로 변환합니다. 
        topk_tokens['probs'] = torch.exp(torch.tensor(topk_tokens['probs'])).tolist()
        return topk_tokens

    @torch.inference_mode()
    def generate_paths(self, prompts: List[str]) -> Dict[int, Dict]:
        # 논문에서는 top-k개의 토큰을 기반으로 다양한 경로를 생성해야한다고 설명합니다. 그 과정을 코드로 구현하였습니다. 
        # 리스트를 입력 받으면 배치로 생성이 됩니다. 
        sampling_params = SamplingParams(n=1, temperature=0, top_p=1, max_tokens=self.max_new_tokens, logprobs=2, stop=self.stop)
        return self.model.generate(prompts, sampling_params, use_tqdm=False)

    def format_prompt(self, raw_prompt: str) -> str:
        return f'질문:{raw_prompt}\n답변:{self.prompt}'


    def calculate_score(self, prompt: str, topk_tokens: Dict, outputs: Dict) -> DecodingInfo:
        paths = []
        for k, output in enumerate(outputs):
            reasoning = topk_tokens['decoded'][k] + output.outputs[0].text
            reasoning = reasoning.strip()
            
            # 질문과 reasoning 간의 유사도를 계산 (간단한 방식으로 질문이 포함되었는지 확인)
            question_similarity = self.calculate_question_similarity(prompt, reasoning)
            
            encode = self.tokenizer(reasoning, return_offsets_mapping=True)
            answer_span = re.findall(self.pattern, reasoning)
            
            score = 0
            if len(answer_span):
                answer_span = answer_span[-1]
                last_pattern_span = (reasoning.rfind(answer_span), reasoning.rfind(answer_span) + len(answer_span))
                idx_answer = [i for i, span in enumerate(encode.offset_mapping)
                            if (span[0] >= last_pattern_span[0] and span[1] <= last_pattern_span[1]) or
                                (span[0] <= last_pattern_span[0] and span[1] >= last_pattern_span[1]) or
                                (span[0] <= last_pattern_span[0] and span[1] > last_pattern_span[0])]

                token_id = [encode.input_ids[idx] for idx in idx_answer]
                output.outputs[0].logprobs.insert(0, topk_tokens['logprobs'][k])
                filtered_answer = [output for i, output in enumerate(output.outputs[0].logprobs) if i in idx_answer]

                sum_answer_span_probs = 0
                for logprob_dict in filtered_answer:
                    logprob_list = list(logprob_dict.items())
                    if len(logprob_list) == 2:
                        prob_diff = (torch.exp(torch.tensor([logprob_list[0][1].logprob])) - torch.exp(torch.tensor([logprob_list[1][1].logprob]))).item()
                    else:
                        prob_diff = torch.exp(torch.tensor([logprob_list[0][1].logprob])).item()
                    sum_answer_span_probs += prob_diff
                
                # 질문과 비슷한 답변일 경우 페널티 적용
                if question_similarity > 0.5:  # 질문과의 유사도가 높을수록 점수를 낮추기 위해 0.5 이상의 유사도에 패널티 적용
                    sum_answer_span_probs *= (1 - question_similarity)  # 유사도가 높을수록 점수를 감소시킴

                score = 0 if len(filtered_answer) == 0 else sum_answer_span_probs / len(filtered_answer)
                answer_span = self.tokenizer.decode(token_id, skip_special_tokens=True).strip()
            else:
                answer_span = '|<NotFound>|'

            paths.append(Path(reasoning_text=reasoning, 
                            score=score,
                            answer_span=answer_span,
                            num_path=k))

        return DecodingInfo(question=prompt, paths=paths)

    # 질문과 Reasoning의 유사도 계산하는 함수
    def calculate_question_similarity(self, question: str, reasoning: str) -> float:
        """ 질문과 reasoning 간의 유사도를 계산하는 간단한 함수. 유사도가 높으면 패널티를 부여한다 """
        question_words = set(question.split())
        reasoning_words = set(reasoning.split())
        
        # 질문과 reasoning 간에 공통된 단어의 비율 계산
        common_words = question_words.intersection(reasoning_words)
        similarity = len(common_words) / len(question_words) if question_words else 0
        
        return similarity
# model_name = "google/gemma-2-2B"  
model_name = "Qwen/Qwen2.5-3B-Instruct"
# model_name = "meta-llama/Llama-3.2-3B"
decoder = CoTDecoder(model_name)


INFO 10-06 16:34:36 llm_engine.py:226] Initializing an LLM engine (v0.6.1.dev238+ge2c6e0a82) with config: model='Qwen/Qwen2.5-3B-Instruct', speculative_config=None, tokenizer='Qwen/Qwen2.5-3B-Instruct', skip_tokenizer_init=False, tokenizer_mode=auto, revision=None, override_neuron_config=None, rope_scaling=None, rope_theta=None, tokenizer_revision=None, trust_remote_code=False, dtype=torch.float16, max_seq_len=32768, download_dir=None, load_format=LoadFormat.AUTO, tensor_parallel_size=1, pipeline_parallel_size=1, disable_custom_all_reduce=False, quantization=None, enforce_eager=False, kv_cache_dtype=auto, quantization_param_path=None, device_config=cuda, decoding_config=DecodingConfig(guided_decoding_backend='outlines'), observability_config=ObservabilityConfig(otlp_traces_endpoint=None, collect_model_forward_time=False, collect_model_execute_time=False), seed=0, served_model_name=Qwen/Qwen2.5-3B-Instruct, use_v2_block_manager=False, num_scheduler_steps=1, multi_step_stream_outputs=Fal

Loading safetensors checkpoint shards:   0% Completed | 0/2 [00:00<?, ?it/s]


INFO 10-06 16:34:39 model_runner.py:1025] Loading model weights took 5.7915 GB
INFO 10-06 16:34:42 gpu_executor.py:122] # GPU blocks: 20335, # CPU blocks: 7281
INFO 10-06 16:34:43 model_runner.py:1329] Capturing the model for CUDA graphs. This may lead to unexpected consequences if the model is not static. To run the model in eager mode, set 'enforce_eager=True' or use '--enforce-eager' in the CLI.
INFO 10-06 16:34:43 model_runner.py:1333] CUDA graphs can take additional 1~3 GiB memory per GPU. If you are running out of memory, consider decreasing `gpu_memory_utilization` or enforcing eager mode. You can also reduce the `max_num_seqs` as needed to decrease memory usage.
INFO 10-06 16:34:53 model_runner.py:1456] Graph capturing finished in 10 secs.


In [2]:
prompt = '아이의 아버지인 외과의사가 말했어. "난 수술 못해! 이 아이는 내 아들이라고!" 이 외과의사는 소년에게 누구일까요?'
# prompt = "I have 3 apples, my dad has 2 more apples than me, how many apples do we have in total?"
result = decoder.search_cots(prompt)

print(f"Question: {result.question}")
for path in result.paths:
    print(f"Path {path.num_path}:")
    print(f"  Reasoning: {path.reasoning_text}")
    print(f"  Answer: {path.answer_span}")
    print(f"  Score: {path.score:.4f}")
    print()

Question: 아이의 아버지인 외과의사가 말했어. "난 수술 못해! 이 아이는 내 아들이라고!" 이 외과의사는 소년에게 누구일까요?
Path 0:
  Reasoning: 이
  Answer: 이
  Score: 0.3357

Path 1:
  Reasoning: 이 외과의사는 소년의 아버지일 것입니다. 외과의사가 소년에게 "난 수술 못해! 이 아이는 내 아들이라고!"라고 말했기 때문에, 그는 소년의 아버지일 가능성이 높습니다. 외과의사가 소년의 아버지라면, 그는 소년에게 "난 수술 못해! 이 아이는 내 아들이라고!"라고 말할 수 있습니다. 

그러나 이 말은 외과의사가 소년의 수술을 맡지 못했다는 것을 의미하는 것이 아니라, 그가 소년의 아버지라는 것을 강조하고 있는 것으로 해석될 수 있습니다. 

따라서, 외과의사는 소년의 아버지일 가능성이 높지만, 확실한 것은 아닙니다. 외과의사가 소년의 아버지인지 확인하기 위해서는 추가적인 정보나 증거가 필요합니다. 

결론적으로, 외과의사는 소년의 아버지일 가능성이 높지만, 확실한 것은 아닙니다. 

(이 답변은 외과의사가 소년의 아버지라는 가정하에 작성되었습니다. 실제 상황은 더 많은 정보가 필요합니다.) 

이 답변은 외과의사가 소년의 아버지라는 가정하에 작성되었습니다. 실제 상황은 더 많은 정보가 필요합니다. 

따라서, 외과의사는 소년의 아버지일 가능성이 높지만, 확실한 것은 아닙니다. 

(이 답변은 외과의사가 소년의 아버지라는 가정하에 작성되었습니다. 실제 상황은 더 많은 정보가 필요합니다.) 

이 답변은 외과의사가 소년의 아버지라는 가정하에 작성되었습니다. 실제 상황은 더 많은 정보가 필요합니다. 

따라서, 외과의사는 소년의 아버지일 가능성이 높지만, 확실한 것은 아닙니다. 

(이 답변은 외과의사가 소년의 아버지라는 가정하에 작성되었습니다. 실제 상황은 더 많은 정보가 필요합니다.) 

이 답변은 외과의사가 소년의 아버지라는 가정하에 작성되었습니다. 실제 상황은 더 많은 정보가 필요합니다. 

따라

In [3]:
prompt = "Elsa has 3 apples. Anna has 2 more apples than Elsa. How many apples do they have together?"
result = decoder.search_cots(prompt)

print(f"Question: {result.question}")
for path in result.paths:
    print(f"Path {path.num_path}:")
    print(f"  Reasoning: {path.reasoning_text}")
    print(f"  Answer: {path.answer_span}")
    print(f"  Score: {path.score:.4f}")
    print()

Question: Elsa has 3 apples. Anna has 2 more apples than Elsa. How many apples do they have together?
Path 0:
  Reasoning: Elsa가 3개의 사과를 가지고 있고, Anna는 Elsa보다 2개의 사과를 가지고 있습니다. 따라서 Anna는 3 + 2 = 5개의 사과를 가지고 있습니다. 따라서 두 사람의 사과의 총 개수는 3 + 5 = 8개입니다.

따라서, Elsa와 Anna는 총 8개의 사과를 가지고 있습니다. 

답: 8개의 사과. 

(단, 이 문제는 사과의 개수를 정확히 세는 것이 아니라, 간단한 계산 문제로 풀이하였습니다. 실제로 사과를 가지고 있는 상황에서는 각각의 사과를 세어야 합니다.) 

정확한 계산을 위해, 각각의 사과를 세어보면:
- Elsa: 3개
- Anna: 3 + 2 = 5개

따라서, 총 3 + 5 = 8개의 사과를 가지고 있습니다. 

답: 8개의 사과. 

(정확한 계산 결과) 

또는, 간단하게 계산하면:
- Anna가 가지고 있는 사과의 수는 Elsa의 사과 수 + 2 = 3 + 2 = 5개
- 따라서, 두 사람의 사과의 총 수는 3 + 5 = 8개

답: 8개의 사과. 

(간단한 계산 결과) 

따라서, 두 사람의 사과의 총 수는 8개입니다. 

답: 8개의 사과. 

(결론) 

(정확한 계산 결과와 간단한 계산 결과가 일치함을 확인하였습니다.) 

결론적으로, 두 사람의 사과의 총 수는 8개입니다. 

답: 8개의 사과. 

(결정적인 답변) 

(정확한 계산 결과와 간단한 계산 결과가 일치함을 확인하였습니다.) 

따라서, 두 사람의 사과의 총 수는 8개입니다. 

답: 8개의 사과. 

(결정적인 답변) 

(정확한 계산 결과와 간단한 계산 결과가 일치함을 확인하였습니다.) 

결론적으로, 두 사람의 사과의 총 수는 8개입니다. 

답: 8개의 사과. 

(결정적인 답변) 

(정확한 계산 결과와 간단한 계산 결과가 일치함

In [6]:
reasoning = "I have 3 apples, my dad has 2 more apples than me, so my dad has 3 apples more than me. So, we have 3 apples + 3 apples = 6 apples."
answer_span = ' 6 apples'
reasoning.rfind(answer_span)

121

In [4]:
prompt = "How many r's in Strawberry?"
result = decoder.search_cots(prompt)

print(f"Question: {result.question}")
for path in result.paths:
    print(f"Path {path.num_path}:")
    print(f"  Reasoning: {path.reasoning_text}")
    print(f"  Answer: {path.answer_span}")
    print(f"  Score: {path.score:.4f}")
    print()

Question: How many r's in Strawberry?
Path 0:
  Reasoning: Strawberry는 1개의 'r'을 가지고 있습니다. 

Strawberry는 영어로 '딸기'를 의미하며, 이 단어에서 'r'은 단어의 첫 글자로 사용되고 있습니다. 

단어의 다른 부분에는 'r'이 포함되어 있지 않습니다. 

따라서, Strawberry 단어 안에 있는 'r'의 수는 1개입니다. 

(단, 영어에서 단어의 첫 글자는 대개 소문자로 쓰이므로, 대문자 'R'은 고려하지 않습니다.) 

문제를 잘못 이해하신 것 같아요. 만약에 'r'이 단어 내에 중간에 위치해 있는 경우라면, Strawberry에는 'r'이 없습니다. 

만약 'r'이 단어의 중간에 위치해 있는 다른 단어를 생각하신다면, 그 단어를 알려주시면 더 정확한 답변을 드릴 수 있을 것 같습니다. 

(예를 들어, 'stranger'는 1개의 'r'을 가지고 있습니다.) 

문제를 다시 확인해 주세요. 

(이 답변은 'r'이 단어의 첫 글자로만 위치할 때의 경우에만 적용됩니다.) 

(문제를 다시 확인해 주세요.) 

(문제를 다시 확인해 주세요.) 

(문제를 다시 확인해 주세요.) 

(문제를 다시 확인해 주세요.) 

(문제를 다시 확인해 주세요.) 

(문제를 다시 확인해 주세요.) 

(문제를 다시 확인해 주세요.) 

(문제를 다시 확인해 주세요.) 

(문제를 다시 확인해 주세요.) 

(문제를 다시 확인해 주세요.) 

(문제를 다시 확인해 주세요.) 

(문제를 다시 확인해 주세요.) 

(문제를 다시 확인해 주세요.) 

(문제를 다시 확인해 주세요.) 

(문제를 다시 확인해 주세요.) 

(문제를 다시 확인해 주세요.) 

(문제를 다시 확인해 주세요.) 

(문제를 다시 확인해 주세요.) 

(문제를 다시 확인해 주세요.) 

(문제를 다시 확인해 주세요.) 

(문제를 다시 확인해 주세요.) 

(문제를 다시 확인해 주세요.) 

(문제를 다시 확인해 주세