In [8]:
%cd ~

/root


In [2]:
import numpy as np
import pandas as pd
from tqdm.auto import tqdm

tqdm.pandas()

In [107]:
from work.config.path.livedoor_news_corpus_paths import LivedoorNewsCorpusPaths
from work.mecab.config.mecab_dic_paths import MecabDicPaths

In [87]:
import re
import unicodedata
from typing import Iterable

import demoji
import neologdn
import numpy as np
from tqdm.auto import tqdm

tqdm.pandas()


class TextPreprocessor:
    def __init__(
        self,
        use_neologdn: bool = True,
        remove_url: bool = True,
        remove_emoji: bool = True,
        remove_thousand_separator: bool = True,
        replace_digit: bool = False,
        remove_full_width_symbol: bool = True,
        remove_half_width_symbol: bool = True,
        use_unicode_normalization: bool = True,
    ) -> None:
        self.use_neologdn = use_neologdn
        self.remove_url = remove_url
        self.remove_emoji = remove_emoji
        self.remove_thousand_separator = remove_thousand_separator
        self.replace_digit = replace_digit
        self.remove_full_width_symbol = remove_full_width_symbol
        self.remove_half_width_symbol = remove_half_width_symbol
        self.use_unicode_normalization = use_unicode_normalization

    def run(
        self,
        text: str,
        use_neologdn: bool = True,
        remove_url: bool = True,
        remove_emoji: bool = True,
        remove_thousand_separator: bool = True,
        replace_digit: bool = False,
        remove_full_width_symbol: bool = True,
        remove_half_width_symbol: bool = True,
        use_unicode_normalization: bool = True,
    ) -> str:
        self.use_neologdn = use_neologdn
        self.remove_url = remove_url
        self.remove_emoji = remove_emoji
        self.remove_thousand_separator = remove_thousand_separator
        self.replace_digit = replace_digit
        self.remove_full_width_symbol = remove_full_width_symbol
        self.remove_half_width_symbol = remove_half_width_symbol
        self.use_unicode_normalization = use_unicode_normalization

        # URL の削除（空白文字に置換）
        # [Note] `separator` の前後に空白文字がないと、separator も URL の一部と見做されて消されることがあるので注意
        if self.remove_url:
            regex = (
                r"(https?|ftp?|http?)(:\/\/[-_\.!~*\’()a-zA-Z0-9;\/?:\@ &=\+\$,%#]+)"
            )
            text = re.sub(regex, " ", text)

        # 絵文字の削除（空白文字に置換）
        if remove_emoji:
            text = demoji.replace(string=text, repl=" ")

        # 数字の桁区切り記号 (,) の削除
        if remove_thousand_separator:
            text = re.sub(r"(\d)([,.])(\d+)", r"\1\3", text)

        # 数字を '0' に置換
        if replace_digit:
            text = re.sub(r"\d+", "0", text)

        # 半角記号を削除
        if remove_half_width_symbol:
            text = re.sub(r"[!-/:-@[-`{-~]", " ", text)

        # 全角記号を削除（ここでは 0x25A0 - 0x266F のブロックのみを削除）
        if remove_full_width_symbol:
            text = re.sub("[■-♯]", " ", text)

        # 全角・半角の統一と重ね表現の削除
        if self.use_neologdn:
            text = neologdn.normalize(text)

        # Unicode 正規化
        if use_unicode_normalization:
            text = unicodedata.normalize("NFKC", text)

        return text

    def run_all(
        self,
        documents: Iterable[str],
        use_neologdn: bool = True,
        remove_url: bool = True,
        remove_emoji: bool = True,
        remove_thousand_separator: bool = True,
        replace_digit: bool = False,
        remove_full_width_symbol: bool = True,
        remove_half_width_symbol: bool = True,
        use_unicode_normalization: bool = True,
    ) -> list[str]:
        def _func(text: str, pbar) -> list[str]:
            filtered_text = self.run(
                text,
                use_neologdn=use_neologdn,
                remove_url=remove_url,
                remove_emoji=remove_emoji,
                remove_thousand_separator=remove_thousand_separator,
                replace_digit=replace_digit,
                remove_full_width_symbol=remove_full_width_symbol,
                remove_half_width_symbol=remove_half_width_symbol,
                use_unicode_normalization=use_unicode_normalization,
            )
            pbar.update(1)
            return filtered_text

        with tqdm(total=len(documents)) as pbar:
            filtered_documents = np.vectorize(_func)(documents, pbar)

        return filtered_documents

    def run_all_with_sep(
        self,
        documents: Iterable[str],
        separator: str = "üßäö",
        use_neologdn: bool = True,
        remove_url: bool = True,
        remove_emoji: bool = True,
        remove_thousand_separator: bool = True,
        replace_digit: bool = False,
        remove_full_width_symbol: bool = True,
        remove_half_width_symbol: bool = True,
        use_unicode_normalization: bool = True,
    ) -> list[str]:
        """
        `run_all_with_sep()` は `run_all()` よりも高速ですが、データによってはうまく separator が機能しないことがあります。
        適切な separator を見つけるのが難しい場合、 `run_all()` の使用を推奨します。
        """
        tmp_separator = separator
        text: str = tmp_separator.join(documents)
        filtered_documents_text = self.run(
            text,
            use_neologdn=use_neologdn,
            remove_url=remove_url,
            remove_emoji=remove_emoji,
            remove_thousand_separator=remove_thousand_separator,
            replace_digit=replace_digit,
            remove_full_width_symbol=remove_full_width_symbol,
            remove_half_width_symbol=remove_half_width_symbol,
            use_unicode_normalization=use_unicode_normalization,
        )
        filtered_documents = filtered_documents_text.split(separator)
        # diff = len(documents) - len(filtered_documents)
        # assert diff == 0, f"documents length must be equal to filtered_documents. \ndiff: {diff}"

        return filtered_documents

