In [1]:
import tensorflow as tf
from tensorflow.keras import Input, models, layers
import matplotlib.pyplot as plt
# import keras.api.ops as K
import tensorflow.keras.backend as K
from keras import models, layers, optimizers

2025-06-23 12:09:59.248096: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-06-23 12:09:59.255214: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1750648199.263153   58640 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1750648199.265829   58640 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1750648199.272221   58640 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking 

In [2]:
# 1. Load MNIST dataset

mnist = tf.keras.datasets.mnist
(x_train, _), (x_test, _) = mnist.load_data()

x_train = x_train.astype('float32') / 255.
x_test = x_test.astype('float32') / 255.

# 28*28 이미지를 1차원 벡터(784)로 변환
x_train = x_train.reshape((-1, 28*28))
x_test = x_test.reshape((-1, 28*28))

In [3]:
# 2. functional API로 구성한 autoencoder
def build_autoencoder(input_dim: int) -> models.Model:
    # encoder
    x = inputs = layers.Input(shape=(input_dim,))
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dense(64, activation='relu')(x)
    x = layers.Dense(32, activation='relu')(x)

    # decoder
    x = layers.Dense(64, activation='relu')(x)
    x = layers.Dense(128, activation='relu')(x)
    x = outputs = layers.Dense(input_dim, activation='sigmoid')(x)

    # model definition
    autoencoder = models.Model(inputs=inputs, outputs=outputs)
    autoencoder.compile(optimizer='adam', loss='mse')
    return autoencoder

In [8]:
# 3. MSE model
def make_model_mse(model: models.Model):
    assert isinstance(model, models.Model)
    input_s = model.inputs[0] 
    output_s = model.outputs[0] 

    # error_s = K.mean(K.square(input_s - output_s), axis=[1, ]) # MSE

    # diff = layers.Subtract()([input_s, output_s]) # 얘도 가능
    diff = input_s - output_s
    squared = layers.Multiply()([diff, diff]) # K.square
    error_s = layers.Lambda(lambda x: tf.reduce_mean(x, axis=1))(squared) # K.mean (axis=1)
    
    return models.Model(inputs=input_s, outputs=error_s)

# model_mse = make_model_mse(autoencoder)
# autoencoder.fit()으로 autoencoder를 학습시키면, model_mse의 파라미터 역시 자동으로 튜닝됩니다!

In [9]:
# 4. autoencoder 객체 생성 및 학습(fit)
autoencoder = build_autoencoder(input_dim=784)
autoencoder.fit(x_train, x_train, epochs=10, batch_size=256, shuffle=True, validation_split=0.1)

# 5. MSE 계산 모델 정의
model_mse = make_model_mse(autoencoder)

# 6. reconstruction error 계산
mse = model_mse.predict(x_test)
print(mse[:10])

