In [1]:
import tensorflow as tf
from tensorflow import keras

# 12.3. 사용자 정의 모델과 훈련 알고리즘
- 가장 간단하고 많이 사용하는 사용자 정의 손실함수를 만들어보자
- 전체적으로 클래스 상속까지는 모르더라도, 적어도 def 하는 부분과 def한 놈을 불러오는 놈까지는 알고 넘어가자

### 12.3.1. 사용자 정의 손실함수
- 만약 회귀모델을 훈련하는데 훈련 세트에 잡음이 있다고 가정
- 이상치를 제거하거나 고쳐서 데이터셋을 수정해보는 방법이 가장 원론적임
    - 하지만 비효율적이고 잡음을 백프로 제거하기는 어려움
- 이럴 때는 손실함수를 바꿔서 시도하는 것도 괜춘함
    - 1) MSE : 큰 오차에 너무 큰 벌칙을 가하기 때문에 정확도가 떨어질 것
    - 2) MAE : 이상치에 너무 관대하여 최적값을 찾는 데 시간이 오래걸릴것이며 정밀도가 떨어질 것
- 이럴 때 사용하기 좋은 것이 **후버 손실**임
    - 아직 keras 공식 API지원은 없고, tf.keras.losses.Huber에서 지원하긴 함
- 이를 구현하려면 아래와 같이 레이블과 예측을 매개변수로 받는 함수를 만들고 텐서플로 연산을 이용하여 손실을 계산하면 됨

In [2]:
def huber_fn(y_true, y_pred):
    error = y_true - y_pred
    is_small_error = tf.abs(error) < 1
    squared_loss = tf.square(error) / 2
    linear_loss = tf.abs(error) - 0.5
    return tf.where(is_small_error, squared_loss, linear_loss)

- 이 때 주의할 점은 **전체 손실의 평균이 아니라, 하나의 손실을 담은 텐서를 반환하는 것이 좋음**
    - 이렇게 해야 필요할 때 케라스가 클래스 가중치나 샘플 가중치를 적용할 수 있음
- 해당 손실함수를 이용하여 컴파일하는 방식은 아래와 같음

In [3]:
# model.compile(loss = huber_fn, optimizer = "nadam")
# model.fit(X_tr, y_tr, [...])

- 훈련은 이런 식으로 매우 간단하게 할 수 있음
- 하지만 모델을 저장하고 로드할 때는 문제가 발생할 수 있다!

### 12.3.2. 사용자 정의 요소를 가진 모델을 저장하고 로드하기
- 케라스가 함수 이름을 저장하므로, 사실 저장 자체에는 문제가 없음
- 하지만, 모델을 로드할 때는 함수 이름과 실제 함수를 매핑한 **딕셔너리 객체**를 전달해야함
- 좀 더 일반적으로 정리하자면, 사용자 정의 객체를 포함한 모델을 로드할 때는 그 이름과 객체를 **매핑**해줘야함
    - model을 load해올 때 custom_object 인자를 이용

In [4]:
def huber_fn(y_true, y_pred):
    error = y_true - y_pred
    is_small_error = tf.abs(error) < 1
    squared_loss = tf.square(error) / 2
    linear_loss = tf.abs(error) - 0.5
    return tf.where(is_small_error, squared_loss, linear_loss)

# model = keras.models.load_models("my_model.h5", custom_object = {"huber_fn" : huber_fn})

- 그런데 위와 같이 후버 함수를 만들어서 저장할 때, "작은 것"의 기준을 따로 threshold로 정하고 싶을 수 있음
- 그럴 때는 huber_fn의 함수를 정의하는 함수를 만들어 threshold를 인자로 건내주면 됨

