In [1]:
from tensorflow.keras.layers import Layer
import tensorflow as tf

class Linear(Layer):
    def __init__(self, x_dim, units):
        super().__init__()
        #1
        #w_init = tf.random_normal_initializer()
        #self.w = tf.Variable(initial_value=w_init(shape=(x_dim, units), dtype=tf.float32), trainable=True)
        #2
        self.w = self.add_weight(shape=(x_dim, units), initializer='random_normal')
        #1
        #b_init = tf.zeros_initializer()
        #self.b = tf.Variable(initial_value=b_init(shape=(units,), dtype=tf.float32), trainable=True)
        #2
        self.b = self.add_weight(shape=(units,), initializer=tf.constant_initializer(), trainable=True)
        
    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

print(tf.__version__)
linear_layer = Linear(2, 4)
y = linear_layer(tf.ones((2, 2)))
print(y.shape)
assert(linear_layer.weights==[linear_layer.w, linear_layer.b])

2.0.1
(2, 4)


In [2]:
from tensorflow.keras.layers import Layer
import tensorflow as tf

class Linear(Layer):
    def __init__(self, units):
        super().__init__()
        self.units = units
        
    def build(self, input_shape):
        self.w = self.add_weight(shape=(input_shape[-1], self.units), initializer=tf.random_normal_initializer(), trainable=True)
        self.b = self.add_weight(shape=self.units, initializer=tf.constant_initializer(), trainable=True)
        
    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b
    
linear_layer = Linear(4)
y = linear_layer(tf.ones((2, 2)))
print(y.shape)

(2, 4)


In [3]:
from tensorflow.keras.layers import Layer
import tensorflow as tf

class ComputeSum(Layer):
    def __init__(self, x_dim):
        super().__init__()
        self.total = tf.Variable(initial_value=tf.zeros(shape=(x_dim,)), trainable=False)
    
    def call(self, inputs):
        self.total.assign_add(tf.reduce_sum(inputs, axis=0))
        return self.total
    
sum = ComputeSum(2)
y = sum(tf.ones((3,2)))
print(y)
y2 = sum(tf.ones((4,2)))
print(y2)

<tf.Variable 'Variable:0' shape=(2,) dtype=float32, numpy=array([3., 3.], dtype=float32)>
<tf.Variable 'Variable:0' shape=(2,) dtype=float32, numpy=array([7., 7.], dtype=float32)>


In [4]:
class MLP(Layer):
    def __init__(self):
        super().__init__()
        self.linear_1 = Linear(32)
        self.linear_2 = Linear(32)
        self.linear_3 = Linear(10)
        
    def call(self, inputs):
        x = self.linear_1(inputs)
        x = tf.nn.relu(x)
        x = self.linear_2(inputs)
        x = tf.nn.relu(x)
        x = self.linear_3(inputs)
        return x
    
mlp = MLP()
y = mlp(tf.ones((3, 64)))
print(y.shape)
assert len(mlp.weights)==6

(3, 10)


In [5]:
class Dropout(Layer):
    def __init__(self, rate):
        super().__init__()
        self.rate = rate
        
    def call(self, inputs, training = None):
        if training:
            return tf.nn.dropout(inputs, rate = self.rate)
        return inputs
    
class MLPWithDropout(Layer):
    def __init__(self):
        super().__init__()
        self.linear_1 = Linear(32)
        self.dropout = Dropout(0.5)
        self.linear_2 = Linear(10)
        
    def call(self, inputs, training = None):
        x = self.linear_1(inputs)
        x = tf.nn.relu(x)
        self.dropout(x, training = training)
        x = self.linear_2(x)
        return x
    
mlp = MLPWithDropout()
y_train = mlp(tf.ones((2,2)), training = True)
y_test = mlp(tf.ones((2, 2)), training = False)


In [6]:
inputs = tf.keras.layers.Input(shape=(16,))
x = Linear(32)(inputs)
x = Dropout(0.5)(x)
outputs= Linear(10)(x)
model = tf.keras.models.Model(inputs=inputs, outputs=outputs)
y = model(tf.ones((2, 16)))
print(len(model.weights))
print(y.shape)

4
(2, 10)


In [7]:
from tensorflow.keras import Sequential
model = Sequential([
    Linear(32), Dropout(0.5), Linear(10)
])
y = model(tf.ones((2, 16)))
print(y.shape)

(2, 10)


In [8]:
bce = tf.keras.losses.BinaryCrossentropy()
y_true = [0., 0, 1., 1]
y_pred = [1., 1, 0., 0]
loss = bce(y_true, y_pred)
print(loss.numpy())


15.379094


In [9]:
m = tf.keras.metrics.AUC()
m.update_state([0, 1, 1, 1],[0, 1, 0, 0])
print("temp result:", m.result().numpy())
m.update_state([1, 1, 1, 1], [0, 1, 1, 0])
print("final result:", m.result().numpy())


temp result: 0.6666667
final result: 0.71428573


