In [1]:
import tensorflow as tf
import numpy as np

In [2]:
class MyModel(tf.keras.Model):
    def __init__(self):
        super().__init__()     # Python 2 下使用 super(MyModel, self).__init__()
        # 此處添加初始化程式碼（包含 call 方法中會用到的層），例如
        # layer1 = tf.keras.layers.BuiltInLayer(...)
        # layer2 = MyCustomLayer(...)

    def call(self, input):
        # 此處添加模型呼叫的程式碼（處理輸入並返回輸出），例如
        # x = layer1(input)
        # output = layer2(x)
        return output

    # 還可以添加自定義的方法

In [3]:
class A:
     def add(self, x):
         y = x+1
         print(y)
class B(A):
    def add(self, x):
        super().add(x)
b = B()
b.add(2)  # 3

3


# linear regression

In [4]:
import tensorflow as tf

X = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
y = tf.constant([[10.0], [20.0]])


class Linear(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.dense = tf.keras.layers.Dense(
            units=1,
            activation=None,
            kernel_initializer=tf.zeros_initializer(),
            bias_initializer=tf.zeros_initializer()
        )

    def call(self, input):
        output = self.dense(input)
        return output


# 以下程式碼結構與前一節類似
model = Linear()
optimizer = tf.keras.optimizers.SGD(lr=0.01)
for i in range(100):
    with tf.GradientTape() as tape:
        y_pred = model(X)      # 呼叫模型 y_pred = model(X) 而不是顯式寫出 y_pred = a * X + b
        loss = tf.reduce_mean(tf.square(y_pred - y))
    grads = tape.gradient(loss, model.variables)    # 使用 model.variables 這一屬性直接獲得模型中的所有變數
    optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))
print(model.variables)

[<tf.Variable 'linear/dense/kernel:0' shape=(3, 1) dtype=float32, numpy=
array([[0.40784496],
       [1.191065  ],
       [1.9742855 ]], dtype=float32)>, <tf.Variable 'linear/dense/bias:0' shape=(1,) dtype=float32, numpy=array([0.78322077], dtype=float32)>]


# Multilayer Perceptron, MLP

In [5]:
class MNISTLoader():
    def __init__(self):
        mnist = tf.keras.datasets.mnist
        (self.train_data, self.train_label), (self.test_data, self.test_label) = mnist.load_data()
        # MNIST中的圖片預設為uint8（0-255的數字）。以下程式碼將其正規化到0-1之間的浮點數，並在最後增加一維作為顏色通道
        self.train_data = np.expand_dims(self.train_data.astype(np.float32) / 255.0, axis=-1)      # [60000, 28, 28, 1]
        self.test_data = np.expand_dims(self.test_data.astype(np.float32) / 255.0, axis=-1)        # [10000, 28, 28, 1]
        self.train_label = self.train_label.astype(np.int32)    # [60000]
        self.test_label = self.test_label.astype(np.int32)      # [10000]
        self.num_train_data, self.num_test_data = self.train_data.shape[0], self.test_data.shape[0]

    def get_batch(self, batch_size):
        # 從資料集中隨機取出batch_size個元素並返回
        index = np.random.randint(0, self.num_train_data, batch_size)
        return self.train_data[index, :], self.train_label[index]

In [6]:
class MLP(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.flatten = tf.keras.layers.Flatten()    # Flatten層將除第一維（batch_size）以外的維度展平
        self.dense1 = tf.keras.layers.Dense(units=100, activation=tf.nn.relu)
        self.dense2 = tf.keras.layers.Dense(units=10)

    def call(self, inputs):         # [batch_size, 28, 28, 1]
        x = self.flatten(inputs)    # [batch_size, 784]
        x = self.dense1(x)          # [batch_size, 100]
        x = self.dense2(x)          # [batch_size, 10]
        output = tf.nn.softmax(x)
        return output

In [7]:
num_epochs = 5
batch_size = 50
learning_rate = 0.001

In [8]:
model = MLP()
data_loader = MNISTLoader()
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

In [9]:
    num_batches = int(data_loader.num_train_data // batch_size * num_epochs)
    for batch_index in range(num_batches):
        X, y = data_loader.get_batch(batch_size)
        with tf.GradientTape() as tape:
            y_pred = model(X)
            loss = tf.keras.losses.sparse_categorical_crossentropy(y_true=y, y_pred=y_pred)
            loss = tf.reduce_mean(loss)
            print("batch %d: loss %f" % (batch_index, loss.numpy()))
        grads = tape.gradient(loss, model.variables)
        optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))

