# 関数化

In [None]:
from langchain import HuggingFacePipeline, PromptTemplate, LLMChain
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
from transformers import StoppingCriteria
from transformers import StoppingCriteriaList
from langchain.memory import ConversationBufferMemory
import torch

def model_setup(model_id:str):
    # モデル&トークナイザーのダウンロード
    print(f"!!! Downloading Model from {model_id} !!!")
    model = AutoModelForCausalLM.from_pretrained(model_id)
    tokenizer = AutoTokenizer.from_pretrained(model_id)
    
    return model, tokenizer


def pipeline_setup(model, tokenizer, isGPU:bool, **kwargs) -> HuggingFacePipeline:
    # GPUの確認
    if isGPU:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        print(f"\n!!! current device is {device} !!!\n")
        model = model.to(device)
        
        # GPUにモデルを展開する際に必要な引数を追加
        device = 0
        framework = 'pt'
    else:
        device = -1
        framework = None
        
        
    # パイプラインの作成
    task = "text-generation"
    pipe = pipeline(
        task,
        model=model,
        tokenizer=tokenizer,
        device=device,
        framework=framework,
        **kwargs
    )

    # LLMs: LangChainで利用可能な形に変換
    llm = HuggingFacePipeline(pipeline=pipe)
    
    print("!!! Pipeline Setup Completed !!!")
    
    return llm



# Stopの条件を設定するクラスを作成 (StoppingCriteriaを継承する)
class MyStoppingCriteria(StoppingCriteria):
    def __init__(self, stop_str, num_iter, tokenizer, isGPU):
        if isGPU:
            self.stop_token_ids = tokenizer(stop_str, return_tensors='pt')["input_ids"].to('cuda')
            self.stop_token_ids_iter = tokenizer(stop_str*2, return_tensors='pt')["input_ids"].to('cuda')
        else:
            self.stop_token_ids = tokenizer(stop_str, return_tensors='pt')["input_ids"]
            self.stop_token_ids_iter = tokenizer(stop_str, return_tensors='pt')["input_ids"]
            
        self.num_iter = num_iter
        self.tokenizer = tokenizer
        
    def __call__(self, input_ids:torch.LongTensor, score:torch.FloatTensor, **kwargs):
        # 出力の最後尾の文字列とstop_strが一致した回数
        match_count = 0
        
        # 出力文字列を最後尾から順に、num_iterで指定された要素数だけ処理する
        for i in range(1, self.num_iter+1): 
            input_id = input_ids[0][-i]
            stop_id = self.stop_token_ids[0][0]
            stop_iter_id = self.stop_token_ids_iter[0][0]
            
            # 対象文字列とstop_strが一致した場合、カウントを増やす
            if input_id == stop_id:
                match_count += 1
            
        # \nが2回続いた場合、または\n\nが現れた場合、generate()をStopする
        if match_count == self.num_iter or input_id == stop_iter_id:
            isStop = True
            # print(f"!!! Generate() Stopped !!!\n!!!!!!!!!\n{self.tokenizer.decode(input_ids[0])} \n!!!!!!!!!")
        else:
            isStop = False
        return isStop
    
    
def chat_chain_setup(template, llm) -> LLMChain:
    # Memory: メモリ上に会話を記録する設定
    memory_key = "chat_history"
    memory = ConversationBufferMemory(memory_key=memory_key, ai_prefix="")
    
    # Prompts: プロンプトを作成
    prompt = PromptTemplate(template=template, input_variables=["chat_history", "input"])

    # Chains: プロンプト&モデル&メモリをチェーンに登録
    llm_chain = LLMChain(
        llm=llm,
        prompt=prompt,
        memory=memory
    )
    
    return llm_chain

In [None]:
# モデルをダウンロード
model_id = "andreaskoepf/pythia-1.4b-gpt4all-pretrain"
model, tokenizer = model_setup(model_id)

In [None]:
# Stopの条件式に用いる文字と、その文字が何回続いたらStopするかを指定
stop_str = "\n"
num_iter = 2  # \nが2回繰り返された場合、generate()をstopする

# StoppingCriteriaListクラスのインスタンスを生成
stopcriteria_list = StoppingCriteriaList([MyStoppingCriteria(stop_str, num_iter, tokenizer, isGPU=True)])
print(stopcriteria_list)

