In [None]:
# 파이썬 ≥3.5 필수
import sys
assert sys.version_info >= (3, 5)

# 사이킷런 ≥0.20 필수
import sklearn
assert sklearn.__version__ >= "0.20"

try:
    # %tensorflow_version은 코랩 명령입니다.
    %tensorflow_version 2.x
except Exception:
    pass

# 이 노트북은 텐서플로 ≥2.4이 필요합니다
# 2.x 버전은 대부분 동일한 결과를 만들지만 몇 가지 버그가 있습니다.
import tensorflow as tf
from tensorflow import keras
assert tf.__version__ >= "2.4"

# 공통 모듈 임포트
import numpy as np
import os

# 노트북 실행 결과를 동일하게 유지하기 위해
np.random.seed(42)
tf.random.set_seed(42)

# 깔끔한 그래프 출력을 위해
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt
mpl.rc('axes', labelsize=14)
mpl.rc('xtick', labelsize=12)
mpl.rc('ytick', labelsize=12)

# 그림을 저장할 위치
PROJECT_ROOT_DIR = "."
CHAPTER_ID = "deep"
IMAGES_PATH = os.path.join(PROJECT_ROOT_DIR, "images", CHAPTER_ID)
os.makedirs(IMAGES_PATH, exist_ok=True)

def save_fig(fig_id, tight_layout=True, fig_extension="png", resolution=300):
    path = os.path.join(IMAGES_PATH, fig_id + "." + fig_extension)
    print("그림 저장:", fig_id)
    if tight_layout:
        plt.tight_layout()
    plt.savefig(path, format=fig_extension, dpi=resolution)

# 12.2 넘파이처럼 텐서플로 사용하기

### 12.2.1 텐서와 연산

In [None]:
#텐서 2행 3열
tf.constant([[1.,2.,3.],[4.,5.,6.]])


In [None]:
#스칼라
tf.constant(42)

In [None]:
t=tf.constant([[1.,2.,3.],[4.,5.,6.]])

In [None]:
t.shape

In [None]:
t.dtype

In [None]:
t[:,1:]

In [None]:
t[:,1,tf.newaxis]

단순한 연산 가능

In [None]:
t+10

In [None]:
tf.square(t)

In [None]:
# @ --> 행렬곱셈
#2x3 @ 3x2 = 2x2

t @ tf.transpose(t) 

### 12.2.2 텐서와 넘파이

In [None]:
a = np.array([2.,4.,5.])

In [None]:
tf.constant(a)

In [None]:
t.numpy()

In [None]:
tf.square(a)

### 12.2.3 타입 변환

성능저하를 방지하기 위해 텐서플로우는 타입변환을 자동으로하지않음.

In [None]:
t2 = tf.constant(40., dtype=tf.float64)

In [None]:
#tf.cast를 이용해 형변환 가능
tf.constant(2.0) + tf.cast(t2,tf.float32)

### 12.2.4 변수

텐서와 다르게 변수는 .assign()을 이용해 값을 바꿀수있음

In [None]:
v = tf.Variable([1.,2.,3.],[4.,5.,6.])

In [None]:
v.assign(2*v)

# 12.3 사용자 정의 모델과 훈련 알고리즘

### 12.3.1 사용자 정의 손실 함수

In [None]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target.reshape(-1, 1), random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full, random_state=42)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_valid_scaled = scaler.transform(X_valid)
X_test_scaled = scaler.transform(X_test)

In [None]:
#후버손실(임계값에 따라 mse,mae 사용)

def huber_fn(y_true,y_pred):
  error = y_true =y_pred
  is_small_error =tf.abs(error)<1
  squared_loss =tf.square(error) /2
  linear_loss = tf.abs(error) -0.5
  return tf.where(is_small_error, squared_loss,linear_loss)

