目标:
1. 使用CSV文件作为数据的来源，采用train_and_eval(keras自带在train当中)进行训练，
2. 进而使用数据验证模型，测试数据测试模型，然后运行一下预测；最后导出模型供使用。

In [2]:
import tensorflow as tf
import shutil
import math
from datetime import datetime
import multiprocessing
from tensorflow.python.feature_column import feature_column
sess = tf.Session()

In [20]:
MODEL_NAME = 'reg-model-02'

TRAIN_DATA_FILES_PATTERN = 'data/train-*.csv'
VALID_DATA_FILES_PATTERN = 'data/valid-*.csv'
TEST_DATA_FILES_PATTERN = 'data/test-*.csv'

RESUME_TRAINING = False
PROCESS_FEATURES = True
EXTEND_FEATURE_COLUMNS = True
MULTI_THREADING = True

# 基本步骤
1. 定义数据集的元数据(常量到处使用)
2. 定义读取CSV的输入函数及其解析(ETL)
3. 定义特征列
4. 定义估计器的创建函数
5. 运行试验，包括运行方案的定义
6. 运行模型的评估
7. 执行预测和部署已存的模型



## 1. 定义数据集元数据
- CSV的头和默认值
- 数据和类别特征名称
- 目标列名称
- 无用列名称

In [21]:
HEADER = ['key','x','y','alpha','beta','target']
HEADER_DEFAULTS = [[0], [0.0], [0.0], ['NA'], ['NA'], [0.0]]
NUMERIC_FEATURE_NAMES = ['x', 'y']  
CATEGORICAL_FEATURE_NAMES_WITH_VOCABULARY = {'alpha':['ax01', 'ax02'], 'beta':['bx01', 'bx02']}
CATEGORICAL_FEATURE_NAMES = list(CATEGORICAL_FEATURE_NAMES_WITH_VOCABULARY.keys())
FEATURE_NAMES = NUMERIC_FEATURE_NAMES + CATEGORICAL_FEATURE_NAMES
TARGET_NAME = 'target'
UNUSED_FEATURE_NAMES = list(set(HEADER) - set(FEATURE_NAMES) - {TARGET_NAME})

## 2. 定义数据数据输入函数
- 输入文件名称模式
- 使用dataset读入数据
- 解析特征
- 使用处理
- 返回(特征，目标)张量


In [22]:
def process_features(features):
    """输入数据时张量，采用tf-api，特征名-张量的字典"""
    features['x_2']=tf.square(features['x'])
    features["x_2"] = tf.square(features['x'])
    features["y_2"] = tf.square(features['y'])
    features["xy"] = tf.multiply(features['x'], features['y']) # features['x'] * features['y']
    features['dist_xy'] =  tf.sqrt(tf.squared_difference(features['x'],features['y']))

def parse_csv_row(csv_row):
    """返回特征字典-目标元组，供dataset"""
    columns=tf.decode_csv(csv_row,record_defaults=HEADER_DEFAULTS) #按顺序解析
    features=dict(zip(HEADER,columns))
    for col in UNUSED_FEATURE_NAMES:
        features.pop(col)
    target= features.pop(TARGET_NAME)
    return features,target
    
    
def csv_input_fn(files_name_pattern, mode=tf.estimator.ModeKeys.EVAL, 
                 skip_header_lines=0,num_epochs=None,batch_size=200):
    
    shuffle=True if mode==tf.estimator.ModeKeys.TRAIN else False
    input_file_names=tf.matching_files(files_name_pattern)
    dataset = tf.data.TextLineDataset(input_file_names)
    dataset = dataset.skip(skip_header_lines)
    dataset = dataset.map(parse_csv_row)
    if shuffle:
        dataset = dataset.shuffle(buffer_size=2 * batch_size + 1)
    dataset = dataset.batch(batch_size)
    dataset = dataset.repeat(num_epochs)
    iterator = dataset.make_one_shot_iterator()
    features,target =iterator.get_next()
    return features,target

features, target = csv_input_fn(files_name_pattern="")
print("Feature read from CSV: {}".format(list(features.keys())))
print("Target read from CSV: {}".format(target))

Feature read from CSV: ['x', 'y', 'alpha', 'beta']
Target read from CSV: Tensor("IteratorGetNext_5:4", shape=(?,), dtype=float32)


