In [1]:
import numpy as np
import tensorflow as tf
from tensorflow import keras

In [2]:
# SavedModel 형식

def get_model():
    inputs = keras.Input(shape=(32,))
    outputs = keras.layers.Dense(1)(inputs)
    model = keras.Model(inputs, outputs)
    model.compile(optimizer="adam", loss="mean_squared_error")
    return model

model = get_model()

test_input = np.random.random((128, 32))
test_target = np.random.random((128, 1))
model.fit(test_input, test_target)

model.save("my_model")

reconstructed_model = keras.models.load_model("my_model")

# model, reconstructed_model의 추론값을 비교할 때 사용. assert_allclose
np.testing.assert_allclose(
    model.predict(test_input), reconstructed_model.predict(test_input)
)

reconstructed_model.fit(test_input, test_target)

INFO:tensorflow:Assets written to: my_model/assets


<tensorflow.python.keras.callbacks.History at 0x7f7f1a438f70>

## SavedModel에 포함되는 것

### assets
   

### svaed_model.pb
모델 아키텍쳐 및 훈련 구성(옵티마이저, 손실 및 메트릭 포함)이 포함됨.

### variables
가중치가 저장

In [3]:
# SavedModel이 사용자 정의 객체를 처리하는 방법

class CustomModel(keras.Model):
    def __init__(self, hidden_units):
        super(CustomModel, self).__init__()
        self.dense_layers = [keras.layers.Dense(u) for u in hidden_units]
    
    def call(self, inputs):
        x = inputs
        for layer in self.dense_layers:
            x = layer(x)
        return x
    
    
model = CustomModel([16, 16, 10])
input_arr = tf.random.uniform((1, 5))
outputs = model(input_arr)
model.save("my_model")

del CustomModel

loaded = keras.models.load_model("my_model")
np.testing.assert_allclose(loaded(input_arr), outputs)

print("Orinal model:", model)
print("Loaded model:", loaded)

INFO:tensorflow:Assets written to: my_model/assets
Orinal model: <__main__.CustomModel object at 0x7f7f17170c70>
Loaded model: <tensorflow.python.keras.saving.saved_model.load.CustomModel object at 0x7f7f1a4fa310>


In [4]:
model = get_model()

test_input = np.random.random((128, 32))
test_target = np.random.random((128, 1))
model.fit(test_input, test_target)

model.save("my_h5_model.h5")

reconstructed_model = keras.models.load_model("my_h5_model.h5")

np.testing.assert_allclose(
    model.predict(test_input), reconstructed_model.predict(test_input)
)

reconstructed_model.fit(test_input, test_target)



<tensorflow.python.keras.callbacks.History at 0x7f7f1ab75d30>

In [5]:
# 레이어 예제
layer = keras.layers.Dense(3, activation="relu")
layer_config = layer.get_config()
print(layer_config)
new_layer = keras.layers.Dense.from_config(layer_config)

{'name': 'dense_5', 'trainable': True, 'dtype': 'float32', 'units': 3, 'activation': 'relu', 'use_bias': True, 'kernel_initializer': {'class_name': 'GlorotUniform', 'config': {'seed': None}}, 'bias_initializer': {'class_name': 'Zeros', 'config': {}}, 'kernel_regularizer': None, 'bias_regularizer': None, 'activity_regularizer': None, 'kernel_constraint': None, 'bias_constraint': None}


In [6]:
# Sequential 모델 예제
model = keras.Sequential([keras.Input((32, )), keras.layers.Dense(1)])
# 직렬화
config = model.get_config()
# 역직렬화
new_model = keras.Sequential.from_config(config)



In [7]:
# Functional 모델 예제
inputs = keras.Input((32,))
outputs = keras.layers.Dense(1)(inputs)
model = keras.Model(inputs, outputs)
config = model.get_config()
new_model = keras.Model.from_config(config)

In [8]:
# to_json() 모델을 json 문자열로 변환한 다음 기존 모델 클래스 없이 로드할 수 있다. 모델용이며, 레이어용은 아님.
model = keras.Sequential([keras.Input((32, )), keras.layers.Dense(1)])
json_config = model.to_json()
new_model = keras.models.model_from_json(json_config)



