In [1]:
#coding=utf-8
import time
import numpy as np

import logging
import traceback

from sklearn import metrics
import tensorflow as tf
print("tf version:", tf.__version__)

logging.basicConfig(level=logging.INFO)

cur_ts = str(int(time.time()))
tf.app.flags.DEFINE_string("train_paths", "/Users/aodandan/data/tfrecord/train/part-*", "HDFS paths to input files.")
tf.app.flags.DEFINE_string("eval_paths", "/Users/aodandan/data/tfrecord/eval/part-00093", "eval data path")
tf.app.flags.DEFINE_string("model_path", "/Users/aodandan/data/model/dnn_simple/"+cur_ts, "Where to write output files.")
tf.app.flags.DEFINE_string("last_model_path", "", "Model path for the previous run.")
tf.app.flags.DEFINE_integer("train_epochs", 5, "train epochs")
tf.app.flags.DEFINE_integer("batch_size", 64, "batch size")
tf.app.flags.DEFINE_float("learning_rate", 1e-3, "train learning rate")
tf.app.flags.DEFINE_float("dropout", 0.5, "dropout")
tf.app.flags.DEFINE_float("clip_norm", 10.0, "clip norm")
tf.app.flags.DEFINE_integer("num_cols", 264, "num cols")
tf.app.flags.DEFINE_string("f", "", "kernel")

FLAGS = tf.app.flags.FLAGS

def cal_sklearn_auc(all_y, all_pred, batch_labels, batch_probs):
    all_y.extend(batch_labels)
    all_pred.extend(batch_probs)
    auc = metrics.roc_auc_score(all_y, all_pred)
    acc = metrics.accuracy_score(all_y, np.around(all_pred).astype(int))
    return auc, acc

class DataIterator(object):
    def __init__(self, paths, shuffle, num_cols, batch_size):
        self.iterator = self.build_iterator(paths, shuffle=shuffle, num_cols=num_cols, batch_size=batch_size)
        self.initializer = self.iterator.initializer
        self.next_element = self.iterator.get_next()
        
    def build_iterator(self, paths, shuffle=True, num_cols=264, batch_size=2, buffer_size = 8 * 1024 * 1024, num_parallels=1):
        def parse(value):
            desc = {
                "slot_%s"%i: tf.io.FixedLenFeature([1], tf.float32, default_value=0.0) for i in range(0, num_cols)
                }
            desc["label"] = tf.io.FixedLenFeature([1], tf.int64, default_value=0)
            example = tf.io.parse_single_example(value, desc)
            label = example["label"]
            label = tf.cast(label,tf.int32)
            del example["label"]
            instance = []
            for i in range(num_cols):
                instance.append(example["slot_%s"%i])
            return instance, label
    
        logging.info("Build iterator from file: {}".format(paths))
        data_files = tf.data.Dataset.list_files(paths, shuffle=shuffle)
        dataset = tf.data.TFRecordDataset(data_files, buffer_size=buffer_size, 
                                          num_parallel_reads=num_parallels)
        if shuffle:
            dataset = dataset.shuffle(buffer_size=buffer_size)
        dataset = dataset.map(parse, num_parallel_calls=num_parallels).batch(batch_size)
        dataset = dataset.prefetch(buffer_size=2 * batch_size)
        return tf.compat.v1.data.make_initializable_iterator(dataset)

