# Universal Sentence Encoderを使って文章の異常検知をする
詳細な説明は https://qiita.com/jovyan/items/e5d2dc7ffabc2353db38 をご覧ください。

In [None]:
import json
import requests
from bs4 import BeautifulSoup
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
import random
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, ConfusionMatrixDisplay
import tensorflow_hub as hub
import tensorflow_text
import tensorflow as tf
from scipy.stats import chi2

## テキストデータ準備

In [None]:
# 青空文庫から夏目漱石の小説をダウンロード
url_list = [
    'https://www.aozora.gr.jp/cards/000148/files/773_14560.html',  # 彼岸過迄
    'https://www.aozora.gr.jp/cards/000148/files/765_14961.html',  # 行人
    'https://www.aozora.gr.jp/cards/000148/files/775_14942.html',  # こころ
]

def fetch_sentences(url):
    """
    urlからテキストをスクレイピングしてルビを除く
    """
    res = requests.get(url)
    soup = BeautifulSoup(res.content, 'html.parser', from_encoding='utf-8')
    main_text = soup.find("div", class_='main_text')
    #ルビが振ってあるのを削除
    for yomigana in main_text.find_all(["rp","h4","rt"]):
        yomigana.decompose()
    sentences = [line.strip() for line in main_text.text.strip().splitlines()]
    return sentences

souseki_texts = []
for url in url_list:
    souseki_texts += fetch_sentences(url)

print(len(souseki_texts))

In [None]:
def split_texts(texts_input):
    """
    文章を句点ごとに分割する
    """
    texts_output = []
    for t in texts_input:
        texts_output += t.split('。')
    return texts_output

souseki_texts = split_texts(souseki_texts)
print(len(souseki_texts))

In [None]:
def filter_by_length(texts_input, min_len=20, max_len=300):
    """
    短すぎる文章と長すぎる文章を除く
    """
    texts_output = []
    for s in texts_input:
        length = len(s)
        if length >= min_len and length <= max_len:
            texts_output.append(s)
    return texts_output

souseki_texts = filter_by_length(souseki_texts)
print(len(souseki_texts))

In [None]:
# chABSA-dataset（有価証券報告書）をダウンロード
!wget https://s3-ap-northeast-1.amazonaws.com/dev.tech-sketch.jp/chakki/public/chABSA-dataset.zip && \
 unzip chABSA-dataset.zip

In [None]:
# jsonファイルを展開して文章をリストに格納
data_path = Path("chABSA-dataset")
path_list = [p for p in data_path.glob("*.json")]

chabsa_texts = []
for p in path_list:
    with open(p, "br") as f:
        j =  json.load(f)
    for line in j["sentences"]:
        chabsa_texts += [line["sentence"].replace('\n', '')]

print(len(chabsa_texts))

In [None]:
# 短すぎる文章と長すぎる文章を除く
chabsa_texts = filter_by_length(chabsa_texts)
print(len(chabsa_texts))

## データセット作成
データセットはモデル開発用(dev)とテスト用(test)の2つ用意します。漱石の文章の8割を開発用に、残り2割をテスト用に使います。開発データには漱石の文章の1%の量だけ有価証券報告書の文章を混ぜます。テスト用に関しては、精度評価をしやすいように漱石の文章と有価証券報告書の文章は同数とします。

In [None]:
# 漱石にラベル0、有価証券報告書にラベル1を付与
souseki_length = len(souseki_texts)
souseki_arr = np.hstack([np.reshape(np.zeros(souseki_length), (-1, 1)), np.reshape(souseki_texts, (-1, 1))])
chabsa_length = len(chabsa_texts)
chabsa_arr = np.hstack([np.reshape(np.ones(chabsa_length), (-1, 1)), np.reshape(chabsa_texts, (-1, 1))])

In [None]:
# 各データの数を決定
num_s_dev = int(souseki_length * 0.8)
num_c_dev = int(num_s_dev * 0.01)
num_s_test = souseki_length - num_s_dev
num_c_test = num_s_test
print("開発データ　 漱石:{}, 有報:{}".format(num_s_dev, num_c_dev))
print("テストデータ 漱石:{}, 有報:{}".format(num_s_test, num_c_test))

# data split
s_dev, s_test = train_test_split(souseki_arr, train_size=num_s_dev)
c_dev, c_test = train_test_split(chabsa_arr, train_size=num_c_dev)
c_test, _ = train_test_split(c_test, train_size=num_c_test)

# 漱石と有価証券報告書をシャッフル
dev_arr = np.vstack([s_dev, c_dev])
np.random.shuffle(dev_arr)
test_arr = np.vstack([s_test, c_test])
np.random.shuffle(test_arr)

print(dev_arr.shape, test_arr.shape)

In [None]:
test_arr[:10]

## 異常検知モデル開発
Universal Sentence Encoder (USE) Multilingual, CNN版を使用

In [None]:
# モデルをtfhubからインポート
use_url = 'https://tfhub.dev/google/universal-sentence-encoder-multilingual/3'
embed = hub.load(use_url)

