# 第9章 質問応答
## 9.3 ChatGPTにクイズを答えさせる

In [None]:
!pip install "datasets==2.19.1" "huggingface_hub<0.26" openai==0.27 tiktoken tqdm

### 9.3.1 OpenAI API

In [None]:
import os

# 取得したAPIキーに置き換えてください
os.environ["OPENAI_API_KEY"] = "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"

In [None]:
import openai

# ChatGPTに送るメッセージ
messages = [{"role": "user", "content": "日本で一番高い山は何？"}]
completion = openai.ChatCompletion.create(
    model="gpt-3.5-turbo",
    messages=messages,
    temperature=0.0,
)
print(completion["choices"][0]["message"]["content"])

In [None]:
messages = [
    {"role": "user", "content": "日本で一番高い山は何？"},
    {
        "role": "assistant",
        "content": "日本で一番高い山は富士山です。",
    },
    {
        "role": "user",
        "content": "一つ前の発言をひらがなに変換してください。",
    },
]
completion = openai.ChatCompletion.create(
    model="gpt-3.5-turbo",
    messages=messages,
    temperature=0.0,
)
print(completion["choices"][0]["message"]["content"])

In [None]:
messages = [
    {
        "role": "user",
        "content": "数字を1から10まで読み上げてください。",
    }
]
completion = openai.ChatCompletion.create(
    model="gpt-3.5-turbo",
    messages=messages,
    temperature=0.0,
    max_tokens=1,
)
print(completion["choices"][0]["message"]["content"])

In [None]:
messages = [
    {
        "role": "user",
        "content": "数字を1から10まで読み上げてください。",
    }
]
completion = openai.ChatCompletion.create(
    model="gpt-3.5-turbo",
    messages=messages,
    temperature=0.0,
    stop="5",
)
print(completion["choices"][0]["message"]["content"])

### 9.3.2 効率的なリクエストの送信

In [None]:
import asyncio
from typing import Awaitable, Callable, TypeVar
from openai.error import OpenAIError

T = TypeVar("T")

async def retry_on_error(
    openai_call: Callable[[], Awaitable[T]],
    max_num_trials: int = 5,
    first_wait_time: int = 10,
) -> Awaitable[T]:
    """OpenAI API使用時にエラーが返ってきた場合に再試行する"""
    for i in range(max_num_trials):
        try:
            # 関数を実行する
            return await openai_call()
        except OpenAIError as e:
            # 試行回数が上限に達したらエラーを送出
            if i == max_num_trials - 1:
                raise
            print(f"エラーを受け取りました：{e}")
            wait_time_seconds = first_wait_time * (2**i)
            print(f"{wait_time_seconds}秒待機します")
            await asyncio.sleep(wait_time_seconds)

In [None]:
async def _async_batch_run_chatgpt(
    messages_list: list[list[dict[str, str]]],
    temperature: float,
    max_tokens: int | None,
    stop: str | list[str] | None,
) -> list[str]:
    """OpenAI APIに並列してリクエストを送る"""
    # コルーチンオブジェクトをtasksに格納
    tasks = [
        retry_on_error(
            # ラムダ式で無名関数を定義して渡し、
            # retry_on_error関数の内部で呼び出させる
            openai_call=lambda x=ms: openai.ChatCompletion.acreate(
                model="gpt-3.5-turbo",
                messages=x,
                temperature=temperature,
                max_tokens=max_tokens,
                stop=stop,
            )
        )
        for ms in messages_list
    ]
    # tasks内の非同期処理を実行し結果を収集
    completions = await asyncio.gather(*tasks)
    return [
        c["choices"][0]["message"]["content"] for c in completions
    ]

In [None]:
def batch_run_chatgpt(
    messages_list: list[list[dict[str, str]]],
    temperature: float = 0.0,
    max_tokens: int | None = None,
    stop: str | list[str] | None = None,
) -> list[str]:
    """非同期処理関数を実行するためのラッパー"""
    return asyncio.run(
        _async_batch_run_chatgpt(
            messages_list, temperature, max_tokens, stop
        )
    )

