<a href="https://colab.research.google.com/github/robomoan/Data_Science_Study/blob/main/Handson_ML/handson_ml_12.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 핸즈온 머신러닝 12장
## 텐서플로를 사용한 사용자 정의 모델과 훈련

### 텐서와 연산

In [1]:
import numpy as np
import pandas as pd
import tensorflow as tf

In [2]:
# 벡터
tf.constant([[1, 2, 3], [4, 5, 6]])

<tf.Tensor: shape=(2, 3), dtype=int32, numpy=
array([[1, 2, 3],
       [4, 5, 6]], dtype=int32)>

In [3]:
# 스칼라
tf.constant(42)

<tf.Tensor: shape=(), dtype=int32, numpy=42>

In [4]:
t = tf.constant([[1, 2, 3], [4, 5, 6]])

# Shape
t.shape

TensorShape([2, 3])

In [5]:
# 데이터 타입
t.dtype

tf.int32

In [6]:
# 인덱스 참조
t[:, 1:] 

<tf.Tensor: shape=(2, 2), dtype=int32, numpy=
array([[2, 3],
       [5, 6]], dtype=int32)>

In [7]:
t[..., 1]

<tf.Tensor: shape=(2,), dtype=int32, numpy=array([2, 5], dtype=int32)>

In [8]:
# tf.newaxis: 새로운 축 생성
t[..., 1, tf.newaxis]

<tf.Tensor: shape=(2, 1), dtype=int32, numpy=
array([[2],
       [5]], dtype=int32)>

In [9]:
# 텐서 연산
# 더하기
t + 10
# tf.add(t, 10)

<tf.Tensor: shape=(2, 3), dtype=int32, numpy=
array([[11, 12, 13],
       [14, 15, 16]], dtype=int32)>

In [10]:
# 제곱
tf.square(t)

<tf.Tensor: shape=(2, 3), dtype=int32, numpy=
array([[ 1,  4,  9],
       [16, 25, 36]], dtype=int32)>

In [11]:
# 행렬 곱셈
t @ tf.transpose(t)

<tf.Tensor: shape=(2, 2), dtype=int32, numpy=
array([[14, 32],
       [32, 77]], dtype=int32)>

### 케라스 저수준 API

In [12]:
# Keras 저수준 API
from tensorflow import keras
K = keras.backend
K.square(K.transpose(t)) + 10

<tf.Tensor: shape=(3, 2), dtype=int32, numpy=
array([[11, 26],
       [14, 35],
       [19, 46]], dtype=int32)>

### 텐서와 넘파이

In [13]:
# 넘파이 배열의 텐서화
a = np.array([2., 4., 5.])
tf.constant(a)

<tf.Tensor: shape=(3,), dtype=float64, numpy=array([2., 4., 5.])>

In [14]:
# 텐서의 넘파이 배열화
t.numpy()

array([[1, 2, 3],
       [4, 5, 6]], dtype=int32)

In [15]:
# 넘파이 배열의 텐서 연산
# 연산 결과는 텐서로 나온다.
tf.square(a)

<tf.Tensor: shape=(3,), dtype=float64, numpy=array([ 4., 16., 25.])>

In [16]:
# 텐서의 넘파이 연산
# 연산 결과는 넘파이 배열로 나온다.
np.square(t)

array([[ 1,  4,  9],
       [16, 25, 36]], dtype=int32)

### 타입 변환

In [17]:
# float32 자료형 + int32 자료형: 오류
tf.constant(2.) + tf.constant(40)

InvalidArgumentError: ignored

In [18]:
# float32 자료형 + float64 자료형: 오류
tf.constant(2.) + tf.constant(40., dtype = tf.float64)

InvalidArgumentError: ignored

In [19]:
# tf.cast: 타입 변환
t2 = tf.constant(40., dtype = tf.float64)
tf.constant(2.) + tf.cast(t2, tf.float32)

<tf.Tensor: shape=(), dtype=float32, numpy=42.0>

### 변수

