# 松原2021/12 20%ルール TensorFlowによる文字列項目の異常検知

## 1. 使い方

id, 文字列形式のCSVを用意し
用意したらファイル名を
*namedata.csv* にして
本jupyter notebookのあるディレクトリにアップロードしてください。

次にそれぞれのブロックを▶︎ボタンを押して実行していきます。
「5. 文字列を入力して異常か検証」でテキストボックスが表示されるので、
そこで検証したい文字列を入力したあと、▶︎ボタンを押してブロックを実行してください。
入力文字列が異常値かどうかが判定されます。

In [None]:
import json
import requests
from bs4 import BeautifulSoup
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
import random
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, ConfusionMatrixDisplay
import tensorflow_hub as hub
import tensorflow_text
import tensorflow as tf
from scipy.stats import chi2
import csv

def fetch_name_from_csv(path):
    """
    csvを読み込み名前を取ってくる
    """
    csv_file = open(path, "r", encoding="utf-8", errors="", newline="")
    f = csv.reader(csv_file, delimiter=",", doublequote=True, lineterminator="\n", quotechar='"', skipinitialspace=True)
    header = next(f)
    
    names = []
    for row in f:
        names.append(row[1])

    return names    

names = fetch_name_from_csv("./namedata.csv")

# print(len(names))
# print(names[1])

print("処理が完了したので次の処理を実行してください。")

## 2. データセット作成

データセットはモデル開発用(dev)とテスト用(test)の2つ用意します。8割を開発用に、残り2割をテスト用に使います。

In [None]:
# 正しい名前にラベル0、正しくない名前にラベル1を付与
name_length = len(names)
name_arr = np.hstack([np.reshape(np.zeros(name_length), (-1, 1)), np.reshape(names, (-1, 1))])
# chabsa_length = len(chabsa_texts)
# chabsa_arr = np.hstack([np.reshape(np.ones(chabsa_length), (-1, 1)), np.reshape(chabsa_texts, (-1, 1))])


# 各データの数を決定
num_s_dev = int(name_length * 0.8)
num_c_dev = int(num_s_dev * 0.01)
num_s_test = name_length - num_s_dev
num_c_test = num_s_test

# print("開発データ　 正しいやつ:{}, 正しくないやつ:{}".format(num_s_dev, num_c_dev))
# print("テストデータ 正しいやつ:{}, 正しくないやつ:{}".format(num_s_test, num_c_test))

# data split
s_dev, s_test = train_test_split(name_arr, train_size=num_s_dev)
# c_dev, c_test = train_test_split(chabsa_arr, train_size=num_c_dev)
# c_test, _ = train_test_split(c_test, train_size=num_c_test)

# シャッフル
# dev_arr = np.vstack([s_dev, c_dev])
dev_arr = np.vstack([s_dev])
np.random.shuffle(dev_arr)
# test_arr = np.vstack([s_test, c_test])
test_arr = np.vstack([s_test])
np.random.shuffle(test_arr)

# print(dev_arr.shape, test_arr.shape)

print("処理が完了したので次の処理を実行してください。")

## 3. 異常検知モデル開発

Universal Sentence Encoder (USE) Multilingual, CNN版を使用

In [None]:
print("処理実行中です。画像が出力されるまでしばらくお待ちください。")

# モデルをtfhubからインポート
use_url = 'https://tfhub.dev/google/universal-sentence-encoder-multilingual/3'
embed = hub.load(use_url)

def get_embeddings(texts, batch_size=100):
    """
    文字列のリストを埋め込みベクトルのリスト(np.ndarray)に変換
    """
    length = len(texts)
    n_loop = int(length / batch_size) + 1
    embeddings = embed(texts[: batch_size])
    for i in range(1, n_loop):
        arr = embed(texts[batch_size*i: min(batch_size*(i+1), length)])
        embeddings = tf.concat([embeddings, arr], axis=0)
    return np.array(embeddings)

dev_embeddings = get_embeddings(dev_arr[:, 1])
test_embeddings = get_embeddings(test_arr[:, 1])
# print(dev_embeddings.shape, test_embeddings.shape)


# 平均方向の推定値$\hat{\mu}$を求める
mu = np.mean(dev_embeddings, axis=0)
mu /= np.linalg.norm(mu)
# print(mu.shape)


# 異常度を計算し、異常度の従うカイ2乗分布のパラメータ$\hat{m}$と$\hat{s}$を推定
anom = 1 - np.inner(mu, dev_embeddings)
anom_mean = np.mean(anom)
anom_mse = np.mean(anom**2) - anom_mean**2
mhat = 2 * anom_mean**2 / anom_mse
shat = 0.5 * anom_mse / anom_mean
# print("mhat: {:.1f}".format(mhat))
# print("shat: {:.3e}".format(shat))