In [None]:
plt.figure(figsize=(8, 3.5))
z = np.linspace(-4, 4, 200)
plt.plot(z, huber_fn(0, z), "b-", linewidth=2, label="huber($z$)")
plt.plot(z, z**2 / 2, "b:", linewidth=1, label=r"$\frac{1}{2}z^2$")
plt.plot([-1, -1], [0, huber_fn(0., -1.)], "r--")
plt.plot([1, 1], [0, huber_fn(0., 1.)], "r--")
plt.gca().axhline(y=0, color='k')
plt.gca().axvline(x=0, color='k')
plt.axis([-4, 4, 0, 4])
plt.grid(True)
plt.xlabel("$z$")
plt.legend(fontsize=14)
plt.title("Huber loss", fontsize=14)
plt.show()

In [None]:
input_shape = X_train.shape[1:]

model = keras.models.Sequential([
    keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal",
                       input_shape=input_shape),
    keras.layers.Dense(1),
])

In [None]:
model.compile(loss=huber_fn, optimizer= 'nadam')

In [None]:
model = keras.models.Sequential(
    [keras.layers.Dense(30,activation="selu", kernel_initializer="lecun_normal",
                        input_shape=[8]),
     keras.layers.Dense(1)
                                      
                        ]
)

In [None]:
#후버로스 사용
model.compile(loss=huber_fn, optimizer= keras.optimizers.Nadam(),
              metrics=["mae"])

In [None]:
model.fit(X_train_scaled,y_train,epochs=2,
          validation_data = (X_valid_scaled,y_valid))

### 12.3.2 사용자 정의 요소를 가진 모델을 저장하고 로드하기

In [None]:
model.save("my_model_with_a_custom_loss.h5")

In [None]:
#저장은 문제가 없지만
#로드는 함수이름과 실제 함수를 매핑한 딕셔너리를 전달해야함
model = keras.models.load_model("my_model_with_a_custom_loss.h5",
                                custom_objects={"huber_fn":huber_fn})

파라미터를 받는 로스 만들기

In [None]:
def create_huber(threshold=1.0):
  def huber_fn(y_true,y_pred):
    error = y_true- y_pred
    is_small_error = tf.abs(error)<threshold
    squared_loss = tf.square(error)/2
    linear_loss = threshold * tf.abs(error) - threshold**2 / 2
    return tf.where(is_small_error,squared_loss,linear_loss)
  return huber_fn

In [None]:
model.compile(loss=create_huber(2.0), optimizer="nadam",metrics=["mae"])


In [None]:
model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))

In [None]:
model.save("my_model_with_a_custom_loss_threshold_2.h5")

In [None]:
model = keras.models.load_model("my_model_with_a_custom_loss_threshold_2.h5",
                                custom_objects={"huber_fn":create_huber(2.0)})
#새로 정의한 함수이름이 아닌 케라스 모델에서 사용했던 함수이름 huber_fn 사용

In [None]:
model.fit(X_train_scaled,y_train,epochs=2,
          validation_data =(X_valid_scaled,y_valid))

keras.losses.Loss 클래스를 상속하여 키 이름지정 해결가능

In [None]:
class HuberLoss(keras.losses.Loss):
    def __init__(self, threshold=1.0, **kwargs):
        self.threshold = threshold
        super().__init__(**kwargs)
    def call(self, y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < self.threshold
        squared_loss = tf.square(error) / 2
        linear_loss  = self.threshold * tf.abs(error) - self.threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)
    #get_config는 부모의 하이퍼파라미터 매핑된 딕셔너리 반환
    #이 딕셔너리에 새로운 하이퍼 파라미터 추가
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

In [None]:
input_shape = X_train_scaled.shape[1:]
model = keras.models.Sequential([
    keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal",
                       input_shape=input_shape),
    keras.layers.Dense(1),
])

In [None]:
model.compile(loss=HuberLoss(2.), optimizer="nadam", metrics=["mae"])

In [None]:
model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))

In [None]:
model.save("my_model_with_a_custom_loss_class.h5")

In [None]:
model = keras.models.load_model("my_model_with_a_custom_loss_class.h5",
                                custom_objects={"HuberLoss": HuberLoss})
#클래스 이름으로 가져옴

In [None]:
model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))


### 12.3.3 활성화 함수, 초기화, 규제, 제한을 커스터마이징 하기

In [None]:
#사용자 정의 활성화함수
def my_softplus(z):
  return tf.math.log(tf.exp(z)+1.0)