In [20]:
v = tf.Variable([[1., 2., 3.], [4., 5., 6.]])
v

<tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>

In [21]:
# 변숫값 변경
v.assign(2 * v)

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2.,  4.,  6.],
       [ 8., 10., 12.]], dtype=float32)>

In [22]:
v[0, 1].assign(42)

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 42.,  6.],
       [ 8., 10., 12.]], dtype=float32)>

In [23]:
v[:, 2].assign([0, 1])

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 42.,  0.],
       [ 8., 10.,  1.]], dtype=float32)>

In [24]:
v.scatter_nd_update(indices = [[0, 0], [1, 2]], updates = [100, 200])

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[100.,  42.,   0.],
       [  8.,  10., 200.]], dtype=float32)>

### 사용자 정의 모델 및 알고리즘

In [25]:
# 후버 손실
def huber_fn(y_true, y_pred):
  error = y_true - y_pred
  is_small_error = tf.abs(error) < 1
  squared_loss = tf.square(error) / 2
  linear_loss = tf.abs(error) - 0.5
  return tf.where(is_small_error, squared_loss, linear_loss)


In [26]:
# 캘리포니아 주택가격 데이터 부르기
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(housing.data, housing.target.reshape(-1, 1))
X_train, X_valid, y_train, y_valid = train_test_split(X_train_full, y_train_full, random_state=42)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_valid_scaled = scaler.transform(X_valid)
X_test_scaled = scaler.transform(X_test)

Downloading Cal. housing from https://ndownloader.figshare.com/files/5976036 to /root/scikit_learn_data


In [27]:
X_train.shape[1:]

(8,)

In [28]:
# 신경망 모델 만들기
model = keras.models.Sequential()

model.add(keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal",
                       input_shape=X_train.shape[1:]))
model.add(keras.layers.Dense(1))

In [29]:
model.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense (Dense)                (None, 30)                270       
_________________________________________________________________
dense_1 (Dense)              (None, 1)                 31        
Total params: 301
Trainable params: 301
Non-trainable params: 0
_________________________________________________________________


In [30]:
# 컴파일: 후버 손실 적용
model.compile(loss=huber_fn, optimizer="nadam", metrics = ["mae"])

In [31]:
# 훈련          
history = model.fit(X_train_scaled, y_train, epochs = 30, validation_data = (X_valid_scaled, y_valid))

Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30


In [32]:
# 사용자 정의 모델 저장
model.save("my_model_with_a_custom_loss.h5")

In [33]:
# 사용자 정의 모델 불러오기
model = keras.models.load_model("my_model_with_a_custom_loss.h5", custom_objects={"huber_fn":huber_fn})

In [34]:
# 불러들인 모델 바로 훈련
model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<tensorflow.python.keras.callbacks.History at 0x7fc5097fea50>

In [35]:
# 후버 손실의 임계점(threshold)을 하이퍼파라미터로 같는 손실함수 만들기
def create_huber(threshold=1.0):
    def huber_fn(y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < threshold
        squared_loss = tf.square(error) / 2
        linear_loss  = threshold * tf.abs(error) - threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)
    return huber_fn

In [36]:
# 새로운 후버손실 활용
model.compile(loss=create_huber(2.0), optimizer = "nadam", metrics = ["mae"])
model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<tensorflow.python.keras.callbacks.History at 0x7fc509453f10>

In [37]:
# 새로운 후버손실 적용하여 사용자 정의 모델 저장 & 불러오기
model.save("my_model_with_a_custom_loss_threshold_2.h5")

model = keras.models.load_model("my_model_with_a_custom_loss_threshold_2.h5",
                                custom_objects={"huber_fn": create_huber(2.0)}) # 함수 이름에 huber_fn 주의