In [None]:
def get_embeddings(texts, batch_size=100):
    """
    文章のリストを埋め込みベクトルのリスト(np.ndarray)に変換
    """
    length = len(texts)
    n_loop = int(length / batch_size) + 1
    embeddings = embed(texts[: batch_size])
    for i in range(1, n_loop):
        arr = embed(texts[batch_size*i: min(batch_size*(i+1), length)])
        embeddings = tf.concat([embeddings, arr], axis=0)
    return np.array(embeddings)

In [None]:
dev_embeddings = get_embeddings(dev_arr[:, 1])
test_embeddings = get_embeddings(test_arr[:, 1])
print(dev_embeddings.shape, test_embeddings.shape)

In [None]:
# 平均方向の推定値$\hat{\mu}$を求める
mu = np.mean(dev_embeddings, axis=0)
mu /= np.linalg.norm(mu)
print(mu.shape)

In [None]:
# 異常度を計算し、異常度の従うカイ2乗分布のパラメータ$\hat{m}$と$\hat{s}$を推定
anom = 1 - np.inner(mu, dev_embeddings)
anom_mean = np.mean(anom)
anom_mse = np.mean(anom**2) - anom_mean**2
mhat = 2 * anom_mean**2 / anom_mse
shat = 0.5 * anom_mse / anom_mean
print("mhat: {:.1f}".format(mhat))
print("shat: {:.3e}".format(shat))

In [None]:
# カイ2乗分布の確率密度関数PDFと累積分布関数CDFをプロット
x = np.linspace(0, 1, 100)
plt.plot(x, chi2.pdf(x, mhat, loc=0, scale=shat), c='r')
plt.title("PDF", fontsize=18)
plt.xlabel("a", fontsize=18)
plt.hist(anom, bins=100, density=True, range=(0,1), color=(0,1,0,0.5))
plt.tick_params(labelsize=14)
plt.xlim(0,1)
plt.grid()
plt.savefig("./PDF.png", bbox_inches='tight')
plt.show()

x = np.linspace(0, 1, 100)
plt.plot(x, chi2.cdf(x, mhat, loc=0, scale=shat), c='b')
plt.title("CDF", fontsize=18)
plt.xlabel("a", fontsize=18)
plt.tick_params(labelsize=14)
plt.xlim(0,1)
plt.grid()
plt.savefig("./CDF.png", bbox_inches='tight')
plt.show()

In [None]:
def iteration_solver(alpha, mhat, shat, x_ini=0.8, eps=1.e-12, n_ite=100):
    """
    Solve equation
    $$1-\alpha = \int_0^x \\! dx\, \chi^2 (x|\hat{m}, \hat{s}) $$
    for x by Newtonian method.
    """
    x = x_ini
    for i in range(n_ite):
        xnew = x - (chi2.cdf(x, mhat, loc=0, scale=shat) - (1 - alpha)) / chi2.pdf(x, mhat, loc=0, scale=shat)
        if abs(xnew - x) < eps:
            print("iteration: ", i+1)
            break
        x = xnew
    return xnew

alpha = 0.01
ath = iteration_solver(alpha, mhat, shat)
print("ath: {:.4f}".format(ath))

## 精度評価

In [None]:
# テストデータの異常度
anom_test = 1 - np.inner(mu, test_embeddings)

# 閾値より異常度が大きければ異常標本と判定
predict = (anom_test > ath).astype(np.int)

# 正解データ
answer = test_arr[:, 0].astype(np.float)

In [None]:
# 各種精度指標計算
acc = accuracy_score(answer, predict)
precision = precision_score(answer, predict)
recall = recall_score(answer, predict)
f1 = f1_score(answer, predict)
cm = confusion_matrix(answer, predict)
print("Accuracy: {:.3f}, Precision: {:.3f}, Recall: {:.3f}, F1: {:.3f}".format(acc, precision, recall, f1))

# 混同行列
plt.rcParams["font.size"] = 18
cmd = ConfusionMatrixDisplay(cm, display_labels=[0,1])
cmd.plot(cmap=plt.cm.Blues, values_format='d')
plt.show()

In [None]:
# テストデータに対する異常値のヒストグラムを正常データと異常データに分けてプロット
souseki_anom_test = anom_test[test_arr[:, 0]=='0.0']
chabsa_anom_test = anom_test[test_arr[:, 0]=='1.0']

x = np.linspace(0, 1, 100)
plt.plot(x, chi2.pdf(x, mhat, loc=0, scale=shat), c='k')
plt.xlabel("a", fontsize=18)
plt.hist(souseki_anom_test, bins=100, density=True, range=(0,1.1), color=(0,1,0,0.8))
plt.hist(chabsa_anom_test, bins=100, density=True, range=(0,1.1), color=(0,0,1,0.5))
plt.plot([ath, ath], [0, 9], c='r')
plt.tick_params(labelsize=14)
plt.xlim(0.3,1.1)
plt.ylim(0,9)
plt.grid()
plt.savefig("./PDF_test.png", bbox_inches='tight')
plt.show()