Epoch 1/10
[1m211/211[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 9ms/step - loss: 0.0998 - val_loss: 0.0392
Epoch 2/10
[1m211/211[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - loss: 0.0357 - val_loss: 0.0274
Epoch 3/10
[1m211/211[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - loss: 0.0259 - val_loss: 0.0218
Epoch 4/10
[1m211/211[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1ms/step - loss: 0.0213 - val_loss: 0.0194
Epoch 5/10
[1m211/211[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - loss: 0.0191 - val_loss: 0.0178
Epoch 6/10
[1m211/211[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - loss: 0.0176 - val_loss: 0.0165
Epoch 7/10
[1m211/211[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - loss: 0.0165 - val_loss: 0.0157
Epoch 8/10
[1m211/211[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - loss: 0.0154 - val_loss: 0.0148
Epoch 9/10
[1m211/211[0m [32m━━━━━━━━

In [12]:
print(tf.__version__)
import keras
print(keras.__version__)

2.19.0
3.10.0


### w/ Caching~~

In [16]:
from pathlib import Path
import pickle
import numpy as np
from keras.models import Model
from tqdm import trange

# 1. 캐시 폴더 생성
path_cache = Path('./cache_mse')
path_cache.mkdir(exist_ok=True)

In [17]:
# 2. 메모리에 담긴 데이터들을 모두 모아 MD5 hash를 만드는 함수
def calculate_md5(*args, __read_path_object__: bool = False, **kwargs) -> str:
    """
    메모리에 담긴 데이터들을 모두 모아 MD5 hash를 만드는 함수

    :param args: positional arguments to be hashed
    :param __read_path_object__: Path 인스턴스의 처리 방법을 결정
    False (기본값): Path 인스턴스, 즉, Python 프로세스의 메모리 값을 hash에 사용
    True: Path 인스턴스가 가리키는 파일에 담긴 디스크 값을 hash에 사용. 파일은 있을 것으로 가정. 파일이 없으면 FileNotFoundError 예외 발생.
    :param kwargs: keyword arguments
    :return: MD5 hash 문자열
    """
    import hashlib
    import pickle
    from pathlib import Path
    import pandas as pd

    def __polish_arg__(arg) -> bytes:
        if isinstance(arg, (pd.DataFrame, pd.Series)):  # pandas Dataframe needs attention.
            arg = pd.util.hash_pandas_object(arg)  # pd.Dataframe or pd.Series becomes pd.Series.
            arg = (arg.index.values, arg.values)  # make a series a tuple containing indices and values

        if __read_path_object__ and isinstance(arg, Path):
            arg = arg.read_bytes()  # (if necessary) read file content as bytes
        if isinstance(arg, str):
            arg = arg.encode()  # str to bytes
        elif not isinstance(arg, bytes):
            arg = pickle.dumps(arg)  # any other object to bytes
        return arg  # it must be bytes.

    m = hashlib.md5()
    for arg in args:
        arg = __polish_arg__(arg)
        m.update(arg)
    for key, value in kwargs.items():
        m.update(__polish_arg__(key))
        m.update(__polish_arg__(value))
    return m.hexdigest()

In [18]:
# 3. 캐싱 예측 함수 정의
def predict_mse(model_mse: Model, X: np.ndarray, label: str = "") -> np.ndarray:
    """
    입력 데이터 X에 대해 model_mse의 예측을 캐싱하여 중복 계산 방지

    :param model_mse: mse 계산용 Keras 모델
    :param X: 입력 데이터 (numpy.ndarray)
    :param label: 동일한 데이터라도 다른 설정일 경우 label로 캐시 분기 가능
    => label="v1-autoencoder-dropout0.2"처럼 모델 버전명, threshold 설정 등을 넣을 수 있음. 이렇게 하면 같은 X라도 모델 설정이 달라지면 새로운 캐시가 만들어진다.
    :return: reconstruction error
    """
    
    # 1. 캐시 키 생성
    key = calculate_md5(X, label)
    path = path_cache.joinpath(f'{key}.pkl')

    # 2-1. 캐시가 있다면 로드
    if path.exists():
        print(f"[Cache 사용] {path.name}")
        return pickle.loads(path.read_bytes())

    # 2-2. 없으면 계산
    print(f"[Cache 없음] 예측 수행 중...")
    errors = model_mse.predict(X, verbose=0)

    # 3. 결과 저장
    path.write_bytes(pickle.dumps(errors))
    print(f"[Cache 저장 완료] {path.name}")
    return errors

In [19]:
mse_scores = predict_mse(model_mse, x_test, label="mnist-ae-v1")

[Cache 없음] 예측 수행 중...
[Cache 저장 완료] f6e9ffe4847f542b3a8464ceeaa4f825.pkl


In [20]:
print("MSE shape", mse_scores.shape)
print(mse_scores[:10])

MSE shape (10000,)
[0.00801274 0.01849638 0.00284302 0.00903244 0.011913   0.00143948
 0.020179   0.02408656 0.01781524 0.01150522]


In [21]:
mse_scores = predict_mse(model_mse, x_test, label="mnist-ae-v1")

[Cache 사용] f6e9ffe4847f542b3a8464ceeaa4f825.pkl


In [22]:
print("MSE shape", mse_scores.shape)
print(mse_scores[:10])

MSE shape (10000,)
[0.00801274 0.01849638 0.00284302 0.00903244 0.011913   0.00143948
 0.020179   0.02408656 0.01781524 0.01150522]