In [10]:
class BinaryTruePositives(tf.keras.metrics.Metric):
    def __init__(self, name="binary_true_positives", **kwargs):
        super().__init__(name=name, **kwargs)
        self.true_positives = self.add_weight(name='cp', initializer = tf.zeros_initializer())
        
    def update_state(self, y_true, y_pred, sample_weight = None):
        y_true = tf.cast(y_true, tf.bool)
        y_pred = tf.cast(y_pred, tf.bool)
        values = tf.logical_and(tf.equal(y_true, True), tf.equal(y_pred, True))
        values = tf.cast(values, self.dtype)
        if sample_weight is not None:
            sample_weight = tf.cast(sample_weight, self.dtype)
            sample_weight = tf.broadcast_weights(sample_weight, values)
            values = tf.multiple(sample_weight, values)
        self.true_positives.assign_add(tf.reduce_sum(values))
        
    def result(self):
        return self.true_positives
        
    def reset_state(self):
        self.true_positives.assign(0)
        
btp = BinaryTruePositives()
btp.update_state([0, 1, 1, 1], [0, 1, 0, 0])
print("temp result:", btp.result().numpy())
btp.update_state([1, 1, 1, 1], [0, 1, 1, 0])
print("final result:", btp.result().numpy())

temp result: 1.0
final result: 3.0


In [11]:
from tensorflow.keras import layers

(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train = x_train[:].reshape(60000, 784).astype('float32') / 255
dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
dataset = dataset.shuffle(buffer_size=256).batch(64)

model = tf.keras.models.Sequential([
    layers.Dense(256, activation='relu'),
    layers.Dense(256, activation='relu'),
    layers.Dense(10)
])

loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
accuracy = tf.keras.metrics.SparseCategoricalAccuracy()
optimizer = tf.keras.optimizers.Adam()

for step, (x, y) in enumerate(dataset):
    with tf.GradientTape() as tape:
        logits = model(x)
        loss_value = loss(y, logits)
    gradients = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply_gradients(zip(gradients, model.trainable_weights))
    accuracy.update_state(y, logits)
    
    if step % 100 == 0:
        print("step:",step)
        print("Loss from lass step:%.3f" %(loss_value,))
        print("Total runing accuracy so far:%.3f" %(accuracy.result()))

step: 0
Loss from lass step:2.397
Total runing accuracy so far:0.109
step: 100
Loss from lass step:0.179
Total runing accuracy so far:0.826
step: 200
Loss from lass step:0.551
Total runing accuracy so far:0.869
step: 300
Loss from lass step:0.200
Total runing accuracy so far:0.890
step: 400
Loss from lass step:0.220
Total runing accuracy so far:0.904
step: 500
Loss from lass step:0.352
Total runing accuracy so far:0.912
step: 600
Loss from lass step:0.192
Total runing accuracy so far:0.919
step: 700
Loss from lass step:0.169
Total runing accuracy so far:0.924
step: 800
Loss from lass step:0.144
Total runing accuracy so far:0.927
step: 900
Loss from lass step:0.115
Total runing accuracy so far:0.931


In [12]:
x_test = x_test[:].reshape(10000, 784).astype('float32') / 255
test_data = tf.data.Dataset.from_tensor_slices((x_test, y_test))
test_data = test_data.batch(128)
accuracy.reset_states()
for step, (x_t, y_t) in enumerate(test_data):
    tlogits = model(x_t)
    accuracy.update_state(y,logits)
print("Final test accuracy: %.3f" %accuracy.result())

Final test accuracy: 1.000


In [30]:

class ActivityRegularization(Layer):
    """计算l2 regularization"""
    def __init__(self,rate=1e-2):
        super().__init__()
        self.rate=rate
    def call(self,inputs):
        """根据输入，使用`add_loss`来添加regularization loss"""
        self.add_loss(self.rate*tf.reduce_sum(tf.square(inputs)))
        return inputs

class SparseMLP(Layer):
    """堆叠多个linear layer,并且加正则化"""
    def __init__(self,output_dim):
        super().__init__()
        self.dense_1 = layers.Dense(32,activation=tf.nn.relu)
        self.regularization= ActivityRegularization(1e-2)
        self.dense_2 = layers.Dense(output_dim)
    def call(self,inputs):
        x = self.dense_1(inputs)
        x = self.regularization(x)
        x = self.dense_2(x)
        return x

(x_train,y_train),_=tf.keras.datasets.mnist.load_data()
dataset = tf.data.Dataset.from_tensor_slices((x_train[:].reshape(60000,784).astype('float32')/255,y_train))

dataset.shuffle(buffer_size=1024).batch(64)

(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
dataset = tf.data.Dataset.from_tensor_slices(
    (x_train.reshape(60000, 784).astype('float32') / 255, y_train))
dataset = dataset.shuffle(buffer_size=1024).batch(64)

# 定义一个新的MLP
mlp = SparseMLP(10)

#losses and optimizer
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = tf.keras.optimizers.SGD(learning_rate=0.1)

for step,(x,y) in enumerate(dataset):
    with tf.GradientTape() as tape:
        # 前向运算
        logits = mlp(x)
        # 求前向运算损失
        loss = loss_fn(y,logits)
        # 加上正则化损失
        loss = loss + sum(mlp.losses)
        # 求梯度
        gradients = tape.gradient(loss,mlp.trainable_weights)
    #更新参数
    optimizer.apply_gradients(zip(gradients,mlp.trainable_weights))
    #日志
    if step %100==0:
        print("Loss at step %d: %.3f" %(step,loss))

InvalidArgumentError: Cannot update variable with shape [2] using a Tensor with shape [], shapes must be equal. [Op:AssignAddVariableOp]