# 端到端tensorflow示例

本文探索并演示了如何实现端到端的使用tensorflow完成工程开发。

参考链接：
* https://medium.com/ml-book/train-tf-keras-model-using-feature-coulmn-8de12e65ddec
* https://www.tensorflow.org/tutorials/structured_data/feature_columns

## 引入依赖

In [1]:
import pandas as pd
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split

## 加载数据

In [2]:
# 读取数据
'''
PassengerId:乘客Id，无实在意义。
Survived:生存，1表示存活，0表示死亡
Pclass:客舱等级1>2>3
Name:名字
Sex:性别
Age:年龄
SibSp:在船兄弟姐妹/配偶数量
Parch:在船父母/子女数
Ticket:船票编号
Fare:船票价格
Cabin:客舱号
Embarked:登船港口
'''
data = pd.read_csv('train.csv')
data.head()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S


In [3]:
# 观察样本数量，共891个样本
data.shape

(891, 12)

## 缺失值处理

In [4]:
# 观察缺失值
data.isnull().sum(axis=0)

PassengerId      0
Survived         0
Pclass           0
Name             0
Sex              0
Age            177
SibSp            0
Parch            0
Ticket           0
Fare             0
Cabin          687
Embarked         2
dtype: int64

In [5]:
# 1）Cabin缺失687行，删除这列特征
# 2）Age缺失177行，用平均值填充
# 3）Embarked缺失2行，最高频填充

mean_age = data['Age'].mean()
mode_embarked = data['Embarked'].mode()[0] # 取众数(最高频)
data.fillna({'Age': mean_age, 'Embarked': mode_embarked}, inplace=True)
data.dropna(axis=1, inplace=True) # 删除带Nan的列（也就是Cabin)

data

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.000000,1,0,A/5 21171,7.2500,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.000000,1,0,PC 17599,71.2833,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.000000,0,0,STON/O2. 3101282,7.9250,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.000000,1,0,113803,53.1000,S
4,5,0,3,"Allen, Mr. William Henry",male,35.000000,0,0,373450,8.0500,S
...,...,...,...,...,...,...,...,...,...,...,...
886,887,0,2,"Montvila, Rev. Juozas",male,27.000000,0,0,211536,13.0000,S
887,888,1,1,"Graham, Miss. Margaret Edith",female,19.000000,0,0,112053,30.0000,S
888,889,0,3,"Johnston, Miss. Catherine Helen ""Carrie""",female,29.699118,1,2,W./C. 6607,23.4500,S
889,890,1,1,"Behr, Mr. Karl Howell",male,26.000000,0,0,111369,30.0000,C


## 特征预处理

In [6]:
# 数据集有效列
valid_columns = {'Pclass':'int', 'Sex':'str', 'Age':'float',
                  'SibSp':'int', 'Parch':'int', 'Ticket':'str', 
                  'Fare':'float', 'Embarked':'str', 'Survived': 'int'}
# 原始输入特征
input_features = {'Pclass','Sex','Age','SibSp', 'Parch', 'Ticket', 'Fare', 'Embarked'}
output_label = 'Survived'

# 特征预处理
numeric_features = ['Age', 'Fare', 'Parch', 'SibSp'] # 连续值
bucket_features = ['Age'] # 连续值分桶
cat_onehot_features = ['Embarked', 'Pclass', 'Sex'] # # 类别onehot
cat_embedding_features = ['Ticket'] # 类别embedding
cat_crossed_onehot_features = [('Age_bucketized', 'Sex')]  # 类别交叉后onehot
cat_crossed_embedding_features = [('Age_bucketized', 'Pclass')]  # 类别交叉后embedding

# feature_column
model_feature_columns = {}   # 模型用到的feature column
cat_feature_columns = {}  # 类别column，留作交叉用

In [7]:
# 方便验证feature_column效果的函数
def test_feature_column(feature_column):
    feature_layer = tf.keras.layers.DenseFeatures(feature_column)
    return feature_layer(data.to_dict('list')).numpy()

### 连续值

In [8]:
# 标准化
def gen_norm(feature_name):
    min_value = data[feature_name].min()
    max_value = data[feature_name].max()
    range_value = max_value - min_value
    def norm(tensor):
        return tf.divide(tf.subtract(tensor, min_value), range_value)
    return norm

