In [15]:
from datetime import datetime

from langchain_community.document_loaders import WebBaseLoader
# Wikipedia
from langchain_community.tools import WikipediaQueryRun
from langchain_community.utilities import DuckDuckGoSearchAPIWrapper
from langchain_community.utilities import WikipediaAPIWrapper


def search_wikipedia(parameters):
    api_wrapper = WikipediaAPIWrapper(top_k_results=2)
    tool = WikipediaQueryRun(api_wrapper=api_wrapper)
    return tool._run(parameters["query"])


def search_duckduckgo(parameters):
    duckduck_api_wrapper = DuckDuckGoSearchAPIWrapper(time="m")
    docs = duckduck_api_wrapper.results(parameters["query"], max_results=1)
    links = []

    for doc in docs:
        links.append(doc["link"])

    loader = WebBaseLoader(
        web_path=links,
        bs_get_text_kwargs={
            "strip": True,
        }
    )

    load = loader.load()
    return "\n".join([content.page_content for content in load])


def save_file(parameters):
    now = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
    filename = f"research_result_{now}.txt"
    with open(filename, 'w', encoding="utf-8") as f:
        f.write(parameters["content"] + "\n")
    return parameters["content"]

In [16]:
functions = [
    {
        "type": "function",
        "function": {
            "name": "search_wikipedia",
            "description": "query에 대한 답을 Wikipedia를 활용해 검색합니다.",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "The query you will search",
                    },
                },
                "required": ["query"],
            },

        }
    },
    {
        "type": "function",
        "function": {
            "name": "search_duckduckgo",
            "description": "query에 대한 답을 DuckDuckGo를 활용해 검색합니다.",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "The query you will search",
                    },
                },
                "required": ["query"]
            },

        }
    },
    {
        "type": "function",
        "function": {
            "name": "save_file",
            "description": """ 리서치 작업이 완료되면 반드시 사용해야 하는 도구입니다.
                            Wikipedia와 DuckDuckGo에서 수집한 모든 정보를 정리하여 content 파라미터에 전달하면 자동으로 파일로 저장됩니다.
                            주의: content에는 파일명이 아닌, 실제로 조사한 내용 요약/정리해서 넣어야 합니다.
                            """,
            "parameters": {
                "type": "object",
                "properties": {
                    "content": {
                        "type": "string",
                        "description": "검색 도구들로부터 수집한 전체 리서치 내용과 결과를 여기에 전달하세요. 파일명이 아닌 실제 조사한 내용 전체를 포함해야 합니다.",
                    },
                },
                "required": ["content"]
            },

        }
    },
]

functions_map = {
    "search_wikipedia": search_wikipedia,
    "search_duckduckgo": search_duckduckgo,
    "save_file": save_file,
}

In [None]:
import json
import time


def get_run(run_id: str, thread_id: str):
    """실행 상태 조회"""
    return client.beta.threads.runs.retrieve(
        run_id=run_id,
        thread_id=thread_id,
    )


def get_messages(thread_id: str):
    """스레드의 모든 메시지 조회"""
    messages = client.beta.threads.messages.list(thread_id=thread_id)
    messages = list(messages)
    messages.reverse()
    for message in messages:
        print(f"{message.role}: {message.content[0].text.value}")


def send_message(thread_id: str, content: str):
    """메시지 전송"""
    return client.beta.threads.messages.create(
        thread_id=thread_id, role="user", content=content
    )


def get_tool_outputs(run_id: str, thread_id: str):
    """Tool 호출 결과 수집"""
    run = get_run(run_id, thread_id)
    outputs = []
    for action in run.required_action.submit_tool_outputs.tool_calls:
        action_id = action.id
        function = action.function
        print(f"Calling function: {function.name} with arg {function.arguments}")
        outputs.append(
            {
                "output": functions_map[function.name](json.loads(function.arguments)),
                "tool_call_id": action_id,
            }
        )
    return outputs


def submit_tool_outputs(run_id: str, thread_id: str):
    """Tool 출력 제출"""
    outputs = get_tool_outputs(run_id, thread_id)
    return client.beta.threads.runs.submit_tool_outputs(
        run_id=run_id,
        thread_id=thread_id,
        tool_outputs=outputs,
    )


def wait_for_run_completion(run_id: str, thread_id: str, max_attempts: int = 100, check_interval: float = 1.0):
    """Run 완료 대기 (폴링 방식)"""
    for attempt in range(max_attempts):
        run = get_run(run_id, thread_id)
        print(f"[Attempt {attempt + 1}] Run status: {run.status}")

        if run.status == "completed":
            print("✓ Run completed successfully")
            return run
        elif run.status == "requires_action":
            print("⚙️ Run requires action - submitting tool outputs")
            submit_tool_outputs(run_id, thread_id)
        elif run.status in ["failed", "expired", "cancelled"]:
            print(f"✗ Run failed with status: {run.status}")
            return run

        time.sleep(check_interval)

    raise TimeoutError(f"Run did not complete within {max_attempts * check_interval} seconds")

