In [2]:
import io

import tensorflow as tf
from keras import Sequential, layers, metrics, optimizers
from matplotlib import pyplot as plt
from tensorflow import keras

(x, y), (x_test, y_test) = keras.datasets.fashion_mnist.load_data() # 返回numpy
print(f"x shape:{x.shape}, y shape:{y.shape}")

def preprocess(x, y):

    x = tf.cast(x, dtype=tf.float32) / 255.
    x = tf.reshape(x, [28*28])
    y = tf.cast(y, dtype=tf.int32)
    y = tf.one_hot(y, depth=10)
    return x,y

batchsz = 128
db = tf.data.Dataset.from_tensor_slices((x,y))
db = db.map(preprocess).shuffle(10000).batch(batchsz)
db_iter = iter(db)
sample = next(db_iter)
print('batch:', sample[0].shape, sample[1].shape)

db_test = tf.data.Dataset.from_tensor_slices((x_test,y_test))
db_test = db_test.map(preprocess).batch(batchsz)

x shape:(60000, 28, 28), y shape:(60000,)
batch: (128, 784) (128, 10)


In [3]:
x = tf.random.normal([4, 784])
# Dense是全连接层
net = keras.layers.Dense(512) # 声明时并不会创建，在build时才会，因为不知道input的shape
# net.bias, net.weights  空的
net.build(input_shape=(None, 784))# 指定最后一维即可
net.bias, net.weights # 有值了
out = net(x)

out.shape

# w b
net.kernel.shape, net.bias.shape

(TensorShape([784, 512]), TensorShape([512]))

In [4]:
model = Sequential([
    layers.Dense(256, activation=tf.nn.relu), # [b, 784] => [b, 256]
    layers.Dense(128, activation=tf.nn.relu), # [b, 256] => [b, 128]
    layers.Dense(64, activation=tf.nn.relu), # [b, 128] => [b, 64]
    layers.Dense(32, activation=tf.nn.relu), # [b, 64] => [b, 32]
    layers.Dense(10) # [b, 32] => [b, 10], 330 = 32*10 + 10
])
model.build(input_shape=[None, 28*28])
model.summary()
# same to model.summary()
# 可以遍历到所有的参数
for p in model.trainable_variables:
	print(p.name, p.shape)
# w = w - lr*grad
optimizer = optimizers.Adam(learning_rate=1e-3)


Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_1 (Dense)             (None, 256)               200960    
                                                                 
 dense_2 (Dense)             (None, 128)               32896     
                                                                 
 dense_3 (Dense)             (None, 64)                8256      
                                                                 
 dense_4 (Dense)             (None, 32)                2080      
                                                                 
 dense_5 (Dense)             (None, 10)                330       
                                                                 
Total params: 244,522
Trainable params: 244,522
Non-trainable params: 0
_________________________________________________________________
dense_1/kernel:0 (784, 256)
dense_1/bias:0 (256,)


In [5]:
# metrics使用
# 1. build meter
acc_meter = metrics.Accuracy()
loss_meter = metrics.Mean()



In [6]:
for epoch in range(5):


    for step, (x,y) in enumerate(db):

        # x: [b, 28, 28] => [b, 784]
        # y: [b]

        with tf.GradientTape() as tape:
            # [b, 784] => [b, 10]
            logits = model(x)
            # [b]
            loss_mse = tf.reduce_mean(tf.losses.MSE(y, logits))
            loss_ce = tf.losses.categorical_crossentropy(y, logits, from_logits=True)
            loss_ce = tf.reduce_mean(loss_ce)
            # 2. metrics使用 update data
            loss_meter.update_state(loss_ce)

        grads = tape.gradient(loss_ce, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))


        if step % 100 == 0:
            print(epoch, step, 'loss:', float(loss_ce), float(loss_mse), loss_meter.result().numpy())
            # 3. metrics使用 重置states
            loss_meter.reset_states()


    # test
    total_correct = 0
    total_num = 0
    # 3. metrics使用 重置states
    acc_meter.reset_states()
    for x,y in db_test:
        # [b, 10]
        logits = model(x)
        # logits => prob, [b, 10]
        prob = tf.nn.softmax(logits, axis=1)
        # [b, 10] => [b], int64
        pred = tf.argmax(prob, axis=1)
        pred = tf.cast(pred, dtype=tf.int32)
        # pred:[b]
        # y: [b]
        # correct: [b], True: equal, False: not equal
        y = tf.argmax(y, axis=1)
        y = tf.cast(y, dtype=tf.int32)
        correct = tf.equal(pred, y)
        correct = tf.reduce_sum(tf.cast(correct, dtype=tf.int32))

        total_correct += int(correct)
        total_num += x.shape[0]
        acc_meter.update_state(y, pred)

    acc = total_correct / total_num
    print(epoch, 'test acc:', acc, acc_meter.result().numpy())

