# 作业1，RAG agent
通过LLAMA-CPP 模型实现一个多Agent问答系统

In [None]:
!python3 -m pip install --no-cache-dir llama-cpp-python==0.3.4 --extra-index-url https://abetlen.github.io/llama-cpp-python/whl/cu122
!python3 -m pip install googlesearch-python bs4 charset-normalizer requests-html lxml_html_clean

In [None]:
from pathlib import Path
if not Path('./Meta-Llama-3.1-8B-Instruct-Q8_0.gguf').exists():
    !wget https://huggingface.co/bartowski/Meta-Llama-3.1-8B-Instruct-GGUF/resolve/main/Meta-Llama-3.1-8B-Instruct-Q8_0.gguf
if not Path('./public.txt').exists():
    !wget https://www.csie.ntu.edu.tw/~ulin/public.txt
if not Path('./private.txt').exists():
    !wget https://www.csie.ntu.edu.tw/~ulin/private.txt

In [None]:
from llama_cpp import Llama

# Load the model onto GPU
llama3 = Llama(
    "./Meta-Llama-3.1-8B-Instruct-Q8_0.gguf",
    verbose=False,
    n_gpu_layers=-1,
    n_ctx=16384,    # This argument is how many tokens the model can take. The longer the better, but it will consume more memory. 16384 is a proper value for a GPU with 16GB VRAM.
)

def generate_response(_model: Llama, _messages: str) -> str:
    '''
    This function will inference the model with given messages.
    '''
    _output = _model.create_chat_completion(
        _messages,
        stop=["<|eot_id|>", "<|end_of_text|>"],
        max_tokens=512,    # This argument is how many tokens the model can generate, you can change it and observe the differences.
        temperature=0,      # This argument is the randomness of the model. 0 means no randomness. You will get the same result with the same input every time. You can try to set it to different values.
        repeat_penalty=2.0,
    )["choices"][0]["message"]["content"]
    return _output

In [None]:
from typing import List
from googlesearch import search as _search
from bs4 import BeautifulSoup
from charset_normalizer import detect
import asyncio
from requests_html import AsyncHTMLSession
import urllib3
urllib3.disable_warnings()

async def worker(s:AsyncHTMLSession, url:str):
    try:
        header_response = await asyncio.wait_for(s.head(url, verify=False), timeout=10)
        if 'text/html' not in header_response.headers.get('Content-Type', ''):
            return None
        r = await asyncio.wait_for(s.get(url, verify=False), timeout=10)
        return r.text
    except:
        return None

async def get_htmls(urls):
    session = AsyncHTMLSession()
    tasks = (worker(session, url) for url in urls)
    return await asyncio.gather(*tasks)

async def search(keyword: str, n_results: int=3) -> List[str]:
    '''
    This function will search the keyword and return the text content in the first n_results web pages.

    Warning: You may suffer from HTTP 429 errors if you search too many times in a period of time. This is unavoidable and you should take your own risk if you want to try search more results at once.
    The rate limit is not explicitly announced by Google, hence there's not much we can do except for changing the IP or wait until Google unban you (we don't know how long the penalty will last either).
    '''
    keyword = keyword[:100]
    # First, search the keyword and get the results. Also, get 2 times more results in case some of them are invalid.
    results = list(_search(keyword, n_results * 2, lang="zh", unique=True))
    # Then, get the HTML from the results. Also, the helper function will filter out the non-HTML urls.
    results = await get_htmls(results)
    # Filter out the None values.
    results = [x for x in results if x is not None]
    # Parse the HTML.
    results = [BeautifulSoup(x, 'html.parser') for x in results]
    # Get the text from the HTML and remove the spaces. Also, filter out the non-utf-8 encoding.
    results = [''.join(x.get_text().split()) for x in results if detect(x.encode()).get('encoding') == 'utf-8']
    # Return the first n results.
    return results[:n_results]

