### RNNによるテキスト分類器の実装

#### 準備

In [3]:
%tensorflow_version 2.x

In [4]:
!pip install janome beautifulsoup4

Collecting janome
  Downloading Janome-0.4.1-py2.py3-none-any.whl (19.7 MB)
[K     |████████████████████████████████| 19.7 MB 1.5 MB/s 
Installing collected packages: janome
Successfully installed janome-0.4.1


In [None]:
!mkdir data
!mkdir models
!wget https://dl.fbaipublicfiles.com/fasttext/vectors-crawl/cc.ja.300.vec.gz -P data/

--2021-12-07 02:13:00--  https://dl.fbaipublicfiles.com/fasttext/vectors-crawl/cc.ja.300.vec.gz
Resolving dl.fbaipublicfiles.com (dl.fbaipublicfiles.com)... 172.67.9.4, 104.22.74.142, 104.22.75.142, ...
Connecting to dl.fbaipublicfiles.com (dl.fbaipublicfiles.com)|172.67.9.4|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1279641604 (1.2G) [binary/octet-stream]
Saving to: ‘data/cc.ja.300.vec.gz’


2021-12-07 02:14:08 (18.0 MB/s) - ‘data/cc.ja.300.vec.gz’ saved [1279641604/1279641604]



#### インポート

In [None]:
import string

import gensim
import numpy as np
import pandas as pd
import tensorflow as tf
from bs4 import BeautifulSoup
from janome.tokenizer import Tokenizer
from sklearn.metrics import f1_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.layers import Dense, Input, Embedding, SimpleRNN, LSTM, Conv1D, GlobalMaxPooling1D
from tensorflow.keras.models import load_model, Model
from tensorflow.keras.preprocessing.sequence import pad_sequences

#### データの読み込み

In [None]:
def filter_by_ascii_rate(text, threshold=0.9):
    ascii_letters = set(string.printable)
    rate = sum(c in ascii_letters for c in text) / len(text)
    return rate <= threshold

def load_dataset(filename, n=5000):
    df = pd.read_csv(filename, sep='\t')

    # 2値分類に変換
    mapping = {1: 0, 2: 0, 4: 1, 5: 1}
    df = df[df.star_rating != 3]
    df.star_rating = df.star_rating.map(mapping)

    # 日本語レビューの抽出
    is_jp = df.review_body.apply(filter_by_ascii_rate)
    df = df[is_jp]

    # サンプリング
    df = df.sample(frac=1, random_state=7)  # shuffle
    grouped = df.groupby('star_rating')
    df = grouped.head(n=n)
    return df.review_body.values, df.star_rating.values

url = 'https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_multilingual_JP_v1_00.tsv.gz'
x, y = load_dataset(url)

#### 前処理

In [None]:
t = Tokenizer(wakati=True)

#ボキャブラリの作成
def build_vocabulary(texts, num_words=None):
    tokenizer = tf.keras.preprocessing.text.Tokenizer(
        num_words=num_words, oov_token='<UNK>'
    )
    tokenizer.fit_on_texts(texts)
    return tokenizer

#htmlタグの除去
def clean_html(html, strip=False):
    soup = BeautifulSoup(html, 'html.parser')
    text = soup.get_text(strip=strip)
    return text

#分かち書き
def tokenize(text):
    return t.tokenize(text)

def preprocess_dataset(texts):
    texts = [clean_html(text) for text in texts]
    texts = [' '.join(tokenize(text)) for text in texts]
    return texts

### モデルの作成

In [13]:
class RNNModel:

    def __init__(self, input_dim, output_dim,
                 emb_dim=300, hid_dim=100,
                 embeddings=None, trainable=True):
      #Inputの作成
        self.input = Input(shape=(None,), name='input') #shape=(None,)は、入力の形状が不明な場合
      #Embedding層の作成(imput_dimをoutput_dimに変換(ベクトル形式に変換))
        if embeddings is None:
            self.embedding = Embedding(input_dim=input_dim,
                                       output_dim=emb_dim,
                                       mask_zero=True,
                                       trainable=trainable,
                                       name='embedding')
        else:
            self.embedding = Embedding(input_dim=embeddings.shape[0],
                                       output_dim=embeddings.shape[1],
                                       mask_zero=True,
                                       trainable=trainable,
                                       embeddings_initializer=tf.keras.initializers.Constant(embeddings),
                                       name='embedding')
        self.rnn = SimpleRNN(hid_dim, name='rnn')
        self.fc = Dense(output_dim, activation='softmax')

    def build(self):
        x = self.input
        embedding = self.embedding(x)
        output = self.rnn(embedding)
        y = self.fc(output)
        return Model(inputs=x, outputs=y)

#### 予測用クラス

In [None]:
class InferenceAPI:

    def __init__(self, model, vocab, preprocess):
        self.model = model
        self.vocab = vocab
        self.preprocess = preprocess
    
    #予測結果のリストを返す
    def predict_from_texts(self, texts):
        x = self.preprocess(texts)
        x = self.vocab.texts_to_sequences(x)
        return self.predict_from_sequences(x)

    #最も確率値の高いクラスを取得する
    def predict_from_sequences(self, sequences):
        sequences = pad_sequences(sequences, truncating='post')#pad_sequences：複数シーケンスを同じ長さになるように詰める
        y = self.model.predict(sequences)
        return np.argmax(y, -1)

#### モデルの学習

In [None]:
maxlen = 300
num_words = 40000
num_label = 2
batch_size = 128
epochs = 100
model_path = 'models/rnn_model.h5'

In [None]:
x = preprocess_dataset(x)
x_traain, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=42)
vocab = build_vocabulary(x_train, num_words)
x_train = vocab.texts_to_sequences(x_train)
x_test = vocab.texts_to_sequences(x_test)
x_train = pad_sequences(x_train, maxlen=maxlen, truncating='post', padding='post')
x_test = pad_sequences(x_test, maxlen=maxlen, truncating='post', padding='post')