In [38]:
# 클래스 상속
class HuberLoss(keras.losses.Loss):
    def __init__(self, threshold=1.0, **kwargs):
        self.threshold = threshold
        super().__init__(**kwargs)
    def call(self, y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < self.threshold
        squared_loss = tf.square(error) / 2
        linear_loss  = self.threshold * tf.abs(error) - self.threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

In [39]:
# 클래스의 인스턴스 활용
model.compile(loss=HuberLoss(2.), optimizer = 'nadam')
model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<tensorflow.python.keras.callbacks.History at 0x7fc5093a2050>

In [40]:
# 모델 저장 후 불러오기 (임계값이 저장됨)
model.save("my_model_with_a_custom_loss_class.h5")

model = keras.models.load_model("my_model_with_a_custom_loss_class.h5",
                                custom_objects={"HuberLoss": HuberLoss})

In [41]:
model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<tensorflow.python.keras.callbacks.History at 0x7fc5091ee190>

In [42]:
# 저장된 임계값
model.loss.threshold

2.0

### 사용자 활성화 함수, 초기화, 규제, 제한

In [43]:
# 소프트플러스
def my_softplus(z):
  return tf.math.log(tf.exp(z) + 1.0)

# 글로럿 초기화
def my_glorot_initializer(shape, dtype=tf.float32):
  stddev = tf.sqrt(2. / (shape[0] + shape[1]))
  return tf.random.normal(shape, stddev=stddev, dtype=dtype)

# l1 규제
def my_l1_regularizer(weights):
  return tf.reduce_sum(tf.abs(0.01 * weights))

# ReLU
def my_positive_weights(weights):
  return tf.where(weights < 0, tf.zeros_like(weights), weights)

In [44]:
# 활용
layer = keras.layers.Dense(1, activation=my_softplus,
                           kernel_initializer=my_glorot_initializer,
                           kernel_regularizer=my_l1_regularizer,
                           kernel_constraint=my_positive_weights)

In [45]:
# 모델 생성
model = keras.models.Sequential()
model.add(keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal",
                       input_shape=X_train.shape[1:]))
model.add(layer)

In [46]:
# 컴파일과 훈련
model.compile(loss="mse", optimizer="nadam", metrics=["mae"])
model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<tensorflow.python.keras.callbacks.History at 0x7fc5066e1bd0>

In [47]:
# 사용자 정의 모델 저장하고 불러오기
model.save("my_model_with_many_custom_parts.h5")

model = keras.models.load_model(
    "my_model_with_many_custom_parts.h5",
    custom_objects={
       "my_l1_regularizer": my_l1_regularizer,
       "my_positive_weights": my_positive_weights,
       "my_glorot_initializer": my_glorot_initializer,
       "my_softplus": my_softplus,
    })

In [48]:
# 클래스 상속하여 L1 규제 만들기
class MyL1Regularizer(keras.regularizers.Regularizer):
    def __init__(self, factor):
        self.factor = factor
    def __call__(self, weights):
        return tf.reduce_sum(tf.abs(self.factor * weights))
    def get_config(self):
        return {"factor": self.factor}

In [49]:
# 활용
model = keras.models.Sequential()
model.add(keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal",
                       input_shape=X_train.shape[1:]))
model.add(keras.layers.Dense(1, activation=my_softplus,
                       kernel_regularizer=MyL1Regularizer(0.01),
                       kernel_constraint=my_positive_weights,
                       kernel_initializer=my_glorot_initializer))

### 사용자 정의 지표

In [50]:
# 지표로 사용자 정의 후버 사용
model.compile(loss="mse", optimizer = "nadam", metrics = [create_huber(2.0)])

