In [343]:
import tensorflow as tf
import numpy as np

In [344]:
gpus = tf.config.experimental.list_physical_devices('GPU')
print("gpus: ", gpus)
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(e)

gpus:  [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


In [345]:
from sklearn.datasets import load_iris

data = load_iris()
print("x shape: ", data.data.shape)
print("y shape: ", data.target.shape)

x shape:  (150, 4)
y shape:  (150,)


In [346]:
class MyDenseLayer(tf.keras.layers.Layer):
    def __init__(self, num_outputs, activation=None, name='my_dense_layer', **kwargs):
        super(MyDenseLayer, self).__init__(name=name, **kwargs)
        self.num_outputs = num_outputs
        self.activation = tf.keras.activations.get(activation)

    # creating variables in build enable late variable creation
    # based on the shape of inputs the layer will operate on.
    def build(self, input_shape):
        #w_init = tf.random_normal_initializer()
        self.kernel = tf.Variable(initial_value=( np.random.randint(10, size=(input_shape[-1], self.num_outputs)) ),
                                  trainable=True, dtype='float32')
        #b_init = tf.zeros_initializer()
        self.bias = tf.Variable(initial_value=( np.random.randint(10, size=(self.num_outputs)) ),
                                trainable=True, dtype='float32')

    def call(self, inputs):
        wx = tf.tensordot(inputs, self.kernel, [[-1], [0]])
        b = self.bias
        return  self.activation(wx + b)

    def get_config(self):
        config = super(MyDenseLayer, self).get_config()
        config.update({'units': self.num_outputs, 
                       'activation':tf.keras.activations.serialize(self.activation)})
        return config
# Invoke __init__    
my_layer = MyDenseLayer(3, activation='relu')

# Invoke build() and call()
my_layer(np.array([[1.0,2.0]]))

print("kernel:")
print(my_layer.variables[0])
print("")

print("bias")
print(my_layer.variables[1])
print("")

print("trainable_variables")
print(my_layer.trainable_variables)
print("")

# trainable_weights is the same as trainable_variables
print("trainable_weights")
print(my_layer.trainable_weights)
print("")

layer = MyDenseLayer(5, activation='relu')
layer_config = layer.get_config()
print("config")
print(layer_config)
print("")

kernel:
<tf.Variable 'my_dense_layer/Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[0., 5., 6.],
       [6., 7., 1.]], dtype=float32)>

bias
<tf.Variable 'my_dense_layer/Variable:0' shape=(3,) dtype=float32, numpy=array([0., 1., 0.], dtype=float32)>

trainable_variables
[<tf.Variable 'my_dense_layer/Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[0., 5., 6.],
       [6., 7., 1.]], dtype=float32)>, <tf.Variable 'my_dense_layer/Variable:0' shape=(3,) dtype=float32, numpy=array([0., 1., 0.], dtype=float32)>]

trainable_weights
[<tf.Variable 'my_dense_layer/Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[0., 5., 6.],
       [6., 7., 1.]], dtype=float32)>, <tf.Variable 'my_dense_layer/Variable:0' shape=(3,) dtype=float32, numpy=array([0., 1., 0.], dtype=float32)>]

config
{'name': 'my_dense_layer', 'trainable': True, 'dtype': 'float32', 'units': 5, 'activation': 'relu'}



In [347]:
a = tf.Variable(initial_value=( np.random.randint(10, size=(3,1)) ),
                                  trainable=True, dtype='float32')
m = tf.constant([[1.,2.,3.], [4.,5.,6.]])
n = tf.constant([[7., 8.]])
with tf.GradientTape(persistent=True) as tape:
    b = tf.matmul(m, a)
    c = tf.matmul(n, b)
gradients_b_a = tape.gradient(b, a)
gradients_c_b = tape.gradient(c, b)
gradients_c_a = tape.gradient(c, a)
print("gradients_b_a")
print(gradients_b_a)
print("")
# gradient returns a sum of Jacobian matrix along
# b-axis, rather than return a Jacobian matrix.
# gradinet is designed to sum all gradients for 
# each sample in a batch, so it makes sense.

print("gradients_c_b")
print(gradients_c_b)
print("")

print("gradients_c_a")
print(gradients_c_a)

gradients_b_a
tf.Tensor(
[[5.]
 [7.]
 [9.]], shape=(3, 1), dtype=float32)

gradients_c_b
tf.Tensor(
[[7.]
 [8.]], shape=(2, 1), dtype=float32)

gradients_c_a
tf.Tensor(
[[39.]
 [54.]
 [69.]], shape=(3, 1), dtype=float32)


In [348]:
class MyModel(tf.keras.Model):
    def __init__(self, **kwargs):
        super(MyModel, self).__init__(**kwargs)
        #self.in_shape = in_shape
        #self.input_layer = tf.keras.Input(in_shape)
        self.d1 = MyDenseLayer(3, activation=tf.nn.relu)
        self.d2 = MyDenseLayer(3, activation=tf.nn.softmax)
        #elf.out = self.call(self.input_layer)
    
    def call(self, x):
        x = self.d1(x)
        x = self.d2(x)
        return x

    #def get_config(self):
    #    config = super(MyModel, self).get_config()
    ##    config.update({"in_shape":self.in_shape})

In [349]:
@tf.function
def normalize(x, y):
    mean = tf.math.reduce_mean(x, axis=0)
    std = tf.math.reduce_std(x, axis=0)
    x = (x - mean) / std
    return x, y