class VerifyDNNModel(object):
    def __init__(self, parameters):
        self.lr = parameters.get("lr", 0.001)
        self.dropout = parameters.get("dropout", 0.1)
        self.n_feature = parameters.get("n_feature", 264)
        self.n_classes = parameters.get("n_classes", 2)
        self.device = parameters.get("device", "/cpu:0")
        
        self.width_1th_layer = 256
        self.width_2th_layer = 64
        self.width_3th_layer = self.n_classes
        
        self.create_placeholders()
        
        self.build_graph()
                        
        self.create_summary()
    
    def create_summary(self):
        with tf.name_scope("dnn_metric") as scope, tf.device(self.device):
            loss_val, self.loss_op = tf.compat.v1.metrics.mean(self.loss, name="loss_metric")
            auc_val, self.auc_op = tf.compat.v1.metrics.auc(self.labels, self.prediction, name="auc_metric")
            pred_class = tf.cast(tf.round(self.prediction), tf.int32)
            acc_val, self.acc_op = tf.compat.v1.metrics.accuracy(self.labels, pred_class, name="acc_metric")
            tf.summary.scalar('loss', loss_val)
            tf.summary.scalar('auc', auc_val)
            tf.summary.scalar('acc', acc_val)        
            self.summary_merged = tf.summary.merge_all()
        
    def create_placeholders(self):
        self.X = tf.placeholder(tf.float32, shape=(None, self.n_feature, 1), name='X')
        self.Y = tf.placeholder(tf.int32, shape=(None, 1), name='Y')
            
    def build_graph(self):    
        with tf.name_scope("dnn_core") as scope, tf.device(self.device):
            input_X = tf.reshape(tf.compat.v1.squeeze(self.X), [-1, self.n_feature])
            onehot_Y = tf.reshape(tf.one_hot(tf.compat.v1.squeeze(self.Y), depth=self.n_classes), [-1, self.n_classes])
            self.labels = onehot_Y[:,1]
        
            w1 = tf.get_variable(name="w1", shape=[self.n_feature, self.width_1th_layer],
                             initializer=tf.contrib.layers.xavier_initializer())
            b1 = tf.get_variable(name="b1", shape=[1, self.width_1th_layer], initializer=tf.compat.v1.zeros_initializer)
        
            w2 = tf.get_variable(name="w2", shape=[self.width_1th_layer, self.width_2th_layer],
                             initializer=tf.contrib.layers.xavier_initializer())
            b2 = tf.get_variable(name="b2", shape=[1, self.width_2th_layer], initializer=tf.compat.v1.zeros_initializer)
        
            w3 = tf.get_variable(name="w3", shape=[self.width_2th_layer, self.width_3th_layer],
                             initializer=tf.contrib.layers.xavier_initializer())
            b3 = tf.get_variable(name="b3", shape=[1, self.width_3th_layer], initializer=tf.compat.v1.zeros_initializer)
        
            z1 = tf.add(tf.matmul(input_X, w1), b1)
            a1 = tf.nn.relu(z1)
            #a1 = tf.nn.dropout(a1, rate=self.dropout)
            z2 = tf.add(tf.matmul(a1, w2), b2)
            a2 = tf.nn.relu(z2)
            #a2 = tf.nn.dropout(a2, rate=self.dropout)
            logits = tf.add(tf.matmul(a2, w3), b3)
            probabilities = tf.nn.softmax(logits)
            self.prediction = probabilities[:,1]
        
            self.loss = tf.nn.softmax_cross_entropy_with_logits(
                        logits=logits,
                        labels=onehot_Y
                    )
            self.loss_rmean = tf.reduce_mean(self.loss)
        
            train_vars = tf.trainable_variables()
            for v in train_vars: logging.info("model train_var: {}, {}".format(v.name, v))
            grads, _ = tf.clip_by_global_norm(tf.gradients(self.loss_rmean, train_vars), clip_norm=5)
            optimizer = tf.train.AdamOptimizer(self.lr)
            #optimizer = tf.train.GradientDescentOptimizer(self.lr)
            self.train_op = optimizer.apply_gradients(zip(grads, train_vars))
        
    def get_train_task(self):
        task_ops = {
            "train_op": self.train_op,
            "loss_op": self.loss_op,
            "auc_op": self.auc_op,
            "acc_op": self.acc_op,
            "summary": self.summary_merged,
            "loss_rmean": self.loss_rmean,
        }
        return task_ops
    
    def get_eval_task(self):
        task_ops = {
            "loss_op": self.loss_op,
            "auc_op": self.auc_op,
            "acc_op": self.acc_op,
            "loss_rmean": self.loss_rmean,
            "prediction": self.prediction,
        }
        return task_ops