In [12]:
# You can try out different questions here.
test_question='請問誰是 Taylor Swift？'

messages = [
    {"role": "system", "content": "你是 LLaMA-3.1-8B，是用來回答問題的 AI。使用中文時只會使用繁體中文來回問題。"},    # System prompt
    {"role": "user", "content": test_question}, # User prompt
]

print(generate_response(llama3, messages))

泰勒絲（Taylor Swift）是一位美國歌手、詞曲作家和製作人。她出生於1989年，來自田納西州。她的音樂風格從鄉村搖滾開始逐漸轉變為流行電音。

她早期的作品如《泰勒絲第一輯》、《愛情故事第二章：睡美人的秘密》，獲得了廣泛認可和獎項，包括多個告示牌音樂大奖。後來，她推出了更具商業成功性的專辑，如 《1989》（2014）、_reputation（《名聲_(泰勒絲专輯)》） （ 20 ） 和 _Lover(2020)，並且在全球取得了巨大的影響力。

她以她的歌曲如 "Shake It Off"、"_Blank Space_"和 "_Bad Blood_",以及與其他藝人合作的作品，如 《Look What You Made Me Do》（2017）而聞名。泰勒絲還是知識產權運動的一部分，對於音樂創作者在數字時代獲得公平報酬有所關注。

她被譽為當代最成功和影響力最大的人物之一，並且她的歌曲經常成為流行文化的話題。


In [13]:
class LLMAgent():
    def __init__(self, role_description: str, task_description: str, llm:str="bartowski/Meta-Llama-3.1-8B-Instruct-GGUF"):
        self.role_description = role_description   # Role means who this agent should act like. e.g. the history expert, the manager......
        self.task_description = task_description    # Task description instructs what task should this agent solve.
        self.llm = llm  # LLM indicates which LLM backend this agent is using.
    def inference(self, message:str) -> str:
        if self.llm == 'bartowski/Meta-Llama-3.1-8B-Instruct-GGUF': # If using the default one.
            # TODO: Design the system prompt and user prompt here.
            # Format the messsages first.
            messages = [
                {"role": "system", "content": f"{self.role_description}"},  # Hint: you may want the agents to speak Traditional Chinese only.
                {"role": "user", "content": self.task_description.format(message)}, # Hint: you may want the agents to clearly distinguish the task descriptions and the user messages. A proper seperation text rather than a simple line break is recommended.
            ]
            return generate_response(llama3, messages)
        else:
            # TODO: If you want to use LLMs other than the given one, please implement the inference part on your own.
            return ""

随便写的prompt，可能不太好用，仅供参考。

In [14]:
# This agent may help you filter out the irrelevant parts in question descriptions.
question_extraction_agent = LLMAgent(
    role_description="你擅长总结问题。你会在用户输入中找到用户提出的问题，确定其中的关键信息，并且用最精简的语言提取出用户提出的问题。输出内容只有总结好的问题，不要回答问题，不要有多余内容。",
    task_description="以下是用户输入:\"{}\"，根据用户的输入，总结成一个精炼的问题。",
)

# This agent may help you extract the keywords in a question so that the search tool can find more accurate results.
keyword_extraction_agent = LLMAgent(
    role_description="你是Google的熟练使用者。根据用户的问题，你要为其设计出包含关键信息，且最适合用于互联网搜索的关键字，数量为2-4个，关键字之间以空格相连。不要回答问题，不要输出额外内容",
    task_description="以下是用户问题:\"{}\"，根据问题总结出2-4个最适合其用于Google搜索的关键词。",
)

# This agent is the core component that answers the question.
qa_agent = LLMAgent(
    role_description="你是 LLaMA-3.1-8B，是用來回答問題的AI。会根据用户的提问和网络信息回答，你的输出只包含对应问题的答案，回答长度为一个词，或者最长为一句话",
    task_description="以下是用户的原始问题，经过总结的问题，和根据总结的问题搜索到的网络信息:\"{}\"。用户原始问题的最终答案是?",
)