In [9]:
# tensorflow 그래프만 로딩하기
'''
이 메서드의 몇가지 단점.
 - 사용하기 어렵다.
사용을 권장하지는 않지만, 사용자 정의 객체의 코드를 잃어버렸거나 tf.keras.models.load_model() 모델을 로드하는데 문제가 있는 경우에 유용.
'''

model.save("my_model")
tensorflow_graph = tf.saved_model.load("my_model")
x = np.random.uniform(size=(4, 32)).astype(np.float32)
predicted = tensorflow_graph(x).numpy()

INFO:tensorflow:Assets written to: my_model/assets


In [10]:
class CustomLayer(keras.layers.Layer):
    def __init__(self, a):
        self.var = tf.Variable(a, name="var_a")
    
    def call(self, inputs, training=False):
        if training:
            return inputs * self.var
        else:
            return inputs
    
    def get_config(self):
        return {"a": self.var.numpy()}
    
    
    @classmethod
    def from_config(cls, config):
        return cls(**config)
    

layer = CustomLayer(5)
layer.var.assign(2)

serialized_layer = keras.layers.serialize(layer)
new_layer = keras.layers.deserialize(
    serialized_layer, custom_objects={"CustomLayer": CustomLayer}
)


In [11]:
# 사용자 정의 클래스 등록. 

class CustomLayer(keras.layers.Layer):
    def __init__(self, units=32, **kwargs):
        super(CustomLayer, self).__init__(**kwargs)
        self.units = units
    
    def build(self, input_shape):
        self.w = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer="random_normal",
            trainable=True
        )
        self.b = self.add_weight(
            shape=(self.units,), initializer="random_normal", trainable=True
        )
        
    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

    def get_config(self):
        config = super(CustomLayer, self).get_config()
        config.update({"units": self.units})
        return config

    
def custom_activation(x):
    return tf.nn.tanh(x) ** 2

inputs = keras.Input((32,))
x = CustomLayer(32)(inputs)
outputs = keras.layers.Activation(custom_activation)(x)
model = keras.Model(inputs, outputs)

config = model.get_config()

custom_objects = {"CustomLayer": CustomLayer, "custom_activation": custom_activation}
with keras.utils.custom_object_scope(custom_objects):
    new_model = keras.Model.from_config(config)

In [12]:
# 메모리에서 레이어 간 가중치 전이하기
def create_layer():
    layer = keras.layers.Dense(64, activation="relu", name="dense_2")
    layer.build((None, 784))
    return layer

layer_1 = create_layer()
layer_2 = create_layer()

layer_2.set_weights(layer_1.get_weights())

In [23]:
# 메모리에서 호환 가능한 아키텍쳐를 사용하여 모델 간 가중치 전이하기.

inputs = keras.Input(shape=(784, ), name="digits")
x = keras.layers.Dense(64, activation="relu", name="dense_1")(inputs)
x = keras.layers.Dense(64, activation="relu", name="dense_2")(x)
outputs = keras.layers.Dense(10, name="predictions")(x)
functional_model = keras.Model(inputs=inputs, outputs=outputs, name="3_layer_mlp")


class SubclassedModel(keras.Model):
    def __init__(self, output_dim, name=None):
        super(SubclassedModel, self).__init__(name=name)
        self.output_dim = output_dim
        self.dense_1 = keras.layers.Dense(64, activation="relu", name="dense_1")
        self.dense_2 = keras.layers.Dense(64, activation="relu", name="dense_2")
        self.dense_3 = keras.layers.Dense(output_dim, name="predictions")
    
    def call(self, inputs):
        x = self.dense_1(inputs)
        x = self.dense_2(x)
        x = self.dense_3(x)
        return x
    
    def get_config(self):
        return {"output_dim": self.output_dim, "name": self.name}

subclassed_model = SubclassedModel(10)
subclassed_model(tf.ones((1, 784)))

subclassed_model.set_weights(functional_model.get_weights())

assert len(functional_model.weights) == len(subclassed_model.weights)
# np.testing.assert_allclose(a, b) : a와 b가 동일하지 않으면 AssertionError를 일으킴 
for a, b in zip(functional_model.weights, subclassed_model.weights):
    np.testing.assert_allclose(a.numpy(), b.numpy())