class DNNTrainer(object):
    def __init__(self):
        self.log_path = FLAGS.model_path + "/log/" # tensorboard -–logdir
        self.checkpoint_path = FLAGS.model_path + "/ckpt/dnn"
        self.last_checkpoint_path = FLAGS.last_model_path
        self.train_epochs = FLAGS.train_epochs
        
        parameters = {}
        parameters["lr"] = FLAGS.learning_rate
        parameters["dropout"] = FLAGS.dropout
        parameters["n_feature"] = FLAGS.num_cols
        self.dnn_model = VerifyDNNModel(parameters)
        
        self.train_iterator = DataIterator(paths=FLAGS.train_paths, num_cols=FLAGS.num_cols, 
                                                        batch_size=FLAGS.batch_size, shuffle=True)
        self.eval_iterator = DataIterator(paths=FLAGS.eval_paths, num_cols=FLAGS.num_cols,
                                                       batch_size=FLAGS.batch_size, shuffle=False)
        self.saver = tf.train.Saver(tf.global_variables(), max_to_keep=FLAGS.train_epochs)

    def get_sess_config(self):
        sess_config = tf.ConfigProto()
        sess_config.log_device_placement = True # log device placement
        sess_config.gpu_options.allow_growth = True # dynamic allocate mem
        sess_config.allow_soft_placement = True # auto select device
        return sess_config
    
    def reset_running_variables(self, sess, scope):
        running_vars = tf.get_collection(tf.GraphKeys.LOCAL_VARIABLES, scope=scope)
        for v in running_vars: logging.info("run var: {}, {}".format(v.name, v))
        running_vars_initializer = tf.variables_initializer(var_list=running_vars)
        sess.run(running_vars_initializer)

    def train_one_epoch(self, sess, log_writer, epoch, step):
        sess.run(self.train_iterator.initializer)
        self.reset_running_variables(sess, "dnn_metric") # accumulative, reset for each epoch
        next_element = self.train_iterator.next_element
        task_ops = self.dnn_model.get_train_task()
        while True:
            try:
                batch_instances, batch_labels = sess.run(next_element)
                feed = {self.dnn_model.X:batch_instances, self.dnn_model.Y:batch_labels}
                
                results = sess.run(task_ops, feed_dict=feed)
        
                step += 1
                log_writer.add_summary(results['summary'], step)

                if step % 100 == 0:
                    logging.info("Epoch-{}, step-{}: batch_loss={}, loss={}, auc={}, acc={}".format(
                        epoch, step, results["loss_rmean"], results["loss_op"], results["auc_op"], results["acc_op"])
                     )
            except tf.errors.OutOfRangeError:
                logging.info("Epoch-{}: consumed all examples.".format(epoch))
                break
            except Exception as e:
                err_msg = traceback.format_exc()
                looging.error("err: {}".format(err_msg))
                break
        return step

    def eval_one(self, sess, step=0):
        sess.run(self.eval_iterator.initializer)
        self.reset_running_variables(sess, "dnn_metric") # accumulative, reset for each epoch       
        next_element = self.eval_iterator.next_element
        task_ops = self.dnn_model.get_eval_task()
        all_y, all_pred = [], []
        while True:
            try:
                batch_instances, batch_labels = sess.run(next_element)
                feed = {self.dnn_model.X:batch_instances, self.dnn_model.Y:batch_labels}
                
                results = sess.run(task_ops, feed_dict=feed)
                auc, acc = cal_sklearn_auc(all_y, all_pred, batch_labels, results["prediction"])
        
                step += 1
                if step % 100 == 0:
                    logging.info("Eval, step-{}: batch_loss={}, loss={}, auc={}, acc={}, skauc={}, skacc={}".format(
                        step, results["loss_rmean"], results["loss_op"], results["auc_op"], results["acc_op"], auc, acc)
                     )
            except tf.errors.OutOfRangeError:
                logging.info("Eval: consumed all examples.")
                break
            except Exception as e:
                err_msg = traceback.format_exc()
                looging.error("err: {}".format(err_msg))
                break
        return step
    
    def train(self):
        with tf.Session(config=self.get_sess_config()) as sess:
            if self.last_checkpoint_path:
                logging.info("Restore variable from file: {}".format(self.last_checkpoint_path))
                self.saver.restore(sess, self.last_checkpoint_path)
            else:
                logging.info("Init global variable: {}".format([v.name for v in tf.global_variables()]))
                sess.run(tf.global_variables_initializer())
            logging.info("Init local variable: {}".format([v.name for v in tf.local_variables()]))
            sess.run(tf.local_variables_initializer())
        
            log_writer = tf.summary.FileWriter(self.log_path, sess.graph)
            step = 0
            for epoch in range(self.train_epochs):
                # train model
                step = self.train_one_epoch(sess, log_writer, epoch, step)
            
                # save model
                last_store_path = self.saver.save(sess, self.checkpoint_path, global_step=step)
                logging.info("Store model to {}".format(last_store_path))
            
                # eval model
                self.eval_one(sess)
                
            log_writer.close()
                