batch 0: loss 2.402627
batch 1: loss 2.376355
batch 2: loss 2.239973
batch 3: loss 2.168834
batch 4: loss 2.037308
batch 5: loss 1.987093
batch 6: loss 1.977174
batch 7: loss 1.900226
batch 8: loss 1.809184
batch 9: loss 1.937359
batch 10: loss 1.800917
batch 11: loss 1.648825
batch 12: loss 1.529788
batch 13: loss 1.624029
batch 14: loss 1.635276
batch 15: loss 1.350808
batch 16: loss 1.574706
batch 17: loss 1.301706
batch 18: loss 1.341482
batch 19: loss 1.277082
batch 20: loss 1.414743
batch 21: loss 1.206796
batch 22: loss 1.226705
batch 23: loss 1.254171
batch 24: loss 1.220646
batch 25: loss 1.146522
batch 26: loss 1.004949
batch 27: loss 1.026992
batch 28: loss 1.027799
batch 29: loss 1.015100
batch 30: loss 1.072310
batch 31: loss 0.955938
batch 32: loss 0.873975
batch 33: loss 0.872546
batch 34: loss 0.923194
batch 35: loss 0.848913
batch 36: loss 0.818888
batch 37: loss 0.835473
batch 38: loss 0.845701
batch 39: loss 0.644325
batch 40: loss 0.603325
batch 41: loss 0.809062
ba

模型的評估： tf.keras.metrics
最後，我們使用測試集評估模型的性能。這裡，我們使用 tf.keras.metrics 中的 SparseCategoricalAccuracy 評量器來評估模型在測試集上的性能，該評量器能夠對模型預測的結果與真實結果進行比較，並輸出預測正確的樣本數占總樣本數的比例。我們疊代測試資料集，每次通過 update_state() 方法向評量器輸入兩個參數： y_pred 和 y_true ，即模型預測出的結果和真實結果。評量器具有內部變數來保存當前評估指標相關的參數數值（例如當前已傳入的累計樣本數和當前預測正確的樣本數）。疊代結束後，我們使用 result() 方法輸出最終的評量指標值（預測正確的樣本數占總樣本數的比例）。

在以下評量器程式碼中，我們提出了一個實例 tf.keras.metrics.SparseCategoricalAccuracy，並使用 For 循環疊代分批次傳入了測試集資料的預測結果與真實結果，並輸出訓練後的模型在測試資料集上的準確率。