dataset = tf.data.Dataset.from_tensor_slices( (data.data, data.target) )
### Every time we iterate through the whole dataset, do not reshuffle the dataset
dataset = dataset.shuffle(len(dataset), seed=100, reshuffle_each_iteration=False)
dataset = dataset.map(lambda a, b: normalize(a, b))

### SparseCategorical does not need one-hot-encoding for y labels
#dataset = dataset.map( lambda a, b: (a, tf.one_hot(b, 3)) )

train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size

#dataset = dataset.shuffle(len(dataset))
tr_ds = dataset.take(train_size)
te_ds = dataset.skip(train_size)
print("tr_ds size: ", len(tr_ds))
print("te_ds size: ", len(te_ds))
tr_ds = tr_ds.shuffle(len(tr_ds), seed=100)
tr_ds = tr_ds.batch(32)
te_ds = te_ds.batch(32)

tr_ds size:  120
te_ds size:  30


In [350]:
# If the input is not probability, from_logits=True and vice versa.
# SparseCategorical accepts label, probability as input
# ex: y_true = [1, 2], y_pred = [[0.05, 0.95, 0], [0.1, 0.8, 0.1]]
train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy =tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')
test_loss = tf.keras.metrics.Mean(name='test_loss')
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False)

optimizer = tf.keras.optimizers.Adam(learning_rate=0.1)

model = MyModel()

# Use tf.function to make graphs out of your programs.
# This will help you create performant and portable models.
# tf.function works best with TF ops; Numpy and Python 
# are converted to constants.
@tf.function
def train_step(model, x, y):
    with tf.GradientTape() as tape:
        y_pred = model(x, training=True)
        loss = loss_object(y, y_pred)
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    train_loss(loss)
    train_accuracy(y, y_pred)

@tf.function
def test_step(model, x, y):
    y_pred = model(x, training=False)
    loss = loss_object(y, y_pred)
    test_loss(loss)
    test_accuracy(y, y_pred)



In [351]:
EPOCHS = 20
for epoch in range(EPOCHS):
    # Reset the metrics at the start of the next epoch
    train_loss.reset_states()
    train_accuracy.reset_states()
    test_loss.reset_states()
    test_accuracy.reset_states()

    for tr_x, tr_y in tr_ds:
        train_step(model, tr_x, tr_y)

    for te_x, te_y in te_ds:
        test_step(model, te_x, te_y)  

    print(f'Epoch {epoch + 1}, ' 
          f'Loss: {train_loss.result():.4f}, '
          f'Accuracy: {train_accuracy.result() * 100:.2f},'
          f'Test Loss: {test_loss.result():.4f}, '
          f'Test Accuracy: {test_accuracy.result() * 100:.2f}')

Epoch 1, Loss: 0.9517, Accuracy: 59.17,Test Loss: 0.8936, Test Accuracy: 56.67
Epoch 2, Loss: 0.4633, Accuracy: 75.83,Test Loss: 0.3500, Test Accuracy: 86.67
Epoch 3, Loss: 0.3365, Accuracy: 89.17,Test Loss: 0.3628, Test Accuracy: 90.00
Epoch 4, Loss: 0.2181, Accuracy: 96.67,Test Loss: 0.5070, Test Accuracy: 86.67
Epoch 5, Loss: 0.2155, Accuracy: 94.17,Test Loss: 0.3379, Test Accuracy: 90.00
Epoch 6, Loss: 0.1840, Accuracy: 95.83,Test Loss: 0.3301, Test Accuracy: 90.00
Epoch 7, Loss: 0.1628, Accuracy: 95.83,Test Loss: 0.3484, Test Accuracy: 93.33
Epoch 8, Loss: 0.1637, Accuracy: 96.67,Test Loss: 0.3443, Test Accuracy: 93.33
Epoch 9, Loss: 0.1362, Accuracy: 95.83,Test Loss: 0.3025, Test Accuracy: 90.00
Epoch 10, Loss: 0.1408, Accuracy: 95.83,Test Loss: 0.3100, Test Accuracy: 90.00
Epoch 11, Loss: 0.1365, Accuracy: 95.00,Test Loss: 0.3433, Test Accuracy: 93.33
Epoch 12, Loss: 0.1344, Accuracy: 96.67,Test Loss: 0.2957, Test Accuracy: 90.00
Epoch 13, Loss: 0.1164, Accuracy: 95.83,Test Loss

In [352]:
model.summary()
model.save_weights("my_checkpoint")
model.save("my_model")
model.get_config()

Model: "my_model_45"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 my_dense_layer (MyDenseLaye  multiple                 15        
 r)                                                              
                                                                 
 my_dense_layer (MyDenseLaye  multiple                 12        
 r)                                                              
                                                                 
Total params: 27
Trainable params: 27
Non-trainable params: 0
_________________________________________________________________
INFO:tensorflow:Assets written to: my_model\assets


{}

In [353]:
new_model = tf.keras.models.load_model('my_model')

test_loss.reset_states()
test_accuracy.reset_states()
for te_x, te_y in te_ds:
        test_step(new_model, te_x, te_y) 
        break

print(f'Test Loss: {test_loss.result():.4f}, '
      f'Test Accuracy: {test_accuracy.result() * 100:.2f}')


Test Loss: 0.2774, Test Accuracy: 90.00
