# 演習２：用例ベース雑談システム

概要
* 用例ベース（検索ベース）の雑談対話システムを作成
* 入力発話とシステム応答の候補との類似度計算には、SentenceBERTならびにコサイン類似度を使用

目的
* 「こう聞かれたら、こう答える」という一問一答型の音声対話システムの実装を体験する

下記のような用例ベース対話システムを実装します。
あらかじめ想定されるユーザ発話とそれに対応するシステム応答のペアを複数用意しておき、ユーザ発話が入力されたら最も近い想定ユーザ発話を検索します。
そして、それに対応するシステム応答を出力します。
類似度の計算には、SentenceBERTとコサイン距離を用います。

<img src="./img/example.png" style="width: 600px;"/>

## 必要なライブラリのインストール

はじめに、必要となるライブラリをインストールします。ここではpipを使ってインストールします。

In [None]:
# SentenceBERT
! pip install sentence-transformers
! pip install fugashi
! pip install ipadic

# 音声認識
! pip install pyaudio
! pip install SpeechRecognition

# 音声合成
! pip install gTTS
! pip install pygame

## Step 1: 用例データの用意

想定されるユーザ発話とそれに対応するシステム応答のペアデータ（用例データ）を読み込みます。
データは data/example-base-data.csvに格納されており、各行が１つのペアデータ、１列目が想定ユーザ発話、２列目がシステム応答です。

In [None]:
# 用例データを読み込む
pair_data = []
filename = './data/example-base-data.csv'
print('Load from %s' % filename)
with open(filename, 'r', encoding='utf8') as f:
    lines = f.readlines()
    for line in lines:
        u1 = line.split(',')[0].strip()
        u2 = line.split(',')[1].strip()
        pair_data.append([u1, u2])
        
        print('%s -> %s' % (u1, u2))

## Step 2: SentenceBERTによる文ベクトル変換

読み込んだ用例のデータのうち、想定ユーザ発話の各発話文をSentenceBERTを用いてベクトル化します。

SentenceBERTのモデルには、今回は下記のものを用います。
https://huggingface.co/sonoisa/sentence-bert-base-ja-mean-tokens-v2

はじめに、上記のサイトを参考に、SentenceBERTを利用するためのクラスを作成します。

In [None]:
# SentenceBERTで使用
from transformers import BertJapaneseTokenizer, BertModel
import torch

# Sentence-BERTの日本語版モデルを操作するためのクラス
class SentenceBertJapanese:
    
    # コンストラクタ
    # model_name_or_path: Sentence-BERTのモデル名またはパス
    # device: 使用するデバイス（CPU or GPU）の指定。デフォルトでは利用可能な場合はGPUを使用。今回の演習ではCPUを想定する。
    def __init__(self, model_name_or_path, device=None):
        
        # トークナイザの初期化
        self.tokenizer = BertJapaneseTokenizer.from_pretrained(model_name_or_path)
        
        # モデルの初期化
        self.model = BertModel.from_pretrained(model_name_or_path)
        
        # 推論モードにモデルを設定
        self.model.eval()

        # 使用するデバイスの設定
        if device is None:
            device = "cuda" if torch.cuda.is_available() else "cpu"
        self.device = torch.device(device)
        
        # モデルを指定したデバイスに移動
        self.model.to(device)

    def _mean_pooling(self, model_output, attention_mask):

        # モデルの出力からトークンの埋め込みを取得
        token_embeddings = model_output[0]
        
        # attention_maskをトークン埋め込みの次元に展開
        input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
        
        # トークンの埋め込みを平均プーリングして文の埋め込みを取得
        return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)

    # 文のリストをベクトルに変換するメソッド
    @torch.no_grad()
    def encode(self, sentences, batch_size=8):
        
        all_embeddings = []
        iterator = range(0, len(sentences), batch_size)
        
        for batch_idx in iterator:
            
            # 文をバッチ処理するための部分集合を取得
            batch = sentences[batch_idx:batch_idx + batch_size]
            
            # 文をトークン化してモデル入力用にエンコード
            encoded_input = self.tokenizer.batch_encode_plus(batch, padding="longest", 
                                           truncation=True, return_tensors="pt").to(self.device)
            
            # モデルを使用してエンコードされた入力から出力を取得
            model_output = self.model(**encoded_input)
            
            # 平均プーリングを使用して文の埋め込みを取得
            sentence_embeddings = self._mean_pooling(model_output, encoded_input["attention_mask"]).to('cpu')
            
            # 全ての文の埋め込みをリストに追加
            all_embeddings.extend(sentence_embeddings)

        # 最終的な文の埋め込みのテンソルを返す
        return torch.stack(all_embeddings)


SentenceBERTを試してみます。

