In [None]:
from __future__ import annotations

import json
import os
import pickle
from dataclasses import dataclass
from typing import Final

import IPython
from matplotlib import pyplot as plt
from tqdm import tqdm

import numpy as np

from sklearn.cluster import KMeans
from sklearn.decomposition import PCA

from PIL import Image, ImageFilter

import character_utility as charutil
import config
from ipywidgets_helper import render_images
from kvg import Kvg
from utility import pathstr, char2code

In [None]:
@dataclass
class PreparedKvg():
    root: Final[Kvg]
    decomposition: Final[list[str]]
    features: Final[dict[str, np.ndarray]]
    images_for_sample: Final[dict[str, Image.Image]]


def prepare_kvg(char: str, image_size: int, stroke_width: float, blur: float, log=False):
    charcode = char2code(char)
    directory_path = config.output_main_kvg_path(charcode)
    with open(pathstr(directory_path, f"{charcode}.json")) as f:
        root_kvg = Kvg.from_dict(json.load(f))

    if log: print(json.dumps(root_kvg.to_dict(), ensure_ascii=False, indent=2))

    kvgid2kvg: dict[str, Kvg] = {}
    decompositions: dict[str, list[str]] = {}
    def dfs(kvg: Kvg, is_root=True) -> list[Kvg]:
        kvgid2kvg[kvg.kvgid] = kvg

        # 小さすぎるなら分解しない
        if not is_root:
            image = Image.open(pathstr(
                directory_path,
                f"{image_size}x,pad=0,sw={stroke_width} {kvg.kvgid}.png",
            ))
            image = image.convert("1")
            image = np.array(image).transpose()
            
            nonzero_idx = image.nonzero()
            nonzero_idx[0].sort()
            nonzero_idx[1].sort()
            
            left = (nonzero_idx[0][0] - 1) / image.shape[0]
            right = (nonzero_idx[0][-1]) / image.shape[0]
            top = (nonzero_idx[1][0] - 1) / image.shape[1]
            bottom = (nonzero_idx[1][-1]) / image.shape[1]

            width = right - left
            height = bottom - top

            if width < 0.25 and height < 0.25:
                return []
            
            if width * height < 0.125:
                return []
        
        ret: list[Kvg] = []
        if len(kvg.svg) == 0:
            for kvg0 in kvg.children:
                ret0 = dfs(kvg0, is_root=False)
                if len(ret0) == 0:
                    ret.clear()
                    break
                ret += ret0
        
        if len(ret):
            decompositions[kvg.kvgid] = [kvg0.kvgid for kvg0 in ret]
            return ret
        
        if kvg.name is None:
            return []

        ret = [kvg]
        decompositions[kvg.kvgid] = [kvg0.kvgid for kvg0 in ret]
        return ret
    
    dfs(root_kvg)

    decomposition = decompositions[root_kvg.kvgid]

    # root の分解に含まれていないものは削除
    kvgid2kvg = {kvgid: kvg for kvgid, kvg in kvgid2kvg.items() if kvgid == root_kvg.kvgid or kvgid in decomposition}
    decompositions = {kvgid: kvgids for kvgid, kvgids in decompositions.items() if kvgid == root_kvg.kvgid or kvgid in decomposition}

    images_for_features: dict[str, Image.Image] = {}
    features: dict[str, np.ndarray] = {}
    images_for_sample: dict[str, Image.Image] = {}
    for kvgid in decomposition:
        images_for_features[kvgid] = Image.open(pathstr(
            directory_path,
            f"{image_size}x,pad=0,sw={stroke_width} {kvgid}.png",
        ))
        images_for_features[kvgid] = images_for_features[kvgid].filter(ImageFilter.GaussianBlur(blur))

        features[kvgid] = np.array(images_for_features[kvgid])
        features[kvgid] = np.concatenate((features[kvgid].reshape(-1), features[kvgid].transpose().reshape(-1))) # 一方向だけだと例えば「かんむり」と「たれ」が区別されにくい

        images_for_sample[kvgid] = Image.open(pathstr(
            directory_path,
            f"64x,pad=4,sw=2 {kvgid}.png", # 雑
        ))

    pkvg = PreparedKvg(
        root=root_kvg,
        decomposition=decomposition,
        features=features,
        images_for_sample=images_for_sample,
    )

    return pkvg, kvgid2kvg, images_for_features, decompositions