In [None]:
# ===== STREAMING 기능 =====

def process_stream(stream):
    """
    Assistant API Streaming 이벤트 처리
    
    주요 이벤트:
    - thread.created: 스레드 생성
    - thread.run.created: 실행 생성
    - thread.run.in_progress: 실행 진행 중
    - thread.run.requires_action: Tool 호출 필요
    - thread.message.delta: 메시지 스트리밍 중
    - thread.run.completed: 실행 완료
    - done: 스트림 종료
    """
    print("🔄 Starting stream processing...")

    current_message = ""
    tool_calls = {}

    for event in stream:
        event_type = event.event

        if event_type == "thread.created":
            print(f"✓ Thread created: {event.data.id}")

        elif event_type == "thread.run.created":
            print(f"✓ Run created: {event.data.id}")

        elif event_type == "thread.run.in_progress":
            print("⏳ Run in progress...")

        elif event_type == "thread.message.created":
            print(f"📝 Message created (role: {event.data.role})")

        elif event_type == "thread.message.delta":
            # 메시지 스트리밍 - 실시간 텍스트 수신
            if hasattr(event.data.delta, 'content') and event.data.delta.content:
                for content in event.data.delta.content:
                    if hasattr(content, 'text') and hasattr(content.text, 'value'):
                        text = content.text.value
                        print(text, end="", flush=True)
                        current_message += text

        elif event_type == "thread.message.completed":
            print(f"\n✓ Message completed")

        elif event_type == "thread.run.requires_action":
            print("\n⚙️ Run requires action - tool calls needed")
            if hasattr(event.data.required_action, 'submit_tool_outputs'):
                for tool_call in event.data.required_action.submit_tool_outputs.tool_calls:
                    print(f"  🔧 Tool call: {tool_call.function.name}")
                    tool_calls[tool_call.id] = tool_call

        elif event_type == "thread.run.completed":
            print("\n✓ Run completed successfully!")

        elif event_type == "thread.run.failed":
            print(f"\n✗ Run failed: {event.data.last_error}")

        elif event_type == "error":
            print(f"\n✗ Stream error: {event.data}")

        elif event_type == "done":
            print("\n✓ Stream ended")

    return current_message


def create_run_with_stream(thread_id: str, assistant_id: str):
    """
    Streaming을 활성화한 Run 생성
    
    stream=True를 설정하면 Server-Sent Events (SSE)로 실시간 업데이트를 받을 수 있습니다.
    """
    print(f"🚀 Creating run with streaming (thread: {thread_id}, assistant: {assistant_id})")

    with client.beta.threads.runs.stream(
            thread_id=thread_id,
            assistant_id=assistant_id,
    ) as stream:
        return process_stream(stream)


def submit_tool_outputs_with_stream(run_id: str, thread_id: str):
    """
    Tool 출력을 제출하고 Streaming으로 응답 받기
    """
    print(f"📤 Submitting tool outputs with streaming...")

    # 먼저 tool outputs 수집
    run = get_run(run_id, thread_id)
    outputs = []

    for action in run.required_action.submit_tool_outputs.tool_calls:
        action_id = action.id
        function = action.function
        print(f"  → Executing: {function.name}")

        output = functions_map[function.name](json.loads(function.arguments))
        outputs.append({
            "output": output,
            "tool_call_id": action_id,
        })

    # Tool 결과를 제출하면서 스트리밍 수신
    with client.beta.threads.runs.submit_tool_outputs_stream(
            thread_id=thread_id,
            run_id=run_id,
            tool_outputs=outputs,
    ) as stream:
        return process_stream(stream)

In [None]:
from openai import OpenAI
from dotenv import load_dotenv

load_dotenv()

# OpenAI 클라이언트 초기화 (v1.0+ 최신 방식)
client = OpenAI()

assistant = client.beta.assistants.create(
    name="search assistant",
    instructions="""
      당신은 주제에 대한 광범위한 조사를 수행하는 AI 어시스턴트입니다.

      **필수 워크플로우:**
      1. 사용자 질문을 받으면 Wikipedia와 DuckDuckGo로 조사
      2. 조사 결과를 종합하고 정리
      3. save_file 도구를 사용하여 결과를 반드시 파일에 저장
      4. "✓ 파일에 저장되었습니다:
      research_result_YYYY-MM-DDTHH:MM:SS.txt" 형식으로 알림
      5. 저장된 내용의 요약을 사용자에게 제공

      **주의:**
      - 모든 조사 결과는 반드시 파일로 저장되어야 함
      - 파일 저장 없이 조사만 하면 안 됨
      """,
    model="gpt-4-turbo",
    tools=functions,
)

In [19]:
assistant_id = "asst_jK97S5ZWiAvVoafRKkz2rPk4"

In [None]:
# ===== 예제 1: 폴링 방식 (기존 방식) =====
# 이 방식은 run 상태를 주기적으로 확인합니다.