In [25]:
inputs = keras.Input(shape=(784,), name="digits")
x = keras.layers.Dense(64, activation="relu", name="dense_1")(inputs)
x = keras.layers.Dense(64, activation="relu", name="dense_2")(x)
outputs = keras.layers.Dense(10, name="predictions")(x)
functional_model = keras.Model(inputs=inputs, outputs=outputs, name="3_layer_mlp")

inputs = keras.Input(shape=(784, ), name="digits")
x = keras.layers.Dense(64, activation="relu", name="dense_1")(inputs)
x = keras.layers.Dense(64, activation="relu", name="dense_2")(x)

x = keras.layers.Dropout(0.5)(x)
outputs = keras.layers.Dense(10, name="predictions")(x)
functional_model_with_dropout = keras.Model(
    inputs=inputs, outputs=outputs, name="3_layer_mlp"
)

functional_model_with_dropout.set_weights(functional_model.get_weights())

In [27]:
sequential_model = keras.Sequential(
    [
        keras.Input(shape=(784, ), name="digits"),
        keras.layers.Dense(64, activation="relu", name="dense_1"),
        keras.layers.Dense(64, activation="relu", name="dense_2"),
        keras.layers.Dense(10, name="predictions"),
    ]
)

sequential_model.save_weights("ckpt")
load_status = sequential_model.load_weights("ckpt")

load_status.assert_consumed()



<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x7f7efb612790>

In [29]:
class CustomLayer(keras.layers.Layer):
    def __init__(self, a):
        self.var = tf.Variable(a, name="var_a")

layer = CustomLayer(5)
layer_ckpt = tf.train.Checkpoint(layer=layer).save("custom_layer")

ckpt_reader = tf.train.load_checkpoint(layer_ckpt)

ckpt_reader.get_variable_to_dtype_map()

{'save_counter/.ATTRIBUTES/VARIABLE_VALUE': tf.int64,
 'layer/var/.ATTRIBUTES/VARIABLE_VALUE': tf.int32,
 '_CHECKPOINTABLE_OBJECT_GRAPH': tf.string}

In [42]:
# 전체 아키텍쳐에서 output은 따로임.

inputs = keras.Input(shape=(784,), name="digits")
x = keras.layers.Dense(64, activation="relu", name="dense_1")(inputs)
x = keras.layers.Dense(64, activation="relu", name="dense_2")(x)
outputs = keras.layers.Dense(10, name="predictions")(x)
functional_model = keras.Model(inputs=inputs, outputs=outputs, name="3_layer_mlp")

pretrained = keras.Model(
    functional_model.inputs, functional_model.layers[-1].input, name="pretrainde_model"
)

print(functional_model.inputs)
print(functional_model.layers[-1].input)

for w in pretrained.weights:
    w.assign(tf.random.normal(w.shape))
pretrained.save_weights("pretrained_ckpt")
pretrained.summary()

inputs = keras.Input(shape=(784, ), name="digits")
x = keras.layers.Dense(64, activation="relu", name="dense_1")(inputs)
x = keras.layers.Dense(64, activation="relu", name="dense_2")(x)
outputs = keras.layers.Dense(5, name="predictions")(x)
model = keras.Model(inputs=inputs, outputs=outputs, name="new_model")

model.load_weights("pretrained_ckpt")

for a, b in zip(pretrained.weights, model.weights):
    np.testing.assert_allclose(a.numpy(), b.numpy())
    
print("\n", "-" * 50)
model.summary()

inputs = keras.Input(shape=(784,), name='digits')
x = keras.layers.Dense(64, activation="relu", name='dense_1')(inputs)
x = keras.layers.Dense(64, activation="relu", name="dense_2")(x)
pretrained_model = keras.Model(inputs=inputs, outputs=x, name="pretrained")

model = keras.Sequential([pretrained_model, keras.layers.Dense(5, name="predictions")])
model.summary()

pretrained_model.load_weights("pretrained_ckpt")