0 0 loss: 2.3338937759399414 0.17064492404460907 2.3338938
0 100 loss: 0.46646273136138916 23.90788459777832 0.80284345
0 200 loss: 0.5061992406845093 25.445873260498047 0.51248395
0 300 loss: 0.43094614148139954 27.841815948486328 0.46819782
0 400 loss: 0.6341909766197205 26.187862396240234 0.4508484
0 test acc: 0.8465 0.8465
1 0 loss: 0.38558927178382874 28.730772018432617 0.42150962
1 100 loss: 0.4585285484790802 29.783458709716797 0.3896769
1 200 loss: 0.3725021183490753 31.84864044189453 0.37938073
1 300 loss: 0.2814905047416687 25.101472854614258 0.37032977
1 400 loss: 0.3613806366920471 27.357162475585938 0.3632713
1 test acc: 0.8608 0.8608
2 0 loss: 0.27347150444984436 30.221534729003906 0.3673601
2 100 loss: 0.36720848083496094 36.19776916503906 0.3359236
2 200 loss: 0.3515879511833191 31.31053352355957 0.3502842
2 300 loss: 0.3477889597415924 38.70899200439453 0.32736564
2 400 loss: 0.285636842250824 30.92763328552246 0.3302698
2 test acc: 0.8761 0.8761
3 0 loss: 0.2768584787

In [7]:
# 使用compile和fit进行训练
model.compile(optimizer=optimizers.Adam(learning_rate=1e-3),
    loss=tf.losses.CategoricalCrossentropy(from_logits=True),
    metrics=['accuracy']
    )
# 同时设置测试数据集，指定验证频率，每2个epoch做一次validation
# model内部会自动调用每一层的call  model.__call__() -> layer.call()
model.fit(db, epochs=6, validation_data=db_test, validation_freq=2)

Epoch 1/6
Epoch 2/6
Epoch 3/6
Epoch 4/6
Epoch 5/6
Epoch 6/6


<keras.callbacks.History at 0x1a28ab9a1c0>

In [8]:
# 测试  这和上面fit时指定validation是一样的，但是fit指定validation可以让fit提前结束
model.evaluate(db_test)

sample = next(iter(db_test))
x = sample[0]
y = sample[1] # one-hot
# model(x)与model.predict(x)意思一样
pred = model.predict(x) # [b, 10]
# convert back to number 
y = tf.argmax(y, axis=1)
pred = tf.argmax(pred, axis=1)

print(pred)
print(y)

tf.Tensor(
[9 2 1 1 6 1 4 6 5 7 4 5 5 3 4 1 2 2 8 0 2 5 7 5 1 2 6 0 9 3 8 8 3 3 8 0 7
 5 7 9 0 1 6 7 6 7 2 1 2 6 4 2 5 8 2 2 8 6 8 0 7 7 8 5 1 1 6 4 7 8 7 0 2 6
 2 3 1 2 8 4 1 8 5 9 5 0 3 2 0 2 5 3 6 7 1 8 0 1 2 2 3 6 7 2 7 8 5 7 9 4 2
 5 7 0 5 2 8 6 7 8 0 0 9 9 3 0 8 2], shape=(128,), dtype=int64)
tf.Tensor(
[9 2 1 1 6 1 4 6 5 7 4 5 7 3 4 1 2 4 8 0 2 5 7 9 1 4 6 0 9 3 8 8 3 3 8 0 7
 5 7 9 6 1 3 7 6 7 2 1 2 2 4 4 5 8 2 2 8 4 8 0 7 7 8 5 1 1 2 3 9 8 7 0 2 6
 2 3 1 2 8 4 1 8 5 9 5 0 3 2 0 6 5 3 6 7 1 8 0 1 4 2 3 6 7 2 7 8 5 9 9 4 2
 5 7 0 5 2 8 6 7 8 0 0 9 9 3 0 8 4], shape=(128,), dtype=int64)


In [23]:
# custom layer/model
# keras.Model与keras.layers.layer类似，都需要继承父类，覆盖__init__, call方法。Model多出了compile/fit/evaluate/predict函数

# Custom layer
class MyDense(layers.Layer):

	def __init__(self, inp_dim, outp_dim):
		super(MyDense, self).__init__() # 必须写
		self.inp_dim = inp_dim
		self.outp_dim = outp_dim
        # w, b自定义，注意由于框架集成原因，kernel，bias定义时不能用tf.variable等直接定义
		self.kernel = self.add_weight('w', [inp_dim, outp_dim])
		self.bias = self.add_weight('b', [outp_dim])

	def call(self, inputs, training=None):

		out = inputs @ self.kernel + self.bias

		return out 

	def get_config(self):
		# have to define get_config to be able to use model_from_json
		config = {
			'inp_dim': self.inp_dim,
			'outp_dim': self.outp_dim
		}
		base_config = super().get_config()
		return dict(list(base_config.items()) + list(config.items()))