In [5]:
def create_huber(threshold = 1.0):
    def huber_fn(y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < threshold
        squared_loss = tf.square(error) / 2
        linear_loss = tf.abs(error) - threshold**2/2
        return tf.where(is_small_error, squared_loss, linear_loss)        
    return huber_fn

# model.compile(loss = create_huber(2.0), optimizer = "nadam")

- 그런데 이렇게 했을 때 아까처럼 모델로딩 시 문제가 또 발생하는데, threshold값이 저장되지 않기 때문임
- 그래서 모델을 로드할 때 위에서 custom_object를 잡아줬던 것 처럼 똑같이 threshold도 잡아줘야함
    - *json으로 모델 훈련시 썼던 하이퍼파라미터들을 다 저장하는 것이 좋을 것 같음...*

In [6]:
# model = keras.load_model("my_model.h5", custom_obejct   = {"huber_fn" : create_huber(2.0)})

- 참고로 새로 정의한 함수가 아닌, 컴파일 할 때 생성이 된 fn의 이름을 custom_object로 전달해야함
    - 즉 컴파일할때는 새로 정의된 함수를 썼지만, 결국 리턴된 값은 huber_fn

- 위의 문제는 keras.losses.Loss클래스를 상속하고 get_config()메서드를 구현하여 해결할 수도 있음

In [7]:
class HuberLoss(keras.losses.Loss):

    def __init__(self, threshold = 1.0, **kwargs):
        self.threshold = threshold
        super().__init__(**kwargs)
        
    def call(self, y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < self.threshold
        squared_loss = tf.square(error) / 2
        linear_loss = self.threshold * tf.abs(error) - self.threshold**2 / 2
        return tf.where(is_samll_error, square_loss, linear_loss)
    
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold" : self.threshold}

- 이렇게하면 컴파일도 쉬워지며, 저장과정에서 threshold도 함께 저장이됨
- 대신 모델 로드 시 **클래스 이름**과 **클래스 자체**를 매핑시켜줘야함

In [8]:
# model.compile(loss = HuberLoss(2.), optimizer=  "nadam")
# model = kears.models.load_model("my_model.h5", custom_objects = {"HuberLoss" : HuberLoss})

- 모델을 저장할 떄 케라스가 손실 객체의 get_config()메서드를 호출해서 반환된 설정을 HDF5파일에 json형태로 저장
    - 그래서 모델을 로딩할 때 HuberLoss클래스의 (상속된 메서드인) from_config()클래스 메서드를 호출함
    - 이 메서드는 기본 손실 클래스에 구현되어있고 생성자에 \*\*config매개변수를 전달해서 클래스 인스턴스를 생성

### 12.3.3. 활성화 함수, 초기화, 규제, 제한 커스터마이징
- 대부분 위와 유사한 방식으로 커스터마이징 가능
    - 보통은 적절한 입/출력을 가진 간단한 함수를 작성하면 됨
- 다음은 사용자 정의 softplus활성화함수, 글로럿 초기화, L1규제, 정의 제한(양수인 가중치만 남기는 것)에 대한 예
    - 위 케이스들은 각각 keras.activations.softplus(), tf.nn.softplus()
    - keras.initializers.glorot_normal
    - keras.regularizers.l1(0.01)
    - keras.constraints.nonneg(), tf.nn.relu()와 동일함

In [9]:
def my_softplus(z):
    return tf.math.log(tf.exp(z) + 1.0)

def my_glorot_initializer(shape, dtype = tf.float32):
    stddev = tf.sqrt(2. / (shape[0] + shape[1]))
    return tf.random.normal(shape, stddev = stddev, dtype = dtype)

def my_l1_regularizer(weights):
    return tf.reduce_sum(tf.abs(0.01 * weights))

def my_positive_weights(weights):
    return tf.where(weights < 0., tf.zeros_like(weights), weights)

- 위에서 보듯 매개변수는 사용자가 정의하려는 함수에 따라 다름
- 이렇게 만들어진 함수는 일반 함수오 ㅏ동일하게 사용할 수 있음

In [10]:
layer = keras.layers.Dense(30, activation = my_softplus,
                          kernel_initializer = my_glorot_initializer,
                          kernel_regularizer = my_l1_regularizer,
                          kernel_constraint = my_positive_weights)

- 만약 함수과 모델과 함께 저장해야할 하이퍼파라미터를 가지고 있다면,
    - keras.regularizers.Regularizer, keras.constraints.Constraint, keras.initializers.Initializer, keras.layesr.Layer와 같이 적절한 클래스를 상속해야함
- 다음은 사용자 정의 손실을 만들었던 것처럼, factor 하이퍼파라미터를 저장하는 l1규제를 위한 간단한 클래스
    - 이 경우에는 부모 클래스 생성자와 get_config()메서드가 정의되어있지 않아 호출할 필요가 없음

In [11]:
class MyL1Regularizer(keras.regularizers.Regularizer):
    
    def __init__(self, factor):
        self.factor = factor
    def __call__(self, weights):
        return tf.reduce_sum(tf.abs(self.factor * weights))
    def get_config(self):
        return {"factor" : self.factor}

- 손실, 활성화 함수를 포함하는 층, 모델의 경우 call()메서드를 구현해야함
- 규제, 초기화, 제한의 경우 \_\_call()\_\_ 메서드를 구현해야함
    - 지표의 경우 조금 다름

### 12.3.4. 사용자 정의 지표
- 손실과 지표는 개념적으로 다른 것은 아님
- 크로스엔트로피와 같은 손실은 모델을 훈련하기 위해 경사하강법에 이용되므로 미분 가능해야하며, 그래디언트가 0이 아니어야함
- 반면 정확도와 같은 지표는 손실에 비해 훨씬 이해하기 쉬워야하며, 미분이 불가하거나 그래디언트가 0이어도 됨

- 이런 차이가 있지만 일반적인 경우에는 사용자 지표 함수를만드는 것은 사용자 손실함수를 만드는 것과 동일함
    - 사실 후버 손실함수는 지표로도 잘 작동함
    - *모델 저장 시에도 동일하게 함수 이름 huber_fn만 저장되므로, 불러올 때는 새로 정의해야함
    - *물론 지표료 잘 사용하지는 않음(MAE나 MSE가 선호됨)

In [12]:
def create_huber(threshold = 1.0):
    def huber_fn(y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < threshold
        squared_loss = tf.square(error) / 2
        linear_loss = tf.abs(error) - threshold**2/2
        return tf.where(is_small_error, squared_loss, linear_loss)        
    return huber_fn

# model.compile(loss = "mse", optimizer = "nadam", metrics = [create_huber(2.0)])

- 훈련하는 동안 각 배치에 대해 케라스는 지표를 계산하고, 에포크가 시작할 때부터 평균을 기록함
- 보통은 이렇게 하는게 맞는데, 아닐 때도 있음
    - 예를 들면, 이진분류기의 정밀도 문제를 생각
        - *Precision = TP/(TP+FP)*
        - 모델이 첫 배치에서 5개의 양성 예측을 만들고 그 중 4개가 맞았다면 정밀도는 80%가 됨
        - 두 번째 배치에서 3개의 양성 예측을 만들었는데 모두 틀렸다면 정밀도는 0%가 됨
        - 그래서 일반적으로 평균을 기록하게 되면 40%가 되는 것
        - 그런데 다시 따져보면 총 8개의 양성 예측 중 4개가 맞은 것이므로 사실은 40%가 아니라 50%인거심!!
    - 따라서 TP와 FP의 개수를 기록하고 필요할 때 정밀도를 계산할 수 있는 객체가 필요함
    - 이 때 사용하는 것이 **keras.metrics.Precision**클래스
- 위에서 사용된 예시를 한 번 구현해보자

In [13]:
precision = keras.metrics.Precision()
print(precision([0, 1, 1, 1, 0, 1, 0, 1], [1, 1, 0, 1, 0, 1, 0, 1]))
print(precision([0, 1, 0, 0, 1, 0, 1, 1], [1, 0, 1, 1, 0, 0, 0, 0]))

tf.Tensor(0.8, shape=(), dtype=float32)
tf.Tensor(0.5, shape=(), dtype=float32)


- 위에서 보면 각 배치의 사례를 만들어봄
    - 반환한 결과값을 보면 각 배치의 정밀도가 아니라, 두 번째 배치까지 돌고 나서의 정밀도임을 볼 수 있음
    - precision객체를 만들고 나서 y_hat과 y값이 입력된 것임!!
- 이러한 지표를 보통 **스트리밍 지표**라고 부름
- 위와 같이 계산된 지점에서 result()메서드를 호출하면 현재의 지표값을 얻을 수 있음
- vairables 속성을 사용하면 변수를 확인할 수도 있음
- reset_states()메서드를 이용하면 이 변수를 초기화시킬 수도 있음

In [14]:
precision.result()

<tf.Tensor: id=121, shape=(), dtype=float32, numpy=0.5>

In [15]:
precision.variables

[<tf.Variable 'true_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>,
 <tf.Variable 'false_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>]

In [16]:
precision.reset_states()
precision

<tensorflow.python.keras.metrics.Precision at 0x1c5a0771e88>

- 만약 만들어져있지 않은 스트리밍 지표를 만들고 싶다면 keras.metrics.Metric클래스를 상속
    - 다음 예시는 전체 후버손실과 지금까지 처리한 샘플 수를 기록하는 클래스고 결괏값을 요청할 경우 평균 후버손실이 반환되도록 함

In [17]:
class HuberMetric(keras.metrics.Metric):
    
    def __init__(self, threshold = 1.0, **kwargs):
        super().__init__(**kwargs)
        self.threshold = threshold
        self.huber_fn = create_huber(threshold)
        self.total = self.add_weight("total", initializer = "zeros")
        self.total = self.add_weight("count", initializer = "zeors")
        
        # add_weight메서드로 여러 배치에 걸쳐 지표의 상태를 기록하기 위한 변수를 생성자에 할당
        # 이 예에서는 후버 손실의 합과 지금까지 처리한 샘플 수를 기록함
        
    def update_state(self, y_true, y_pred, sample_weight = None):
        metric = self.huber_fn(y_true, y_pred)
        self.total.assign_add(tf.reduce_sum(metric))
        self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))
        
        # 위 클래스를 함수처럼 사용할 때 사용(위의 Precision 객체의 사례처럼)
        # 배치의 레이블과 예측을 바탕으로 변수를 업데이트
        
    def result(self):
        return self.total / self.count
    
        # 최종 결과를 계산하고 반환함

    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold" : self.threshold}
    
        #get_config()메서드를 구현함으로써, threshold 변수를 모델과 함께 저장함
            

- 사실 이 예시보다 더 위의 후버함수의 케이스처럼, 지표를 간단한 함수로 정의하기만 해도 우리가 수동으로 했던 것처럼 케라스가 배치마다 자동으로 이 함수를 호출하고 에포크 동안 평균을 기록함
    - 그래서 이렇게 HuberMetrics 클래스를 구현하는 이점은 threshold를 저장할 수 있다는 것밖에 없기는 함
- 물론 정밀도와 같이 어떤 지표는 배치에 걸쳐 단순히 평균을 낼 수는 없음
    - 이런 경우에는 위와 같이 스트리밍 지표를 직접 구현하는 것 외에는 방법이 없음

### 12.3.5. 사용자 정의 층
- 가끔 텐서플로에 없는 특이한 층을 가진 네트워크를 만들어야하는 케이스가 있음
- 이럴 때 사용자 정의 층을 만들게 됨
- 동일한 층 블럭이 여러번 반복되는 네트워크를 만들 때는 각각의 층 블럭을 하나의 층으로 다루는 것이 편리함
    - 예를 들어 모델이 a, b, c, a, b, c, a, b, c 순서로 구성되었다면, a, b, c를 하나로 묶어 A로 만든 뒤 A, A, A로 구성된 층을 만들 수도 있음

- kears.layers.Flatten이나 keras.layers.ReLU와 같은 층은 가중치가 없는데, 이런 가중치가 없는 사용자 정의 층을 만들기 위해서는 파이썬 함수를 만든 후 kearas.layers.Lambda 층으로 감싸는 것
    - 아래는 가중치 없이 지수함수를 적용하는 층을 맨드러봄

In [18]:
exponential_layer = keras.layers.Lambda(lambda x : tf.exp(x))

- 이렇게 간단히 정의해주면, 사용자 정의 층을 시퀀셜 API나 함수형 API, 서브클래싱 API에서 모두 사용할 수 있음
    - 그리고 활성화 함수로도 사용할 수 있음
- 지수함수는 회귀 모델에서 예측값의 스케일이 매우 다를 때 출력층에 사용될 수 있음

- 상태가 있는(가중치가 있는) 층을 만드려면 kears.layesr.Layer를 상속해야함
- 다음 클래스는 Dense를 간소하게 구현해보는 것

In [20]:
class MyDense(keras.layers.Layer):
    
    def __init__(self, units, activation = None, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.activation = keras.activations.get(activation)
        
    # 모든 하이퍼파라미터를 생성자가 인자로 받아야함
    # **kwargs 매개변수를 추가하는 것도 중요 -> 부모 생성자를 호출하면서 kwargs를 전달
    # ### 이를 통해서 input_shape, trainable, name과 같은 기본 매개변수를 처리할 수 있음
    # 그 다음 하이퍼파라미터를 속성으로 저장하고 activation 인자를 keras.activation.get()함수를 사용하여 적절한 활성화함수로 변경가능하게 함
        
    def build(self, batch_input_shape):
        self.kernel = slef.add_weight(
            name = "kernel", shape = [batch_input_shape[-1], slef.units],
            initializer = "glorot_normal")
        self.bias = self.add_weights(
            names = "bias", shape = [self.units], initializers = "zeors")
        super().build(batch_input_shape)
        
    # 가중치마다 add_weight()메서드를 호출하여 층의 변수를 만듦
    # build() 메서드는 층이 처음 사용될 때 호출됨
    # ### 이 쯤이면 케라스가 층의 입력 크기를 알고 있을 것 -> 따라서 build 메서드의 입력으로 크기를 전달함
    # 가중치를 만들 때 크기가 꼭 필요한 경우가 있는데, 연결 가중치를 만드려면 이전 층의 뉴런 수를 알아야함
    # 이 크기는 입력의 마지막 차원 크기에 해당하며 build()메서드 끝에서 부모의 build메서드를 호출해야함
        
    def call(self, X):
        return self.activation(X @ self.kernel + self.bias)
    
    # 이 층에서 필요한 연산을 수행함
    # ### 지금의 경우 입력 X와 층의 커널의 행렬곱을 수행하고 편향을 더하며, 여기에 활성화함수를 적용하여 출력
    
    def compute_output_shape(self, batch_input_shape):
        return tf.TensorShape(batch_input_shape.as_list()[:-1] + [self.units])
        
    # 층의 출력의 크기를 반환함
    # 지금 예시에서는 마지막 차원을 제외하고 입력과 크기가 같음
    # 마지막 차원은 이 층의 뉴런 수
        
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "units" : self.units,
               "activation" : keras.activations.serialize(self.activation)}
    
    # keras.activations.serialize()를 이용하여 활성화 함수 전체 설정을 저장

- 여러 입력을 받는 층(ex : Concatenate)을 만드려면 call() 메서드에 모든 입력이 포함된 튜플을 매개변수 값으로 전달해야함
- 비슷하게 compute_output_shape() 메서드의 매개변수도 각 이볅의 배치 크기를 담은 튜플이어야함
- 여러 출력을 가진 층을 만드려면 call() 메서드가 출력의 리스트를 반환해야함
- compute_output_shape() 메서드는 배치 출력 크기의 리스트를 반환해야함
- 예를 들어 다음 에는 두 개의 입력과 세 개의 출력을 만드는 층임

In [22]:
class MyMultiLayer(keras.layers.Layer):

    def call(self, X):
        X1, X2 = X
        return [X1 + X2, X1 + X2, X1 / X2]
    
    def compute_output_shape(self, batch_input_shape):
        b1, b2 = batch_input_shape
        return [b1, b1, b1]

- 위 층은 다른 일반적인 층처럼 사용할 수 있지만, 함수형 API와 서브클래싱 API에만 사용할 수 있음
    - 시퀀셜의 경우 하나의 입력과 하나의 출력을 가진 층만 사용하니까

- 훈련과 테스트에서 다르게 동작하는 층이 필요하다면(ex : Dropout, BatchNormalization), call() 메서드에 training 매개변수를 추가하여 훈련인지 테스트인지를 결정해야함
- 예를 들어, 훈련하는 동안에는 규제 목적으로 가우스 잡음을 추가하고 테스트 시에는 아무것도 하지 않는 층을 만들어보자
    - *keras.layers.GaussianNoise층이 그 역할을 수행하기는 함*

In [23]:
class MyGaussianNoise(keras.layers.Layer):
    
    def __init__(self, stddev, **kwargs):
        super().__init__(**kwargs)
        self.stddev = stddev
        
    def call(self, X, training = None):
        if training:
            noise = tf.random.normal(tf.shape(X), stddev = self.stddev)
            return X + noise
        else:
            return X
        
    def compute_output_shape(self, batch_input_shape):
        return batch_input_shape

- 추후 필요시 다시 진행