# HuggingFacePipelineを作成
model_args = {"temperature":0.1, "max_length": 256, "stopping_criteria": stopcriteria_list}
llm = pipeline_setup(model=model, tokenizer=tokenizer, isGPU=True, **model_args)

In [None]:
# プロンプトテンプレートを作成
template = """
You are an AI who responds to user Input.
Please provide an answer to the human's question.
Additonaly, you are having a conversation with a human based on past interactions.

### Answer Sample
Human: Hi!
AI: Hi, nice to meet you.

### Past Interactions
{chat_history}

### 
Human:{input}
"""

# Chat用のチェーンを作成
llm_chain = chat_chain_setup(template, llm)

# 手順①: シンプルなQAをする

In [None]:
from langchain import HuggingFacePipeline, PromptTemplate, LLMChain
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
import torch

In [None]:
# GPUの確認
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"\n!!! current device is {device} !!!\n")

In [None]:
# モデルのダウンロード
model_id = "andreaskoepf/pythia-1.4b-gpt4all-pretrain"
tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForCausalLM.from_pretrained(model_id).to(device)

In [None]:
# パイプラインの作成
task = "text-generation"
pipe = pipeline(
    task,
    model=model,
    tokenizer=tokenizer,
    device=0,
    framework='pt',
    temperature=0.1,
    max_new_tokens=128
)

# LLMs: LangChainで利用可能な形に変換
llm = HuggingFacePipeline(pipeline=pipe)

In [None]:
# Prompts: プロンプトを作成
template = """You are an assistant who responds to user Input. \nPlease provide an answer to the user's question, as shown in the following example.\n\nExample:\nQuestion: What is the height of Tokyo Tower in meters?\nAnswer: The height of Tokyo Tower is 333 meters.\n\n###\n\nInput:\n{question}"""
prompt = PromptTemplate(template=template, input_variables=["question"])

# Chains: チェーンに登録
llm_chain = LLMChain(prompt=prompt, llm=llm, verbose=True)

In [None]:
# 質問を投げる
# question = "How can I get end of the list in Python?Take an example of Python Code."
question = input("Enter your question")
generated_text = llm_chain.run(question)
print(generated_text)

# 手順②: 会話を記録する

In [None]:
from langchain.memory import ConversationBufferMemory

# Memory: メモリ上に会話を記録する設定
memory_key = "chat_history"
memory = ConversationBufferMemory(memory_key=memory_key, ai_prefix="")

In [None]:
# Prompts: プロンプトを作成。会話履歴もinput_variablesとして指定する
template = """
You are an AI who responds to user Input.
Please provide an answer to the human's question.
Additonaly, you are having a conversation with a human based on past interactions.

### Answer Sample
Human: Hi!
AI: Hi, nice to meet you.

### Past Interactions
{chat_history}

### 
Human:{input}
"""
prompt = PromptTemplate(template=template, input_variables=["chat_history", "input"])

# Chains: プロンプト&モデル&メモリをチェーンに登録
llm_chain = LLMChain(
    llm=llm,
    prompt=prompt,
    memory=memory,
    verbose=True,
)

# 実行①
user_input = "What is the Japanese word for mountain？"
response = llm_chain.predict(input=user_input)
print(response)

# 履歴表示
memory.load_memory_variables({})

In [None]:
# 実行②
user_input = "Hi, I`m Matthew. Nice to meet you."
response = llm_chain.predict(input=user_input)
print(response)

# 履歴表示
memory.load_memory_variables({})

In [None]:
# 実行③
user_input = "Please call my name."
response = llm_chain.predict(input=user_input)
print(response)

# 履歴表示
memory.load_memory_variables({})

In [None]:
from transformers import StoppingCriteria
from transformers import StoppingCriteriaList

# Stopの条件式に用いる文字と、その文字が何回続いたらStopするかを指定
stop_str = "\n"
num_iter = 2  # \nが2回繰り返された、または\n\nが現れた場合にgenerate()をstopする


