# Import Your trained model

In [None]:
import torch
import matplotlib.pyplot as plt
from transformers import AutoTokenizer, AutoModelForCausalLM, set_seed
from utils.prompts import get_messages, get_cot_Ex
# 0) 환경 설정
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
set_seed(42)

# 1) 모델·토크나이저 로드
HF_REPO = "YOUR MODEL" # Huggingface repo name includes author's name. Cannot be provided on review stage.
tokenizer = AutoTokenizer.from_pretrained(HF_REPO)
tokenizer.pad_token = tokenizer.eos_token
model = AutoModelForCausalLM.from_pretrained(HF_REPO, 
                                             torch_dtype=torch.bfloat16,
                                             device_map="auto")
model.eval()


# Response Generation with Summary Tokens

In [None]:
from utils.lsc_generate import add_enhanced_generation
model = add_enhanced_generation(model)
special_tokens = [tokenizer.encode(t, add_special_tokens=False)[0] for t in
               ["<Support1>","<Support2>","<Support3>","<Support4>","<Support5>","<Support6>",]]
print(f"Special IDs: {special_tokens}")
question = """Let \[f(x) = \left\{
\begin{array}{cl} ax+3, &\text{ if }x>2, \\
x-5 &\text{ if } -2 \le x \le 2, \\
2x-b &\text{ if } x <-2.
\end{array}
\right.\]Find $a+b$ if the piecewise function is continuous (which means that its graph can be drawn without lifting your pencil from the paper)."""

# 이 함수들이 정의되어 있지 않으므로 주석 처리하고 대체 예시 제공
cot_ex = get_cot_Ex("MATH", "llama3_8b")
model_name = "llama3_8b"

# 예시 메시지
messages = get_messages(
    question=question,
    cot_ex=cot_ex,
    model_name="llama3_8b",
    dataset="MATH"
)

# 1) Prepare prompt
inputs = tokenizer.apply_chat_template(
    messages,
    add_generation_prompt=True,
    return_tensors="pt",
    enable_thinking=False
).to(model.device)
set_seed(42)

result = model.generate_with_special_tokens(
    inputs=inputs,
    special_tokens=special_tokens,
    remove_eos=True,
    return_embeddings=True,
    max_new_tokens=1024,
    num_return_sequences=10,
)

set_seed(42)
print(f"Lengths: {result.sequences.shape}")

seq_ids = []
for seq in result.sequences:
  sequence = seq.tolist()
  seq_ids.append(sequence)
prompt_len = inputs.shape[1]
print(prompt_len)
ans_list = []
for id in seq_ids[:2]:
  decoded = tokenizer.decode(
      id[prompt_len:],
      skip_special_tokens=False,
      clean_up_tokenization_spaces=True
  )
  ans_list.append(decoded)
  print("\nDecoded text:\n", decoded)


# LSC Calculation

In [None]:
import numpy as np
import torch.nn.functional as F
embs = result.mean_embeddings
embs_norm = F.normalize(embs.float(), p=2, dim=1)
sim_matrix = embs_norm @ embs_norm.T
sim_np = sim_matrix.cpu().float().numpy()
np.fill_diagonal(sim_np, 0.0)

weights = np.exp(sim_np / 0.5)
np.fill_diagonal(weights, 0.0)
avg_weight = weights.mean(axis=1)
best_idx = int(np.argmax(avg_weight))
lsc_calib = avg_weight[best_idx]

# Dynamic LSC Calculation

In [None]:
def dynamic_topk_lsc(weights, ans_list):
    """
    Dynamic TopK LSC: Calculate from top2 to topN each,
    find the point with the largest decrease in max_sim_score and use K right before that point
    """
    num_paths = len(ans_list)
    max_k = min(num_paths - 1, num_paths)  # Set maximum K
    
    max_scores = []
    best_indices = []
    avg_weights_list = []
    
    # Calculate from K=2 to max_k
    for k in range(2, max_k + 1):
        # Select only top K weights from each row and average
        avg_weight_topk = np.array([
            np.mean(np.sort(row)[-k:])
            for row in weights
        ])
        
        best_idx = int(np.argmax(avg_weight_topk))
        max_score = avg_weight_topk[best_idx]
        
        avg_weights_list.append(avg_weight_topk)
        max_scores.append(max_score)
        best_indices.append(best_idx)
    
    # Find point with largest decrease in max_sim_score
    optimal_k = max_k  # Default: last K
    largest_drop = 0.0
    largest_drop_idx = -1
    
    # Calculate differences between consecutive points to find largest decrease
    for i in range(1, len(max_scores)):
        score_diff = max_scores[i-1] - max_scores[i]  # Decrease from previous to current point
        if score_diff > largest_drop:
            largest_drop = score_diff
            largest_drop_idx = i
    
    # Select K right before the point with largest decrease
    if largest_drop_idx != -1 and largest_drop > 0:
        optimal_k = largest_drop_idx + 1  # largest_drop_idx is where decrease occurred, so optimal is right before
    
    # Select result corresponding to optimal_k
    k_idx = optimal_k - 2  # Convert to 0-based index (K=2 is index 0)
    if k_idx >= len(best_indices):
        k_idx = len(best_indices) - 1
    elif k_idx < 0:
        k_idx = 0
    
    selected_idx = best_indices[k_idx]
    selected_answer = ans_list[selected_idx]
    selected_conf = ans_list.count(selected_answer) / len(ans_list)
    selected_calib = avg_weights_list[k_idx][selected_idx]
    
    # Add decrease information
    score_diffs = []
    for i in range(1, len(max_scores)):
        score_diffs.append(max_scores[i-1] - max_scores[i])
    
    return {
        "answer": selected_answer,
        "optimal_k": optimal_k,
        "selected_idx": selected_idx,
        "conf": selected_conf,
        "calib": selected_calib,
        "max_scores": max_scores,
        "score_diffs": score_diffs,
        "largest_drop": largest_drop,
        "largest_drop_idx": largest_drop_idx,
        "all_k_results": list(zip(range(2, max_k + 1), max_scores, best_indices))
    }

result = dynamic_topk_lsc(weights, ans_list)
print(f"Selected Answer: {result['answer']}")
print(f"Optimal K: {result['optimal_k']}")
print(f"Selected Index: {result['selected_idx']}")