In [10]:
    sparse_categorical_accuracy = tf.keras.metrics.SparseCategoricalAccuracy()
    num_batches = int(data_loader.num_test_data // batch_size)
    for batch_index in range(num_batches):
        start_index, end_index = batch_index * batch_size, (batch_index + 1) * batch_size
        y_pred = model.predict(data_loader.test_data[start_index: end_index])
        sparse_categorical_accuracy.update_state(y_true=data_loader.test_label[start_index: end_index], y_pred=y_pred)
    print("test accuracy: %f" % sparse_categorical_accuracy.result())

test accuracy: 0.974000


# 卷積神經網路（CNN）


In [11]:
class CNN(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.conv1 = tf.keras.layers.Conv2D(
            filters=32,             # 卷積層神經元（卷積核）數目
            kernel_size=[5, 5],     # 接受區的大小
            padding='same',         # padding策略（vaild 或 same）
            activation=tf.nn.relu   # 激活函数
        )
        self.pool1 = tf.keras.layers.MaxPool2D(pool_size=[2, 2], strides=2)
        self.conv2 = tf.keras.layers.Conv2D(
            filters=64,
            kernel_size=[5, 5],
            padding='same',
            activation=tf.nn.relu
        )
        self.pool2 = tf.keras.layers.MaxPool2D(pool_size=[2, 2], strides=2)
        self.flatten = tf.keras.layers.Reshape(target_shape=(7 * 7 * 64,))
        self.dense1 = tf.keras.layers.Dense(units=1024, activation=tf.nn.relu)
        self.dense2 = tf.keras.layers.Dense(units=10)

    def call(self, inputs):
        x = self.conv1(inputs)                  # [batch_size, 28, 28, 32]
        x = self.pool1(x)                       # [batch_size, 14, 14, 32]
        x = self.conv2(x)                       # [batch_size, 14, 14, 64]
        x = self.pool2(x)                       # [batch_size, 7, 7, 64]
        x = self.flatten(x)                     # [batch_size, 7 * 7 * 64]
        x = self.dense1(x)                      # [batch_size, 1024]
        x = self.dense2(x)                      # [batch_size, 10]
        output = tf.nn.softmax(x)
        return output

In [12]:
num_epochs = 5
batch_size = 50
learning_rate = 0.001

model = CNN()
data_loader = MNISTLoader()
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

num_batches = int(data_loader.num_train_data // batch_size * num_epochs)
for batch_index in range(num_batches):
    X, y = data_loader.get_batch(batch_size)
    with tf.GradientTape() as tape:
        y_pred = model(X)
        loss = tf.keras.losses.sparse_categorical_crossentropy(y_true=y, y_pred=y_pred)
        loss = tf.reduce_mean(loss)
        print("batch %d: loss %f" % (batch_index, loss.numpy()))
    grads = tape.gradient(loss, model.variables)
    optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))

batch 0: loss 2.301937
batch 1: loss 2.222834
batch 2: loss 1.975661
batch 3: loss 1.982270
batch 4: loss 1.738940
batch 5: loss 1.592233
batch 6: loss 1.353394
batch 7: loss 1.416510
batch 8: loss 1.063882
batch 9: loss 0.811517
batch 10: loss 0.917445
batch 11: loss 0.622289
batch 12: loss 0.894822
batch 13: loss 0.627155
batch 14: loss 0.578612
batch 15: loss 0.482645
batch 16: loss 0.422136
batch 17: loss 0.427089
batch 18: loss 0.404176
batch 19: loss 0.491314
batch 20: loss 0.450208
batch 21: loss 0.590942
batch 22: loss 0.461819
batch 23: loss 0.648759
batch 24: loss 0.450375
batch 25: loss 0.508517
batch 26: loss 0.560419
batch 27: loss 0.187707
batch 28: loss 0.291256
batch 29: loss 0.611106
batch 30: loss 0.593267
batch 31: loss 0.287954
batch 32: loss 0.425068
batch 33: loss 0.464779
batch 34: loss 0.278601
batch 35: loss 0.442968
batch 36: loss 0.399355
batch 37: loss 0.264028
batch 38: loss 0.257739
batch 39: loss 0.261392
batch 40: loss 0.222210
batch 41: loss 0.399148
ba

In [9]:
    sparse_categorical_accuracy = tf.keras.metrics.SparseCategoricalAccuracy()
    num_batches = int(data_loader.num_test_data // batch_size)
    for batch_index in range(num_batches):
        start_index, end_index = batch_index * batch_size, (batch_index + 1) * batch_size
        y_pred = model.predict(data_loader.test_data[start_index: end_index])
        sparse_categorical_accuracy.update_state(y_true=data_loader.test_label[start_index: end_index], y_pred=y_pred)
    print("test accuracy: %f" % sparse_categorical_accuracy.result())

test accuracy: 0.992900


# 使用 Keras 中預定義的典型卷積神經網路結構
tf.keras.applications 中有一些預定義好的典型卷積神經網路結構，如 VGG16 、 VGG19 、 ResNet 、 MobileNet 等。我們可以直接呼叫這些典型的卷積神經網路結構（甚至載入預訓練的參數），而無需手動定義網路結構。

例如，我們可以使用以下代碼來實例化一個 MobileNetV2 網路結構：

以下展示一個例子，使用 MobileNetV2 網路在 tf_flowers 五種分類數據集上進行訓練（為了程式碼的簡短高效，在該範例中我們使用了 TensorFlow Datasets 和 tf.data 載入和預處理資料）。通過將 weights 設置為 None ，我們隨機初始化變數而不使用預訓練權重值。同時將 classes 設置為 5，對應於 5 種分類的資料集。

In [5]:
import tensorflow as tf
import tensorflow_datasets as tfds


Virtual devices cannot be modified after being initialized


In [10]:

num_epoch = 5
batch_size = 20
learning_rate = 0.001

dataset = tfds.load("tf_flowers", split=tfds.Split.TRAIN, as_supervised=True)
dataset = dataset.map(lambda img, label: (tf.image.resize(img, (224, 224)) / 255.0, label)).shuffle(1024).batch(batch_size)
model = tf.keras.applications.MobileNetV2(weights=None, classes=5)
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)


In [11]:


for e in range(num_epoch):
    for images, labels in dataset:
        with tf.GradientTape() as tape:
            labels_pred = model(images, training=True)
            loss = tf.keras.losses.sparse_categorical_crossentropy(y_true=labels, y_pred=labels_pred)
            loss = tf.reduce_mean(loss)
            print("loss %f" % loss.numpy())
            
            
        grads = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(grads_and_vars=zip(grads, model.trainable_variables))
        
        
    print(labels_pred)

ResourceExhaustedError: OOM when allocating tensor with shape[20,14,14,576] and type float on /job:localhost/replica:0/task:0/device:GPU:0 by allocator GPU_0_bfc [Op:Conv2D]

# 循環神經網路（RNN）
循環神經網路（Recurrent Neural Network, RNN）是一種適宜於處理序列資料的神經網路，被廣泛用於語言模型、文本生成、機器翻譯等

In [17]:
import tensorflow as tf
import numpy as np

class DataLoader():
    def __init__(self):
        path = tf.keras.utils.get_file('nietzsche.txt',
            origin='https://s3.amazonaws.com/text-datasets/nietzsche.txt')
        with open(path, encoding='utf-8') as f:
            self.raw_text = f.read().lower()
        self.chars = sorted(list(set(self.raw_text)))
        self.char_indices = dict((c, i) for i, c in enumerate(self.chars))
        self.indices_char = dict((i, c) for i, c in enumerate(self.chars))
        self.text = [self.char_indices[c] for c in self.raw_text]

    def get_batch(self, seq_length, batch_size):
        seq = []
        next_char = []
        for i in range(batch_size):
            index = np.random.randint(0, len(self.text) - seq_length)
            seq.append(self.text[index:index+seq_length])
            next_char.append(self.text[index+seq_length])
        return np.array(seq), np.array(next_char)       # [batch_size, seq_length], [num_batch]

In [18]:
class RNN(tf.keras.Model):
    def __init__(self, num_chars, batch_size, seq_length):
        super().__init__()
        self.num_chars = num_chars
        self.seq_length = seq_length
        self.batch_size = batch_size
        self.cell = tf.keras.layers.LSTMCell(units=256)
        self.dense = tf.keras.layers.Dense(units=self.num_chars)

    def call(self, inputs, from_logits=False):
        inputs = tf.one_hot(inputs, depth=self.num_chars)       # [batch_size, seq_length, num_chars]
        state = self.cell.get_initial_state(batch_size=self.batch_size, dtype=tf.float32)   # 获得 RNN 的初始状态
        for t in range(self.seq_length):
            output, state = self.cell(inputs[:, t, :], state)   # 通过当前输入和前一时刻的状态，得到输出和当前时刻的状态
        logits = self.dense(output)
        if from_logits:                     # from_logits 参数控制输出是否通过 softmax 函数进行归一化
            return logits
        else:
            return tf.nn.softmax(logits)

    def predict(self, inputs, temperature=1.):
        batch_size, _ = tf.shape(inputs)
        logits = self(inputs, from_logits=True)                         # 调用训练好的RNN模型，预测下一个字符的概率分布
        prob = tf.nn.softmax(logits / temperature).numpy()              # 使用带 temperature 参数的 softmax 函数获得归一化的概率分布值
        return np.array([np.random.choice(self.num_chars, p=prob[i, :]) # 使用 np.random.choice 函数，
                         for i in range(batch_size.numpy())])           # 在预测的概率分布 prob 上进行随机取样

In [8]:
num_batches = 1000
seq_length = 40
batch_size = 50
learning_rate = 1e-3

訓練過程與前節基本一致，在此不再贅述：

從 DataLoader 中隨機取一批訓練資料；

將這批資料送入模型，計算出模型的預測值；

將模型預測值與真實值進行比較，計算損失函數（loss）；

計算損失函數關於模型變數的導數；

使用優化器更新模型參數以最小化損失函數。

In [9]:
data_loader = DataLoader()
model = RNN(num_chars=len(data_loader.chars), batch_size=batch_size, seq_length=seq_length)
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
for batch_index in range(num_batches):
    X, y = data_loader.get_batch(seq_length, batch_size)
    with tf.GradientTape() as tape:
        y_pred = model(X)
        loss = tf.keras.losses.sparse_categorical_crossentropy(y_true=y, y_pred=y_pred)
        loss = tf.reduce_mean(loss)
        print("batch %d: loss %f" % (batch_index, loss.numpy()))
    grads = tape.gradient(loss, model.variables)
    optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))