In [88]:
import re
import unicodedata

import MeCab
import numpy as np
from tqdm.auto import tqdm

tqdm.pandas()


class MecabTokenizer:
    def __init__(
        self,
        dict_path: str | None = None,
    ) -> None:
        self.tagger = (
            MeCab.Tagger() if dict_path is None else MeCab.Tagger(f"-d {dict_path}")
        )
        self.kana_re = re.compile("^[ぁ-ゖ]+$")
        self.stop_words = None

    def run(
        self,
        text: str,
        target_pos_0_list: list[str] | None = ["名詞", "動詞", "形容詞"],
        remove_hiragana_only: bool = False,
        lower_letter_case: bool = True,
        stop_words: list[str] | None = None,
    ) -> str:
        self.stop_words = stop_words if stop_words is not None else []

        # 分かち書き
        parsed_text = self.tagger.parse(text)
        parsed_lines = parsed_text.split("\n")[:-2]
        surfaces = [l.split("\t")[0] for l in parsed_lines]
        features = [l.split("\t")[1] for l in parsed_lines]

        # 原型を取得
        bases = [f.split(",")[6] for f in features]

        # 各単語を原型に変換する
        token_list = [b if b != "*" else s for s, b in zip(surfaces, bases)]

        # 品詞の絞り込み
        if (target_pos_0_list is not None) and (len(target_pos_0_list) > 0):
            pos = [f.split(",")[0] for f in features]
            token_list = [t for t, p in zip(token_list, pos) if (p in target_pos_0_list)]

        _token_list = []
        for token in token_list:
            # stopwords に含まれていれば除去
            if token in self.stop_words:
                continue

            # ひらがなのみの単語を除く
            if remove_hiragana_only and self.kana_re.match(token):
                continue

            # アルファベットを小文字に統一
            token = token.lower() if lower_letter_case else token

            _token_list.append(token)

        # 半角スペースを挟んで結合
        tokenized_text = " ".join(_token_list)

        # 再度ユニコード正規化
        tokenized_text = unicodedata.normalize("NFKC", tokenized_text)

        return tokenized_text

    def run_all(
        self,
        documents: list[str],
        target_pos_0_list: list[str] | None = ["名詞", "動詞", "形容詞"],
        remove_hiragana_only: bool = False,
        lower_letter_case: bool = True,
        stop_words: list[str] | None = None,
    ) -> list[str]:
        def _func(text: str, pbar) -> list[str]:
            tokenized_text = self.run(
                text,
                target_pos_0_list=target_pos_0_list,
                remove_hiragana_only=remove_hiragana_only,
                lower_letter_case=lower_letter_case,
                stop_words=stop_words,
            )
            pbar.update(1)
            return tokenized_text

        with tqdm(total=len(documents)) as pbar:
            tokenized_documents = np.vectorize(_func)(documents, pbar)

        return tokenized_documents

    def run_all_with_sep(
        self,
        documents: list[str],
        separator: str = "üßäö",
        target_pos_0_list: list[str] | None = ["名詞", "動詞", "形容詞"],
        remove_hiragana_only: bool = False,
        lower_letter_case: bool = True,
        stop_words: list[str] | None = None,
    ) -> list[str]:
        """
        `run_all_with_sep()` は `run_all()` よりも高速ですが、データによってはうまく separator が機能しないことがあります。
        適切な separator を見つけるのが難しい場合、 `run_all()` の使用を推奨します。
        """
        tmp_separator = separator
        text = tmp_separator.join(documents)
        tokenized_documents_text = self.run(
            text,
            target_pos_0_list=target_pos_0_list,
            remove_hiragana_only=remove_hiragana_only,
            lower_letter_case=lower_letter_case,
            stop_words=stop_words,
        )
        tokenized_documents = tokenized_documents_text.split(separator)
        # diff = len(documents) - len(tokenized_documents)
        # assert diff == 0, f"documents length must be equal to filtered_documents. \ndiff: {diff}"

        return tokenized_documents

In [9]:
# データ読み込み
filepath = CorpusPaths.LIVEDOOR_NEWS_CORPUS_PATH
df = pd.read_csv(filepath)
df.head(3).T