# custom model
class MyModel(keras.Model):

	def __init__(self):
		super(MyModel, self).__init__()
        # 构建model结构
		self.fc1 = MyDense(28*28, 256)
		self.fc2 = MyDense(256, 128)
		self.fc3 = MyDense(128, 64)
		self.fc4 = MyDense(64, 32)
		self.fc5 = MyDense(32, 10)

	def call(self, inputs, training=None):
        # 实现x在layers间的传递过程
		x = self.fc1(inputs)
		x = tf.nn.relu(x)
		x = self.fc2(x)
		x = tf.nn.relu(x)
		x = self.fc3(x)
		x = tf.nn.relu(x)
		x = self.fc4(x)
		x = tf.nn.relu(x)
		x = self.fc5(x) 

		return x

In [10]:
model = MyModel()
model.compile(optimizer=optimizers.Adam(learning_rate=1e-3),
    loss=tf.losses.CategoricalCrossentropy(from_logits=True),
    metrics=['accuracy']
    )
# 同时设置测试数据集，指定验证频率，每2个epoch做一次validation
# model内部会自动调用每一层的call  model.__call__() -> layer.call()
model.fit(db, epochs=6, validation_data=db_test, validation_freq=2)

Epoch 1/6
Epoch 2/6
Epoch 3/6
Epoch 4/6
Epoch 5/6
Epoch 6/6


<keras.callbacks.History at 0x1a28abcd910>

In [14]:
# 模型保存与加载  遵循ONNX标准
# 权重保存与加载
import os
import shutil

dir_path = 'weights/fashion_minist'
if os.path.exists(dir_path):
    shutil.rmtree(dir_path)
# os.makedirs(dir_path)

weight_path = os.path.join(dir_path, 'weights.ckpt')
model.save_weights(weight_path)
model.evaluate(db_test)
print('saved weights.')
del model

model = MyModel()
model.compile(optimizer=optimizers.Adam(learning_rate=1e-3),
    loss=tf.losses.CategoricalCrossentropy(from_logits=True),
    metrics=['accuracy']
    )
model.load_weights(weight_path)
print('loaded weights!')
model.evaluate(db_test)


saved weights.
loaded weights!


[0.33221739530563354, 0.881600022315979]

In [33]:
# 尝试使用自定义layer

class MyLayer(layers.Layer):

	def __init__(self, inp_dim, outp_dim, **kwargs):
		super(MyLayer, self).__init__() # 必须写
		self.inp_dim = inp_dim
		self.outp_dim = outp_dim
        # w, b自定义，注意由于框架集成原因，kernel，bias定义时不能用tf.variable等直接定义
		self.kernel = self.add_weight('w', [inp_dim, outp_dim])
		self.bias = self.add_weight('b', [outp_dim])

	def call(self, inputs, training=None):

		out = inputs @ self.kernel + self.bias

		return out 

	def get_config(self):
		# have to define get_config to be able to use model_from_json
		config = {
			'inp_dim': self.inp_dim,
			'outp_dim': self.outp_dim
		}
		base_config = super().get_config()
		return dict(list(base_config.items()) + list(config.items()))

model = Sequential([
    MyLayer(28*28, 256),
    layers.ReLU(),
    # layers.Dense(256, activation=tf.nn.relu), # [b, 784] => [b, 256]
    layers.Dense(128, activation=tf.nn.relu), # [b, 256] => [b, 128]
    layers.Dense(64, activation=tf.nn.relu), # [b, 128] => [b, 64]
    layers.Dense(32, activation=tf.nn.relu), # [b, 64] => [b, 32]
    layers.Dense(10) # [b, 32] => [b, 10], 330 = 32*10 + 10
])

# 使用compile和fit进行训练
model.compile(optimizer=optimizers.Adam(learning_rate=1e-3),
    loss=tf.losses.CategoricalCrossentropy(from_logits=True),
    metrics=['accuracy']
    )
# 同时设置测试数据集，指定验证频率，每2个epoch做一次validation
# model内部会自动调用每一层的call  model.__call__() -> layer.call()
model.fit(db, epochs=6, validation_data=db_test, validation_freq=2)

Epoch 1/6
Epoch 2/6
Epoch 3/6
Epoch 4/6
Epoch 5/6
Epoch 6/6


<keras.callbacks.History at 0x1a29a2f0dc0>

In [34]:
import os
import shutil

dir_path = 'models/fashion_minist'
if os.path.exists(dir_path):
    shutil.rmtree(dir_path)
# os.makedirs(dir_path)

model_path = os.path.join(dir_path, 'model.h5')
model.save(model_path) # 默认h5格式不支持自定义model, 可以保存weights或者指定tf格式:save_format="tf"
print('saved total model.')
del model

print('loaded model from file.')
model = tf.keras.models.load_model(model_path, compile=False, custom_objects={'MyLayer': MyLayer})
model.compile(optimizer=optimizers.Adam(learning_rate=1e-3),
    loss=tf.losses.CategoricalCrossentropy(from_logits=True),
    metrics=['accuracy']
    )
model.evaluate(db_test)

saved total model.
loaded model from file.


[0.34546780586242676, 0.8756999969482422]