<a href="https://colab.research.google.com/github/stayup24h/Hangul-to-Unicode-Obfuscation-Project/blob/main/unicode_similarity_search_by_OCR_vector.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import numpy as np
import tensorflow as tf
import keras
from tensorflow.keras import Model
from sklearn.model_selection import KFold
from copy import deepcopy # 각 폴드마다 새로운 모델을 시작하기 위함
from google.colab import drive
import os
import matplotlib.pyplot as plt

drive.mount('/content/drive')

Mounted at /content/drive


In [48]:
# autoencoder 정의

def compiled_autoencoder(input_dim=68, latent_dim=16):
    """Autoencoder 모델을 정의하고 컴파일하여 반환합니다."""

    # 1. Encoder 정의
    input_layer = tf.keras.layers.Input(shape=(input_dim,), name='input_vector')
    x = tf.keras.layers.Dense(32, activation='relu')(input_layer)
    latent_vector = tf.keras.layers.Dense(latent_dim, activation='relu', name='latent_vector')(x)
    encoder = Model(input_layer, latent_vector, name='encoder')

    # 2. Decoder 정의
    latent_input = tf.keras.layers.Input(shape=(latent_dim,), name='latent_input')
    y = tf.keras.layers.Dense(32, activation='relu')(latent_input)
    reconstruction = tf.keras.layers.Dense(input_dim, activation='sigmoid', name='reconstruction')(y)
    decoder = Model(latent_input, reconstruction, name='decoder')

    # 3. Autoencoder 정의
    autoencoder_output = decoder(encoder(input_layer))
    autoencoder = Model(input_layer, autoencoder_output, name='autoencoder')

    optimizer = tf.keras.optimizers.SGD(learning_rate=1e-5, momentum=0.9)

    early_stopping_cb = keras.callbacks.EarlyStopping(patience=10,
                                                  restore_best_weights=True,
                                                  monitor='loss')

    lr_scheduler_cb = keras.callbacks.ReduceLROnPlateau(monitor='loss',
                                                    factor=0.1,
                                                    patience=3)

    autoencoder.compile(optimizer=optimizer, loss='mse')

    return autoencoder, encoder

In [49]:
NPY_FILE_PATHS = []

for i in range(13):
    NPY_FILE_PATHS.append(f"/content/drive/MyDrive/기계학습기초/datasets/unicode_images/unicode_results_{i}.npy")
    NPY_FILE_PATHS.append(f"/content/drive/MyDrive/기계학습기초/datasets/unicode_images/combined_unicode_results_{i}.npy")

loaded_arrays = []
for file_path in NPY_FILE_PATHS:
    if os.path.exists(file_path):
        loaded_arrays.append(np.load(file_path))
        print(f"'{file_path}' loaded. Shape: {loaded_arrays[-1].shape}")
    else:
        print(f"Warning: File not found at '{file_path}'. Skipping.")

if not loaded_arrays:
    raise FileNotFoundError("No NPY files were loaded from the specified paths.")

X_features = np.concatenate(loaded_arrays, axis=0)
print(f"All NPY files concatenated successfully. Total Shape: {X_features.shape}")


N = X_features.shape[0]
INPUT_DIM = X_features.shape[1]

if INPUT_DIM != 68:
    print(f"경고: 로드된 데이터의 차원({INPUT_DIM})이 예상된 68차원과 다릅니다. 확인해주세요.")

LATENT_DIM = 16
EPOCHS = 200
BATCH_SIZE = 64

autoencoder, encoder_model = compiled_autoencoder(INPUT_DIM, LATENT_DIM)

print("\nStarting Training...")


history = autoencoder.fit(
    x= X_features,
    y= X_features,  # Autoencoder는 입력과 출력이 동일
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    verbose=1
)


plt.figure(figsize=(10, 6))
plt.plot(history.history['loss'], label='Train Loss')
plt.title('Autoencoder Training History')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)
plt.show()


save_path = 'best_simple_encoder_for_faiss.h5'
encoder_model.save(save_path)
print(f"\nEncoder Model saved successfully at '{save_path}'")