# Stopの条件を設定するクラスを作成 (StoppingCriteriaを継承する)
class MyStoppingCriteria(StoppingCriteria):
    def __init__(self, stop_str, num_iter, tokenizer, isGPU):
        if isGPU:
            self.stop_token_ids = tokenizer(stop_str, return_tensors='pt')["input_ids"].to('cuda')
            self.stop_token_ids_iter = tokenizer(stop_str*2, return_tensors='pt')["input_ids"].to('cuda')
        else:
            self.stop_token_ids = tokenizer(stop_str, return_tensors='pt')["input_ids"]
            self.stop_token_ids_iter = tokenizer(stop_str, return_tensors='pt')["input_ids"]
            
        self.num_iter = num_iter
        self.tokenizer = tokenizer
#         self.num_stop = 0
        self.num_stop = 1
        
    def __call__(self, input_ids:torch.LongTensor, score:torch.FloatTensor, **kwargs):
        # 出力の最後尾の文字列とstop_strが一致した回数
        match_count = 0
        print(tokenizer.decode(input_ids[0]))
        print(input_ids)
        
        # 出力文字列を最後尾から順に、num_iterで指定された要素数だけ処理する
        for i in range(1, self.num_iter+1): 
            input_id = input_ids[0][-i]
            stop_id = self.stop_token_ids[0][0]
            stop_iter_id = self.stop_token_ids_iter[0][0]
            
            # 対象文字列とstop_strが一致した場合、カウントを増やす
            if input_id == stop_id:
                match_count += 1
            
        
        # モデルが最初に\n\nを出力する仕様の場合、Stop条件を1回だけ無視する
        if (match_count == num_iter) and self.num_stop == 0:
            isStop = False
            self.num_stop += 1
            print("!!! FirstStop was ignored!!!")
        # \nが2回続いた場合、または\n\nが現れた場合、generate()をStopする
        elif match_count == num_iter or input_id == stop_iter_id:
            isStop = True
#             self.num_stop = 0
            print(f"!!! Generate() Stopped !!!\n!!!!!!!!!\n{self.tokenizer.decode(input_ids[0])} \n!!!!!!!!!")
        else:
            isStop = False
        return isStop


# StoppingCriteriaListクラスのインスタンスを生成
stopcriteria_list = StoppingCriteriaList([MyStoppingCriteria(stop_str, num_iter, tokenizer, isGPU=True)])
print(stopcriteria_list)

# HuggingFacePipelineを作成
model_args = {"temperature":0.1, "max_length": 256, "stopping_criteria": stopcriteria_list}
llm = pipeline_setup(model=model, tokenizer=tokenizer, isGPU=True, **model_args)

# 手順③チャットボット化

In [None]:
# プロンプトテンプレートを作成
template = """
You are an AI who responds to user Input.
Please provide an answer to the human's question.
Additonaly, you are having a conversation with a human based on past interactions.

### Answer Sample
Human: Hi!
AI: Hi, nice to meet you.

### Past Interactions
{chat_history}

###
Human:{input}
"""

# Chat用のチェーンを作成
llm_chain = chat_chain_setup(template, llm)

# チャット形式
while True:
    user_input = input("\n> ")
    if user_input == "exit":
        break
    else:
        response = llm_chain.predict(input=user_input)
        print(response)

# その他、テストなど

In [None]:
# StoppingCriterial実装のためのテスト
test1 = "This is Test\n"
stop = "\n"

test1_ids = tokenizer(test1, return_tensors='pt')["input_ids"]
stop_token_ids = tokenizer(stop*2, return_tensors='pt')["input_ids"]

num_iter = 2
print(test1_ids, stop_token_ids)
for i in range(1, num_iter+1):
    print(test1_ids1[-1][-i])

In [None]:
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationChain
from langchain.prompts.chat import (
    ChatPromptTemplate,
    MessagesPlaceholder, 
    SystemMessagePromptTemplate,
    HumanMessagePromptTemplate,
)

# Memory: メモリ上に会話を記録する設定
memory = ConversationBufferMemory(return_messages=True)

template = """
You provides lots of specific information based on the context of the conversation. 
Here is a conversation between a human and an AI.
Human: Hi!
AI: Hi, nice to meet you.
"""

prompt = ChatPromptTemplate.from_messages([
    SystemMessagePromptTemplate.from_template(template),
    MessagesPlaceholder(variable_name="history"),
    HumanMessagePromptTemplate.from_template("{input}")
])

conversation = ConversationChain(llm=llm, memory=memory, prompt=prompt, verbose=True)

user_input = "Hi, I`m Matthew."
response = conversation.predict(input=user_input)
print(response)

In [None]:
memory.load_memory_variables({})