for feature_name in numeric_features:
    col = tf.feature_column.numeric_column(feature_name, default_value=-1, normalizer_fn=gen_norm(feature_name))
    model_feature_columns[col.name] = col

In [9]:
# 验证numeric_column的效果
test_feature_column(model_feature_columns['Age'])[:5]

array([[0.27117366],
       [0.4722292 ],
       [0.32143754],
       [0.4345313 ],
       [0.4345313 ]], dtype=float32)

### 分桶（连续值）

In [10]:
for feature_name in bucket_features:
    col = tf.feature_column.numeric_column(feature_name, default_value=-1) # 连续值
    # 分桶
    bucket_col = tf.feature_column.bucketized_column(col, boundaries=[18, 25, 30, 35, 40, 45, 50, 55, 60, 65])
    cat_feature_columns[bucket_col.name] = bucket_col
    # 作为one-hot特征列
    model_feature_columns[bucket_col.name] = bucket_col

In [11]:
# 验证bucketized_column的效果
test_feature_column(cat_feature_columns['Age_bucketized'])[:5]

array([[0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0.],
       [0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0.]], dtype=float32)

### 类别特征（onehot）

In [12]:
for feature_name in cat_onehot_features:
    # 该类别的所有可能取值
    vocabulary = data[feature_name].unique() 
    
    # 类别特征
    col = tf.feature_column.categorical_column_with_vocabulary_list(feature_name, vocabulary) 
    cat_feature_columns[col.name] = col
    
    # 类别onehot特征
    onehot_col = tf.feature_column.indicator_column(col) 
    model_feature_columns[onehot_col.name] = onehot_col

In [13]:
# 验证indicator column的效果
test_feature_column(model_feature_columns['Pclass_indicator'])[:5]

array([[1., 0., 0.],
       [0., 1., 0.],
       [1., 0., 0.],
       [0., 1., 0.],
       [1., 0., 0.]], dtype=float32)

### 类别特征（embedding）

embadding为每个类别生成1个词向量，适合类比比较多（不要太多）的情况。

In [14]:
for feature_name in cat_embedding_features:
    # 该类别的所有可能取值
    vocabulary = data[feature_name].unique() 
    
    # 类别特征
    col = tf.feature_column.categorical_column_with_vocabulary_list(feature_name, vocabulary)
    cat_feature_columns[col.name] = col
    
    # 类别embedding，词向量的维度10
    embedding_col = tf.feature_column.embedding_column(col, dimension=10) 
    model_feature_columns[embedding_col.name] = embedding_col

In [15]:
# 验证embedding column的效果
# 1）一共vocabulary个词向量，对应vocabulary个类别
# 2）词向量随着模型训练计算得到，没训练前是随机填充的
# 3）样本Ticket特征经过embedding找到对应词向量作为替代
test_feature_column(model_feature_columns['Ticket_embedding'])[:5]

array([[-0.5314496 ,  0.22284812, -0.10256241, -0.11512943,  0.3887074 ,
        -0.00736922, -0.04162008, -0.31262052, -0.06345925, -0.3141414 ],
       [ 0.04324598, -0.20080951, -0.40643996, -0.51189876, -0.45935574,
        -0.00388012,  0.5758124 , -0.43589   , -0.3594899 , -0.09482296],
       [-0.02293092, -0.3944286 , -0.33285603,  0.09458611,  0.20127569,
        -0.2474735 , -0.06500464,  0.13706218, -0.07010416, -0.3581399 ],
       [ 0.28103504, -0.38127196, -0.12628618,  0.3504441 ,  0.00938372,
         0.19014397,  0.09432404,  0.5664573 ,  0.1452246 , -0.03203008],
       [ 0.5045057 , -0.12331874,  0.4781773 ,  0.08953258,  0.4484633 ,
        -0.30678847, -0.20736401, -0.15818349,  0.15930215,  0.10670206]],
      dtype=float32)

### 类别特征交叉（onehot）

In [16]:
# 这是预处理得到的所有类别特征
cat_feature_columns.keys()

dict_keys(['Age_bucketized', 'Embarked', 'Pclass', 'Sex', 'Ticket'])

