<a href="https://colab.research.google.com/github/yoyo0914/ML2025/blob/main/2025hw1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
%cd /content/drive/MyDrive/ML/HW1

In [None]:
!python3 -m pip install --no-cache-dir llama-cpp-python==0.3.4 --extra-index-url https://abetlen.github.io/llama-cpp-python/whl/cu122
!python3 -m pip install googlesearch-python bs4 charset-normalizer requests-html lxml_html_clean

from pathlib import Path
if not Path('./Meta-Llama-3.1-8B-Instruct-Q8_0.gguf').exists():
    !wget https://huggingface.co/bartowski/Meta-Llama-3.1-8B-Instruct-GGUF/resolve/main/Meta-Llama-3.1-8B-Instruct-Q8_0.gguf
if not Path('./public.txt').exists():
    !wget https://www.csie.ntu.edu.tw/~ulin/public.txt
if not Path('./private.txt').exists():
    !wget https://www.csie.ntu.edu.tw/~ulin/private.txt

In [None]:
from llama_cpp import Llama

# Load the model onto GPU
llama3 = Llama(
    "./Meta-Llama-3.1-8B-Instruct-Q8_0.gguf",
    verbose=False,
    n_gpu_layers=-1,
    n_ctx=16384,    # This argument is how many tokens the model can take. The longer the better, but it will consume more memory. 16384 is a proper value for a GPU with 16GB VRAM.
)
def generate_response(_model: Llama, _messages: str) -> str:
    '''
    This function will inference the model with given messages.
    '''
    _output = _model.create_chat_completion(
        _messages,
        stop=["<|eot_id|>", "<|end_of_text|>"],
        max_tokens=512,    # This argument is how many tokens the model can generate, you can change it and observe the differences.
        temperature=0,      # This argument is the randomness of the model. 0 means no randomness. You will get the same result with the same input every time. You can try to set it to different values.
        repeat_penalty=2.0,
    )["choices"][0]["message"]["content"]
    return _output

In [None]:
from typing import List
from googlesearch import search as _search
from bs4 import BeautifulSoup
from charset_normalizer import detect
import asyncio
from requests_html import AsyncHTMLSession
import urllib3
urllib3.disable_warnings()

async def worker(s:AsyncHTMLSession, url:str):
    try:
        header_response = await asyncio.wait_for(s.head(url, verify=False), timeout=10)
        if 'text/html' not in header_response.headers.get('Content-Type', ''):
            return None
        r = await asyncio.wait_for(s.get(url, verify=False), timeout=10)
        return r.text
    except:
        return None

async def get_htmls(urls):
    session = AsyncHTMLSession()
    tasks = (worker(session, url) for url in urls)
    return await asyncio.gather(*tasks)

async def search(keyword: str, n_results: int=10) -> List[str]:
    '''
    This function will search the keyword and return the text content in the first n_results web pages.

    Warning: You may suffer from HTTP 429 errors if you search too many times in a period of time. This is unavoidable and you should take your own risk if you want to try search more results at once.
    The rate limit is not explicitly announced by Google, hence there's not much we can do except for changing the IP or wait until Google unban you (we don't know how long the penalty will last either).
    '''
    keyword = keyword[:100]
    # First, search the keyword and get the results. Also, get 2 times more results in case some of them are invalid.
    results = list(_search(keyword, n_results * 2, lang="zh", unique=True))
    # Then, get the HTML from the results. Also, the helper function will filter out the non-HTML urls.
    results = await get_htmls(results)
    # Filter out the None values.
    results = [x for x in results if x is not None]
    # Parse the HTML.
    results = [BeautifulSoup(x, 'html.parser') for x in results]
    # Get the text from the HTML and remove the spaces. Also, filter out the non-utf-8 encoding.
    results = [''.join(x.get_text().split()) for x in results if detect(x.encode()).get('encoding') == 'utf-8']
    # Return the first n results.
    return results[:n_results]

In [None]:
class LLMAgent():
    def __init__(self, role_description: str, task_description: str, llm:str="bartowski/Meta-Llama-3.1-8B-Instruct-GGUF"):
        self.role_description = role_description   # Role means who this agent should act like. e.g. the history expert, the manager......
        self.task_description = task_description    # Task description instructs what task should this agent solve.
        self.llm = llm  # LLM indicates which LLM backend this agent is using.
    def inference(self, message:str) -> str:
        if self.llm == 'bartowski/Meta-Llama-3.1-8B-Instruct-GGUF': # If using the default one.
            # TODO: Design the system prompt and user prompt here.
            # Format the messsages first.
            messages = [
                {"role": "system", "content": f"{self.role_description}"},  # Hint: you may want the agents to speak Traditional Chinese only.
                {"role": "user", "content": f"{self.task_description}\n{message}"}, # Hint: you may want the agents to clearly distinguish the task descriptions and the user messages. A proper seperation text rather than a simple line break is recommended.
            ]
            return generate_response(llama3, messages)
        else:
            # TODO: If you want to use LLMs other than the given one, please implement the inference part on your own.
            return ""