#사용자 정의 글로럿초기화(==xavier, 1/fan_avg)
def my_glorot_initializer(shape, dtype=tf.float32):
    stddev = tf.sqrt(2. / (shape[0] + shape[1]))
    return tf.random.normal(shape, stddev=stddev, dtype=dtype)

#사용자 정의 l1규제
def my_l1_regularizer(weights):
  return tf.reduce_sum(tf.abs(0.01*weights))
#사용자 정의 양수 가중치만 남기기
def my_positive_weights(weights):
  return tf.where(weights<0., tf.zeros_like(weights), weights)

In [None]:
layer = keras.layers.Dense(1, 
                           activation=my_softplus,
                           kernel_initializer=my_glorot_initializer,
                           kernel_regularizer=my_l1_regularizer,
                           kernel_constraint=my_positive_weights)

모델과 저장할 파라미터가 있을때.

In [None]:
class MyL1Regularizer(keras.regularizers.Regularizer):
  def __init__(self,factor):
    self.factor = factor
  def call(self,weights):
    return tf.reduce_sum(tf.abs(self.factor*weights))
  def get_config(self):
    return {"factor":self.factor}
    #다른 규제클래스를 상속하여 만들지 않았기떄문에 부모의 get_config 필요없음.

### 12.3.4 사용자 정의 지표

precision과 같은 스트리밍 지표는 누적되어야 함.

In [None]:
precision = tf.metrics.Precision()
precision([0, 1, 1, 1, 0, 1, 0, 1], [1, 1, 0, 1, 0, 1, 0, 1])

In [None]:
precision([0, 1, 0, 0, 1, 0, 1, 1], [1, 0, 1, 1, 0, 0, 0, 0])
#누적되어 0.4가 아닌 0.5

In [None]:
#variable 을 통해 변수 확인
precision.variables

In [None]:
#스트리밍 변수 초기화
precision.reset_states()

스트리밍 지표 만들기

In [None]:
#전체 후버 손실과 처리한 샘플 수 기록
class HuberMetric(keras.metrics.Metric):
  def __init__(self,threshold=1.0, **kwargs):
    super().__init__(**kwargs)
    self.threshold = threshold
    self.huber_fn = create_huber(threshold)
    self.total = self.add_weight("total", initializer="zeros")
    self.count = self.add_weight("count", initializer = "zeros")
  def update_state(self,y_true,y_pred,sample_weight=None):
    metric = self.huber_fn(y_true,y_pred)
    self.total.assign_add(tf.reduce_sum(metric))
    self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))
  def result(self):
    return self.total / self.count
  def get_config(self):
    base_config = super().get_config()
    return {**base_config, "threshold":self.threshold}

In [None]:
m = HuberMetric(2.)
# total = 2 * |10 - 2| - 2²/2 = 14
# count = 1
# result = 14 / 1 = 14
m(tf.constant([[2.]]), tf.constant([[10.]]))

12.3.5 사용자 정의 층

가중치가 없는 층은 keras.layers.lambda 사용 ex) Flatten,relu

In [None]:
exponential_layer =keras.layers.Lambda(lambda x: tf.exp(x))

In [None]:
model = keras.models.Sequential([
                                 keras.layers.Dense(30,activation="relu",input_shape= input_shape),
                                 keras.layers.Dense(1),
                                 #사용
                                 exponential_layer
                                 
])

상태가 있는 층(가중치가 있는 층)은 keras.layers.Layer 상속

In [None]:
class MyDense(keras.layers.Layer):
  def __init__(self,units, activation=None, **kwargs):
    super().__init__(**kwargs)
    self.units = units
    self.activation = keras.activations.get(activation)
  def build(self, batch_input_shape):
    self.kernel = self.add_weight(
        name="kernel", shape = [batch_input_shape[-1], self.units],
        initializer = 'glorot_normal')
    self.bias = self.add_weight(
        name='bias', shape = [self.units],
        initializer = "zeros"
    )
    super().build(batch_input_shape)  
  def call(self,X):
    return self.activation(X @ self.kernel + self.bias)
  def comput_output_shape(self, batch_input_shape):
    return tf.TensorShape(batch_input_shape.as_list()[:-1]+
                          [self.units])
    
  def get_config(self):
    base_config = super().get_config()
    return {**base_config, "units": self.units,
            "activation": keras.activations.serialize(self.activation)}