In [51]:
model.fit(X_train_scaled, y_train, epochs = 2, validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<tensorflow.python.keras.callbacks.History at 0x7fc5064f0b10>

In [52]:
# 스트리밍 지표로서의 정밀도: 누적
precision = keras.metrics.Precision()
precision([0,1,1,1,0,1,0,1], [1,1,0,1,0,1,0,1])

<tf.Tensor: shape=(), dtype=float32, numpy=0.8>

In [53]:
# 해당 샘플의 정밀도가 아닌 누적 정밀도
precision([0,1,0,0,1,0,1,1], [1,0,1,1,0,0,0,0])

<tf.Tensor: shape=(), dtype=float32, numpy=0.5>

In [54]:
# 결과보기
precision.result()

<tf.Tensor: shape=(), dtype=float32, numpy=0.5>

In [55]:
# TP, TF 변수 확인
precision.variables

[<tf.Variable 'true_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>,
 <tf.Variable 'false_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>]

In [56]:
# 변수 초기화
precision.reset_states()

In [57]:
# 스트리밍 지표로서의 후버 지표 만들기
# keras.metrics.Metric 클래스를 상속
class HuberMetric(keras.metrics.Metric):
    def __init__(self, threshold=1.0, **kwargs):
        super().__init__(**kwargs) # 기본 매개변수 처리 (예를 들면, dtype)
        self.threshold = threshold
        self.huber_fn = create_huber(threshold)
        self.total = self.add_weight("total", initializer="zeros")
        self.count = self.add_weight("count", initializer="zeros")
    def update_state(self, y_true, y_pred, sample_weight=None):
        metric = self.huber_fn(y_true, y_pred)
        self.total.assign_add(tf.reduce_sum(metric))
        self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))
    def result(self):
        return self.total / self.count
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

In [58]:
m = HuberMetric(2.)
m(tf.constant([[2.]]), tf.constant([[10.]]))

<tf.Tensor: shape=(), dtype=float32, numpy=14.0>

In [59]:
# 누적되는 데이터의 후버 지표 
m(tf.constant([[0.], [5.]]), tf.constant([[1.], [9.25]]))

<tf.Tensor: shape=(), dtype=float32, numpy=7.0>

### 사용자 정의 층

In [60]:
# 가중치가 없는 사용자 정의 층 만들기
# keras.layers.Lambda 사용
exponential_layer = keras.layers.Lambda(lambda x: tf.exp(x))

In [61]:
exponential_layer([-1., 0., 1.])

<tf.Tensor: shape=(3,), dtype=float32, numpy=array([0.36787948, 1.        , 2.7182817 ], dtype=float32)>

In [63]:
# Dense층 구현
class MyDense(keras.layers.Layer):
    def __init__(self, units, activation=None, **kwargs):
        super().__init__(**kwargs)
        self.units = unit # 유닛 개수 지정
        self.activation = keras.activations.get(activation) # 활성화 함수 지정

    def build(self, batch_input_shape): # 층에 변수 생성 
        # 연결 가중치
        self.kernel = self.add_weight(
            # kernal shape: 입력 노드 개수 x 유닛 개수 행렬
            name="kernel", shape=[batch_input_shape[-1], self.units],
            initializer="glorot_normal") # 글로럿 초기화
        # 편향
        self.bias = self.add_weight(
            name="bias", shape=[self.units], initializer="zeros") # 초기값:0
        super().build(batch_input_shape) # 마지막에 호출 

    def call(self, X): # 연산 수행 
        # 입력값과 커널 행렬곱 + 편향 → 활성화 함수 적용
        return self.activation(X @ self.kernel + self.bias)

    def compute_output_shape(self, batch_input_shape): # 출력 크기 반환
        return tf.TensorShape(batch_input_shape.as_list()[:-1] + [self.units])

    def get_config(self): # 활성화함수의 전체 설정 저장 
        base_config = super().get_config()
        return {**base_config, "units": self.units,
                "activation": keras.activations.serialize(self.activation)}

In [65]:
# 두 개의 입력과 두 개의 출력을 만드는 층
class MyMultiLayer(keras.layers.Layer):
    def call(self, X):
        X1, X2 = X
        print("X1.shape: ", X1.shape ," X2.shape: ", X2.shape) # 사용자 정의 층 디버깅
        return X1 + X2, X1 * X2

    def compute_output_shape(self, batch_input_shape):
        batch_input_shape1, batch_input_shape2 = batch_input_shape
        return [batch_input_shape1, batch_input_shape2]

In [66]:
# 사용자 정의 층 호출 
inputs1 = keras.layers.Input(shape=[2])
inputs2 = keras.layers.Input(shape=[2])
outputs1, outputs2 = MyMultiLayer()((inputs1, inputs2))

