In [None]:
%pip install transformers accelerate sentencepiece

In [None]:
# LLMクラス
# ./libs/llm.py と同じ
import threading
from typing import Any, Generator, Optional

import torch
from transformers.models.auto.configuration_auto import AutoConfig
from transformers.models.auto.tokenization_auto import AutoTokenizer
from transformers.models.auto.modeling_auto import AutoModelForCausalLM
from transformers.models.auto.modeling_auto import AutoModelForSeq2SeqLM
from transformers.generation.streamers import TextIteratorStreamer


class LLM:
    def __init__(
        self,
        model_name: str = "elyza/ELYZA-japanese-Llama-2-7b-instruct",
        access_token: Optional[str] = None,
    ):
        """ LLMの初期化

        :param model_name: モデル名
        :param access_token: Hugging Faceのアクセストークン
        """
        self._model_name = model_name

        # トークナイザーとモデルを読み込み
        self._model = _load_model(model_name, access_token)
        self._tokenizer = AutoTokenizer.from_pretrained(
            model_name,
            use_fast=True,
            token=access_token,
        )

    def infer(
        self,
        input_text: str,
        max_new_tokens: int = 128,
        do_sample: bool = True,
        temperature: float = 0.7,
        top_p: float = 0.9,
    ) -> Generator[str, None, None]:
        """ 推論

        :param input_text: 入力テキスト
        :param max_new_tokens: 生成する最大トークン数
        :param do_sample: 推論結果をサンプリングするか？Falseなら常に確率の一番高いトークンのみを出力する（結果は決定的になる）
        :param temperature: サンプリングの多様性を制御する温度パラメーター
        :param top_p: nucleus samplingの確率閾値
        :return: トークンのジェネレーター

        Examples:
            >>> for token in llm.infer("こんにちは。"):
            ...     print(token, end="", flush=True)
        """
        tokenizer = self._tokenizer
        model = self._model

        # プロンプト生成→トークン生成→GPUメモリーに転送
        prompt = self._prompt(input_text)
        inputs = tokenizer(prompt, return_tensors="pt")
        inputs = {k: v.to(model.device) for k, v in inputs.items()}

        # 出力取得用のストリーマー
        streamer = TextIteratorStreamer(tokenizer, skip_special_tokens=True)

        thread = threading.Thread(
            target=model.generate,
            kwargs={
                **inputs,
                "streamer": streamer,
                "max_new_tokens": max_new_tokens,
                "temperature": temperature,
                "top_p": top_p,
                "do_sample": do_sample,
            }
        )
        thread.start()

        for token in streamer:
            yield token

    def print_inference_result(
        self,
        input_text: str,
        max_new_tokens: int = 128,
        do_sample: bool = True,
        temperature: float = 0.7,
        top_p: float = 0.9,
    ) -> None:
        """ 推論結果を出力

        :param input_text: 入力テキスト
        :param max_new_tokens: 生成する最大トークン数
        :param do_sample: 推論結果をサンプリングするか？Falseなら常に確率の一番高いトークンのみを出力する（結果は決定的になる）
        :param temperature: サンプリングの多様性を制御する温度パラメーター
        :param top_p: nucleus samplingの確率閾値
        """
        # 入力プロンプト
        for token in self.infer(
            input_text,
            max_new_tokens,
            do_sample,
            temperature,
            top_p,
        ):
            # トークンを1つずつ出力
            print(token, end="", flush=True)

        # 出力の終端で改行
        print()

    def _prompt(self, input_text: str) -> str:
        """ 入力文字列から、モデルに合わせたプロンプトを生成する

        :param input_text: 入力テキスト
        :return: プロンプト
        """

        # seq2seqはプロンプト用の加工不要
        if isinstance(self._model, AutoModelForSeq2SeqLM):
            return input_text

        # chat_template に対応していれば使う（Mistral, Gemmaなど）
        if hasattr(self._tokenizer, "chat_template"):
            try:
                return self._tokenizer.apply_chat_template(
                    [
                        {"role": "user", "content": input_text},
                    ],
                    tokenize=False,
                    add_generation_prompt=True,
                )
            except Exception:
                # fallback
                pass

        # モデルに合わせたプロンプトを生成
        model_name = self._model_name.lower()
        if "elyza" in model_name:
            return f"[INST] {input_text} [/INST]"

        if "rinna" in model_name:
            return (
                f"ユーザー: {input_text}\n"
                "システム: "
            )

        if (
            "llama-2" in model_name or "llama2" in model_name or
            "mistral" in model_name
        ):
            return f"<s>[INST] {input_text} [/INST]</s>"

        if "openchat" in model_name:
            return f"<|user|>\n{input_text}\n<|assistant|>\n"

        if "llama-3" in model_name or "llama3" in model_name:
            return (
                "<|begin_of_text|><|start_header_id|>user<|end_header_id|>\n"
                f"{input_text}\n"
                "<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n"
            )

        # デフォルト（そのまま）
        return input_text