In [None]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

In [None]:
model = keras.models.Sequential(
    MyDense(30,activation='relu', input_shape=input_shape),
    MyDense(1)
)

여러가지 입력을 받는 층 ex)Concatenate

In [None]:
class MyMultiLayer(keras.layers.Layer):
  def call(self,X):
    X1,X2 = X
    return X1+X2, X1*X2
  def compute_output_shape(self,batch_input_shape):
    batch_input_shape1, batch_input_shape2 = batch_input_shape
    return [batch_input_shape1,batch_input_shape2]


In [None]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

In [None]:
inputs1 = keras.layers.Input(shape=[2])
inputs2 = keras.layers.Input(shape=[2])
#함수형 API와 서브클래싱 API에서만 사용가능. 시퀀셜에서 사용불가
outputs1,outputs2 = MyMultiLayer()((inputs1,inputs2))


훈련과 테스트에서 다르게 동작하는 층

In [None]:
class MyGaussianNoise(keras.layers.Layer):
  def __init__(self,stddev,**kwargs):
    super().__init__(kwargs)
    self.stddev = stddev
  
  #call() 메서드에 trainning 매개변수를 추가하여 활용
  def call(self,X,training=None):
    if training:
      noise = tf.random.normal(tf.shape(X), stddev = self.stddev)
      return X+noise
    else:
      return X

  def compute_output_shape(self,batch_input_shape):
    return batch_input_shape

### 12.3.6 사용자 정의 모델

사용자 정의 모델을 통해 ResidualBlock을 가진 모델 만들기

블록 재사용을 위한 ResidualBlock

In [None]:
class ResidualBlock(keras.layers.Layer):
  def __init__(self,n_layers,n_neurons,**kwargs):
    super().__init__(**kwargs)
    self.hidden=[keras.layers.Dense(n_neurons,
                        activation="elu",
                        kernel_initializer=keras.initializers.he_normal)
    
    for layer in range(n_layers)]

  def call(self,inputs):
    Z=inputs
    for layer in self.hidden:
      Z = layer(Z)
      #쭉 쌓아줌 layer(layzer(Z))
    return inputs+Z
      #마지막에 인풋과 함께 리턴




ResidualBlock을 사용한 ResidualRegressor 모델

In [None]:
class ResidualRegressor(keras.models.Model):
  def __init__(self,output_dim,**kwargs):
    super().__init__(kwargs)
    self.hidden1 = keras.layers.Dense(30,activation='elu',
                                      kernel_initializer='he_normal')
    self.block1 = ResidualBlock(2,30)
    self.block2 = ResidualBlock(2,30)
    self.out = keras.layers.Dense(output_dim)
  def call(self,inputs):
    Z = self.hidden1(inputs)
    #블럭1 4개
    for _ in range(1+3):
      Z = self.block1(Z)
    #블럭2 1개
    Z = self.block2(Z)
    return self.out(Z)

In [None]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

In [None]:
model = ResidualRegressor(1)
model.compile(loss="mse",optimizer= 'nadam')
history = model.fit(X_train_scaled, y_train,epochs=5)



#일반 다른 모델과 같은 기능 가능.
score = model.evaluate(X_test_scaled,y_test)

X_new_scaled = X_test_scaled
y_pred = model.predict(X_new_scaled)

### 12.3.7 모델 구성 요소에 기반한 손실과 지표

In [None]:
class ReconstructionRegressor(keras.Model):
  def __init__(self,output_dim,**kwargs):
    super().__init__(kwargs)
    self.hidden = [keras.layers.Dense(30,activation = "selu",
                                      kernel_initializer = "lecun_normal")
                   for _ in range(5)]
    self.out = keras.layers.Dense(output_dim)


  def build(self,batch_input_shape):
    n_inputs = batch_input_shape[-1]
    self.reconstruct = keras.layers.Dense(n_inputs)
    super().build(batch_input_shape)
  
  def call(self,inputs):
    Z=inputs
    for layer in self.hidden:
      Z=layer(Z)
    reconstruction = self.reconstruct(Z)
    recon_loss =tf.reduce_mean(tf.square(reconstruction - inputs))
    self.add_loss(0.05* recon_loss)
    return self.out(Z)