batch 0: loss 4.036938
batch 1: loss 4.020241
batch 2: loss 3.995863
batch 3: loss 3.979443
batch 4: loss 3.933723
batch 5: loss 3.844876
batch 6: loss 3.720203
batch 7: loss 3.147954
batch 8: loss 3.681298
batch 9: loss 3.044731
batch 10: loss 3.345973
batch 11: loss 2.880627
batch 12: loss 3.048766
batch 13: loss 3.168972
batch 14: loss 3.336415
batch 15: loss 3.113639
batch 16: loss 2.930874
batch 17: loss 3.050998
batch 18: loss 3.122120
batch 19: loss 3.300528
batch 20: loss 2.820194
batch 21: loss 3.309232
batch 22: loss 2.638648
batch 23: loss 2.983929
batch 24: loss 3.433769
batch 25: loss 3.165073
batch 26: loss 3.042756
batch 27: loss 2.875337
batch 28: loss 2.862890
batch 29: loss 2.971168
batch 30: loss 3.247563
batch 31: loss 3.186876
batch 32: loss 3.120157
batch 33: loss 3.162197
batch 34: loss 3.149079
batch 35: loss 3.033498
batch 36: loss 3.206494
batch 37: loss 3.240856
batch 38: loss 2.985980
batch 39: loss 2.887172
batch 40: loss 2.995800
batch 41: loss 3.142247
ba