In [None]:
import nest_asyncio

nest_asyncio.apply()

### 9.3.3 クイズ用のプロンプトの作成

In [None]:
from abc import ABCMeta, abstractmethod

class PromptMaker(metaclass=ABCMeta):
    """クイズ用プロンプトを作成するための抽象クラス"""

    @abstractmethod
    def run(self, questions: list[str]) -> list[str]:
        """プロンプトの作成（具体的な実装は継承先で行われる）"""
        pass

In [None]:
class SimplePromptMaker(PromptMaker):
    """単純なクイズ用プロンプトを作成するクラス"""

    def run(self, questions: list[str]) -> list[str]:
        """プロンプトの作成"""
        return [
            "あなたには今からクイズに答えてもらいます。"
            "問題を与えますので、その解答のみを簡潔に出力してください。\n"
            f"問題：{q}\n"
            "解答："
            for q in questions
        ]

In [None]:
questions = [
    "日本で一番高い山は何？",
    "日本で一番長い川は何？",
    "日本で一番面積の大きい都道府県はどこ？",
    "日本で一番人口の多い都道府県はどこ？",
]
simple_prompt_maker = SimplePromptMaker()
print(simple_prompt_maker.run(questions)[0])

In [None]:
answers = batch_run_chatgpt(
    [
        [{"role": "user", "content": p}]
        for p in simple_prompt_maker.run(questions)
    ],
    temperature=0.0,
)
print(answers)

### 9.3.4 API使用料金の見積もり

In [None]:
import tiktoken

encoding = tiktoken.encoding_for_model("gpt-3.5-turbo")
encoding.encode("日本で一番高い山は何？")

In [None]:
from huggingface_hub import hf_hub_download
src = hf_hub_download("llm-book/aio", "aio.py", repo_type="dataset")

with open(src, "rb") as f:
    code = f.read().decode("latin-1")
fixed_path = "/tmp/aio_utf8.py"
with open(fixed_path, "w", encoding="utf-8") as f:
    f.write(code)

from datasets import load_dataset
quiz_dataset = load_dataset(fixed_path, split="validation", trust_remote_code=True)


In [None]:
def calculate_prompt_cost(
    prompts: list[str],
    num_output_tokens: int = 40,
    model: str = "gpt-3.5-turbo",
    usd_per_token: float = 0.0015 / 1000,
):
    """
    プロンプトをOpenAI APIに送信した際にかかる費用を見積もる
    """
    # トークナイザの初期化
    encoding = tiktoken.encoding_for_model(model)

    # 入力プロンプトの合計トークン数を算出
    total_num_prompt_tokens = 0
    for prompt in prompts:
        total_num_prompt_tokens += len(encoding.encode(prompt))

    avg_num_prompt_tokens = total_num_prompt_tokens / len(prompts)
    print(
        "入力プロンプトの平均トークン数:"
        f" {int(avg_num_prompt_tokens)}"
    )

    # モデル出力の合計トークン数を見積もる
    total_num_output_tokens = num_output_tokens * len(prompts)

    # 費用の計算
    total_cost = (
        total_num_prompt_tokens + total_num_output_tokens
    ) * usd_per_token
    print(f"合計コスト: {round(total_cost, 3)} USD")

In [None]:
questions = quiz_dataset["question"]
prompts = simple_prompt_maker.run(questions)
calculate_prompt_cost(prompts)

### 9.3.5 クイズデータセットによる評価

In [None]:
from datasets import Dataset
from tqdm import tqdm

def get_chatgpt_outputs_for_quiz(
    quiz_prompt_maker: PromptMaker,
    quiz_dataset: Dataset,
    batch_size: int,
) -> list[str]:
    """
    クイズ用のプロンプトを使用した際の
    データセットの各問題に対するモデルの解答を集める
    """
    output_answers: list[str] = []
    with tqdm(total=len(quiz_dataset)) as pbar:
        for batch in quiz_dataset.iter(batch_size=batch_size):
            # 入力の準備
            prompts = quiz_prompt_maker.run(batch["question"])
            inputs = [
                [{"role": "user", "content": p}] for p in prompts
            ]

            # APIにリクエストを送信
            answers = batch_run_chatgpt(inputs)

            # モデルの解答を表示
            for question, answer in zip(batch["question"], answers):
                print(f"問題：{question}")
                print(f"解答：{answer}")
                print()

            output_answers += answers
            pbar.update(len(answers))
    return output_answers

