In [1]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

In [2]:
import tensorflow as tf

# x[2][3], y[2][1]
x = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
y = tf.constant([[10.0], [20.0]])

print(x.shape)
print(y.shape)

class Linear(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.dense = tf.keras.layers.Dense(
            units = 1,
            activation = None,
            kernel_initializer = tf.zeros_initializer(),
            bias_initializer = tf.zeros_initializer()
        )
        
        
    def call(self, input):
        output = self.dense(input)
        return output


# 
model = Linear()
optimizer = tf.keras.optimizers.SGD(learning_rate = 1e-3)

for i in range(100):
    with tf.GradientTape() as tape:
        y_pred = model(x)
        loss = tf.reduce_mean(tf.square(y_pred - y))
    grads = tape.gradient(loss, model.variables)
    
    optimizer.apply_gradients(grads_and_vars = zip(grads, model.variables))

print(model.variables)
        

Metal device set to: Apple M1 Pro
(2, 3)
(2, 1)
[<tf.Variable 'linear/dense/kernel:0' shape=(3, 1) dtype=float32, numpy=
array([[0.9032683],
       [1.2881179],
       [1.6729671]], dtype=float32)>, <tf.Variable 'linear/dense/bias:0' shape=(1,) dtype=float32, numpy=array([0.38484937], dtype=float32)>]


2022-04-17 21:15:10.826654: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:305] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
2022-04-17 21:15:10.826758: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:271] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)


# tf.keras.datasets

In [3]:
class MNISTLoader():
    def __init__(self):
        mnist = tf.keras.datasets.mnist
        (self.train_data, self.train_label), (self.test_data, self.test_label) = mnist.load_data()
        self.train_data = np.expand_dims(self.train_data.astype(np.float32) / 255.0, axis = -1)
        self.test_data = np.expand_dims(self.test_data.astype(np.float32) / 255.0, axis = -1)
        self.train_label = self.train_label.astype(np.int32)
        self.test_label = self.test_label.astype(np.int32)
        self.num_train_data, self.num_test_data = self.train_data.shape[0], self.test_data.shape[0]
        
    def get_batch(self, batch_size):
        index = np.random.randint(0, np.shape(self.train_data)[0], batch_size)
        return self.train_data[index, :], self.train_label[index]
        

## 3.4 tf RNN

In [6]:
import numpy as np
import tensorflow as tf

class DataLoader():
    def __init__(self):
        path = tf.keras.utils.get_file('nietzsche.txt',
                                       origin='https://s3.amazonaws.com/text-datasets/nietzsche.txt')
        with open(path, encoding='utf-8') as f:
            self.raw_text = f.read().lower()
        
        self.chars = sorted(list(set(self.raw_text)))
        self.char_indices = dict((c, i) for i, c in enumerate(self.chars))
        self.indices_char = dict((i, c) for i, c in enumerate(self.chars))
        # indice list of text
        self.text = [self.char_indices[c] for c in self.raw_text]
    
    def get_batch(self, seq_length, batch_size):
        seq = []
        next_char = []
        for i in range(batch_size):
            index = np.random.randint(0, len(self.text) - seq_length)
            seq.append(self.text[index:index+seq_length])
            next_char.append(self.text[index+seq_length])
        return np.array(seq), np.array(next_char) # [batch_size, seq_length], [num_batch]

class RNN(tf.keras.Model):
    def __init__(self, num_chars, batch_size, seq_length):
        super().__init__()
        self.num_chars = num_chars
        self.seq_length = seq_length
        self.batch_size = batch_size
        self.cell = tf.keras.layers.LSTMCell(units=256)
        self.dense = tf.keras.layers.Dense(units = self.num_chars)
    
    def call(self, inputs, from_logits = False):
        # [batch_size, seq_length, num_chars]
        inputs = tf.one_hot(inputs, depth=self.num_chars)
        state = self.cell.get_initial_state(batch_size=self.batch_size, dtype = tf.float32)
        for t in range(self.seq_length):
            output, state = self.cell(inputs[:, t, :], state)
        logits = self.dense(output)
        if from_logits:
            return logits
        else:
            return tf.nn.softmax(logits)
    
    # predict
    
    def predict(self, inputs, temperature=1.):
        batch_size, _ = tf.shape(inputs)
        logits = self(inputs, from_logits=True)
        prob = tf.nn.softmax(logits / temperature).numpy()
        return np.array([np.random.choice(self.num_chars, p=prob[i, :])
                     for i in range(batch_size.numpy())])
    