NameError: ignored

In [23]:
#モデルの構築
model = RNNModel(num_words, num_label, embeddings=None).build()
model.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=['acc']
)
#コールバックの定義
callbacks = [
    EarlyStopping(patience=3),
    ModelCheckpoint(model_path, save_best_only=True)
]
#モデルの学習
model.fit(
    x=x_train, y=y_train,
    batch_size=batch_size,
    epochs=epochs,
    validation_split=0.2,
    callbacks=callbacks,
    shuffle=True)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100


<keras.callbacks.History at 0x7fc62dee1a90>

In [25]:
#予測
model = load_model(model_path)
api = InferenceAPI(model, vocab, preprocess_dataset)
y_pred = api.predict_from_sequences(x_test)
print('precision\t: {:.4f}'.format(precision_score(y_test, y_pred, average='binary')))
print('recall\t: {:.4f}'.format(recall_score(y_test, y_pred, average='binary')))
print('f1\t: {:.4f}'.format(f1_score(y_test, y_pred, average='binary')))
print()

precision	: 0.7822
recall	: 0.6944
f1	: 0.7357



### LSTMによるテキスト分類器の実装

In [26]:
class LSTMModel:

    def __init__(self, input_dim, output_dim,
                 emb_dim=300, hid_dim=100,
                 embeddings=None, trainable=True):
        self.input = Input(shape=(None,), name='input')
        if embeddings is None:
            self.embedding = Embedding(input_dim=input_dim,
                                       output_dim=emb_dim,
                                       mask_zero=True,
                                       trainable=trainable,
                                       name='embedding')
        else:
            self.embedding = Embedding(input_dim=embeddings.shape[0],
                                       output_dim=embeddings.shape[1],
                                       mask_zero=True,
                                       trainable=trainable,
                                       embeddings_initializer=tf.keras.initializers.Constant(embeddings),
                                       # weights=[embeddings],
                                       name='embedding')
        self.lstm = LSTM(hid_dim, name='lstm')
        self.fc = Dense(output_dim, activation='softmax')

    def build(self):
        x = self.input
        embedding = self.embedding(x)
        output = self.lstm(embedding)
        y = self.fc(output)
        return Model(inputs=x, outputs=y)

In [27]:
model_path = 'models/lstm_model.h5'
#モデルの構築
model = LSTMModel(num_words, num_label, embeddings=None).build()
model.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=['acc']
)
#コールバックの定義
callbacks = [
    EarlyStopping(patience=3),
    ModelCheckpoint(model_path, save_best_only=True)
]
#モデルの学習
model.fit(
    x=x_train, y=y_train,
    batch_size=batch_size,
    epochs=epochs,
    validation_split=0.2,
    callbacks=callbacks,
    shuffle=True)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100


<keras.callbacks.History at 0x7fc62ee3ef90>