def example_polling():
    """폴링 방식 예제"""
    # Step 1: 스레드 생성
    thread = client.beta.threads.create(
        messages=[
            {
                "role": "user",
                "content": "Research about the XZ backdoor",
            }
        ]
    )
    print(f"Thread created: {thread.id}")

    # Step 2: Run 생성
    run = client.beta.threads.runs.create(
        thread_id=thread.id,
        assistant_id=assistant_id,
    )
    print(f"Run created: {run.id}")

    # Step 3: Run 완료 대기 (폴링)
    wait_for_run_completion(run.id, thread.id)

    # Step 4: 최종 메시지 조회
    print("\n=== Final Response ===")
    get_messages(thread.id)

    return thread.id

In [None]:
# ===== 예제 2: Streaming 방식 (권장) =====
# 이 방식은 Server-Sent Events (SSE)를 통해 실시간 업데이트를 받습니다.
# 응답이 더 빠르고 실시간으로 메시지를 볼 수 있습니다.

def example_streaming():
    """Streaming 방식 예제"""
    # Step 1: 스레드와 함께 메시지 생성하고 Run 실행
    print("=== Using Streaming Mode ===\n")

    response = client.beta.threads.create_and_run(
        assistant_id=assistant_id,
        thread={"messages": [{"role": "user", "content": "Research about the XZ backdoor"}]},
    )

    # Step 2: Run이 완료될 때까지 스트리밍으로 이벤트 수신
    # (Tool call이 필요한 경우 자동으로 처리)
    with client.beta.threads.runs.stream(
            thread_id=response.thread_id,
            assistant_id=assistant_id,
    ) as stream:
        for event in stream:
            # 스트림 이벤트 처리
            if event.event == "thread.message.delta":
                if hasattr(event.data.delta, 'content'):
                    for content in event.data.delta.content:
                        if hasattr(content, 'text'):
                            print(content.text.value, end="", flush=True)
            elif event.event == "thread.run.requires_action":
                print("\n⚙️ Handling tool calls...")

    print("\n✓ Streaming completed!")
    return response.thread_id

In [20]:
thread = client.beta.threads.create(
    messages=[
        {
            "role": "user",
            "content": "Research about the XZ backdoor",
        }
    ]
)

  thread = client.beta.threads.create(


In [21]:
run = client.beta.threads.runs.create(
    thread_id=thread.id,
    assistant_id=assistant_id,
)

  run = client.beta.threads.runs.create(


In [22]:
get_run(run.id, thread.id).status

  return client.beta.threads.runs.retrieve(


'queued'

In [24]:
submit_tool_outputs(run.id, thread.id)

  return client.beta.threads.runs.retrieve(


Calling function: search_wikipedia with arg {"query": "XZ backdoor"}
Calling function: search_duckduckgo with arg {"query": "XZ backdoor"}


  return client.beta.threads.runs.submit_tool_outputs(


Run(id='run_ckWHHydnNV5USDDY0u2EMBUa', assistant_id='asst_jK97S5ZWiAvVoafRKkz2rPk4', cancelled_at=None, completed_at=None, created_at=1761880037, expires_at=1761880637, failed_at=None, incomplete_details=None, instructions='검색한 결과를 파일로 저장하는 것을 도와줍니다.', last_error=None, max_completion_tokens=None, max_prompt_tokens=None, metadata={}, model='gpt-4.1-mini', object='thread.run', parallel_tool_calls=True, required_action=None, response_format='auto', started_at=1761880038, status='queued', thread_id='thread_IJYn9z261gUBNZ9Xa63Af7mm', tool_choice='auto', tools=[FunctionTool(function=FunctionDefinition(name='search_wikipedia', description='query에 대한 답을 Wikipedia를 활용해 검색합니다.', parameters={'type': 'object', 'properties': {'query': {'type': 'string', 'description': 'The query you will search'}}, 'required': ['query']}, strict=False), type='function'), FunctionTool(function=FunctionDefinition(name='search_duckduckgo', description='query에 대한 답을 DuckDuckGo를 활용해 검색합니다.', parameters={'type': 'object'

In [25]:
get_run(run.id, thread.id).status

  return client.beta.threads.runs.retrieve(


'completed'

In [26]:
get_messages(thread.id)

  messages = client.beta.threads.messages.list(thread_id=thread_id)


user: Research about the XZ backdoor
assistant: The "XZ backdoor" refers to a malicious backdoor discovered in the Linux build of the xz utility within the liblzma library in versions 5.6.0 and 5.6.1. It was introduced in February 2024 by an account named "Jia Tan." This backdoor allowed an attacker with a specific Ed448 private key to execute remote code via OpenSSH on affected Linux systems. The vulnerability is identified as CVE-2024-3094 and has the highest possible severity score of 10.0 (CVSS).

XZ Utils is a set of free software command-line lossless data compressors widely used in Unix-like systems and Windows. The backdoor resided in specially crafted .xz test files and a modification in the build scripts, which injected the backdoor into the built binaries. It was carefully disguised as legitimate test files and build scripts, making it extremely difficult to detect through standard code reviews or automated tools.

The backdoor was discovered by developer Andres Freund in Ma