'/content/drive/MyDrive/기계학습기초/datasets/unicode_images/unicode_results_0.npy' loaded. Shape: (14805, 68)
'/content/drive/MyDrive/기계학습기초/datasets/unicode_images/combined_unicode_results_0.npy' loaded. Shape: (16384, 68)
'/content/drive/MyDrive/기계학습기초/datasets/unicode_images/unicode_results_1.npy' loaded. Shape: (16374, 68)
'/content/drive/MyDrive/기계학습기초/datasets/unicode_images/combined_unicode_results_1.npy' loaded. Shape: (16384, 68)
'/content/drive/MyDrive/기계학습기초/datasets/unicode_images/unicode_results_2.npy' loaded. Shape: (11023, 68)
'/content/drive/MyDrive/기계학습기초/datasets/unicode_images/combined_unicode_results_2.npy' loaded. Shape: (16384, 68)
'/content/drive/MyDrive/기계학습기초/datasets/unicode_images/unicode_results_3.npy' loaded. Shape: (1504, 68)
'/content/drive/MyDrive/기계학습기초/datasets/unicode_images/combined_unicode_results_3.npy' loaded. Shape: (16384, 68)
'/content/drive/MyDrive/기계학습기초/datasets/unicode_images/unicode_results_4.npy' loaded. Shape: (7334, 68)
'/content/drive/MyDri

KeyboardInterrupt: 

In [None]:
# 68차원 벡터들을 16차원 벡터들로 변환

best_encoder = load_model('best_kf_encoder_for_faiss.h5')

# TODO : 유니코드를 OCR한 68차원 numpy 배열 불러오기

# X_features_all = np.load('all_unicode_features.npy')
N = 200000 #numpy 배열 크기
INPUT_DIM = 68
X_features_all = np.random.rand(N, INPUT_DIM).astype('float32') # 예시 데이터

X_16dim_vectors = best_encoder.predict(X_features_all)

X_16dim_vectors = X_16dim_vectors.astype('float32')

In [None]:
# 변환한 벡터들 저장

VECTOR_OUTPUT_FILENAME = "faiss_16dim_vectors.npy"

np.save(VECTOR_OUTPUT_FILENAME, X_16dim_vectors)

print(f"16차원 잠재 벡터가 {VECTOR_OUTPUT_FILENAME}에 저장되었습니다. (Shape: {X_16dim_vectors.shape})")

In [None]:
# 유니코드 코드 포인트 배열 생성 (N개의 요소)
# 이 배열은 X_features_all을 생성했을 때의 순서와 정확히 일치해야 합니다.
# 예시: 한글 외 유니코드 코드 포인트만 순서대로 나열
# [0x0000, 0x0001, ..., 0x00A0, ..., 0x10FFFF] (제외 범위 제외)
unicode_codepoints = np.array([
    # 실제 데이터셋을 만들 때 사용된 코드 포인트를 순서대로 여기에 배치해야 함.
    i for i in range(N) # 예시에서는 N개의 임의 인덱스라고 가정
])

MAPPING_OUTPUT_FILENAME = "faiss_unicode_map.npy"
np.save(MAPPING_OUTPUT_FILENAME, unicode_codepoints)

print(f"유니코드 코드 포인트 매핑이 {MAPPING_OUTPUT_FILENAME}에 저장되었습니다. (Shape: {unicode_codepoints.shape})")

In [1]:
%pip install faiss-cpu numpy
%pip install faiss-gpu

Collecting faiss-cpu
  Downloading faiss_cpu-1.13.0-cp39-abi3-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (7.7 kB)