X1.shape:  (None, 2)  X2.shape:  (None, 2)


In [67]:
# 훈련시에만 가우시안 노이즈를 추가하고 테스트에는 추가하지 않는 층 생성
class MyGaussianNoise(keras.layers.Layer):
  def __init__(self, stddev, **kwargs):
    super().__init__(**kwargs)
    self.stddev = stddev
  
  def call(self, X, training=None):
    if training:
      noise = tf.random.normal(tf.shape(X), stddev = self.stddev)
      return X + noise
    else:
      return X
  def compute_output_shape(self, batch_input_shape):
    return batch_input_shape

### 사용자 정의 모델

In [68]:
# 잔차 블록 만들기 
class ResidualBlock(keras.layers.Layer):
    def __init__(self, n_layers, n_neurons, **kwargs):
        super().__init__(**kwargs)
        self.hidden = [keras.layers.Dense(n_neurons, activation="elu",
                                          kernel_initializer="he_normal")
                       for _ in range(n_layers)] # n_layers개 만큼 Dense층 반복 생성

    def call(self, inputs):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        # Input이 Dense층을 통과한 값과 Input값 합치기
        return inputs + Z

In [69]:
# 잔차 블록 모델
class ResidualRegressor(keras.models.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden1 = keras.layers.Dense(30, activation="elu",
                                          kernel_initializer="he_normal")
        self.block1 = ResidualBlock(2, 30) # Dense층 2개 각 층당 neuron 30개
        self.block2 = ResidualBlock(2, 30) # Dense층 2개 각 층당 neuron 30개
        self.out = keras.layers.Dense(output_dim) # output층 뉴런 개수 지정

    def call(self, inputs):
        Z = self.hidden1(inputs)
        for _ in range(1 + 3):
            Z = self.block1(Z)
        Z = self.block2(Z)
        return self.out(Z)

In [70]:
model = ResidualRegressor(1)
model.compile(loss="mse", optimizer="nadam")
history = model.fit(X_train_scaled, y_train, epochs=5)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


### 모델 구성 요소에 기반한 손실과 지표

In [71]:
class ReconstructingRegressor(keras.models.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden = [keras.layers.Dense(30, activation="selu",
                                          kernel_initializer="lecun_normal")
                       for _ in range(5)] # 5개의 은닉층
        self.out = keras.layers.Dense(output_dim) # 1개의 출력층

    def build(self, batch_input_shape):
        n_inputs = batch_input_shape[-1]
        self.reconstruct = keras.layers.Dense(n_inputs, name='recon') # 보조 출력
        super().build(batch_input_shape)

    def call(self, inputs):
        Z = inputs
        for layer in self.hidden: # 입력이 은닉층 5개에 모두 통과
          Z = layer(Z)
        reconstruction = self.reconstruct(Z) # 은닉층과 별개로 재구성층 통과
        # 재구성 손실: 재구성과 입력 사이의 평균 제곱 오차
        recon_loss = tf.reduce_mean(tf.square(reconstruction - inputs))
        self.add_loss(0.05 * recon_loss) # 재구성 손실을 0.05의 가중치로 주 손실에 더함
        return self.out(Z)

In [72]:
class ReconstructingRegressor(keras.models.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden = [keras.layers.Dense(30, activation="selu",
                                          kernel_initializer="lecun_normal")
                       for _ in range(5)]
        self.out = keras.layers.Dense(output_dim)
        self.reconstruct = keras.layers.Dense(8) # TF 이슈 #46858에 대한 대책
        self.reconstruction_mean = keras.metrics.Mean(name="reconstruction_error")

#     TF 이슈 #46858 때문에 주석 처리
#     def build(self, batch_input_shape):
#         n_inputs = batch_input_shape[-1]
#         self.reconstruct = keras.layers.Dense(n_inputs, name='recon')
#         super().build(batch_input_shape)

    def call(self, inputs, training=None):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        reconstruction = self.reconstruct(Z)
        self.recon_loss = 0.05 * tf.reduce_mean(tf.square(reconstruction - inputs))
        
        if training:
           result = self.reconstruction_mean(recon_loss)
           self.add_metric(result)
        return self.out(Z)
    
    def train_step(self, data):
        x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x)
            loss = self.compiled_loss(y, y_pred, regularization_losses=[self.recon_loss])

        gradients = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))

        return {m.name: m.result() for m in self.metrics}