### 12.3.8 자동 미분을 사용하여 그레디언트 계산하기

In [None]:
def f(w1, w2):
    return 3 * w1 ** 2 + 2 * w1 * w2

In [None]:
w1,w2 = 5,3

In [None]:
eps=1e-6

5,3 에서 w1에 대한 도함수

In [None]:
(f(w1+eps,w2) - f(w1,w2))/eps

5,3에서 W2에 대한 도함수

In [None]:
(f(w1,w2+eps)-f(w1,w2))/eps

텐서플로에서

In [None]:
w1,w2 = tf.Variable(5.),tf.Variable(3.)
with tf.GradientTape() as tape:
  z = f(w1,w2)

gradient = tape.gradient(z, [w1,w2])

In [None]:
gradient
#일치~!

In [None]:
w1,w2 = tf.Variable(5.),tf.Variable(3.)
with tf.GradientTape() as tape:
  z = f(w1,w2)

테이프는 gradient() 메서드가 호출된 후 자동으로 지워짐.

따라서 한번이상 gradient()를 호출해야 한다면 persistent=True 후 

테이프를 삭제하여 리소스를 해제

In [None]:
with tf.GradientTape(persistent=True) as tape:
  z = f(w1,w2)

dz_dw1 = tape.gradient(z,w1)
dz_dw2 = tape.gradient(z,w2)

del tape

tape는 변수가 포함된 연산만을 기록. 아닌 변수는 None 반환

In [None]:
c1,c2 = tf.constant(5.),tf.constant(3.)
#tf.Variable(변수)이 아닌 tf.constant(상수)



with tf.GradientTape() as tape:
  z = f(c1,c2)

gradients = tape.gradient(z, [c1,c2])

In [None]:
gradients

watch를 사용하여 필요한 경우 감시하여 연산을 강제할수있음

In [None]:
with tf.GradientTape() as tape:
  tape.watch(c1)
  tape.watch(c2)
  z = f(c1,c2)

gradients = tape.gradient(z,[c1,c2])

In [None]:
gradients

일부분에 역방향 전파만 무시시키는 방법

In [None]:
def f(w1,w2):
  return 3*w1**2 + tf.stop_gradient(2*w1*w2) #뒷부분은 역전파 무시 w2가 무시됨.

In [None]:
with tf.GradientTape() as tape:
  z = f(w1,w2)

In [None]:
gradients = tape.gradient(z,[w1,w2])

In [None]:
gradients
#w2값에 대한 역전파값이 None으로 반환.

데코레이터를 이용해 계산되지않는(NAN) 그레디언트 이슈 해결

ex) softplus는 큰 입력에 대해 NAN 발생

@ 데코레이터 사용법(python 문법)

In [None]:
def trace(func):                             # 호출할 함수를 매개변수로 받음
    def wrapper():
        print(func.__name__, '함수 시작')    # __name__으로 함수 이름 출력
        func()                               # 매개변수로 받은 함수를 호출
        print(func.__name__, '함수 끝')
    return wrapper                           # wrapper 함수 반환
 
@trace    # @데코레이터
def hello():
    print('hello')
 
@trace    # @데코레이터
def world():
    print('world')
 
hello()    # 함수를 그대로 호출
world()    # 함수를 그대로 호출

In [None]:
x = tf.Variable([100.])
with tf.GradientTape() as tape:
  z= my_softplus(x)


In [None]:
tape.gradient(z,[x])
#soft plus가 큰 입력(100)에 대하여 nan값 출력됨.

In [None]:
#데코레이터를 이용한 해결

@tf.custom_gradient
def my_better_softplus(z):
  exp = tf.exp(z)
  def my_softplus_gradients(grad):
    return grad / (1+1 / exp)
  return tf.math.log(exp+1), my_softplus_gradients