Downloading faiss_cpu-1.13.0-cp39-abi3-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (23.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m23.6/23.6 MB[0m [31m49.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: faiss-cpu
Successfully installed faiss-cpu-1.13.0
[31mERROR: Could not find a version that satisfies the requirement faiss-gpu (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for faiss-gpu[0m[31m
[0m

In [14]:
import numpy as np
import faiss
import os

UnicodeNPY_FILE_PATHS = []
LableNPY_FILE_PATHS = []

# 1. 파일 경로 생성
for i in range(13):
    UnicodeNPY_FILE_PATHS.append(f"/content/drive/MyDrive/기계학습기초/datasets/unicode_images/unicode_results_{i}.npy")
    LableNPY_FILE_PATHS.append(f"/content/drive/MyDrive/기계학습기초/datasets/unicode_images/unicode_labels_{i}.npy")
    UnicodeNPY_FILE_PATHS.append(f"/content/drive/MyDrive/기계학습기초/datasets/unicode_images/combined_unicode_results_{i}.npy")
    LableNPY_FILE_PATHS.append(f"/content/drive/MyDrive/기계학습기초/datasets/unicode_images/combined_unicode_labels_{i}.npy")

loaded_unicode_arrays = []
loaded_lable_arrays = []

# 2. Unicode 데이터 로드
print("--- Unicode 데이터 로딩 시작 ---")
for file_path in UnicodeNPY_FILE_PATHS:
    if os.path.exists(file_path):
        data = np.load(file_path)
        loaded_unicode_arrays.append(data)
        print(f"Loaded: '{os.path.basename(file_path)}' | Shape: {data.shape}")
    else:
        print(f"Warning: File not found at '{file_path}'. Skipping.")

# 3. Label 데이터 로드
print("\n--- Label 데이터 로딩 시작 ---")
for file_path in LableNPY_FILE_PATHS:
    if os.path.exists(file_path):
        # allow_pickle=True 유지
        data = np.load(file_path, allow_pickle=True)
        loaded_lable_arrays.append(data)
        print(f"Loaded: '{os.path.basename(file_path)}' | Shape: {data.shape}")
    else:
        print(f"Warning: File not found at '{file_path}'. Skipping.")

# ========================================================
# 4. 배열 합치기 (이 부분이 추가되었습니다)
# ========================================================
print("\n--- 데이터 병합 시작 ---")

# Unicode 데이터 합치기
if loaded_unicode_arrays:
    final_unicode_X = np.concatenate(loaded_unicode_arrays, axis=0)
    print(f"✅ 최종 Unicode 배열(X) 합치기 완료!")
    print(f"   최종 형태(Shape): {final_unicode_X.shape}")
else:
    print("❌ 병합할 Unicode 데이터가 없습니다.")

# Label 데이터 합치기
if loaded_lable_arrays:
    final_label_Y = np.concatenate(loaded_lable_arrays, axis=0)
    print(f"✅ 최종 Label 배열(Y) 합치기 완료!")
    print(f"   최종 형태(Shape): {final_label_Y.shape}")
else:
    print("❌ 병합할 Label 데이터가 없습니다.")


FAISS_INDEX_FILENAME = "unicode_faiss_index.faiss"


--- Unicode 데이터 로딩 시작 ---
Loaded: 'unicode_results_0.npy' | Shape: (14805, 68)
Loaded: 'combined_unicode_results_0.npy' | Shape: (16384, 68)
Loaded: 'unicode_results_1.npy' | Shape: (16374, 68)
Loaded: 'combined_unicode_results_1.npy' | Shape: (16384, 68)
Loaded: 'unicode_results_2.npy' | Shape: (11023, 68)
Loaded: 'combined_unicode_results_2.npy' | Shape: (16384, 68)
Loaded: 'unicode_results_3.npy' | Shape: (1504, 68)
Loaded: 'combined_unicode_results_3.npy' | Shape: (16384, 68)
Loaded: 'unicode_results_4.npy' | Shape: (7334, 68)
Loaded: 'combined_unicode_results_4.npy' | Shape: (16384, 68)
Loaded: 'unicode_results_5.npy' | Shape: (5785, 68)
Loaded: 'combined_unicode_results_5.npy' | Shape: (16384, 68)
Loaded: 'unicode_results_6.npy' | Shape: (4081, 68)
Loaded: 'combined_unicode_results_6.npy' | Shape: (16384, 68)
Loaded: 'unicode_results_7.npy' | Shape: (5024, 68)
Loaded: 'combined_unicode_results_7.npy' | Shape: (16384, 68)
Loaded: 'unicode_results_8.npy' | Shape: (907, 68)
Loaded: 

In [19]:
# --- FAISS 인덱스 설정 ---
NLIST = 500  # 클러스터(Partition) 개수. (데이터 개수의 sqrt 근처 값 권장)

D = final_unicode_X

# 1. Quantizer 정의: 벡터를 클러스터링할 때 사용할 기준 (일반적으로 L2 거리를 사용하는 Flat 인덱스 사용)
# 'D' 대신 벡터의 차원 (D.shape[1])을 전달해야 합니다.
quantizer = faiss.IndexFlatL2(D.shape[1])

# 2. IVF 인덱스 생성: D차원, NLIST개의 클러스터, L2 거리 사용
# 'D' 대신 벡터의 차원 (D.shape[1])을 전달해야 합니다.
index = faiss.IndexIVFFlat(quantizer, D.shape[1], NLIST, faiss.METRIC_L2)

# 3. 인덱스 훈련 (Training)
# IVF 인덱스는 add 전에 클러스터 중심점을 계산하는 훈련이 필요합니다.
print("FAISS Index Training Start...")
index.train(final_unicode_X)
print("Training Complete. Index is trained:", index.is_trained)

# 4. 벡터 추가 (Adding)
# 16차원 잠재 벡터를 인덱스에 추가합니다.
index.add(final_unicode_X)
print(f"Total vectors added: {index.ntotal}")

# 5. 검색 속도 vs. 정확도 설정 (nprobe)
# 검색 시 확인할 클러스터의 개수. 높을수록 정확하나 느립니다.
# 20만 개에서는 10~30 사이가 적절하며, 20을 추천합니다.
index.nprobe = 20

# 6. 인덱스 저장
faiss.write_index(index, FAISS_INDEX_FILENAME)
print(f"FAISS Index saved as {FAISS_INDEX_FILENAME}")

FAISS Index Training Start...
Training Complete. Index is trained: True
Total vectors added: 265534
FAISS Index saved as unicode_faiss_index.faiss


In [None]:
#한글 이미지 생성기 로드


import os
import numpy as np
from PIL import Image, ImageDraw, ImageFont
from fontTools.ttLib import TTFont
from tqdm import tqdm

from search_fonts import NOTO_FONTS_NAME


# --- 설정 ---
IMAGE_HEIGHT = 128
IMAGE_WIDTH = 128
FONT_SIZE = 96  # 이미지 크기에 맞게 조절 필요
OUTPUT_FILENAME_BASE = "unicode_tensors"  # unicode_tensors_{k}.npy
OUTPUT_LABEL_BASE = "unicode_labels"  # unicode_labels_{k}.npy
UNRENDERED_LOG_FILENAME = (
    "unrendered_characters.txt"  # 렌더 안 되는 빈 유니코드 목록 저장
)

def render_char_to_tensor(char, font_path = "./NotoSansKR-Medium.ttf"):
    """문자를 이미지로 렌더링하고 Numpy 텐서로 변환합니다."""
    try:
        font = ImageFont.truetype(font_path, FONT_SIZE)
    except IOError:
        # Pillow가 폰트 파일을 열지 못하는 경우
        return None

    image = Image.new("L", (IMAGE_WIDTH, IMAGE_HEIGHT), "white")
    draw = ImageDraw.Draw(image)

    # 문자를 이미지 중앙에 정렬하기 위해 바운딩 박스 계산
    try:
        bbox = draw.textbbox((0, 0), char, font=font)
        text_width = bbox[2] - bbox[0]
        text_height = bbox[3] - bbox[1]
        position = (
            (IMAGE_WIDTH - text_width) / 2 - bbox[0],
            (IMAGE_HEIGHT - text_height) / 2 - bbox[1],
        )
    except Exception:
        # 일부 특수 문자는 bbox 계산에 실패할 수 있음
        position = (5, 5)

    draw.text(position, char, font=font, fill="black")

    # 이미지를 Numpy 배열로 변환하고 0~1 사이 값으로 정규화
    tensor = np.array(image, dtype=np.float32) / 255.0

    # reshape()의 첫 번째 차원은 제거합니다. 배열에 직접 할당할 것이기 때문입니다.
    return tensor.reshape((IMAGE_WIDTH, IMAGE_HEIGHT, 1))

In [None]:
#OCR 모델 로드
MODEL_PATH = "./final.keras"

L_SLICE = slice(0, 19)
V_SLICE = slice(19, 40)
T_SLICE = slice(40, 68)

def hangul_loss(y_true, logits):
    cho  = tf.nn.softmax_cross_entropy_with_logits(labels=y_true[:, L_SLICE], logits=logits[:, L_SLICE])
    jung = tf.nn.softmax_cross_entropy_with_logits(labels=y_true[:, V_SLICE], logits=logits[:, V_SLICE])
    jong = tf.nn.softmax_cross_entropy_with_logits(labels=y_true[:, T_SLICE], logits=logits[:, T_SLICE])
    return cho + jung + jong

def acc_first(y_true, y_pred):
    return tf.reduce_mean(tf.cast(tf.equal(tf.argmax(y_true[:, L_SLICE], -1),
                                           tf.argmax(y_pred[:, L_SLICE], -1)), tf.float32))
def acc_middle(y_true, y_pred):
    return tf.reduce_mean(tf.cast(tf.equal(tf.argmax(y_true[:, V_SLICE], -1),
                                           tf.argmax(y_pred[:, V_SLICE], -1)), tf.float32))
def acc_last(y_true, y_pred):
    return tf.reduce_mean(tf.cast(tf.equal(tf.argmax(y_true[:, T_SLICE], -1),
                                           tf.argmax(y_pred[:, T_SLICE], -1)), tf.float32))
def acc_joint(y_true, y_pred):
    l_ok = tf.equal(tf.argmax(y_true[:, L_SLICE], -1), tf.argmax(y_pred[:, L_SLICE], -1))
    v_ok = tf.equal(tf.argmax(y_true[:, V_SLICE], -1), tf.argmax(y_pred[:, V_SLICE], -1))
    t_ok = tf.equal(tf.argmax(y_true[:, T_SLICE], -1), tf.argmax(y_pred[:, T_SLICE], -1))
    return tf.reduce_mean(tf.cast(l_ok & v_ok & t_ok, tf.float32))

# --- 모델 로드 ---
model = tf.keras.models.load_model(
    MODEL_PATH,
    custom_objects={
        "hangul_loss": hangul_loss,
        "acc_first": acc_first,
        "acc_middle": acc_middle,
        "acc_last": acc_last,
        "acc_joint": acc_joint,
    }
)

def ocr(character):
    X = render_char_to_tensor(character)                               # shape: (N, 128,128,1)

    if X.ndim == 3:
        X = np.expand_dims(X, -1)
    X = X.astype(np.float32)
    if np.max(X) > 1.5:
        X /= 255.0                                  # 정규화

    print(f"▶ Predicting {f_in} -> {out_path} (shape={X.shape})")

    # 모델 예측
    preds = model.predict(X, batch_size=256, verbose=1)  # (N,68)

    return pred


In [None]:
# FAISS 인덱스 로드
index = faiss.read_index(FAISS_INDEX_FILENAME)

def find_similar_unicode(korean_char, k=5):
    """
    한글 문자를 입력받아 시각적으로 가장 유사한 K개의 유니코드 문자를 반환합니다.
    """

    char_vector = ocr(korean_char)



    D, I = index.search(char_vector, k)

    # --- 3. 결과 해석 및 반환 ---
    results = []

    # 가장 가까운 이웃의 인덱스와 거리를 순회합니다.
    for rank, (idx, distance) in enumerate(zip(I[0], D[0])):
        if idx >= 0: # 유효한 인덱스일 경우
            code_point = U_map[idx]
            unicode_char = chr(code_point)

            results.append({
                "Rank": rank + 1,
                "Character": unicode_char,
                "CodePoint": f"U+{code_point:04X}",
                "Similarity_Distance": float(distance) # 거리가 낮을수록 유사
            })

    return results

# --- 사용 예시 ---
input_char = "한"
similar_chars = find_similar_unicode(input_char, k=10)

print(f"--- '{input_char}'와 시각적으로 유사한 상위 10개 유니코드 문자 ---")
for result in similar_chars:
    print(
        f"[{result['Rank']}] {result['Character']} "
        f"({result['CodePoint']}) - Distance: {result['Similarity_Distance']:.4f}"
    )

NameError: name 'query_encoder' is not defined