In [None]:
def calculate_quiz_accuracy(
    output_answers: list[str], correct_answers_list: list[list[str]]
) -> float:
    """モデルの解答と正解の解答例から正解率を算出する"""
    num_correct = 0
    for output_answer, answers in zip(
        output_answers, correct_answers_list
    ):
        # モデルの出力が解答例を一つでも含んでいれば正解とみなす
        num_correct += int(any(a in output_answer for a in answers))
    return num_correct / len(output_answers)

In [None]:
output_answers = get_chatgpt_outputs_for_quiz(
    simple_prompt_maker, quiz_dataset, batch_size=4
)
accuracy = calculate_quiz_accuracy(
    output_answers, [item["answers"] for item in quiz_dataset]
)
print(f"正解率：{accuracy * 100}")

### 9.3.6 文脈内学習

In [None]:
quiz_train_dataset = load_dataset(fixed_path, split="train", trust_remote_code=True)
quiz_valid_dataset = load_dataset(fixed_path, split="validation", trust_remote_code=True)

num_in_context_examples = 3
in_context_examples = [quiz_train_dataset[i] for i in range(num_in_context_examples)]

for ex in in_context_examples:
    print(f'問題：{ex["question"]}')
    print(f'解答：{ex["answers"][0]}')

enc = tiktoken.get_encoding("cl100k_base")
max_answer_length = 0
for answers in quiz_valid_dataset["answers"]:
    for ans in answers:
        max_answer_length = max(max_answer_length, len(enc.encode(ans)))
print("max_answer_length:", max_answer_length)

In [None]:
class InContextPromptMaker(PromptMaker):
    """文脈内学習を用いたクイズ用プロンプトを作成する"""

    def __init__(self, examples: list[tuple[str, str]]):
        self._prompt = (
            "あなたには今からクイズに答えてもらいます。問題を与えますので、"
            "その解答のみを簡潔に出力してください。\n\n"
        )
        for question, answer in examples:
            self._prompt += f"問題：{question}\n解答：{answer}\n\n"

    def run(self, questions: list[str]) -> list[str]:
        """プロンプトの作成"""
        prompts = [
            self._prompt + f"問題：{q}\n解答：" for q in questions
        ]
        return prompts

q_and_a_list = [
    (e["question"], e["answers"][0]) for e in in_context_examples
]
in_context_prompt_maker = InContextPromptMaker(q_and_a_list)
prompt = in_context_prompt_maker.run(["日本で一番高い山は何？"])[0]
print(prompt)

In [None]:
questions = quiz_dataset["question"]
prompts = in_context_prompt_maker.run(questions)
calculate_prompt_cost(prompts)

In [None]:
in_context_output_answers = get_chatgpt_outputs_for_quiz(
    in_context_prompt_maker, quiz_dataset, batch_size=4
)
in_context_accuracy = calculate_quiz_accuracy(
    in_context_output_answers,
    [item["answers"] for item in quiz_dataset],
)
print(f"正解率：{in_context_accuracy * 100}")

### 9.3.7 言語モデルの幻覚に注意

In [None]:
quiz_example = quiz_dataset[87]
print("問題：" + quiz_example["question"])
print("解答：" + str(quiz_example["answers"][0]) + "\n")

In [None]:
prompt = simple_prompt_maker.run([quiz_example["question"]])[0]
answer = batch_run_chatgpt([[{"role": "user", "content": prompt}]])[0]
print(f"SimplePromptMakerを用いたモデルの解答：{answer}")

In [None]:
prompt = in_context_prompt_maker.run([quiz_example["question"]])[0]
answer = batch_run_chatgpt([[{"role": "user", "content": prompt}]])[0]
print(f"InContextPromptMakerを用いたモデルの解答：{answer}")