trainer = DNNTrainer()
trainer.train()


tf version: 1.15.0
The TensorFlow contrib module will not be included in TensorFlow 2.0.
For more information, please see:
  * https://github.com/tensorflow/community/blob/master/rfcs/20180907-contrib-sunset.md
  * https://github.com/tensorflow/addons
  * https://github.com/tensorflow/io (for I/O related ops)
If you depend on functionality not listed there, please file an issue.



The TensorFlow contrib module will not be included in TensorFlow 2.0.
For more information, please see:
  * https://github.com/tensorflow/community/blob/master/rfcs/20180907-contrib-sunset.md
  * https://github.com/tensorflow/addons
  * https://github.com/tensorflow/io (for I/O related ops)
If you depend on functionality not listed there, please file an issue.



Instructions for updating:

Future major versions of TensorFlow will allow gradients to flow
into the labels input on backprop by default.

See `tf.nn.softmax_cross_entropy_with_logits_v2`.



Instructions for updating:

Future major versions of TensorFlow will allow gradients to flow
into the labels input on backprop by default.

See `tf.nn.softmax_cross_entropy_with_logits_v2`.

INFO:root:model train_var: w1:0, <tf.Variable 'w1:0' shape=(264, 256) dtype=float32_ref>
INFO:root:model train_var: b1:0, <tf.Variable 'b1:0' shape=(1, 256) dtype=float32_ref>
INFO:root:model train_var: w2:0, <tf.Variable 'w2:0' shape=(256, 64) dtype=float32_ref>
INFO:root:model train_var: b2:0, <tf.Variable 'b2:0' shape=(1, 64) dtype=float32_ref>
INFO:root:model train_var: w3:0, <tf.Variable 'w3:0' shape=(64, 2) dtype=float32_ref>
INFO:root:model train_var: b3:0, <tf.Variable 'b3:0' shape=(1, 2) dtype=float32_ref>


Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where


Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where


Instructions for updating:
Deprecated in favor of operator or tf.math.divide.


Instructions for updating:
Deprecated in favor of operator or tf.math.divide.
INFO:root:Build iterator from file: /Users/aodandan/data/tfrecord/train/part-*
INFO:root:Build iterator from file: /Users/aodandan/data/tfrecord/eval/part-00093


Device mapping:
/job:localhost/replica:0/task:0/device:XLA_CPU:0 -> device: XLA_CPU device