In [None]:
MODEL_NAME = "sonoisa/sentence-bert-base-ja-mean-tokens-v2"
model = SentenceBertJapanese(MODEL_NAME)

sentences = ["京都大学へようこそ", "京都でおいしいごはんを食べた"]
sentence_embeddings = model.encode(sentences, batch_size=8)

print("Sentence embeddings 1:", sentence_embeddings[0])
print("len(Sentence embeddings 1):", len(sentence_embeddings[0]))
      
print("Sentence embeddings 2:", sentence_embeddings[1])
print("len(Sentence embeddings 2):", len(sentence_embeddings[1]))

上記のmodelを用いて、各用例データを文ベクトル化します。

In [None]:
# 用例データをSentence-BERTでベクトル化
pair_data_vec = []
for d in pair_data:
    u1 = model.encode([d[0]])[0]
    u2 = d[1]
    pair_data_vec.append([u1, u2])
    
print(pair_data_vec[0][0])
print(pair_data_vec[0][1])

## Step 3: 類似度計算

次に、類似度を計算する関数を用意します。
ここでは、入力ユーザ発話と、用例データを受け取り、入力ユーザ発話に最も類似するシステム応答を返します。

In [None]:
# コサイン類似度を計算する際にnumpyを使用
import numpy as np

# 類似度計算
# input_sentence_vec: ベクトル化された入力ユーザ発話
# pair_data_vec: ベクトル化された用例データ
def matching(input_sentence_vec: np.array, pair_data_vec: list):
    
    # コサイン類似度が最も高いものを採用
    cos_dist_max = 0.
    response = None
    
    # 用例毎に処理
    for pair_each in pair_data_vec:
        
        # コサイン類似度を計算
        v1 = input_sentence_vec
        v2 = pair_each[0]
        cos_sim = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
        
        # 最大値を更新
        if cos_dist_max < cos_sim:
            cos_dist_max = cos_sim
            response = pair_each[1]
    
    return response, cos_dist_max
        

## Step 3: テスト

では、用例ベース対話システムをテストしてみましょう。

In [None]:
# テスト

# 入力発話　その１
input_sentence = '趣味は何ですか'
input_sentence_vec = model.encode([input_sentence])[0]

response, cos_dist_max = matching(input_sentence_vec, pair_data_vec)

print('入力：%s' % input_sentence)
print('応答：%s' % response)
print('類似度：%.3f' % cos_dist_max)
print()

# 入力発話　その２
input_sentence = '最近面白かったものは何ですか'
input_sentence_vec = model.encode([input_sentence])[0]

response, cos_dist_max = matching(input_sentence_vec, pair_data_vec)

print('入力：%s' % input_sentence)
print('応答：%s' % response)
print('類似度：%.3f' % cos_dist_max)
print()

### Step 4: 音声対話システムとしての統合

最後に、演習1で実装した方法を用いて、音声対話システムとして動作するように上記の機能を統合します。

In [None]:
# ライブラリのインポート
import speech_recognition as sr
from gtts import gTTS
import pygame

# 音声認識を関数化
def get_asr():
    
    r = sr.Recognizer()
    r.pause_threshold = 0.5
    
    with sr.Microphone() as source:
        r.adjust_for_ambient_noise(source) # 背景雑音へ適応する（１秒間）
        print("どうぞ話してください >> ")
        audio = r.listen(source)
    
    try:
        result = r.recognize_google(audio, language="ja-JP")
    except sr.UnknownValueError:
        result = ""
    except sr.RequestError as e:
        result = ""
    
    return result

# 音声合成を関数化
def play_tts(text):
    
    speech = gTTS(text=text, lang="ja")

    try:
        speech.save("./data/test.mp3")
    except Exception as e:
        print('ファイル保存エラー')
    
    pygame.mixer.init()
    pygame.mixer.music.load("./data/test.mp3")
    pygame.mixer.music.play()

    while pygame.mixer.music.get_busy():
        pygame.time.Clock().tick(10)

    pygame.mixer.music.stop()
    pygame.mixer.quit()

In [None]:
# 対話が終了状態に移るまで対話を続ける
while True:
    
    # 音声入力＆音声認識
    result_asr_utterance = get_asr()
    print("ユーザ： " + result_asr_utterance)
    
    # 「終了」がユーザ発話に含まれていれば対話を終了
    if "終了" in result_asr_utterance:
        break

    # 用例を検索
    input_sentence_vec = model.encode([result_asr_utterance])[0]
    system_utterance, cos_dist_max = matching(input_sentence_vec, pair_data_vec)

    print("システム： " + system_utterance)
    play_tts(system_utterance)
    
    print()

# 対話終了
print("対話終了")