In [1]:
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, END
from typing import TypedDict, List, Dict, Any
import json

In [2]:
# Define models

o3_mini = init_chat_model("openai:o3-mini")
claude_sonnet = init_chat_model("anthropic:claude-3-5-sonnet-latest", temperature=0)
gemma3 = init_chat_model("ollama:gemma3:12b", temperature=0)
llama3_1 = init_chat_model("ollama:llama3.1:latest", temperature=0)
deepseek_r1 = init_chat_model("ollama:deepseek-r1:8b", temperature=0)

In [3]:
# Preparing benchmark data

file_path = "biggen_bench_instruction_idx0.json"

def load_benchmark_data(file_path: str) -> List[Dict]:
    """Load benchmark data from a JSON file."""
    with open(file_path, 'r', encoding='utf-8') as f:
        data = json.load(f)
    return data

def prepare_prompts(benchmark_data: List[Dict]) -> List[Dict]:
    """Prepare prompts by excluding reference_answer and score_rubric."""
    prompts = []
    for item in benchmark_data:
        prompt = {
            "id": item["id"],
            "capability": item["capability"],
            "task": item["task"],
            "instance_idx": item["instance_idx"],
            "system_prompt": item["system_prompt"],
            "input": item["input"],
            # Exclude reference_answer and score_rubric
        }
        prompts.append(prompt)
    return prompts

def prepare_rubric(benchmark_data: List[Dict]) -> List[Dict]:
    """Prepare rubric including reference_answer and score_rubric."""
    rubric = []
    for item in benchmark_data:
        prompt = {
            "id": item["id"],
            "reference_answer": item["reference_answer"],
            "score_rubric": item["score_rubric"]
        }
        rubric.append(prompt)
    return rubric

In [4]:
# run function
benchmark_data = load_benchmark_data(file_path)
prompts = prepare_prompts(benchmark_data)
rubric = prepare_rubric(benchmark_data)

In [5]:
# Define the state structure
class State(TypedDict):
    prompts: List[Dict[str, Any]]
    processed_count: int
    gemma3_results: List[Dict[str, Any]]
    llama3_1_results: List[Dict[str, Any]]
    deepseek_r1_results: List[Dict[str, Any]]

In [6]:
def create_model_processor(model, model_name):
    def process_model(state: State) -> State:
        results = []
        
        for prompt in state["prompts"]:
            system_prompt = prompt.get('system_prompt', '')
            user_input = prompt.get('input', '')
            
            try:
                response = model.invoke(
                    user_input,
                    config={"system_prompt": system_prompt}
                )
                
                response_content = response.content if hasattr(response, 'content') else str(response)

                result = {
                    "id": prompt.get('id', ''),
                    "model_name": model_name,
                    "response": response_content
                }
            except Exception as e:
                result = {
                    "id": prompt.get('id', ''),
                    "model_name": model_name,
                    "error": str(e)
                }
                
            results.append(result)
        
        results_key = f"{model_name}_results"
        return {
            **state,
            results_key: results
        }
    
    return process_model

In [7]:
process_gemma3 = create_model_processor(gemma3, "gemma3")
process_llama3_1 = create_model_processor(llama3_1, "llama3_1")
process_deepseek_r1 = create_model_processor(deepseek_r1, "deepseek_r1")

In [8]:
# 초기 상태 설정
initial_state = {
    "prompts": prompts,
    "processed_count": 0,
    "gemma3_results": [],
    "llama3_1_results": [],
    "deepseek_r1_results": []
}

# 그래프 생성
workflow = StateGraph(State)
workflow.add_node("process_gemma3", process_gemma3)
workflow.add_node("process_llama3", process_llama3_1)
workflow.add_node("process_deepseek", process_deepseek_r1)

# 노드 연결
workflow.set_entry_point("process_gemma3")
workflow.add_edge("process_gemma3", "process_llama3")
workflow.add_edge("process_llama3", "process_deepseek")

# 그래프 컴파일
app = workflow.compile()

# 실행
final_state = app.invoke(initial_state)

In [None]:
print(type(result))

In [None]:
# final_state가 dict 타입인지 확인
if isinstance(final_state, dict):
    # JSON 파일로 저장
    output_file_path = "output_parallel_results.json"
    try:
        with open(output_file_path, 'w', encoding='utf-8') as f:
            json.dump(final_state, f, ensure_ascii=False, indent=4) # ensure_ascii=False는 한글 깨짐 방지, indent는 가독성 향상
        print(f"결과가 {output_file_path} 에 성공적으로 저장되었습니다.")
    except TypeError as e:
        print(f"JSON 직렬화 오류: {e}")
        # 만약 final_state 딕셔너리 내부에 JSON으로 변환할 수 없는 타입 (예: 모델 객체 자체)이 있다면
        # 해당 부분을 처리해야 합니다. 현재 코드에서는 LLM 응답(response)이 문자열 또는 LangChain 메시지 객체일 가능성이 높으며,
        # LangChain 메시지 객체는 기본적으로 JSON 직렬화가 어려울 수 있습니다.
        # 이 경우, response 내용을 문자열 등으로 변환하는 과정이 필요할 수 있습니다.
        # 예를 들어, process_model 함수 내에서 response를 저장할 때:
        # "response": response.content # AIMessage 등의 객체일 경우 .content 사용
else:
    print(f"오류: 최종 결과의 타입이 dict가 아닙니다. 타입: {type(final_state)}")
    # 만약 정말로 str이라면, 어떤 과정에서 문자열로 변환되었는지 확인 필요
    # 예: final_state = str(app.invoke(initial_state)) 와 같이 실수로 변환했을 수 있음