In [17]:
# 交叉类别特征并Hash到新类别，然后对新类别做onehot
for crossed_feature_names in cat_crossed_onehot_features:
    # 取出要交叉的类别feature column
    cols = []
    for f_name in crossed_feature_names:
        cols.append(cat_feature_columns[f_name])
    
    # 2个类别相乘做hash取模10得到新类别
    crossed_col = tf.feature_column.crossed_column(cols, hash_bucket_size=10) 
    
    # 对新类别做onehot
    onehot_crossed_col = tf.feature_column.indicator_column(crossed_col)
    model_feature_columns[onehot_crossed_col.name] = onehot_crossed_col

In [18]:
# 验证类别交叉特征
test_feature_column(model_feature_columns['Age_bucketized_X_Sex_indicator'])[:5]

array([[0., 0., 0., 1., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 1., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 1., 0., 0., 0.],
       [0., 0., 0., 0., 1., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 1., 0., 0., 0.]], dtype=float32)

### 类别交叉特征（embedding）

In [19]:
#  交叉类别特征并Hash到新类别，然后对新类别做embedding
for crossed_feature_names in cat_crossed_embedding_features:
    # 取出要交叉的类别feature column
    cols = []
    for f_name in crossed_feature_names:
        cols.append(cat_feature_columns[f_name])
    
    # 2个类别相乘做hash取模10得到新类别
    crossed_col = tf.feature_column.crossed_column(cols, hash_bucket_size=10) 
    
    # 新类别embedding，词向量的维度10
    embedding_crossed_col = tf.feature_column.embedding_column(crossed_col, dimension=10)
    model_feature_columns[embedding_crossed_col.name] = embedding_crossed_col

In [20]:
# 验证类别交叉特征
test_feature_column(model_feature_columns['Age_bucketized_X_Pclass_embedding'])[:5]

array([[-0.37412336,  0.02996962, -0.24474321, -0.35264003, -0.40175837,
         0.17377801, -0.22526294, -0.10493472, -0.3369941 , -0.37105986],
       [ 0.38651082,  0.39749685, -0.2492073 , -0.30518344, -0.4056563 ,
        -0.03413704, -0.11457369, -0.27169058,  0.12370556, -0.11518823],
       [ 0.52233106,  0.53510404,  0.04754441, -0.09479497, -0.19274408,
        -0.3149597 , -0.02996827,  0.12042738, -0.39143303,  0.02296387],
       [ 0.38651082,  0.39749685, -0.2492073 , -0.30518344, -0.4056563 ,
        -0.03413704, -0.11457369, -0.27169058,  0.12370556, -0.11518823],
       [ 0.1063731 , -0.1019956 ,  0.20581587,  0.32311597, -0.1785489 ,
         0.41357297,  0.13034934,  0.3760741 , -0.02352649, -0.18046921]],
      dtype=float32)

## 准备数据

### 加载sklearn数据集到内存

In [21]:
train, test = train_test_split(data, test_size=0.15, random_state=41)
train, val = train_test_split(train, test_size=0.15, random_state=41)

### 生成tfrecord格式的样本文件

In [22]:
def dataframe_to_tfrecords(dataframe, filename):
    with tf.io.TFRecordWriter(filename) as writer:
        for _, row in dataframe.iterrows():
            feature_dict = {}
            for f_name, f_type in valid_columns.items():
                if f_type =='int':
                    feature = tf.train.Feature(int64_list=tf.train.Int64List(value=[row[f_name]]))
                elif f_type =='float':
                    feature = tf.train.Feature(float_list=tf.train.FloatList(value=[row[f_name]]))
                elif f_type == 'str':
                    feature = tf.train.Feature(bytes_list=tf.train.BytesList(value=[row[f_name].encode('utf-8')]))
                feature_dict[f_name] = feature
            features = tf.train.Features(feature=feature_dict)
            example = tf.train.Example(features=features)
            writer.write(example.SerializeToString())

In [23]:
dataframe_to_tfrecords(train, './train.tfrecords')
dataframe_to_tfrecords(val, './val.tfrecords')
dataframe_to_tfrecords(test, './test.tfrecords')

### 使用dataset加载tfrecords文件