#이렇게하면 tf.custom_gradient 중에 이 함수가 실행됨.

하지만 여전히 지수함수 특성상 inf값이 발생함. --> tf.where을 쓰는방법.(이게편한듯)

In [None]:
def my_better_softplus(z):
    return tf.where(z > 30., z, tf.math.log(tf.exp(z) + 1.))

In [None]:
x = tf.Variable([1000.])
with tf.GradientTape() as tape:
    z = my_better_softplus(x)

z, tape.gradient(z, [x])

#1000 vs nan

12.3.9 사용자 정의 훈련 반복

fit 메소드의 유연성 제공, but 극도의 유연성이 아니면 추천x


ex) wide-deep 구조처럼 경로마다 다른 옵티마이저 사용할떄

In [None]:
#간단한모델
l2_reg = keras.regularizers.l2(0.05)
model = keras.models.Sequential([
    keras.layers.Dense(30, activation="elu", kernel_initializer="he_normal",
                       kernel_regularizer=l2_reg),
    keras.layers.Dense(1, kernel_regularizer=l2_reg)
])

In [None]:
#샘플 배치를 랜덤하게 추출하는 함수
def random_batch(X, y, batch_size=32):
    idx = np.random.randint(len(X), size=batch_size)
    return X[idx], y[idx]

In [None]:
#현재 스텝수, 전체 스텝수, 에포크 시작부터 평균손실 등을 포함하여 훈련상태를 출력하는 함수

def print_status_bar(iteration, total, loss, metrics=None, size=30):
    metrics = " - ".join(["{}: {:.4f}".format(m.name, m.result())
                         for m in [loss] + (metrics or [])])
    end = "" if iteration < total else "\n"
    print("\r{}/{}-".format(iteration, total)+ metrics, end=end)

In [None]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

In [None]:
n_epochs = 5
batch_size = 32
n_steps = len(X_train) // batch_size
optimizer = keras.optimizers.Nadam(lr=0.01)
loss_fn = keras.losses.mean_squared_error
mean_loss = keras.metrics.Mean()
metrics = [keras.metrics.MeanAbsoluteError()]

In [None]:
for epoch in range(1, n_epochs + 1):
    print("Epoch {}/{}".format(epoch, n_epochs))
    for step in range(1, n_steps + 1):
        X_batch, y_batch = random_batch(X_train_scaled, y_train)
        with tf.GradientTape() as tape:
            #배치 하나를 위한 예측
            y_pred = model(X_batch)

            #주 손실을 계산 .mse
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            
            #규제손실은 하나의 스칼라 값이므로 동일한 크기와 타입을 가진 텐서를 더하는 tf.add_n()을 사용하여 손실을 모두 더함.
            loss = tf.add_n([main_loss] + model.losses)

        #테이프를 사용해 그래디언트 계산
        gradients = tape.gradient(loss, model.trainable_variables)
        #옵티마이저에 적용해 경사하강 실행.
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        for variable in model.variables:
            if variable.constraint is not None:
                variable.assign(variable.constraint(variable))
        #현재 에폭에 대한 평균로스 계산(주 로스, 규제 로스).
        mean_loss(loss)

        for metric in metrics:
            metric(y_batch, y_pred)
        print_status_bar(step * batch_size, len(y_train), mean_loss, metrics)
    print_status_bar(len(y_train), len(y_train), mean_loss, metrics)
    #평균 손실과 지표값 초기화
    for metric in [mean_loss] + metrics:
        metric.reset_states()



# 12.4 텐서플로 함수와 그래프

파이썬 함수를 텐서플로 함수로 바꾸기

In [None]:
def cube(x):
  return x ** 3

In [None]:
cube(2)

In [None]:
cube(tf.constant(2.0))

In [None]:
tf_cube =tf.function(cube)
tf_cube

In [None]:
tf_cube(2)

In [None]:
tf_cube(tf.constant(2.0))

데코레이터를 이용하여 바꾸기

In [None]:
@tf.function
def tf_cube(x):
  return x**3