In [23]:
ds=tf.data.Dataset.from_tensor_slices([[1,2],[4,9],[4,8]])
it= ds.make_one_shot_iterator()

### 3.定义特征列
假设数值列规范化了或者同尺度，否则使用特征列构造器，
传递normlizer_fn及其normlisation params

In [27]:
def extend_feature_columns(feature_columns):
    """添加交叉列"""
    feature_columns['alpha_X_beta'] = tf.feature_column.crossed_column(
        [feature_columns['alpha'], feature_columns['beta']], 4)
    return feature_columns

def get_feature_columns():
    CONSTRUCTED_NUMERIC_FEATURES_NAMES = ['x_2', 'y_2', 'xy', 'dist_xy']
    all_numeric_feature_names = NUMERIC_FEATURE_NAMES.copy() 
    if PROCESS_FEATURES: # 进一步选择输入的列
        all_numeric_feature_names += CONSTRUCTED_NUMERIC_FEATURES_NAMES
    numeric_columns = {feature_name: tf.feature_column.numeric_column(feature_name)
                       for feature_name in all_numeric_feature_names}
    categorical_column_with_vocabulary = \
        {item[0]: tf.feature_column.categorical_column_with_vocabulary_list(item[0], item[1])
         for item in CATEGORICAL_FEATURE_NAMES_WITH_VOCABULARY.items()}
    # 将准备好的列全出整起来
    feature_columns = {}
    if numeric_columns is not None:
        feature_columns.update(numeric_columns)
    if categorical_column_with_vocabulary is not None:
        feature_columns.update(categorical_column_with_vocabulary)
    # 函数和常量一样是个模块全局对象没有必要传递，直接引用就好
    if EXTEND_FEATURE_COLUMNS:
        feature_columns = extend_feature_columns(feature_columns)
    return feature_columns

feature_columns = get_feature_columns()
print("Feature Columns: {}".format(feature_columns))