In [73]:
# 활용
keras.backend.clear_session()
model = ReconstructingRegressor(1) # output_dim = 1
model.compile(loss="mse", optimizer="nadam")
history = model.fit(X_train_scaled, y_train, epochs=2)
y_pred = model.predict(X_test_scaled)

Epoch 1/2
Epoch 2/2


### 자동 미분을 사용하여 그래디언트 계산

In [74]:
def f(w1, w2):
  return 3 * w1 ** 2 + 2 * w1 * w2

In [75]:
w1, w2 = (5, 3)
eps = 1e-6 # 입실론, 매우 작은 값
(f(w1 + eps, w2) - f(w1, w2)) / eps # w1에 대한 도함수 정의

36.000003007075065

In [76]:
(f(w1, w2 + eps) - f(w1, w2)) / eps # w2에 대한 도함수 정의

10.000000003174137

In [77]:
w1, w2 = tf.Variable(5.), tf.Variable(3.)
with tf.GradientTape() as tape:
  z = f(w1, w2)

gradients = tape.gradient(z, [w1, w2])

In [78]:
gradients

[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]

In [79]:
# gradient 매서드 호출 뒤에는 자동으로 tape가 지워짐
with tf.GradientTape() as tape:
  z = f(w1, w2)

dz_dw1 = tape.gradient(z, w1)

In [80]:
dz_dw2 = tape.gradient(z, w2) # 오류: 앞에서 이미 tape가 지워졌으므로

RuntimeError: ignored

In [81]:
# 지속가능한 tape
with tf.GradientTape(persistent=True) as tape:
  z = f(w1, w2)

dz_dw1 = tape.gradient(z, w1)
dz_dw2 = tape.gradient(z, w2)
del tape # 수동으로 지워줘야함

In [82]:
dz_dw1

<tf.Tensor: shape=(), dtype=float32, numpy=36.0>

In [83]:
dz_dw2

<tf.Tensor: shape=(), dtype=float32, numpy=10.0>

In [84]:
# 변수(Variable)가 아닌 다른 객체 (예: constant)를 사용했을 경우 None 출력
c1, c2 = tf.constant(5.), tf.constant(3.)
with tf.GradientTape() as tape:
  z = f(c1, c2)

gradients = tape.gradient(z, [c1, c2])
gradients

[None, None]

In [85]:
# 어떤 종류의 텐서더라도 연산을 기록하도록 강제 가능
with tf.GradientTape() as tape:
  tape.watch(c1)
  tape.watch(c2)
  z = f(c1, c2)

gradients = tape.gradient(z, [c1, c2])
gradients

[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]

In [86]:
# tf.stop_gradient: 정방향계산시에는 입력 반환, 역전파 시에는 그래디언트 전파 안 함
def f(w1, w2):
    return 3 * w1 ** 2 + tf.stop_gradient(2 * w1 * w2)

with tf.GradientTape() as tape:
    z = f(w1, w2)

tape.gradient(z, [w1, w2])

[<tf.Tensor: shape=(), dtype=float32, numpy=30.0>, None]

In [87]:
# 소프트플러스 그래디언트 계산 불안정: NaN 출력(부동 소수점 오류로 인해 무한 나누기 무한 계산)
x = tf.Variable(100.)
with tf.GradientTape() as tape:
    z = my_softplus(x)

tape.gradient(z, [x])

[<tf.Tensor: shape=(), dtype=float32, numpy=nan>]