# カイ2乗分布の確率密度関数PDFと累積分布関数CDFをプロット
x = np.linspace(0, 1, 100)
plt.plot(x, chi2.pdf(x, mhat, loc=0, scale=shat), c='r')
plt.title("PDF", fontsize=18)
plt.xlabel("a", fontsize=18)
plt.hist(anom, bins=100, density=True, range=(0,1), color=(0,1,0,0.5))
plt.tick_params(labelsize=14)
plt.xlim(0,1)
plt.grid()
plt.savefig("./PDF.png", bbox_inches='tight')
plt.show()

x = np.linspace(0, 1, 100)
plt.plot(x, chi2.cdf(x, mhat, loc=0, scale=shat), c='b')
plt.title("CDF", fontsize=18)
plt.xlabel("a", fontsize=18)
plt.tick_params(labelsize=14)
plt.xlim(0,1)
plt.grid()
plt.savefig("./CDF.png", bbox_inches='tight')
plt.show()


def iteration_solver(alpha, mhat, shat, x_ini=0.8, eps=1.e-12, n_ite=100):
    """
    Solve equation
    $$1-\alpha = \int_0^x \\! dx\, \chi^2 (x|\hat{m}, \hat{s}) $$
    for x by Newtonian method.
    """
    x = x_ini
    for i in range(n_ite):
        xnew = x - (chi2.cdf(x, mhat, loc=0, scale=shat) - (1 - alpha)) / chi2.pdf(x, mhat, loc=0, scale=shat)
        if abs(xnew - x) < eps:
            print("iteration: ", i+1)
            break
        x = xnew
    return xnew

alpha = 0.01
ath = iteration_solver(alpha, mhat, shat)
print("ath: {:.4f}".format(ath))

print("次の処理を実行してください。")

## 4. 精度評価

In [None]:
import warnings
warnings.filterwarnings('ignore')

print("処理実行中です。しばらくお待ちください。")

# テストデータの異常度
anom_test = 1 - np.inner(mu, test_embeddings)

# 閾値より異常度が大きければ異常標本と判定
predict = (anom_test > ath).astype(np.int)

# 正解データ
answer = test_arr[:, 0].astype(np.float)


# 精度計算
acc = accuracy_score(answer, predict)
print("Accuracy: {:.3f}".format(acc))
precision = precision_score(answer, predict)
recall = recall_score(answer, predict)
f1 = f1_score(answer, predict)
cm = confusion_matrix(answer, predict)

# 混同行列
plt.rcParams["font.size"] = 18
cmd = ConfusionMatrixDisplay(cm, display_labels=[0,1])
cmd.plot(cmap=plt.cm.Blues, values_format='d')
plt.show()


# テストデータに対する異常値のヒストグラムを正常データと異常データに分けてプロット
s_anom_test = anom_test[test_arr[:, 0]=='0.0']
c_anom_test = anom_test[test_arr[:, 0]=='1.0']

x = np.linspace(0, 1, 100)
plt.plot(x, chi2.pdf(x, mhat, loc=0, scale=shat), c='k')
plt.xlabel("a", fontsize=18)
plt.hist(s_anom_test, bins=100, density=True, range=(0,1.1), color=(0,1,0,0.8))
plt.hist(c_anom_test, bins=100, density=True, range=(0,1.1), color=(0,0,1,0.5))
plt.plot([ath, ath], [0, 9], c='r')
plt.tick_params(labelsize=14)
plt.xlim(0.3,1.1)
plt.ylim(0,9)
plt.grid()
plt.savefig("./PDF_test.png", bbox_inches='tight')
plt.show()


print("次の処理を実行してください。")

## 5. 文字列を入力して異常か検証

In [None]:
from __future__ import print_function
from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets

text = widgets.Text(
    value='テスト太郎 aaaaa',
    placeholder='please type',
    description='ﾃｽﾄ文:',
    disabled=False
)

display(text)

In [None]:
print("入力された文字列は「" + text.value + "」は")
check_arr = [text.value]
check_embeddings = get_embeddings(check_arr)

# チェックデータの異常度
anom_check = 1 - np.inner(mu, check_embeddings) + 0.20  # 0.20は補正値

# 閾値より異常度が大きければ異常標本と判定
if anom_check > ath:
    print("異常なデータである可能性が高いです。")
else:
    print("異常なデータではありません。")

print("")
print("閾値: " + str(ath) + " 異常度: " + str(anom_check))