In [2]:
import os
import re
import json
import numpy as np
import shutil
import matplotlib.pyplot as plt
from PIL import Image
from tqdm import tqdm
from sklearn.model_selection import train_test_split

from tokenizers import Tokenizer
from tokenizers.models import WordLevel
from tokenizers.pre_tokenizers import Whitespace
from tokenizers.trainers import WordLevelTrainer

import subprocess
if not os.path.exists("Japanese_BPEEncoder_V2"):
    subprocess.run(["git", "clone", "https://github.com/tanreinama/Japanese-BPEEncoder_V2.git", "Japanese_BPEEncoder_V2"])
from Japanese_BPEEncoder_V2.encode_swe import SWEEncoder_ja

# CONFIG

In [3]:
NO_WORD_IMAGE_ONLY = True
REAL_IMAGE_ONLY = True
REAL_IMAGE_THRESHOLD = 0.75
NO_UNIQUE_NOUN_SENTENCE_ONLY = True

MAX_SENTENCE_LENGTH = 31
MIN_SENTENCE_LENGTH = 4
MIN_FREQUENCY = 16

data_dir = f"../datas/{NO_WORD_IMAGE_ONLY}_{REAL_IMAGE_ONLY}_{REAL_IMAGE_THRESHOLD}_{NO_UNIQUE_NOUN_SENTENCE_ONLY}_{MAX_SENTENCE_LENGTH}_{MIN_SENTENCE_LENGTH}_{MIN_FREQUENCY}/"
os.makedirs(data_dir, exist_ok = True)
with open(f"{data_dir}data_config.json", "w") as f:
    json.dump({
        "NO_WORD_IMAGE_ONLY": NO_WORD_IMAGE_ONLY,
        "REAL_IMAGE_ONLY": REAL_IMAGE_ONLY,
        "REAL_IMAGE_THRESHOLD": REAL_IMAGE_THRESHOLD,
        "NO_UNIQUE_NOUN_SENTENCE_ONLY": NO_UNIQUE_NOUN_SENTENCE_ONLY,

        "MAX_SENTENCE_LENGTH": MAX_SENTENCE_LENGTH,
        "MIN_SENTENCE_LENGTH": MIN_SENTENCE_LENGTH,
        "MIN_FREQUENCY": MIN_FREQUENCY,
    }, f)

In [4]:
DATA_DIR = "../datas/Bokete_Dataset/boke_data_assemble/"
IMAGE_DIR = "../datas/Bokete_Dataset/boke_image/"

with open('Japanese_BPEEncoder_V2/ja-swe32kfix.txt') as f:
    bpe = f.read().split('\n')

with open('Japanese_BPEEncoder_V2/emoji.json') as f:
    emoji = json.loads(f.read())

SWE_tokenizer = SWEEncoder_ja(bpe, emoji)

In [11]:
def is_contains_invalid_characters(text):
    # ひらがな（\u3040-\u309F）
    # カタカナ（\u30A0-\u30FF）
    # 漢字（\u4E00-\u9FFF）
    # 句読点「、。」（直接列挙）
    pattern = r"^[\u3040-\u309F\u30A0-\u30FF\u4E00-\u9FFF、。!?！？ー]*$"
    return not re.fullmatch(pattern, text)

In [None]:
data_dict = dict()
word_count_dict = dict()



# ファイル名でソートすることで他のコンピュータでも同様の結果となる
for JP in tqdm(sorted(os.listdir(DATA_DIR))):

    # 画像があるか
    n = JP.split(".")[0]
    if not os.path.exists(f"{IMAGE_DIR}{n}.jpg"):
        continue

    with open(f"{DATA_DIR}{JP}", "r") as f:
        d = json.load(f)
    
    image_information = d["image_infomation"]

    # 現実の画像であるか
    if REAL_IMAGE_ONLY:
        if image_information["is_photographic_probability"] < REAL_IMAGE_THRESHOLD:
            continue
    
    # OCRで文字を含む画像であるか
    if NO_WORD_IMAGE_ONLY:
        if len(image_information["ocr"]) != 0:
            continue
    
    tmp_bokes = list()
    for B in d["bokes"]:

        # 数字，ローマ字，記号を含む文章であるか
        if is_contains_invalid_characters(B["boke"]):
            continue
        
        # 固有名詞を含む文章であるか
        if NO_UNIQUE_NOUN_SENTENCE_ONLY:
            if len(B["unique_nouns"]) != 0:
                continue
        
        boke = B["boke"].replace("�", "")
        tokenized_boke = SWE_tokenizer.encode(boke)

        # 文章の長さが最大値を超えているか
        if not (MIN_SENTENCE_LENGTH <= len(tokenized_boke) <= MAX_SENTENCE_LENGTH):
            continue
        
        tmp_bokes.append( {
            "boke": boke,
            "tmp_tokenized_boke": tokenized_boke
        } )

        for I in tokenized_boke:
            try:
                word_count_dict[I] += 1
            except:
                word_count_dict[I] = 1

    data_dict[n] = tmp_bokes

# 単語の最小出現回数を満たすか
tmp_data_dict = dict()
words = list()
for K, V in data_dict.items():
    tmp_bokes = list()
    for B in V:


        flag = False
        for I in B["tmp_tokenized_boke"]:
            if word_count_dict[I] < MIN_FREQUENCY:
                flag = True
                break
        if flag:
            continue

        tmp_bokes.append( B )
        words += B["tmp_tokenized_boke"]

    if len(tmp_bokes) == 0:
        continue
    tmp_data_dict[K] = tmp_bokes