In [88]:
# 소프트플러스의 도함수 1 / (1 + 1 / exp(x)) 직접 대입
@tf.custom_gradient
def my_better_softplus(z):
    exp = tf.exp(z)
    def my_softplus_gradients(grad):
        return grad / (1 + 1 / exp)
    return tf.math.log(exp + 1), my_softplus_gradients

### 사용자 정의 훈련 반복

In [89]:
l2_reg = keras.regularizers.l2(0.05)
model = keras.models.Sequential()
model.add(keras.layers.Dense(30, activation="elu", kernel_initializer='he_normal', kernel_regularizer=l2_reg))
model.add(keras.layers.Dense(1, kernel_regularizer=l2_reg))

In [90]:
# 훈련 세트에서 샘플 배치를 랜덤하게 추출
def random_batch(X, y, batch_size=32):
    idx = np.random.randint(len(X), size=batch_size)
    return X[idx], y[idx]

In [91]:
# 현재 스텝수, 전체 스텝 수
def print_status_bar(iteration, total, loss, metrics=None):
    metrics = " - ".join(["{}: {:.4f}".format(m.name, m.result())
                         for m in [loss] + (metrics or [])])
    end = "" if iteration < total else "\n"
    print("\r{}/{} - ".format(iteration, total) + metrics,
          end=end)

In [92]:
# 활용 전 하이퍼파라미터 정의
keras.backend.clear_session()

n_epochs = 5
batch_size = 32
n_steps = len(X_train) // batch_size
optimizer = keras.optimizers.Nadam(learning_rate=0.01)
loss_fn = keras.losses.mean_squared_error
mean_loss = keras.metrics.Mean()
metrics = [keras.metrics.MeanAbsoluteError()]

In [93]:
for epoch in range(1, n_epochs + 1):
    print("에포크 {}/{}".format(epoch, n_epochs))
    for step in range(1, n_steps + 1):
        X_batch, y_batch = random_batch(X_train_scaled, y_train) # 훈련세트에서 배치 랜덤 샘플링
        with tf.GradientTape() as tape:
            y_pred = model(X_batch) # 배치 하나의 예측 만들기
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred)) # 손실 계산 배치에 대한 손실 평균 
            loss = tf.add_n([main_loss] + model.losses) # 주 손실 + 다른 손실
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        for variable in model.variables:
            if variable.constraint is not None:
                variable.assign(variable.constraint(variable))
        mean_loss(loss)
        for metric in metrics:
            metric(y_batch, y_pred)
        print_status_bar(step * batch_size, len(y_train), mean_loss, metrics)
    print_status_bar(len(y_train), len(y_train), mean_loss, metrics)
    for metric in [mean_loss] + metrics:
        metric.reset_states()

에포크 1/5
11610/11610 - mean: 1.5109 - mean_absolute_error: 0.6027
에포크 2/5
11610/11610 - mean: 0.6956 - mean_absolute_error: 0.5419
에포크 3/5
11610/11610 - mean: 0.6724 - mean_absolute_error: 0.5371
에포크 4/5
11610/11610 - mean: 0.6449 - mean_absolute_error: 0.5270
에포크 5/5
11610/11610 - mean: 0.6720 - mean_absolute_error: 0.5380


### 텐서플로 함수와 그래프

In [94]:
# 세제곱
def cube(x):
  return x ** 3

In [95]:
cube(3)

27

In [96]:
cube(tf.constant(2.))

<tf.Tensor: shape=(), dtype=float32, numpy=8.0>

In [97]:
# 텐서플로 함수로 바꾸기
tf_cube = tf.function(cube)
tf_cube

<tensorflow.python.eager.def_function.Function at 0x7fc4f57a4090>

In [98]:
# 텐서 결과를 출력
tf_cube(2)

<tf.Tensor: shape=(), dtype=int32, numpy=8>

In [99]:
tf_cube(tf.constant(2.))

<tf.Tensor: shape=(), dtype=float32, numpy=8.0>

In [100]:
# tf.function 데코레이터 사용
@tf.function
def tf_cube(x):
  return x ** 3

In [101]:
# 원본 파이썬 함수 변환
tf_cube.python_function(2)

8