In [None]:
# 問題提取代理：從複雜描述中提取核心問題，保留所有關鍵資訊
question_extraction_agent = LLMAgent(
    role_description="你是一位精煉提問的專家。你只負責提取問題的核心，去除冗餘描述。如果原問題已經簡潔，則直接返回原問題。絕對不要嘗試回答問題或猜測答案。也不要自己隨意生成原問題沒有的文字。",
    task_description="###任務###\n請從以下輸入中提取出核心問題，去除冗餘描述，但保留所有關鍵專有名詞和時間地點人物等重要資訊，注意引號的內容要保留。如果原問題已經簡潔明確，則直接原封不動地返回原問題。注意：你的任務僅是提煉問題，絕對不是回答問題或猜測答案。\n\n###輸入###\n"
)

# 關鍵詞提取代理：提取最佳搜索關鍵詞，強調專有名詞
keyword_extraction_agent = LLMAgent(
    role_description="你是頂尖的搜索關鍵詞提取專家。你精通從問題中提取最適合用於Google搜尋的關鍵詞和短語，特別擅長保留所有專有名詞、地名、人名等核心實體。並且不可以生成出原本問題裡面沒有的關鍵字。",
    task_description="###任務###\n請從以下問題中提取3-5個最適合用於Google搜尋的關鍵詞或短語。這些關鍵詞應該能幫助找到最相關的資訊。只返回關鍵詞，以空格分隔，不要有任何其他文字或標點符號。請特別注意保留所有專有名詞（如人名、地名、機構名、產品名等），這些是搜尋的核心。\n\n###問題###\n",
)

# 問答代理：根據提供的資訊回答問題，強調事實準確性
qa_agent = LLMAgent(
    role_description="你是一位專業的回答專家。你會根據得問題和得到的資訊來判斷出最有可能的答案，提供最精簡的回答，但是同時也要精準(例如行政區劃的範圍不能過大)。你會直接給出答案，不需要解釋資訊來源。對於數字、日期、人名、地名等事實性資訊，你會特別確保其準確性。使用中文時只會使用繁體中文來回答問題。",
    task_description="###任務###\n請根據提供的資訊來回答以下問題。你的回答必須明確且精準。不用解釋你的思考過程或資訊來源。優先使用搜尋結果中的資訊回答，特別關注專有名詞、數字、日期等具體事實資訊。確保數字和專有名詞的準確性是最高優先級。若是地名則給出詳細的鄉鎮市。如果搜尋結果中找不到答案，則基於你的知識提供最可能的答案。如果是未來事件或無法確定的資訊，請明確表示無法提供答案。\n\n",
)