[<KerasTensor: shape=(None, 784) dtype=float32 (created by layer 'digits')>]
KerasTensor(type_spec=TensorSpec(shape=(None, 64), dtype=tf.float32, name=None), name='dense_2/Relu:0', description="created by layer 'dense_2'")
Model: "pretrainde_model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
digits (InputLayer)          [(None, 784)]             0         
_________________________________________________________________
dense_1 (Dense)              (None, 64)                50240     
_________________________________________________________________
dense_2 (Dense)              (None, 64)                4160      
Total params: 54,400
Trainable params: 54,400
Non-trainable params: 0
_________________________________________________________________

 --------------------------------------------------
Model: "new_model"
_________________________________________________________________
Layer (type)   

<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x7f7efc41ac10>

In [45]:
first_dense = functional_model.layers[1]
last_dense = functional_model.layers[-1]
ckpt_path = tf.train.Checkpoint(
    dense=first_dense, kernel=last_dense.kernel, bias=last_dense.bias
).save("ckpt")

class ContrivedModel(keras.Model):
    def __init__(self):
        super(ContrivedModel, self).__init__()
        self.first_dense = keras.layers.Dense(64)
        self.kernel = self.add_variable("kernel", shape=(64, 10))
        self.bias = self.add_variable("bias", shape=(10, ))
    
    def call(self, inputs):
        x = self.first_dense(inputs)
        return tf.matmul(x, self.kernel) + self.bias

model = ContrivedModel()
_ = model(tf.ones((1, 784)))

tf.train.Checkpoint(
    dense=model.first_dense, kernel=model.kernel, bias=model.bias
).restore(ckpt_path).assert_consumed()

<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x7f7f19580220>

In [46]:
sequential_model = keras.Sequential(
    [
        keras.Input(shape=(784, ), name="digits"),
        keras.layers.Dense(64, activation="relu", name="dense_1"),
        keras.layers.Dense(64, activation="relu", name="dense_2"),
        keras.layers.Dense(10, name="predictions"),
    ]
)
sequential_model.save_weights("weights.h5")
sequential_model.load_weights("weights.h5")



In [47]:
class NestedDenseLayer(keras.layers.Layer):
    def __init__(self, units, name=None):
        super(NestedDenseLayer, self).__init__(name=name)
        self.dense_1 = keras.layers.Dense(units, name="dense_1")
        self.dense_2 = keras.layers.Dense(units, name="dense_2") 
    
    def call(self, inputs):
        return self.dense_2(self.dense_1(inputs))
    
nested_model = keras.Sequential([keras.Input((784, )), NestedDenseLayer(10, "nested")])
variable_names = [v.name for v in nested_model.weights]
print("variables: {}".format(variable_names))

print("\nChanging trainable status of one of the nested layers...")
nested_model.get_layer("nested").dense_1.trainable=False

variable_names_2 = [v.name for v in nested_model.weights]
print("\nvariables: {}".format(variable_names_2))
print("variable ordering changed:", variable_names != variable_names_2)

variables: ['nested/dense_1/kernel:0', 'nested/dense_1/bias:0', 'nested/dense_2/kernel:0', 'nested/dense_2/bias:0']

Changing trainable status of one of the nested layers...

variables: ['nested/dense_2/kernel:0', 'nested/dense_2/bias:0', 'nested/dense_1/kernel:0', 'nested/dense_1/bias:0']
variable ordering changed: True


In [51]:
def create_functional_model():
    inputs = keras.Input(shape=(784,), name="digits")
    x = keras.layers.Dense(64, activation="relu", name="dense_1")(inputs)
    x = keras.layers.Dense(64, activation="relu", name="dense_2")(x)
    outputs = keras.layers.Dense(10, name="predictions")(x)
    return keras.Model(inputs=inputs, outputs=outputs, name="3_layer_mlp")

functional_model = create_functional_model()
functional_model.save_weights("pretrained_weights.h5")

pretrained_model = create_functional_model()
pretrained_model.load_weights("pretrained_weights.h5")

extracted_layers = pretrained_model.layers[:-1]
extracted_layers.append(keras.layers.Dense(5, name="dense_3"))
model = keras.Sequential(extracted_layers)
model.summary()


Model: "sequential_13"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense_1 (Dense)              (None, 64)                50240     
_________________________________________________________________
dense_2 (Dense)              (None, 64)                4160      
_________________________________________________________________
dense_3 (Dense)              (None, 5)                 325       
Total params: 54,725
Trainable params: 54,725
Non-trainable params: 0
_________________________________________________________________