def test():
    (
        pkvg,
        kvgid2kvg,
        images_for_features,
        decompositions,
    ) = prepare_kvg("困", image_size=16, stroke_width=2, blur=1, log=True)

    print(f"decompositions: {len(decompositions)}; {decompositions}")

    images = []
    for kvgid, image in pkvg.images_for_sample.items():
        images.append((image, kvgid))
    for kvgid, image in images_for_features.items():
        images.append((image, kvgid))
    return render_images(images, columns=(len(images) // 2))


test()

In [None]:
@dataclass(frozen=True)
class KMeansResult:
    kvgids: list[str]
    features: np.ndarray
    labels: list[int]
    kmeans: KMeans


def train_kmeans(pkvgs: list[PreparedKvg], kvgid2kvg: dict[str, Kvg], n_clusters, log=False, plot=False) -> KMeansResult:
    kvgids: list[str] = []
    features = []
    for pkvg in pkvgs:
        for kvgid, feature in pkvg.features.items():
            kvgids.append(kvgid)
            features.append(feature)
    features = np.stack(features)

    if log: print(f"data size: {len(kvgids)}")

    kmeans = KMeans(n_init=4, n_clusters=n_clusters, init="k-means++")
    labels = kmeans.fit_predict(features).tolist()
    
    if plot:
        pca = PCA(n_components=2)
        boundings2d = pca.fit_transform(features)
        print(f"{pca.explained_variance_ratio_=}")

        plt.figure(figsize=(16, 16))
        plt.scatter(boundings2d[:, 0], boundings2d[:, 1], c=labels)
        for kvgid, xy in zip(kvgids, boundings2d):
            kvg = kvgid2kvg[kvgid]
            assert kvg.name is not None
            plt.annotate(kvg.name, xy)
    
    return KMeansResult(kvgids=kvgids, features=features, labels=labels, kmeans=kmeans)


def test(characters, n_clusters, image_size, stroke_width, blur):
    pkvgs: list[PreparedKvg] = []
    kvgid2kvg: dict[str, Kvg] = {}
    for c in characters:
        pkvg0, kvgid2kvg0, _, _ = prepare_kvg(c, image_size, stroke_width, blur)
        pkvgs.append(pkvg0)
        kvgid2kvg |= kvgid2kvg0

    train_kmeans(pkvgs=pkvgs, kvgid2kvg=kvgid2kvg, n_clusters=n_clusters, log=True, plot=True)


test(charutil.kanjis.education(), n_clusters=64, image_size=16, stroke_width=2, blur=1)

In [None]:
def plot_elbow(characters, n_clusters_candidates, image_size, stroke_width, blur):
    pkvgs: list[PreparedKvg] = []
    kvgid2kvg: dict[str, Kvg] = {}
    for c in characters:
        pkvg0, kvgid2kvg0, _, _ = prepare_kvg(c, image_size, stroke_width, blur)
        pkvgs.append(pkvg0)
        kvgid2kvg |= kvgid2kvg0

    distortions = []
    for n_clusters in tqdm(n_clusters_candidates):
        result = train_kmeans(pkvgs=pkvgs, kvgid2kvg=kvgid2kvg, n_clusters=n_clusters, log=False)
        distortions.append(-result.kmeans.score(result.features))

    plt.plot(n_clusters_candidates, distortions, marker="o")
    plt.xlabel("n_clusters")
    plt.ylabel("distortion")


plot_elbow(charutil.kanjis.all(), range(16, 512 + 1, 16), image_size=16, stroke_width=2, blur=1)

In [None]:
def save(characters, dataset_name, n_clusters, image_size, stroke_width, blur):
    directory_path = config.output_radical_clustering_path(dataset_name, n_clusters, image_size, stroke_width, blur)
    print(directory_path)
    
    os.makedirs(directory_path, exist_ok=False)
    
    pkvgs: list[PreparedKvg] = []
    kvgid2kvg: dict[str, Kvg] = {}
    decompositions: dict[str, list[str]] = {}
    for c in characters:
        pkvg0, kvgid2kvg0, _, decompositions0 = prepare_kvg(c, image_size, stroke_width, blur)
        pkvgs.append(pkvg0)
        kvgid2kvg |= kvgid2kvg0
        decompositions |= decompositions0

    with open(pathstr(directory_path, "decompositions.json"), "w") as f:
        json.dump(decompositions, f)

    result = train_kmeans(pkvgs=pkvgs, kvgid2kvg=kvgid2kvg, n_clusters=n_clusters, log=True)

    with open(pathstr(directory_path, "kmeans.pickle"), "wb") as f:
        pickle.dump(result.kmeans, f)

    label2radicalname2kvgids: list[dict[str, list[str]]] = [{} for _ in range(n_clusters)]
    for kvgid, label in zip(result.kvgids, result.labels):
        kvg = kvgid2kvg[kvgid]

        name = kvg.name
        assert name is not None
        if kvg.part is not None:
            name = f"{name}_{kvg.part}"

        label2radicalname2kvgids[label].setdefault(name, [])
        label2radicalname2kvgids[label][name].append(kvg.kvgid)

    for t in label2radicalname2kvgids:
        for v in t.values():
            v.sort()

    with open(pathstr(directory_path, "label2radicalname2kvgids.json"), "w") as f:
        json.dump(label2radicalname2kvgids, f)

    def generate_result_html() -> str:
        from base64 import b64encode
        from io import BytesIO
        from bs4 import BeautifulSoup

        label2size = [0 for _ in range(n_clusters)]
        for label in result.labels:
            label2size[label] += 1

        kvgid2sample_image: dict[str, Image.Image] = {}
        for pkvg in pkvgs:
            kvgid2sample_image |= pkvg.images_for_sample
        
        soup = BeautifulSoup("", "html.parser")
        
        container = soup.new_tag("div", style="display: flex; flex-direction: column; justify-content: center; gap: 2em")
        soup.append(container)

        for label, (radicalname2kvgids, size, cluster_center) in enumerate(
            zip(label2radicalname2kvgids, label2size, result.kmeans.cluster_centers_)
        ):
            cluster_el = soup.new_tag("div", style="display: flex; flex-direction: column; gap: 1em")
            container.append(cluster_el)

            title_el = soup.new_tag("div")
            title_el.string = f"cluster: {label}, radicals: {size}"
            cluster_el.append(title_el)

            images: list[tuple[str, Image.Image]] = []

            center_image = (cluster_center[:(image_size ** 2)].reshape(image_size, image_size) + cluster_center[(image_size ** 2):].reshape(image_size, image_size).transpose()) / 2
            center_image = Image.fromarray(center_image)
            center_image = center_image.resize((image_size, image_size)).convert("L")

            images.append(("center", center_image))

            for neg_num_kvgids, radical, image in sorted(tuple(
                (-len(kvgids), radical, kvgid2sample_image[kvgids[0]])
                for radical, kvgids in radicalname2kvgids.items()
            )):
                images.append((f"{radical} × {-neg_num_kvgids}", image))

            image_container_el = soup.new_tag("div", style="display: flex; gap: 1em")
            cluster_el.append(image_container_el)

            for caption, image in images:
                buffer = BytesIO()
                image.save(buffer, "png")
                base64 = b64encode(buffer.getvalue()).decode("ascii")

                figure_el = soup.new_tag("figure", style="margin: 0; padding: 0; display: flex; flex-direction: column; align-items: center; gap: 0.5em")
                image_container_el.append(figure_el)

                figcaption_el = soup.new_tag("figcaption", style="line-height: 1")
                figcaption_el.string = caption
                figure_el.append(figcaption_el)

                img_el = soup.new_tag("img", style="width: 64px; aspect-ratio: 1", src=f"data:image/png;base64,{base64}")
                figure_el.append(img_el)

        return str(soup)
    
    html = generate_result_html()
    with open(pathstr(directory_path, "result.html"), "w") as f:
        print(html, file=f)

    return IPython.display.HTML(html) # type: ignore


# save(charutil.kanjis.jis_row(16), "test", n_clusters=16, image_size=16, stroke_width=2, blur=2)
save(charutil.kanjis.all(), "edu+jis_l1,2(new_decomp)", n_clusters=384, image_size=16, stroke_width=2, blur=2)