In [24]:
def tfrecords_to_dataset(filename, shuffle_buffer_size=64, batch_size=32):
    def parse_example(example_str):
        feature_dict = {}
        for f_name, f_type in valid_columns.items():
            if f_type == 'int':
                feature = tf.io.FixedLenFeature([1], tf.int64, default_value=-1)
            elif f_type == 'float':
                feature = tf.io.FixedLenFeature([1], tf.float32, default_value=-1)
            elif f_type == 'str':
                feature = tf.io.FixedLenFeature([1], tf.string, default_value='')
            feature_dict[f_name] = feature
        features = tf.io.parse_single_example(example_str, feature_dict)
        label = features[output_label]
        del features[output_label]
        return features, label
    
    dataset = tf.data.TFRecordDataset(filename)
    dataset = dataset.map(parse_example)
    if shuffle_buffer_size:
        dataset = dataset.shuffle(shuffle_buffer_size)
    dataset = dataset.batch(batch_size)
    return dataset
    
train_ds = tfrecords_to_dataset('./train.tfrecords')
val_ds = tfrecords_to_dataset('./val.tfrecords')
test_ds = tfrecords_to_dataset('./test.tfrecords')

In [25]:
# 观察tf.data
for feature_batch, label_batch in train_ds:
    print('输入特征：', list(feature_batch.keys()))
    print('Age特征：', feature_batch['Age'])
    print('标签：', label_batch)

输入特征： ['Age', 'Embarked', 'Fare', 'Parch', 'Pclass', 'Sex', 'SibSp', 'Ticket']
Age特征： tf.Tensor(
[[29.      ]
 [29.699118]
 [25.      ]
 [ 4.      ]
 [22.      ]
 [28.      ]
 [22.      ]
 [21.      ]
 [62.      ]
 [58.      ]
 [21.      ]
 [21.      ]
 [24.      ]
 [25.      ]
 [33.      ]
 [30.      ]
 [32.      ]
 [29.699118]
 [48.      ]
 [ 3.      ]
 [29.699118]
 [48.      ]
 [54.      ]
 [28.      ]
 [17.      ]
 [58.      ]
 [17.      ]
 [42.      ]
 [27.      ]
 [27.      ]
 [22.      ]
 [29.699118]], shape=(32, 1), dtype=float32)
标签： tf.Tensor(
[[1]
 [1]
 [0]
 [0]
 [0]
 [0]
 [1]
 [0]
 [1]
 [1]
 [0]
 [0]
 [0]
 [0]
 [0]
 [0]
 [1]
 [0]
 [1]
 [1]
 [1]
 [1]
 [1]
 [0]
 [0]
 [0]
 [0]
 [0]
 [0]
 [1]
 [0]
 [1]], shape=(32, 1), dtype=int64)