In [None]:
async def improved_pipeline(question: str) -> str:
    try:
        # Step 1: 提取核心問題
        print(f"處理問題: {question}")
        simplified_question = question_extraction_agent.inference(question)
        print(f"1.關鍵問題: {simplified_question}")

        # Step 2: 使用簡化問題進行搜尋(限3個結果)
        simplified_search_results = []
        try:
            print(f"使用簡化問題進行搜尋: {simplified_question}")
            simplified_search_results = await search(simplified_question, n_results=3)
            print(f"使用簡化問題搜尋到 {len(simplified_search_results)} 個結果")

            # 打印簡化問題搜尋結果預覽
            for i, result in enumerate(simplified_search_results, 1):
                print(f"簡化問題搜尋結果 {i}:")
                print(f"內容預覽: {result[:300]}...")
        except Exception as e:
            print(f"簡化問題搜尋錯誤: {e}")

        # 過濾有效的搜尋結果
        valid_simplified_results = [r for r in simplified_search_results if len(r) > 100]

        # 確保結果不會太長，以防模型上下文長度限制
        search_results = [result[:8000] for result in valid_simplified_results]

        final_answer = None

        # 只有在簡化問題搜尋無結果時才提取關鍵詞
        keywords = None

        if search_results:
            # 有簡化問題搜尋結果，準備上下文並生成答案
            context = "以下是從網路搜尋到的資訊：\n\n"
            for i, result in enumerate(search_results, 1):
                context += f"搜尋結果 {i}：\n{result}\n\n"

            # 限制上下文長度
            max_context_len = 15000
            if len(context) > max_context_len:
                context = context[:max_context_len] + "...(資訊被截斷)"

            print(f"3.參考的內容: \n{context}")

            # 關鍵修改：使用原始問題而非簡化問題來生成答案
            qa_prompt = f"問題：{question}\n\n{context}"
            final_answer = qa_agent.inference(qa_prompt)

        else:
            # 簡化問題無搜尋結果，使用關鍵字搜尋
            print("簡化問題無搜尋結果，嘗試使用關鍵字搜尋...")

            # 提取搜尋關鍵詞
            keywords = keyword_extraction_agent.inference(simplified_question)
            print(f"2.關鍵詞: {keywords}")

            keyword_results = []
            try:
                print(f"使用關鍵詞進行搜尋: {keywords}")
                keyword_results = await search(keywords, n_results=3)
                print(f"使用關鍵字搜尋到 {len(keyword_results)} 個結果")

                # 打印關鍵詞搜尋結果
                for i, result in enumerate(keyword_results, 1):
                    print(f"關鍵詞搜尋結果 {i}:")
                    print(f"內容預覽: {result[:300]}...")
            except Exception as e:
                print(f"關鍵字搜尋錯誤: {e}")

            valid_keyword_results = [r for r in keyword_results if len(r) > 100]
            keyword_search_results = [result[:8000] for result in valid_keyword_results]

            if keyword_search_results:
                # 準備關鍵字搜尋結果上下文
                keyword_context = "以下是從網路搜尋到的資訊：\n\n"
                for i, result in enumerate(keyword_search_results, 1):
                    keyword_context += f"搜尋結果 {i}：\n{result}\n\n"

                if len(keyword_context) > max_context_len:
                    keyword_context = keyword_context[:max_context_len] + "...(資訊被截斷)"

                print(f"3.參考的內容: \n{keyword_context}")

                # 關鍵修改：使用原始問題而非簡化問題來生成答案
                keyword_qa_prompt = f"問題：{question}\n\n{keyword_context}"
                final_answer = qa_agent.inference(keyword_qa_prompt)
            else:
                # 所有搜尋都無結果，使用模型知識生成答案
                print("所有搜尋都無結果，使用模型知識生成答案...")
                # 關鍵修改：使用原始問題而非簡化問題
                final_answer = qa_agent.inference(f"問題：{question}")
                print("3.參考的內容: 無搜尋結果，使用模型知識")

        # 確保答案不為空
        if not final_answer or len(final_answer.strip()) == 0:
            print("答案為空，使用模型知識生成答案...")
            # 關鍵修改：使用原始問題而非簡化問題
            final_answer = qa_agent.inference(f"問題：{question}")
            print("3.參考的內容: 無有效答案，使用模型知識")

        print(f"4.最終答案: {final_answer}")
        return final_answer

    except Exception as e:
        error_msg = f"Pipeline錯誤: {str(e)}"
        print(error_msg)
        # 返回直接答案作為備用方案
        return qa_agent.inference(question)

In [None]:
from pathlib import Path

STUDENT_ID = "r13044045"

STUDENT_ID = STUDENT_ID.lower()
with open('./public.txt', 'r') as input_f:
    questions = input_f.readlines()
    questions = [l.strip().split(',')[0] for l in questions]
    for id, question in enumerate(questions, 1):
        # 移除檔案存在的檢查條件，直接處理所有問題並覆寫
        print(f"\n處理問題 {id}: {question}")
        answer = await improved_pipeline(question)  # 使用improved_pipeline
        answer = answer.replace('\n',' ')
        print(f"問題 {id} 的答案: {answer}")
        with open(f'./{STUDENT_ID}_{id}.txt', 'w') as output_f:  # 使用'w'模式覆寫
            print(answer, file=output_f)

with open('./private.txt', 'r') as input_f:
    questions = input_f.readlines()
    for id, question in enumerate(questions, 31):
        # 移除檔案存在的檢查條件，直接處理所有問題並覆寫
        print(f"\n處理問題 {id}: {question}")
        answer = await improved_pipeline(question)  # 使用improved_pipeline
        answer = answer.replace('\n',' ')
        print(f"問題 {id} 的答案: {answer}")
        with open(f'./{STUDENT_ID}_{id}.txt', 'w') as output_f:  # 使用'w'模式覆寫
            print(answer, file=output_f)

In [None]:
# Combine the results into one file.
with open(f'./{STUDENT_ID}.txt', 'w') as output_f:  # 'w'模式會覆寫現有文件
    for id in range(1, 91):
        try:
            with open(f'./{STUDENT_ID}_{id}.txt', 'r') as input_f:
                answer = input_f.readline().strip()
                print(answer, file=output_f)
        except FileNotFoundError:
            # 如果某個答案文件不存在，可以添加一個空行或默認答案
            print(f"無法找到問題{id}的答案", file=output_f)
            # 或者直接跳過: continue