### doc2vecでベクトル化を行う

In [1]:
from gensim.models import Doc2Vec
from gensim.models.doc2vec import TaggedDocument
from typing import List, Any
import os

from lib763.fs import (
    load_str_from_file,
    get_all_file_path_in,
    save_str_to_file,
    ensure_path_exists,
)
from lib763.Logger import Logger

from CONST import (
    DATA_PATH,
    MODEL_D2V_PATH,
    SPLITED_DATA_PATH,
    TRAIN_LOG_PATH,
)

In [2]:
def divide_list(lst: List[Any], m: int) -> List[List[Any]]:
    """
    リストをm個に分割します。
    nがmで割り切れない場合、余りの要素は順に最初の部分リストに追加されます。

    Args:
        lst (List[Any]): 分割するリスト
        m (int): 分割する個数

    Returns:
        List[List[Any]]: 分割されたリスト
    """
    quotient, remainder = divmod(len(lst), m)
    return [
        lst[
            i * quotient
            + min(i, remainder): (i + 1) * quotient
            + min(i + 1, remainder)
        ]
        for i in range(m)
    ]


# 学習データを10000ずつに分割
def split_data():
    for i, ls in enumerate(divide_list(get_all_file_path_in(DATA_PATH), 200)):
        save_str_to_file(
            "\n".join([load_str_from_file(p) for p in ls]),
            f"{SPLITED_DATA_PATH}{str(i).zfill(4)}.txt",
        )


ensure_path_exists(SPLITED_DATA_PATH)
if len(get_all_file_path_in(SPLITED_DATA_PATH)) == 0:
    split_data()

In [3]:
# Doc2Vecモデル
def get_model():
    ensure_path_exists(MODEL_D2V_PATH)
    model_path_ls = get_all_file_path_in(MODEL_D2V_PATH)
    if len(model_path_ls) != 0:
        print(f'loaded model = {model_path_ls[-1]}')
        return Doc2Vec.load(model_path_ls[-1])

    model = Doc2Vec(
        vector_size=30000,  # ベクトルの次元数
        window=1000,  # コンテキストウィンドウのサイズ
        min_count=0,  # 最低出現頻度の単語を考慮
        dm=0,
    )
    return model


model = get_model()

loaded model = ./d2v_models/model_0199.d2v


In [4]:
# log系の段取り
logger = Logger(TRAIN_LOG_PATH)
if os.path.exists(TRAIN_LOG_PATH):
    already_trained_path_ls = logger.get_log().split("\n")
else:
    already_trained_path_ls = []

In [None]:
# 学習開始
for i, path in enumerate(get_all_file_path_in(SPLITED_DATA_PATH)):
    # 学習済みなら次へ
    if path in already_trained_path_ls:
        continue
    
    batch = [
        TaggedDocument(words=row.split(","), tags=[str(i)])
        for i, row in enumerate(load_str_from_file(path).split("\n"))
    ]
    if i == 0:
        model.build_vocab(corpus_iterable=batch)
    else:
        model.build_vocab(corpus_iterable=batch, update=True)
    model.train(corpus_iterable=batch,
                total_examples=model.corpus_count, epochs=1)
    
    model.save(f"{MODEL_D2V_PATH}model_{str(i).zfill(4)}.d2v")
    logger.add_log(path)