關於文本生成的過程有一點需要特別注意。之前，我們一直使用 tf.argmax() 函數，將對應機率最大的值作為預測值。然而對於文本生成而言，這樣的預測方式過於絕對，會使得生成的文本失去豐富性。於是，我們使用 np.random.choice() 函數按照生成的機率分佈取樣。這樣，即使是對應機率較小的字元，也有機會被取樣到。同時，我們加入一個 temperature 參數控制分佈的形狀，參數值越大則分佈越平緩（最大值和最小值的差值越小），生成文本的豐富度越高；參數值越小則分佈越陡峭，生成文本的豐富度越低。

In [19]:
    X_, _ = data_loader.get_batch(seq_length, 1)
    for diversity in [0.2, 0.5, 1.0, 1.2]:      # 丰富度（即temperature）分别设置为从小到大的 4 个值
        X = X_
        print("diversity %f:" % diversity)
        for t in range(400):
            y_pred = model.predict(X, diversity)    # 预测下一个字符的编号
            print(data_loader.indices_char[y_pred[0]], end='', flush=True)  # 输出预测的字符
            X = np.concatenate([X[:, 1:], np.expand_dims(y_pred, axis=1)], axis=-1)     # 将预测的字符接在输入 X 的末尾，并截断 X 的第一个字符，以保证 X 的长度不变
        print("\n")

diversity 0.200000:


TypeError: Expected int32 passed to parameter 'size' of op 'Slice', got [0.8] of type 'list' instead. Error: Expected int32, got 0.8 of type 'float' instead.

In [16]:
y_pred= model.predict(X, diversity)
y_pred 

TypeError: Expected int32 passed to parameter 'size' of op 'Slice', got [0.8] of type 'list' instead. Error: Expected int32, got 0.8 of type 'float' instead.