INFO:root:Init global variable: ['w1:0', 'b1:0', 'w2:0', 'b2:0', 'w3:0', 'b3:0', 'dnn_core/beta1_power:0', 'dnn_core/beta2_power:0', 'w1/Adam:0', 'w1/Adam_1:0', 'b1/Adam:0', 'b1/Adam_1:0', 'w2/Adam:0', 'w2/Adam_1:0', 'b2/Adam:0', 'b2/Adam_1:0', 'w3/Adam:0', 'w3/Adam_1:0', 'b3/Adam:0', 'b3/Adam_1:0']
INFO:root:Init local variable: ['dnn_metric/loss_metric/total:0', 'dnn_metric/loss_metric/count:0', 'dnn_metric/auc_metric/true_positives:0', 'dnn_metric/auc_metric/false_negatives:0', 'dnn_metric/auc_metric/true_negatives:0', 'dnn_metric/auc_metric/false_positives:0', 'dnn_metric/acc_metric/total:0', 'dnn_metric/acc_metric/count:0']
INFO:root:run var: dnn_metric/loss_metric/total:0, <tf.Variable 'dnn_metric/loss_metric/total:0' shape=() dtype=float32_ref>
INFO:root:run var: dnn_metric/loss_metric/count:0, <tf.Variable 'dnn_metric/loss_metric/count:0' shape=() dtype=float32_ref>
INFO:root:run var: dnn_metric/auc_metric/true_positives:0, <tf.Variable 'dnn_metric/auc_metric/true_positives:0' 

INFO:root:run var: dnn_metric/loss_metric/total:0, <tf.Variable 'dnn_metric/loss_metric/total:0' shape=() dtype=float32_ref>
INFO:root:run var: dnn_metric/loss_metric/count:0, <tf.Variable 'dnn_metric/loss_metric/count:0' shape=() dtype=float32_ref>
INFO:root:run var: dnn_metric/auc_metric/true_positives:0, <tf.Variable 'dnn_metric/auc_metric/true_positives:0' shape=(200,) dtype=float32_ref>
INFO:root:run var: dnn_metric/auc_metric/false_negatives:0, <tf.Variable 'dnn_metric/auc_metric/false_negatives:0' shape=(200,) dtype=float32_ref>
INFO:root:run var: dnn_metric/auc_metric/true_negatives:0, <tf.Variable 'dnn_metric/auc_metric/true_negatives:0' shape=(200,) dtype=float32_ref>
INFO:root:run var: dnn_metric/auc_metric/false_positives:0, <tf.Variable 'dnn_metric/auc_metric/false_positives:0' shape=(200,) dtype=float32_ref>
INFO:root:run var: dnn_metric/acc_metric/total:0, <tf.Variable 'dnn_metric/acc_metric/total:0' shape=() dtype=float32_ref>
INFO:root:run var: dnn_metric/acc_metric/co

INFO:root:run var: dnn_metric/auc_metric/false_positives:0, <tf.Variable 'dnn_metric/auc_metric/false_positives:0' shape=(200,) dtype=float32_ref>
INFO:root:run var: dnn_metric/acc_metric/total:0, <tf.Variable 'dnn_metric/acc_metric/total:0' shape=() dtype=float32_ref>
INFO:root:run var: dnn_metric/acc_metric/count:0, <tf.Variable 'dnn_metric/acc_metric/count:0' shape=() dtype=float32_ref>
INFO:root:Epoch-4, step-3900: batch_loss=121442.609375, loss=85523.234375, auc=0.5790203809738159, acc=0.5943667888641357
INFO:root:Epoch-4, step-4000: batch_loss=58704.32421875, loss=90401.40625, auc=0.5876289010047913, acc=0.5980113744735718
INFO:root:Epoch-4, step-4100: batch_loss=77977.53125, loss=82041.53125, auc=0.5887019634246826, acc=0.5992413759231567
INFO:root:Epoch-4, step-4200: batch_loss=13217.666015625, loss=93715.9375, auc=0.5755914449691772, acc=0.5954122543334961
INFO:root:Epoch-4, step-4300: batch_loss=40527.2734375, loss=89723.5390625, auc=0.5715029239654541, acc=0.5896139740943909