Unnamed: 0,0,1,2
text,自由に転載・引用が可能です。このディレクトリの記事ファイル内容の提供元：独女通信http:/...,もうすぐジューン・ブライドと呼ばれる６月。独女の中には自分の式はまだなのに呼ばれてばかり……...,携帯電話が普及する以前、恋人への連絡ツールは一般電話が普通だった。恋人と別れたら、手帳に書か...
category,dokujo-tsushin,dokujo-tsushin,dokujo-tsushin
category_label,0,0,0


In [10]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7376 entries, 0 to 7375
Data columns (total 3 columns):
 #   Column          Non-Null Count  Dtype 
---  ------          --------------  ----- 
 0   text            7376 non-null   object
 1   category        7376 non-null   object
 2   category_label  7376 non-null   int64 
dtypes: int64(1), object(2)
memory usage: 173.0+ KB


In [11]:
class LivedoorNewsColumns:
    TEXT = "text"
    CATEGORY = "category"
    CATEGORY_LABEL = "category_label"
    TOKEN = "token"
    FILTERED_TEXT = "filtered_text"
    FILTERED_TEXT_TOKEN = "filtered_text_token"

## テキストの前処理

In [90]:
# %%time
# text_preprocessor = TextPreprocessor()

# _df = df.sample(500)
# documents = _df[LivedoorNewsColumns.TEXT].to_numpy()
# filtered = text_preprocessor.run_all(documents)
# diff = len(filtered) - len(documents)
# assert diff == 0

  0%|          | 0/500 [00:00<?, ?it/s]

CPU times: user 24.2 s, sys: 84.3 ms, total: 24.3 s
Wall time: 23.9 s


In [45]:
# documents[0]

'12日、日本陸上競技連盟は、ロンドン五輪マラソンの日本代表メンバーを発表した。男子は藤原新（東京陸協）、山本亮（佐川急便）、中本健太郎（安川電機）、補欠・堀端宏行（旭化成）となり、川内優輝（埼玉県庁）は落選。女子は、重友梨佐（天満屋）、木崎良子（ダイハツ）、尾崎好美（第一生命）、補欠・赤羽有紀子（ホクレン）が選ばれた。'

In [44]:
# filtered[0]

'12日、日本陸上競技連盟は、ロンドン五輪マラソンの日本代表メンバーを発表した。男子は藤原新(東京陸協)、山本亮(佐川急便)、中本健太郎(安川電機)、補欠・堀端宏行(旭化成)となり、川内優輝(埼玉県庁)は落選。女子は、重友梨佐(天満屋)、木崎良子(ダイハツ)、尾崎好美(第一生命)、補欠・赤羽有紀子(ホクレン)が選ばれた。'

In [12]:
%%time
text_preprocessor = TextPreprocessor()

documents = df[LivedoorNewsColumns.TEXT].to_numpy()
df[LivedoorNewsColumns.FILTERED_TEXT] = text_preprocessor.run_all(documents)

CPU times: user 5min 36s, sys: 971 ms, total: 5min 37s
Wall time: 5min 35s


## 形態素解析

In [84]:
# def _tokenize(text: str, pbar) -> list[str]:
#     result = tokenizer.run(text)
#     pbar.update(1)

#     return result

In [92]:
# %%time
# _df = df.sample(3)
# tokenizer = MecabTokenizer()
# documents = _df[LivedoorNewsColumns.FILTERED_TEXT].to_numpy()

# with tqdm(total=len(documents)) as pbar:
#     result = np.vectorize(_tokenize)(documents, pbar)
#     _df[LivedoorNewsColumns.FILTERED_TEXT_TOKEN] = result

# print(f"{documents[0]}\n")
# print(result[0])
# print()

# diff = len(result) - len(documents)
# print(diff)
# assert diff == 0

In [93]:
%%time
tokenizer = MecabTokenizer()
documents = df[LivedoorNewsColumns.FILTERED_TEXT].to_numpy()
print(documents[0])

result = tokenizer.run_all(documents)
df[LivedoorNewsColumns.FILTERED_TEXT_TOKEN] = result
result[0]

自由に転載・引用が可能です。このディレクトリの記事ファイル内容の提供元:独女通信


  0%|          | 0/7376 [00:00<?, ?it/s]

CPU times: user 1min 15s, sys: 1.46 s, total: 1min 17s
Wall time: 1min 16s


'自由 転載 引用 可能 ディレクトリ 記事 ファイル 内容 提供元 独女通信'

In [100]:
df.head(1)

Unnamed: 0,text,category,category_label,filtered_text,filtered_text_token
0,自由に転載・引用が可能です。このディレクトリの記事ファイル内容の提供元：独女通信http:/...,dokujo-tsushin,0,自由に転載・引用が可能です。このディレクトリの記事ファイル内容の提供元:独女通信,自由 転載 引用 可能 ディレクトリ 記事 ファイル 内容 提供元 独女通信


In [111]:
filepath = LivedoorNewsCorpusPaths.LIVEDOOR_NEWS_CORPUS_PRP_PATH
df.to_csv(filepath, index=False)