输入特征： ['Age', 'Embarked', 'Fare', 'Parch', 'Pclass', 'Sex', 'SibSp', 'Ticket']
Age特征： tf.Tensor(
[[26.      ]
 [29.699118]
 [18.      ]
 [50.      ]
 [61.      ]
 [ 3.      ]
 [39.      ]
 [44.      ]
 [58.      ]
 [18.      ]
 [74.      ]
 [30.5    

## 训练模型

### 定义模型

Sequential定义方式：

In [26]:
model = tf.keras.Sequential([
    tf.keras.layers.DenseFeatures(list(model_feature_columns.values())), # 预处理层
    tf.keras.layers.Dense(8, activation='relu'),
    tf.keras.layers.Dense(16, activation='relu'),
    tf.keras.layers.Dense(1, activation='sigmoid'), # sigmoid激活到0~1以拟合目标值
])

Functional定义方式：

In [37]:
# 用法借鉴自：https://github.com/tensorflow/tensorflow/issues/27416#issuecomment-502218673

# 模型结构：input_layer -> feature_layer -> dense layer -> dense layer -> dense layer


# 输入层
input_layer = {} 
for f_name in input_features:
    dtype = valid_columns[f_name]
    if dtype == 'int':
        dtype = tf.int64
    elif dtype == 'float':
        dtype = tf.float32
    else:
        dtype = tf.string
    input_layer[f_name] = tf.keras.Input(shape=(1,), name=f_name, dtype=dtype)

# 定义layer
feature_layer = tf.keras.layers.DenseFeatures(list(model_feature_columns.values()))
dense_layer1 = tf.keras.layers.Dense(8, activation='relu')
dense_layer2 = tf.keras.layers.Dense(16, activation='relu')
dense_layer3 = tf.keras.layers.Dense(1, activation='sigmoid', name='output_1')

# 定义model
output = feature_layer(input_layer)
output = dense_layer1(output)
output = dense_layer2(output)
output = dense_layer3(output)
model = tf.keras.Model(inputs=list(input_layer.values()), outputs=[output])


### 训练模型

In [38]:
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])  # 用交叉熵算损失
model.fit(train_ds, validation_data=val_ds, epochs=100)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100
Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 78

<tensorflow.python.keras.callbacks.History at 0x7f8a0c1d3bd0>

In [29]:
model.summary()

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
Age (InputLayer)                [(None, 1)]          0                                            
__________________________________________________________________________________________________
Embarked (InputLayer)           [(None, 1)]          0                                            
__________________________________________________________________________________________________
Fare (InputLayer)               [(None, 1)]          0                                            
__________________________________________________________________________________________________
Parch (InputLayer)              [(None, 1)]          0                                            
______________________________________________________________________________________________

### 验证模型

In [39]:
loss, accuracy = model.evaluate(test_ds)



### 保存模型

In [40]:
tf.saved_model.save(model, 'demo/1')

INFO:tensorflow:Assets written to: demo/1/assets


### 线上服务

tensorflow serving加载模型，在8500端口监听grpc协议，8501端口监听HTTP协议:

```
tensorflow_model_server  --rest_api_port=8501 --port=8500  --model_name=demo --model_base_path=`realpath ./demo` --enable_batching=true
```

查看model输入输出格式：

```
curl 'http://localhost:8501/v1/models/demo/versions/1/metadata' 
```


In [32]:
# 取训练集前5行，每行是dict，由feature name -> value构成
records = train[:5].to_dict('list')
records

{'PassengerId': [370, 779, 321, 856, 257],
 'Survived': [1, 0, 0, 1, 1],
 'Pclass': [1, 3, 3, 3, 1],
 'Name': ['Aubart, Mme. Leontine Pauline',
  'Kilgannon, Mr. Thomas J',
  'Dennis, Mr. Samuel',
  'Aks, Mrs. Sam (Leah Rosen)',
  'Thorne, Mrs. Gertrude Maybelle'],
 'Sex': ['female', 'male', 'male', 'female', 'female'],
 'Age': [24.0, 29.69911764705882, 22.0, 18.0, 29.69911764705882],
 'SibSp': [0, 0, 0, 0, 0],
 'Parch': [0, 0, 0, 1, 0],
 'Ticket': ['PC 17477', '36865', 'A/5 21172', '392091', 'PC 17585'],
 'Fare': [69.3, 7.7375, 7.25, 9.35, 79.2],
 'Embarked': ['C', 'Q', 'S', 'S', 'C']}

In [42]:
# https://stackoverflow.com/questions/58057708/issue-with-embedding-layer-when-serving-a-tensorflow-keras-model-with-tf-2-0/62955213#62955213

from tensorflow_serving.apis import predict_pb2
from tensorflow_serving.apis import prediction_service_pb2_grpc
import grpc

# GRPC连接
channel = grpc.insecure_channel('127.0.0.1:8500')
# 客户端
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
# 创建请求
request = predict_pb2.PredictRequest()
request.model_spec.name = 'demo'
request.model_spec.signature_name = 'serving_default'
# 特征填充到请求
for feature_name in records:
    if feature_name not in input_features:
        continue
    values = records[feature_name]
    dtype = valid_columns[feature_name]
    if dtype == 'int':
        dtype = np.int64
    elif dtype == 'float':
        dtype = np.float32
    else:
        dtype = np.object
    tensor_pb = tf.make_tensor_proto(values, shape=(len(values),1), dtype=dtype)
    request.inputs[feature_name].CopyFrom(tensor_pb)
# 发起调用
response = stub.Predict(request)
# 打印模型输出
print(response.outputs['output_1'].float_val)
# 打印真实标签
print(records['Survived'])

[0.9999998807907104, 0.00016692138160578907, 3.3320251532131806e-05, 0.9997161030769348, 0.999953031539917]
[1, 0, 0, 1, 1]