In [28]:
#予測
model = load_model(model_path)
api = InferenceAPI(model, vocab, preprocess_dataset)
y_pred = api.predict_from_sequences(x_test)
print('precision\t: {:.4f}'.format(precision_score(y_test, y_pred, average='binary')))
print('recall\t: {:.4f}'.format(recall_score(y_test, y_pred, average='binary')))
print('f1\t: {:.4f}'.format(f1_score(y_test, y_pred, average='binary')))
print()

precision	: 0.8415
recall	: 0.7555
f1	: 0.7962



In [None]:
class CNNModel:

    def __init__(self, input_dim, output_dim,
                 filters=250, kernel_size=3,
                 emb_dim=300, embeddings=None, trainable=True):
        self.input = Input(shape=(None,), name='input')
        if embeddings is None:
            self.embedding = Embedding(input_dim=input_dim,
                                       output_dim=emb_dim,
                                       trainable=trainable,
                                       name='embedding')
        else:
            self.embedding = Embedding(input_dim=embeddings.shape[0],
                                       output_dim=embeddings.shape[1],
                                       trainable=trainable,
                                       embeddings_initializer=tf.keras.initializers.Constant(embeddings),
                                       weights=[embeddings],
                                       name='embedding')
        self.conv = Conv1D(filters,
                           kernel_size,
                           padding='valid',
                           activation='relu',
                           strides=1)
        self.pool = GlobalMaxPooling1D()
        self.fc = Dense(output_dim, activation='softmax')

    def build(self):
        x = self.input
        embedding = self.embedding(x)
        conv = self.conv(embedding)
        pool = self.pool(conv)
        y = self.fc(pool)
        return Model(inputs=x, outputs=y)

In [31]:
model_path = 'models/cnn_model.h5'
#モデルの構築
model = CNNModel(num_words, num_label, embeddings=None).build()
model.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=['acc']
)
#コールバックの定義
callbacks = [
    EarlyStopping(patience=3),
    ModelCheckpoint(model_path, save_best_only=True)
]
#モデルの学習
model.fit(
    x=x_train, y=y_train,
    batch_size=batch_size,
    epochs=epochs,
    validation_split=0.2,
    callbacks=callbacks,
    shuffle=True)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100


<keras.callbacks.History at 0x7fc62c3041d0>

In [32]:
#予測
model = load_model(model_path)
api = InferenceAPI(model, vocab, preprocess_dataset)
y_pred = api.predict_from_sequences(x_test)
print('precision\t: {:.4f}'.format(precision_score(y_test, y_pred, average='binary')))
print('recall\t: {:.4f}'.format(recall_score(y_test, y_pred, average='binary')))
print('f1\t: {:.4f}'.format(f1_score(y_test, y_pred, average='binary')))
print()

precision	: 0.8908
recall	: 0.7605
f1	: 0.8205



In [1]:
#FastTextの読み込み
def load_fasttext(filepath, binary=False):
    model = gensim.models.KeyedVectors.load_word2vec_format(filepath, binary=binary)
    return model

#必要なボキャブラリの単語分散表現のみを抽出
def filter_embeddings(embeddings, vocab, num_words, dim=300):
  _embeddings = np.zeros((num_words, dim))
  for word in vocab:
      if word in embeddings:
          word_id = vocab[word]
          if word_id >= num_words:
              continue
          _embeddings[word_id] = embeddings[word]

  return _embeddings

In [None]:
wv = load_fasttext('data/cc.ja.300.vec.gz')
wv = filter_embeddings(wv, vocab.word_index, num_words)

In [None]:
model_path = 'models/model.h5'
#モデルの構築
model = CNNModel(num_words, num_label, embeddings=wv).build()
model.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=['acc']
)
#コールバックの定義
callbacks = [
    EarlyStopping(patience=3),
    ModelCheckpoint(model_path, save_best_only=True)
]
#モデルの学習
model.fit(
    x=x_train, y=y_train,
    batch_size=batch_size,
    epochs=epochs,
    validation_split=0.2,
    callbacks=callbacks,
    shuffle=True)

In [37]:
#予測
model = load_model(model_path)
api = InferenceAPI(model, vocab, preprocess_dataset)
y_pred = api.predict_from_sequences(x_test)
print('precision\t: {:.4f}'.format(precision_score(y_test, y_pred, average='binary')))
print('recall\t: {:.4f}'.format(recall_score(y_test, y_pred, average='binary')))
print('f1\t: {:.4f}'.format(f1_score(y_test, y_pred, average='binary')))
print()

precision	: 0.8467
recall	: 0.8357
f1	: 0.8411