num_batches = 60
seq_length = 40
batch_size = 50
learning_rate = 1e-3
data_loader = DataLoader()
model = RNN(num_chars=len(data_loader.chars), batch_size=batch_size, seq_length=seq_length)
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
for batch_index in range(num_batches):
    X, y = data_loader.get_batch(seq_length, batch_size)
    with tf.GradientTape() as tape:
        y_pred = model(X)
        loss = tf.keras.losses.sparse_categorical_crossentropy(y_true=y, y_pred=y_pred)
        loss = tf.reduce_mean(loss)
        print("batch %d: loss %f" % (batch_index, loss.numpy()))
    grads = tape.gradient(loss, model.variables)
    optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))
    



X_, _ = data_loader.get_batch(seq_length, 1)
for diversity in [0.2, 0.5, 1.0, 1.2]:
    X = X_
    print("diversity %f: " % diversity)



for t in range(40):
    y_pred = model.predict(X, diversity)
    print(data_loader.indices_char[y_pred[0]], end='', flush=True)
    x = np.concatenate([X[:, 1:], np.expand_dims(y_pred, axis=1)], axis=-1)
print("\n")

                    
    

batch 0: loss 4.039315
batch 1: loss 4.020144
batch 2: loss 3.991930
batch 3: loss 3.955172
batch 4: loss 3.909860
batch 5: loss 3.831239
batch 6: loss 3.598962
batch 7: loss 3.638101
batch 8: loss 3.156896
batch 9: loss 3.159657
batch 10: loss 2.905585
batch 11: loss 3.192379
batch 12: loss 3.442448
batch 13: loss 3.281109
batch 14: loss 3.084947
batch 15: loss 3.427345
batch 16: loss 3.124683
batch 17: loss 3.114635
batch 18: loss 3.088225
batch 19: loss 3.056967
batch 20: loss 3.068408
batch 21: loss 3.369570
batch 22: loss 2.984977
batch 23: loss 2.845366
batch 24: loss 2.999393
batch 25: loss 2.947476
batch 26: loss 3.012032
batch 27: loss 3.350656
batch 28: loss 3.118740
batch 29: loss 3.130414
batch 30: loss 3.051334
batch 31: loss 2.775728
batch 32: loss 3.151231
batch 33: loss 3.394035
batch 34: loss 2.893868
batch 35: loss 2.909874
batch 36: loss 3.261985
batch 37: loss 3.158598
batch 38: loss 3.042449
batch 39: loss 3.030560
batch 40: loss 2.900105
batch 41: loss 3.009262
ba

In [7]:
import tensorflow as tf

with open("test_read.txt", "w") as f:
    f.write("hello tensorflow, this test case tf.io.read")
out=tf.io.read_file("test_read.txt")
print(out)

tf.Tensor(b'hello tensorflow, this test case tf.io.read', shape=(), dtype=string)


## 3.5 RL case

In [10]:
# pip install gym
import gym

# 实例化一个游戏环境，参数为游戏名称
env = gym.make('CartPole-v1')

# 初始化环境，获得初始状态
state = env.reset()
while True:
    env.render()
    action = model.predict(state)
    
    # 让环境执行动作，获得执行完动作的的下一个状态，动作的奖励，游戏是否已经结束及额外信息
    next_state, reward, done, info = env.step(action)
    if done:
        break
                            

ModuleNotFoundError: No module named 'pygame'