按照以下形式做一个agent pipeline
- RAG with agents (strong baseline)

    ![](https://www.csie.ntu.edu.tw/~ulin/rag_agent.png)

In [15]:
async def pipeline(question: str) -> str:
    ex_question = question_extraction_agent.inference(question)
    print(f"question_extraction_agent: {ex_question}")
    keywords = keyword_extraction_agent.inference(ex_question)
    print(f"keyword_extraction_agent: {keywords}")
    search_results = await search(keywords)
    search_results = [i[:4000] for i in search_results]
    return qa_agent.inference(f"原始问题{question},根据原始提取的问题：{ex_question},搜索结果：{''.join(search_results)}")

In [None]:
from pathlib import Path

# Fill in your student ID first.
STUDENT_ID = "a"

STUDENT_ID = STUDENT_ID.lower()
with open('./public.txt','r') as input_f:
    questions = input_f.readlines()
    questions = [l.strip().split(',')[0] for l in questions]
    for id, question in enumerate(questions, 1):
        if Path(f"./{STUDENT_ID}_{id}.txt").exists():
            continue
        print(id,question)
        answer = await pipeline(question)
        answer = answer.replace('\n',' ')
        print(f"LLM:{answer}")
        # with open(f'./{STUDENT_ID}_{id}.txt', 'w') as output_f:
        #     print(answer, file=output_f)

with open('./private.txt', 'r') as input_f:
    questions = input_f.readlines()
    for id, question in enumerate(questions, 31):
        if Path(f"./{STUDENT_ID}_{id}.txt").exists():
            continue
        print(id,question)
        answer = await pipeline(question)
        answer = answer.replace('\n',' ')
        print(f"LLM:{answer}")
        # with open(f'./{STUDENT_ID}_{id}.txt', 'a') as output_f:
        #     print(answer, file=output_f)

1 校歌為學校（包括小學、中學、大學等）宣告或者規定的代表該校的歌曲。用於體現該校的治學理念、辦學理想等學校文化。「虎山雄風飛揚」是哪間學校的校歌歌詞？
question_extraction_agent: "虎山雄風飛揚是哪間學校的校歌?"
keyword_extraction_agent: 虎山雄風飛揚 校歌
LLM:光華國小
2 2025年初，NCC透過行政命令，規定民眾如果透過境外郵購無線鍵盤、滑鼠、藍芽耳機..等自用產品回台，每案一律加收審查費多少錢？
question_extraction_agent: 2025年初，境外郵購自用產品（如無線鍑盤、滑鼠等）回台需繳交多少審查費？
keyword_extraction_agent: 郵購自用產品審查費台湾
LLM:750
3 第一代 iPhone 是由哪位蘋果 CEO 發表？
question_extraction_agent: 第一代 iPhone 由哪位苹果 CEO 发表？
keyword_extraction_agent: 史蒂夫·乔布斯
LLM:史蒂夫·乔布斯
4 台灣大學進階英文免修申請規定中，托福網路測驗 TOEFL iBT 要達到多少分才能申請？
question_extraction_agent: 托福網路測驗 TOEFL iBT 達到多少分才能申請台灣大學進階英文免修？
keyword_extraction_agent: 托福網路測驗 TOEFL iBT 分數 台灣大學英文免修
LLM:92
5 Rugby Union 中觸地 try 可得幾分？
question_extraction_agent: Rugby Union 中觸地 try 可得幾分？
keyword_extraction_agent: Rugby Union 觸地 try 分数
LLM:觸地 try 可得 5 分。
6 卑南族是位在臺東平原的一個原住民族，以驍勇善戰、擅長巫術聞名，曾經統治整個臺東平原。相傳卑南族的祖先發源自 ruvuwa'an，該地位於現今的哪個行政區劃？
question_extraction_agent: 卑南族的祖先發源地在哪個行政區劃？
keyword_extraction_agent: 台東縣