Feature Columns: {'x': _NumericColumn(key='x', shape=(1,), default_value=None, dtype=tf.float32, normalizer_fn=None), 'y': _NumericColumn(key='y', shape=(1,), default_value=None, dtype=tf.float32, normalizer_fn=None), 'x_2': _NumericColumn(key='x_2', shape=(1,), default_value=None, dtype=tf.float32, normalizer_fn=None), 'y_2': _NumericColumn(key='y_2', shape=(1,), default_value=None, dtype=tf.float32, normalizer_fn=None), 'xy': _NumericColumn(key='xy', shape=(1,), default_value=None, dtype=tf.float32, normalizer_fn=None), 'dist_xy': _NumericColumn(key='dist_xy', shape=(1,), default_value=None, dtype=tf.float32, normalizer_fn=None), 'alpha': _VocabularyListCategoricalColumn(key='alpha', vocabulary_list=('ax01', 'ax02'), dtype=tf.string, default_value=-1, num_oov_buckets=0), 'beta': _VocabularyListCategoricalColumn(key='beta', vocabulary_list=('bx01', 'bx02'), dtype=tf.string, default_value=-1, num_oov_buckets=0), 'alpha_X_beta': _CrossedColumn(keys=(_VocabularyListCategoricalColumn(key=

### 4. 定义估计器创建函数
- 从特征列中获取数值特征列
- 将类型特征列转化成
- 使用dense + indicator feature columns + params创建估计器

In [33]:
def create_estimator(run_config,hparams):
    feature_columns = list(get_feature_columns())
    dense_columns=list(filter(lambda col:isinstance(col,feature_column._NumericColumn),feature_columns))
    categorial_columns=list(filter(lambda col:isinstance(col,(feature_column._VocabularyListCategoricalColumn,feature_column._BucketizedColumn)),
                                  feature_columns))
    indicator_columns = list(map(lambda column: tf.feature_column.indicator_column(column),categorical_columns))
    
    estimator=tf.estimator.DNNRegressor(feature_columns=dense_columns+indicator_columns,
                                       hidden_units=hparams.hidden_units,optimizer=tf.train.AdadeltaOptimizer(),
                                       activation_fn=tf.nn.elu,dropout= hparams.dropout_prob,config= run_config)
    return estimator

### 5. 运行试验
- a. 设置运行参数和模型超参数HParam and RunConfig
- b. 定义Serving Function
- c. 定义一个Early Stopping Monitor (Hook)
- d. 定义TrainSpec and EvaluSpec方案

进行了这个多定义，现在模块空间有多少对象？函数对象和实例对象

In [37]:
TRAIN_SIZE=12000
NUM_EPOCHS=1000
BATCH_SIZE=500
EVAL_AFTER_SEC=15
TOTAL_STEPS=(TRAIN_SIZE/BATCH_SIZE)*NUM_EPOCHS
hparams= tf.contrib.training.HParams(
    num_epochs = NUM_EPOCHS,
    batch_size = BATCH_SIZE,
    hidden_units=[16, 12, 8],
    num_buckets = 6,
    embedding_size = 3,
    max_steps = TOTAL_STEPS,
    dropout_prob = 0.001)  #正如spark中paramap

run_config=tf.estimator.RunConfig(model_dir='trained_models/{}'.format(MODEL_NAME),tf_random_seed=1983061)



In [48]:
def csv_serving_input():
    SERVING_HEADER = ['x','y','alpha','beta']
    SERVING_HEADER_DEFAULTS = [[0.0], [0.0], ['NA'], ['NA']]
    rows_string_tensor = tf.placeholder(dtype=tf.string,shape=[None], name='csv_rows')
    receiver_tensor = {'csv_rows': rows_string_tensor}
    row_columns = tf.expand_dims(rows_string_tensor, -1) #变成2阶，另一axis来放置特征
    columns = tf.decode_csv(row_columns, record_defaults=SERVING_HEADER_DEFAULTS)
    features = dict(zip(SERVING_HEADER, columns))
    if PROCESS_FEATURES: features = process_features(features)
    return tf.estimator.export.ServingInputReceiver(features, receiver_tensor)
    

In [49]:
class EarlyStoppingHook(tf.train.SessionRunHook):
    def __init__(self, early_stopping_rounds=1):
        self._best_loss = None
        self._early_stopping_rounds = early_stopping_rounds
        self._counter = 0
        print("*** Early Stopping Hook: - Created")
        print("*** Early Stopping Hook:: Early Stopping Rounds: {}".format(self._early_stopping_rounds))
    def before_run(self, run_context):  # 调度程序返回给方法的对象
        graph = run_context.session.graph
        loss_tensor = graph.get_collection(tf.GraphKeys.LOSSES)[1]
        return tf.train.SessionRunArgs(loss_tensor)

    def after_run(self, run_context, run_values):
        last_loss = run_values.results
        print("************************")
        print("** Evaluation Monitor - Early Stopping **")
        print("Early Stopping Hook: Current loss: {}".format(str(last_loss)))
        print("Early Stopping Hook: Best loss: {}".format(str(self._best_loss)))

        if self._best_loss is None:
            self._best_loss = last_loss
        elif last_loss > self._best_loss:
            self._counter += 1
            print("Early Stopping Hook: No improvment! Counter: {}".format(self._counter))
            if self._counter == self._early_stopping_rounds:
                run_context.request_stop()
                print("Early Stopping Hook: Stop Requested: {}".format(run_context.stop_requested))
        else:
            self._best_loss = last_loss
            self._counter = 0
        print("************************")

In [None]:
files_name_pattern, mode=tf.estimator.ModeKeys.EVAL, 
                 skip_header_lines=0,num_epochs=None,batch_size=200)

In [50]:
import functools

In [46]:
train_input_fn=functools.partial(csv_input_fn,
                                 files_name_pattern=TRAIN_DATA_FILES_PATTERN,
                                 mode=tf.estimator.ModeKeys.TRAIN,
                                 num_epochs=hparams.num_epochs
                                 batch_size=hparams.batch_size)
train_spec=tf.estimator.TrainSpec(train_input_fn,max_steps=hparams.max_steps,hooks=None)

valid_input_fn =functools.partial(csv_input_fn,
                                  files_name_pattern=VALID_DATA_FILES_PATTERN,
                                  mode=tf.estimator.ModeKeys.EVAL,
                                  num_epochs=hparams.num_epochs,
                                  batch_size=hparams.batch_size),

eval_spec = tf.estimator.EvalSpec(valid_input_fn,
                                  steps=None,
                                  throttle_secs = EVAL_AFTER_SEC)# evalute after each 15 training seconds!

In [47]:
features = dict(zip(SERVING_HEADER, columns))

NameError: name 'columns' is not defined