words = list(set(words))
data_dict = tmp_data_dict

#
index_to_index = {
    W: i + 3 for i, W in enumerate(words)
}
index_to_word = {
    i + 3: SWE_tokenizer.decode([W]) for i, W in enumerate(words) if SWE_tokenizer.decode([W]) != "�"
}
index_to_word[0] = "<PAD>"
index_to_word[1] = "<BOS>"
index_to_word[2] = "<EOS>"

tmp_data_dict = dict()
for K, V in tqdm(data_dict.items()):
    tmp_bokes = list()
    for B in V:
        tokenized_boke = list()
        for I in B["tmp_tokenized_boke"]:
            if SWE_tokenizer.decode([I]) == "�": continue 
            
            tokenized_boke.append(index_to_index[I])
        tmp_bokes.append( {
            "boke": B["boke"],
            "tokenized_boke": tokenized_boke
        } )
    
    tmp_data_dict[K] = tmp_bokes

data_dict = tmp_data_dict

# # tokenizerの定義
model = WordLevel(vocab = {V:K for K, V in index_to_word.items()})
tokenizer = Tokenizer(model)
tokenizer.pre_tokenizer = Whitespace()

#
len(data_dict), sum([len(V) for V in data_dict.values()]), len(index_to_word), tokenizer.get_vocab()["<PAD>"], tokenizer.get_vocab()["<BOS>"], tokenizer.get_vocab()["<EOS>"]

In [None]:
train_image_numbers, test_image_numbers = train_test_split(sorted(list(data_dict.keys())), 
                                                           test_size = 0.01,
                                                           random_state = 42)

# image
train_inputs_1 = list()
# sentence
train_inputs_2 = list()
train_teacher_signals = list()
for N in tqdm(train_image_numbers):
    for B in data_dict[N]:

        train_inputs_1.append( int(N) )

        tokenized_boke = [1] + B["tokenized_boke"] + [2]
        tokenized_boke += [0] * (MAX_SENTENCE_LENGTH + 2 - len(tokenized_boke))
        train_inputs_2.append( tokenized_boke[:-1] )
        train_teacher_signals.append( tokenized_boke[1:] )
train_inputs_1 = np.array(train_inputs_1)
train_inputs_2 = np.array(train_inputs_2)
train_teacher_signals = np.array(train_teacher_signals)

#
# image
test_inputs_1 = list()
# sentence
test_inputs_2 = list()
test_teacher_signals = list()
for N in tqdm(test_image_numbers):
    for B in data_dict[N]:

        test_inputs_1.append( int(N) )

        tokenized_boke = [1] + B["tokenized_boke"] + [2]
        tokenized_boke += [0] * (MAX_SENTENCE_LENGTH + 2 - len(tokenized_boke))
        test_inputs_2.append( tokenized_boke[:-1] )
        test_teacher_signals.append( tokenized_boke[1:] )
test_inputs_1 = np.array(test_inputs_1)
test_inputs_2 = np.array(test_inputs_2)
test_teacher_signals = np.array(test_teacher_signals)

#
len(train_image_numbers), len(test_image_numbers), train_inputs_2.shape

In [None]:
tokenizer.save(f"{data_dir}tokenizer.json")
np.save(f"{data_dir}train_inputs_1.npy", train_inputs_1)
np.save(f"{data_dir}train_inputs_2.npy", train_inputs_2)
np.save(f"{data_dir}train_teacher_signals.npy", train_teacher_signals)
np.save(f"{data_dir}test_inputs_1.npy", test_inputs_1)
np.save(f"{data_dir}test_inputs_2.npy", test_inputs_2)
np.save(f"{data_dir}test_teacher_signals.npy", test_teacher_signals)

# アンケート用の大喜利を選択

In [None]:
test_inputs_1 = np.load(f"{data_dir}test_inputs_1.npy")
test_image_numbers = list(set(test_inputs_1.tolist()))

In [12]:
test_inputs_1 = np.load(f"{data_dir}test_inputs_1.npy")
test_image_numbers = list(set(test_inputs_1.tolist()))

data_dict = dict()
for N in tqdm(test_image_numbers):
    with open(f"{DATA_DIR}{N}.json", "r") as f:
        d = json.load(f)
    
    tmp_bokes = list()
    for B in d["bokes"]:
        # 数字，ローマ字，記号を含む文章であるか
        if is_contains_invalid_characters(B["boke"]):
            continue

        # 固有名詞を含む文章であるか
        if NO_UNIQUE_NOUN_SENTENCE_ONLY:
            if len(B["unique_nouns"]) != 0:
                continue
        boke = B["boke"].replace("�", "")
        tokenized_boke = SWE_tokenizer.encode(B["boke"])

        # 文章の長さが最大値を超えているか
        if not (MIN_SENTENCE_LENGTH <= len(tokenized_boke) <= MAX_SENTENCE_LENGTH):
            continue

        tmp_bokes.append( {
            "boke": boke,
            "star": B["star"]
        } )
    
    data_dict[N] = np.random.choice(tmp_bokes, size = 1, replace = False)[0]

100%|██████████| 2202/2202 [00:01<00:00, 1187.07it/s]


In [15]:
with open(f"human_bokes.json", "w") as f:
    json.dump(data_dict, f, ensure_ascii = False, indent = 4)