In [None]:
tf_cube(tf.constant(2.))

어렵당 ㅡ,.ㅡ

# 연습문제 12. 층 정규화를 수행하는 사용자 정의 층을 구현하시오


a.
_문제: build() 메서드에서 두 개의 훈련 가능한 가중치 α와 β를 정의합니다. 두 가중치 모두 크기가 input_shape[-1:]이고 데이터 타입은 tf.float32입니다. α는 1로 초기화되고 β는 0으로 초기화되어야 합니다._

b.
문제: call() 메서드는 샘플의 특성마다 평균 μ와 표준편차 σ를 계산해야 합니다. 이를 위해 전체 샘플의 평균 μ와 분산 σ2을 반환하는 tf.nn.moments(inputs, axes=-1, keepdims=True)을 사용할 수 있습니다(분산의 제곱근으로 표준편차를 계산합니다). 그다음 α⊗(X - μ)/(σ + ε) + β를 계산하여 반환합니다. 여기에서 ⊗는 원소별 곱셈(*)을 나타냅니다. ε은 안전을 위한 항입니다(0으로 나누어지는 것을 막기 위한 작은 상수. 예를 들면 0.001).

In [None]:
class LayerNormalization(keras.layers.Layer):
  def __init__(self,eps=0.001,**kwargs):
    super().__init__(**kwargs)
    self.eps = eps


  def build(self, batch_input_shape):
    self.alpha = self.add_weight(
        name='alpha',
        shape=batch_input_shape[-1],#(batchsize,8)인듯?.
        initializer ="ones"
    )
    

    self.beta = self.add_weight(
        name='beta',
        shape =batch_input_shape[-1],
        initializer = "zeros"
    )
    super().build(batch_input_shape)


  def call(self,inputs):
    
    mean,variance = tf.nn.moments(inputs,axes=-1,keepdims=True)#mean,variance를 구해주는 연산
    return self.alpha * (inputs-mean)/(tf.sqrt(variance+self.eps)) +self.beta

  def compute_output_shape(self,batch_input_shape):
    return batch_input_shape

  
  def get_config(self): #얘를 쓰는 이유는 뭐라고?? --> 모델 저장 시 파라미터까지 함께 저장하고 싶을떄.
    base_config = super().get_config()
    return {**base_config, "eps": self.eps}


c.
문제: 사용자 정의 층이 keras.layers.LayerNormalization 층과 동일한(또는 거의 동일한) 출력을 만드는지 확인하세요.
각 클래스의 객체를 만들고 데이터(예를 들면, 훈련 세트)를 적용해 보죠. 차이는 무시할 수 있는 수준입니다.

In [None]:
X= X_train.astype(np.float32)

custom_layer_norm = LayerNormalization()
keras_layer_norm = keras.layers.LayerNormalization()

tf.reduce_mean(keras.losses.mean_absolute_error(
    keras_layer_norm(X), custom_layer_norm(X)))



#두 값이 가까움. 5.496049e-08 --> 사용자 정의 층과 케라스 층이 비슷한 결과를 냄.

13. 사용자 정의 훈련 반복을 사용해 패션 MNIST 데이터셋으로 모델을 훈련해보세요.

데이터 준비

In [None]:
(X_train_full,y_train_full),(X_test,y_test) = keras.datasets.fashion_mnist.load_data()

X_train_full = X_train_full.astype(np.float32)/255.
X_test = X_test.astype(np.float32)/255.

X_train,X_valid =X_train_full[5000:],X_train_full[:5000]
y_train,y_valid = y_train_full[5000:],y_train_full[:5000]

In [None]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

In [None]:
model = keras.models.Sequential([
                                 keras.layers.Flatten(input_shape=[28,28]),
                                 keras.layers.Dense(100,activation='relu'),
                                 keras.layers.Dense(10,activation='softmax')
])

In [None]:
n_epochs=5
batch_size=32
n_steps = len(X_train)//batch_size
optimizer = keras.optimizers.Nadam(lr=0.01)
loss_fn = keras.losses.sparse_categorical_crossentropy
mean_loss = keras.metrics.Mean()
metrics = [keras.metrics.SparseCategoricalAccuracy)]