def _load_model(model_name: str, access_token: Optional[str]) -> Any:
    """ モデルを読み込む

    :param model_name: モデル名
    :param access_token: Hugging Faceのアクセストークン
    :return: モデル
    """
    config = AutoConfig.from_pretrained(model_name)
    if hasattr(config, "is_encoder_decoder") and config.is_encoder_decoder:
        return AutoModelForSeq2SeqLM.from_pretrained(
            model_name,
            device_map="auto",
            torch_dtype=_dtype(),
            token=access_token,
        )
    else:
        return AutoModelForCausalLM.from_pretrained(
            model_name,
            device_map="auto",
            torch_dtype=_dtype(),
            token=access_token,
        )


def _dtype() -> torch.dtype | str:
    """ データ型を取得

    :return: データ型
    """
    if torch.cuda.is_available():
        # CUDAが有効ならfloat16を使う（"auto"のままだとV100環境でfloat32を選ぶことがある）
        return torch.float16

    return "auto"


In [None]:
# チャットアプリケーション
# LLMクラスのインポートがないこととmain()の呼び出しがないこと以外は ./llm-chat.py と同じ
import argparse
from dataclasses import dataclass
from typing import Optional


@dataclass
class Parameters:
    """ アプリケーションパラメーター """
    model_name: str
    access_token: Optional[str]


def parse_args(args: Optional[list[str]]) -> Parameters:
    """ 引数を解析

    :return: 解析結果
    """
    parser = argparse.ArgumentParser(description="LLM Chat")
    parser.add_argument(
        "--model-name", "-m",
        help="使用するモデル名",
        default="elyza/ELYZA-japanese-Llama-2-7b-instruct",
    )
    parser.add_argument(
        "--access-token", "-t",
        help="Hugging Faceのアクセストークン",
    )

    # 引数を解析＆Parametersクラスに変換
    ns = parser.parse_args(args=args)
    return Parameters(**vars(ns))


def main(argv: Optional[list[str]] = None) -> None:
    """ メイン関数

    :param args: 解析済み引数
    :return: 終了ステータス
    """
    # コマンドライン引数を取り出す
    params = parse_args(argv)
    print(f"Model Name: {params.model_name}")
    print(f"Access Token: {params.access_token}")
    print()

    # モデルの初期化
    llm = LLM(params.model_name, params.access_token)

    print()
    print("Now, let's talk!")
    print("Type 'exit' to end the conversation.")
    print()

    # ひたすらチャット
    while True:
        try:
            # 入力プロンプト
            print("> ", end="", flush=True)

            input_text = input().strip()
            if input_text == "exit":
                break

            llm.print_inference_result(input_text)

            # 次の入力プロンプトとの間隔を空ける
            print()

        except EOFError:
            # EOFでも終了
            break

    print("See you!")

In [None]:
# モデルやトークンを指定する場合は、コマンドライン引数のように渡す
# 例) main(["-m", "MODEL_NAME", "-t", "YOUR_ACCESS_TOKEN"])
main([])