In [None]:
from tqdm.notebook import trange
from collections import OrderedDict
#epoch tqdm
with trange(1,n_epochs+1, desc = "All epochs") as epochs:
  for epoch in epochs:
    #step tqdm
    with trange(1,n_steps+1,desc = "Epoch {}/{}".format(epoch,n_epochs)) as steps:
      for step in steps:
        X_batch,y_batch = random_batch(X_train,y_train)
        
        with tf.GradientTape() as tape:
          y_pred = model(X_batch)
          main_loss = tf.reduce_mean(loss_fn(y_batch,y_pred))
          loss = tf.add_n([main_loss]+model.losses)
        gradient = tape.gradient(loss,model.trainable_variables)
        optimizer.apply_gradients(zip(gradient,model.trainable_variables))

        
        #저장
        status =OrderedDict()
        mean_loss(loss)
        status["loss"] = mean_loss.result().numpy()

        for metric in metrics:
          
          metric(y_batch,y_pred)
          status[metric.name] = metric.result().numpy()
        steps.set_postfix(status) # tqdm 기록 해주는 함수인듯
      y_pred = model(X_valid)
      status["val_loss"] =np.mean(loss_fn(y_valid,y_pred))
      status["val_accuracy"] = np.mean(keras.metrics.sparse_categorical_accuracy(
          tf.constant(y_valid,dtype = np.float32), y_pred
      )) 
      steps.set_postfix(status) 
    for metric in [mean_loss] + metrics:
      metric.reset_states()


b.
문제: 상위 층과 하위 층에 학습률이 다른 옵티마이저를 따로 사용해보세요.

In [None]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

하위층 상위층 생성

In [None]:
lower_layers = keras.models.Sequential([
                                        
keras.layers.Flatten(input_shape=[28,28]),
keras.layers.Dense(100,activation='relu')
])

upper_layers = keras.models.Sequential([
                                        
keras.layers.Dense(10,activation='softmax'),

])

model = keras.models.Sequential([
  lower_layers,upper_layers
])

상위층 하위층 옵티마이저 생성

In [None]:
lower_optimizer= keras.optimizers.SGD(lr=1e-4)
upper_optimizer = keras.optimizers.Nadam(lr=1e-3)

In [None]:
n_epochs=5
batch_size=32
n_steps = len(X_train)//batch_size
loss_fn = keras.losses.sparse_categorical_crossentropy
mean_loss = keras.metrics.Mean()
metrics = [keras.metrics.SparseCategoricalAccuracy()]

사용자 정의 훈련 반복

In [None]:
#에폭 tqdm
with trange(1,n_epochs+1, desc="All epochs") as epochs:
  for epoch in epochs:
    with trange(1,n_steps+1, desc = "{}/{}".format(epoch,n_epochs)) as steps:
      for step in steps:
        X_batch,y_batch = random_batch(X_train,y_train)


              #두번 쓰기 위해 persistent = True 설정
        with tf.GradientTape(persistent=True) as tape:
          y_pred = model(X_batch)
          main_loss = tf.reduce_mean(loss_fn(y_batch,y_pred))
          loss = tf.add_n([main_loss]+model.losses)

        #옵티마이저 각각 적용
        for layers,optimizer in ((lower_layers,lower_optimizer),(upper_layers,upper_optimizer)):
          gradients = tape.gradient(loss,layers.trainable_variables)
          optimizer.apply_gradients(zip(gradients,layers.trainable_variables))
        #persistent=True 이므로 수동삭제
        del tape
      

        status = OrderedDict()

        mean_loss(loss)
        status["loss"] = mean_loss.result().numpy()

        for metric in metrics:
          metric(y_batch,y_pred)
          status[metric.name] = metric.result().numpy()
        
        steps.set_postfix(status)
      y_pred = model(X_valid)
      status["val_loss"] = np.mean(keras.metrics.sparse_categorical_accuracy(
          tf.constant(y_valid,dtype=np.float32),y_pred
      ))

      steps.set_postfix(status)

  for metric in [mean_loss] + metrics:
    metric.reset_states()