In [1]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from tensorflowonspark import TFCluster

import argparse
import shutil
import functools
import os
import sys
import json
#import logging
import tensorflow as tf
import numpy as np
import scipy
import time

#FORMAT = '[%(asctime)-15s] [%(levelname)s] - %(message)s'
#logging.basicConfig(level=logging.INFO, format=FORMAT)
#logger = logging.getLogger(__name__)

def get_arg_parser():
    parser = argparse.ArgumentParser()
    parser.register("type", "bool", lambda v: v.lower() == "true")
    
    parser.add_argument('--spark_mode', type=int, default=0, help='Run in Spark Mode')
    parser.add_argument('--clean_model', type=int, default=1, help='Clean Up Previous Existing Model Dir')
    parser.add_argument('--model_dir', type=str, default='tfmodel', help='Base directory for the model.')
    parser.add_argument('--model_dict', type=str, default='dict.tsv', help='Feature Count')
    
    parser.add_argument('--train_batch_size', type=int, default=1000, help='Number of examples per batch for training')
    parser.add_argument('--train_records', type=int, default=0, help='Total Number of Records; If 0 then determine based on input FileSize')
    parser.add_argument('--train_steps', type=int, default=0, help='Number of training steps; If 0 then determine based on input FileSize')
    parser.add_argument('--train_epochs', type=int, default=1, help='Number of training epochs.')

    parser.add_argument('--test_batch_size', type=int, default=5000, help='Number of examples per batch for testing')
    parser.add_argument('--test_steps', type=int, default=10, help='Number of testing steps')

    parser.add_argument('--record_count_tsv', type=str, default='counter.tsv', help='Path to the training data.')
    parser.add_argument('--train_data', type=str, default='train.csv', help='Path to the training data.')
    parser.add_argument('--test_data', type=str, default='test.csv', help='Path to the test data.')
    
    parser.add_argument('--model_type', type=str, default='deep', help='deep, wide, wide_deep')
    parser.add_argument('--optimizer', type=str, default='adagrad', help='Optimizer (adagrad, adam)')
    parser.add_argument('--learning_rate', type=float, default=0.001, help='Learning Rate')
    parser.add_argument('--l1', type=float, default=0.0, help='l1 regularization strength')
    parser.add_argument('--l2', type=float, default=0.000001, help='l2 regularization strength')
    parser.add_argument('--dropout', type=float, default=0.0, help='Drop Out Probability between 0 to 1')

    parser.add_argument('--h1', type=int, default=16, help='Number of Hidden Units - Layer1')
    parser.add_argument('--h2', type=int, default=16, help='Number of Hidden Units - Layer2')
    parser.add_argument('--h3', type=int, default=8, help='Number of Hidden Units - Layer3')
    parser.add_argument('--h4', type=int, default=8, help='Number of Hidden Units - Layer4')
    
    parser.add_argument('--num_ps', type=int, default=1, help='Number of PS Servers')
    parser.add_argument('--app_name', type=str, default="HappiCFDnn", help='Name of Spark Application')
    parser.add_argument('--summary', type=str, default="summary.txt", help='Summary of the experiment and results')
    return parser

CSV_FEATURE_NAMES = []
CSV_FEATURE_DEFAULTS = []

################################################################################################################
class FeatureColumType:
    def __init__(self, coltype, name, vtype, size=0, dimension=0, max_bucket=0):
        self.coltype = coltype
        self.name = name
        self.vtype = vtype
        self.size = size
        self.dimension = dimension
        self.max_bucket = max_bucket
        
feature_columns = []
feature_columns_info = []
fdict = {}

def fetch_column_info(model_dict):
    global CSV_FEATURE_NAMES
    global CSV_FEATURE_DEFAULTS
    global fdict

    CSV_ALL_COLS = [
        'event_guid', 'click',
        'campaign_objective', 'adv_sub_category',
        'ad_id', 'ad_cmp', 'ad_adv', 'ad_adg',
        'ad_sponsored_by', 'ad_title',
        'dayofweek', 'daypart',
        'user', 'user_age', 'user_gender', 'user_os', 'device_type',
        'state_woeid', 'country_woeid',
        'page_section_id']

    print('Reading Dictionary: %s' % model_dict)
    tfdict = tf.gfile.GFile(model_dict)

    for line in tfdict.readlines():
        fields = line.strip().split('\t')
        name = fields[1]
        if name in fdict:
            fdict[name] = fdict[name] + 1
        else:
            fdict[name] = 1
    print('fdict: %s' % fdict)

    for f in CSV_ALL_COLS:
        if f == 'event_guid' or f == 'click' or f in fdict:
            CSV_FEATURE_NAMES.append(f)
            CSV_FEATURE_DEFAULTS.append(['0'] if (f == 'event_guid' or f == 'ad_title') else [0])
    print('CSV_FEATURE_NAMES: %s' % CSV_FEATURE_NAMES)

def normalize_column(col):
    return (col - np.mean(col)) / np.std(col)

def fdim(total):
    return int(np.log2(total))+1

def build_wide_model_columns():
    global fdict
    global feature_columns
    global feature_columns_info
    ####################################################################
    for name in sorted(fdict.keys()):
        size = fdict[name]
        if size < 256:
            feature_columns.append(tf.feature_column.indicator_column(tf.feature_column.categorical_column_with_identity(key=name, num_buckets=size)))
            feature_columns_info.append(FeatureColumType(coltype='INDICATOR', name=name, vtype='int32', size=size, dimension=0))
            print('NAME: %-20s TYPE: INDICATOR\t SIZE: %d' % (name, size))
        else:
            dimension = 32 if name == 'user' else fdim(size)
            feature_columns.append(tf.feature_column.embedding_column(tf.feature_column.categorical_column_with_identity(key=name, num_buckets=size), dimension))
            feature_columns_info.append(FeatureColumType(coltype='EMBEDDING', name=name, vtype='int32', size=size, dimension=dimension))
            print('NAME: %-20s TYPE: EMBEDDING\t SIZE: %d\t DIMENSION: %d' % (name, size, dimension))
    ####################################################################

    return feature_columns, feature_columns


def build_estimator(model_dir, config):
    deep_columns, wide_columns = build_wide_model_columns()

    hidden_units = []
    hidden_units.append(FLAGS.h1)
    if FLAGS.h2 > 0:
        hidden_units.append(FLAGS.h2)
    if FLAGS.h3 > 0:
        hidden_units.append(FLAGS.h3)
    if FLAGS.h4 > 0:
        hidden_units.append(FLAGS.h4)

    dropout = None
    if FLAGS.dropout > 0.0:
        dropout = FLAGS.dropout
    
    print('Using DNNClassifier with Hidden Units: %s' % hidden_units)
    print('Using Model Dir: %s' % model_dir)
    print('Using Learning Rate: %f' % FLAGS.learning_rate)
    print('Using L1 Regularization Strength: %f' % FLAGS.l1)
    print('Using L2 Regularization Strength: %f' % FLAGS.l2)
    print('Using ModelType: %s' % FLAGS.model_type)
    print('Using Dropout: %s' % dropout)
    
    optimzer = tf.train.AdamOptimizer(learning_rate=FLAGS.learning_rate) if FLAGS.optimizer == 'adam' else tf.train.ProximalAdagradOptimizer(learning_rate=FLAGS.learning_rate, l1_regularization_strength=FLAGS.l1, l2_regularization_strength=FLAGS.l2)
    
    if FLAGS.model_type == "wide":
        return tf.estimator.LinearClassifier(
        model_dir=model_dir,
        feature_columns=wide_columns,
        config=config)

    if FLAGS.model_type == "wide_deep":
        return tf.estimator.DNNLinearCombinedClassifier(
        model_dir=model_dir,
        linear_feature_columns=wide_columns,
        dnn_feature_columns=deep_columns,
        dnn_hidden_units=hidden_units,
        config=config,
        dnn_dropout=dropout,
        dnn_optimizer=optimzer)

    return tf.estimator.DNNClassifier(
        model_dir=model_dir,
        feature_columns=deep_columns,
        hidden_units=hidden_units,
        dropout=dropout,
        config=config,
        optimizer=optimzer)

def parse_multival(x):
    sparse_strings = tf.string_split(x, delimiter="\x02")
    return tf.SparseTensor(indices=sparse_strings.indices, values=tf.string_to_number(sparse_strings.values, out_type=tf.int32), dense_shape=sparse_strings.dense_shape)

def parse_input(value):
    columns = tf.decode_csv(value, record_defaults=CSV_FEATURE_DEFAULTS)
    features = dict(zip(CSV_FEATURE_NAMES, columns))

    if 'ad_title' in features:
        features['ad_title'] = parse_multival(features['ad_title'])

    features.pop('event_guid')
    label = features.pop('click')
    return features, label

def export_input_fn():
    csv_row = tf.placeholder(dtype=tf.string, shape=[None], name='input_csv_tensor')
    receiver_tensors = {'csv_row': csv_row}
    
    columns = tf.decode_csv(csv_row, record_defaults=CSV_FEATURE_DEFAULTS)
    features = dict(zip(CSV_FEATURE_NAMES, columns))

    if 'ad_title' in features:
        features['ad_title'] = parse_multival(features['ad_title'])

    features.pop('event_guid')
    features.pop('click')
    return tf.estimator.export.ServingInputReceiver(features, receiver_tensors)

def get_all_input_files(data_file):
    input_files = []
    if tf.gfile.IsDirectory(data_file):
        filenames = tf.gfile.ListDirectory(data_file)
        for f in filenames:
            if str(f).startswith('part'):
                infile = data_file + '/' + f
                input_files.append(infile)
                print('Input Part File: %s' % infile)
    else:
        input_files = [data_file]
        print('Input Part File: %s' % data_file)
    return input_files

def get_all_input_size(data_file):
    total_input_size = 0
    if tf.gfile.IsDirectory(data_file):
        filenames = tf.gfile.ListDirectory(data_file)
        for f in filenames:
            if str(f).startswith('part'):
                infile = data_file + '/' + f
                fstats = tf.gfile.Stat(infile)
                total_input_size += fstats.length
                print('Input Part File Size: %10d => %s' % (fstats.length, infile))
    else:
        fstats = tf.gfile.Stat(data_file)
        total_input_size = fstats.length
        print('Input Part File: %10d => %s' % (fstats.length, data_file))
    return total_input_size


def input_fn(data_file, num_epochs, batch_size, task_index, total_workers):
    print('Input Function => Using data_file: %s' % (data_file))
    print('Input Function => Using num_epochs: %d' % (num_epochs))
    print('Input Function => Using task_index: %d' % (task_index))
    print('Input Function => Using total_workers: %d' % (total_workers))
    
    part_files = get_all_input_files(data_file)
    print('Total Workers: %d; Total Input Files: %d' % (total_workers, len(part_files)))
    
    input_files = []
    for epoch_index in range(0, num_epochs):
        tindex = (task_index + epoch_index) % total_workers
        
        file_id = 0
        for infile in part_files:
            if file_id % total_workers == tindex:
                input_files.append(infile)
                print('Worker Index: %d; Total Workers: %d; Adding File (%03d): %s' % (task_index, total_workers, file_id, infile))
            file_id = file_id + 1
    
    print('Worker Index: %d; Total Workers: %d; Total Files to process: %d' % (task_index, total_workers, len(input_files)))

    dataset = tf.data.TextLineDataset(input_files)
    if total_workers == 1:
        dataset = dataset.repeat(1)
    else:
        dataset = dataset.repeat()

    dataset = dataset.batch(batch_size)
    #dataset = dataset.map(parse_input)

    iterator = dataset.make_one_shot_iterator()
    #features, labels = iterator.get_next()
    features, labels = parse_input(iterator.get_next())
    return features, labels


def log_flags_info(out_file):
    for arg in vars(FLAGS):
        line = 'FLAGS\t' + arg + '\t' + str(getattr(FLAGS, arg))
        out_file.write(line + '\n')
        print(line)
    
def log_traing_info(out_file):
    steps_avg_loss = []
    eventfile = ""
    for f in tf.gfile.ListDirectory(FLAGS.model_dir + "/tf"):
        if f.startswith('events'):
            eventfile = FLAGS.model_dir + "/tf/" + f
    for summary in tf.train.summary_iterator(eventfile):
        for v in summary.summary.value:
            if v.tag == 'average_loss':
                steps_avg_loss.append(v.simple_value)
    
    steps_per_iteration = int(FLAGS.train_steps/FLAGS.train_epochs)
    line = 'TRAIN\tStepsPerIteration\t' + str(steps_per_iteration)
    out_file.write(line + '\n')
    print(line)

    step_log_count = int((len(steps_avg_loss) + FLAGS.train_epochs - 1) / FLAGS.train_epochs)
    for i in range(0, FLAGS.train_epochs):
        startRange = i * step_log_count
        endRange = min(startRange + step_log_count, len(steps_avg_loss))
        line = 'ITERATION\t' + ('%03d' % (i)) + '\tAverageLoss\t' + str(scipy.mean(steps_avg_loss[startRange:endRange]))
        out_file.write(line + '\n')
        print(line)
        
    line = 'TRAIN\tFinalAverageLoss\t' + str(scipy.mean(steps_avg_loss))
    out_file.write(line + '\n')
    print(line)

    line = 'TRAIN\tTrainPeriod\t' + str(int(round(time.time() * 1000)) - FLAGS.start_ts)
    out_file.write(line + '\n')
    print(line)

def log_feature_info(out_file):
    global feature_columns_info
    layer1_feature_count = 0
    for info in feature_columns_info:
        if info.coltype == "EMBEDDING":
            layer1_feature_count += info.dimension
        else:
            layer1_feature_count += info.size
        line = 'FEATURE\t' + info.coltype + '\t' + info.name + '\t' + info.vtype + '\t' + str(info.size) + '\t' + str(info.dimension) + '\t' + str(info.max_bucket)
        out_file.write(line + '\n')
        print(line)
    
    line = 'TRAIN\tLayer1FeatureCount\t' + str(layer1_feature_count)
    out_file.write(line + '\n')
    print(line)

def log_summary():
    print("+++++++++++++++++++++++++++++ LOGGING SUMMARY +++++++++++++++++++++++++++++")
    out_file = tf.gfile.GFile(FLAGS.summary, mode='w')
    
    log_flags_info(out_file)
    log_traing_info(out_file)
    log_feature_info(out_file)

    out_file.close()

def main_spark(unused_argv, ctx):
    cluster_spec = ctx.cluster_spec
    job_name = ctx.job_name
    task_index = ctx.task_index

    # Keep the original ones for distributing input files among workers
    FLAGS.task_index = task_index
    FLAGS.total_workers = len(cluster_spec["worker"])

    cluster_spec["master"] = [cluster_spec["worker"][0]]
    cluster_spec["worker"] = cluster_spec["worker"][1:]
    
    if task_index == 0 and job_name == "worker":
        job_name = "master"
    elif job_name == "worker":
        task_index = ctx.task_index - 1
    
    print('Current Node - Job Name: %s; Task Index: %s' % (job_name, task_index))
    print('Cluster specification is: %s' % cluster_spec)
    print('Cluster specification for Master is: %s' % cluster_spec["master"])

    print('Using FLAGS: %s' % (FLAGS))
    fetch_column_info(FLAGS.model_dict)

    if job_name == "ps":
        os.environ['TF_CONFIG'] = json.dumps({'cluster': cluster_spec, 'environment': 'cloud', 'task': {'type': 'ps', 'index': task_index}})
    elif job_name == "master":
        os.environ['TF_CONFIG'] = json.dumps({'cluster': cluster_spec, 'environment': 'cloud', 'task': {'type': 'master', 'index': task_index}})
    else:
        os.environ['TF_CONFIG'] = json.dumps({'cluster': cluster_spec, 'environment': 'cloud', 'task': {'type': 'worker', 'index': task_index}})
    
    print('Using TFConfig: %s' % os.environ['TF_CONFIG'])
    
    output_dir = FLAGS.model_dir + "/tf"

    config = tf.estimator.RunConfig(model_dir=output_dir)
    train_input_fn = functools.partial(input_fn, FLAGS.train_data, FLAGS.train_epochs, FLAGS.train_batch_size, FLAGS.task_index, FLAGS.total_workers)
    eval_input_fn = functools.partial(input_fn, FLAGS.test_data, 1, FLAGS.test_batch_size, 0, 1)
    estimator = build_estimator(output_dir, config)

    train_spec = tf.estimator.TrainSpec(input_fn=train_input_fn, max_steps=FLAGS.train_steps)
    eval_spec = tf.estimator.EvalSpec(input_fn=eval_input_fn, steps=FLAGS.test_steps)
    tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)

    # Export_Saved Model
    if job_name == "master":
        print("Trying to save the model for serving through master node")
        servable_model_path = estimator.export_savedmodel(FLAGS.model_dir + "/serve", export_input_fn, as_text=False)
        print("++++++ Servable Model Path: %s" % servable_model_path)
        
        # Save Summary of this experiment
        log_summary()


def main_local():
    FLAGS.task_index = 0
    FLAGS.total_workers = 1
    print('Using FLAGS: %s' % (FLAGS))
    
    fetch_column_info(FLAGS.model_dict)

    output_dir = FLAGS.model_dir + "/tf"
    
    config = tf.estimator.RunConfig(model_dir=output_dir)
    train_input_fn = functools.partial(input_fn, FLAGS.train_data, FLAGS.train_epochs, FLAGS.train_batch_size, FLAGS.task_index, FLAGS.total_workers)
    eval_input_fn = functools.partial(input_fn, FLAGS.test_data, 1, FLAGS.test_batch_size, 0, 1)
    estimator = build_estimator(output_dir, config)

    train_spec = tf.estimator.TrainSpec(input_fn=train_input_fn, max_steps=FLAGS.train_steps)
    eval_spec = tf.estimator.EvalSpec(input_fn=eval_input_fn, steps=FLAGS.test_steps)
    tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)

    # Export_Saved Model
    servable_model_path = estimator.export_savedmodel(FLAGS.model_dir + "/serve", export_input_fn, as_text=True)
    print('++++++ Servable Model Path: %s' % servable_model_path)

    # Save Summary of this experiment
    log_summary()


if __name__ == '__main__':
    tf.logging.set_verbosity(tf.logging.INFO)
    FLAGS, unparsed = get_arg_parser().parse_known_args()

    if FLAGS.clean_model == 1:
        print('Cleaning up prev model path: %s' % (FLAGS.model_dir))
        shutil.rmtree(FLAGS.model_dir, ignore_errors=True)
    FLAGS.start_ts = int(round(time.time() * 1000))

    total_input_size = 0
    total_input_records = 0
    total_test_records = 0
    total_input_steps_per_epoch = 0
    total_input_steps_global = 0
    if FLAGS.train_records == 0 and FLAGS.train_steps == 0:
        if tf.gfile.Exists(FLAGS.record_count_tsv):
            counterTsv = tf.gfile.GFile(FLAGS.record_count_tsv)
            for line in counterTsv.readlines():
                fields = line.strip().split('\t')
                if fields[0] == 'train':
                    total_input_records = int(fields[1])
                if fields[0] == 'test':
                    total_test_records = int(fields[1])
        else:
            total_input_size = get_all_input_size(FLAGS.train_data)
            total_input_records = int(total_input_size / 150)

    if FLAGS.train_steps == 0:
        total_input_steps_per_epoch = int(total_input_records / FLAGS.train_batch_size)
        total_input_steps_global = total_input_steps_per_epoch * FLAGS.train_epochs
        FLAGS.train_steps = total_input_steps_global

    if FLAGS.test_steps == 0:
        FLAGS.test_steps = int(total_test_records / FLAGS.test_batch_size)

    print('Total Train Input Bytes: %d' % (total_input_size))
    print('Total Train Input Records: %d' % (total_input_records))
    print('Using Train Batch Size: %d' % (FLAGS.train_batch_size))
    print('Using Train Epochs: %d' % (FLAGS.train_epochs))
    
    print('Using Train Steps Per Epoch: %d' % (total_input_steps_per_epoch))
    print('Using Train Steps Global: %d' % (total_input_steps_global))
    print('Using Test Steps Global: %d' % (FLAGS.test_steps))

    print('Using SparkMode: %d' % (FLAGS.spark_mode))
    
    if FLAGS.spark_mode == 1:
        global sc
        sc = SparkContext(conf=SparkConf().setAppName(FLAGS.app_name))
    
        num_executors_str = sc._conf.get("spark.executor.instances")
        num_executors = int(3 if not num_executors_str else num_executors_str)
        
        num_ps = FLAGS.num_ps
        print('Total Executors: %d' % num_executors)
        print('Total PS: %d' % num_ps)
    
        tf_cluster = TFCluster.run(sc, main_spark, [sys.argv[0]] + unparsed, num_executors, num_ps, False, input_mode=TFCluster.InputMode.TENSORFLOW)
        tf_cluster.shutdown()
    else:
        main_local()


  from ._conv import register_converters as _register_converters


Cleaning up prev model path: tfmodel
Input Part File:  214092831 => train.csv
Total Train Input Bytes: 214092831
Total Train Input Records: 1427285
Using Train Batch Size: 1000
Using Train Epochs: 1
Using Train Steps Per Epoch: 1427
Using Train Steps Global: 1427
Using Test Steps Global: 10
Using SparkMode: 0
Using FLAGS: Namespace(app_name='HappiCFDnn', clean_model=1, dropout=0.0, h1=16, h2=16, h3=8, h4=8, l1=0.0, l2=1e-06, learning_rate=0.001, model_dict='dict.tsv', model_dir='tfmodel', model_type='deep', num_ps=1, optimizer='adagrad', record_count_tsv='counter.tsv', spark_mode=0, start_ts=1555890801501, summary='summary.txt', task_index=0, test_batch_size=5000, test_data='test.csv', test_steps=10, total_workers=1, train_batch_size=1000, train_data='train.csv', train_epochs=1, train_records=0, train_steps=1427)
Reading Dictionary: dict.tsv


2019-04-21 16:53:30,163 INFO (MainThread-97467) Using config: {'_model_dir': 'tfmodel/tf', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': None, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0xb1f8acf98>, '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}
2019-04-21 16:53:30,165 INFO (MainThread-97467) Running training and evaluation locally (non-distributed).
2019-04-21 16:53:30,165 INFO (MainThread-97467) Start train and evaluate loop. The evaluate will happen after 600 secs (eval_spec.throttle_secs) or training is finished.
2019-04-21 16:53:30,225 INFO (MainThread-97467) Calling mo

fdict: {'ad_adg': 65744, 'ad_adv': 8296, 'ad_cmp': 49681, 'ad_id': 105469, 'ad_sponsored_by': 7504, 'ad_title': 27931, 'country_woeid': 191, 'page_section_id': 85, 'state_woeid': 1472, 'user': 2785208, 'campaign_objective': 9, 'adv_sub_category': 134, 'dayofweek': 7, 'daypart': 4, 'user_age': 11, 'user_gender': 3, 'user_os': 6, 'device_type': 2}
CSV_FEATURE_NAMES: ['event_guid', 'click', 'campaign_objective', 'adv_sub_category', 'ad_id', 'ad_cmp', 'ad_adv', 'ad_adg', 'ad_sponsored_by', 'ad_title', 'dayofweek', 'daypart', 'user', 'user_age', 'user_gender', 'user_os', 'device_type', 'state_woeid', 'country_woeid', 'page_section_id']
NAME: ad_adg               TYPE: EMBEDDING	 SIZE: 65744	 DIMENSION: 17
NAME: ad_adv               TYPE: EMBEDDING	 SIZE: 8296	 DIMENSION: 14
NAME: ad_cmp               TYPE: EMBEDDING	 SIZE: 49681	 DIMENSION: 16
NAME: ad_id                TYPE: EMBEDDING	 SIZE: 105469	 DIMENSION: 17
NAME: ad_sponsored_by      TYPE: EMBEDDING	 SIZE: 7504	 DIMENSION: 13
NAME: a

2019-04-21 16:53:31,671 INFO (MainThread-97467) Done calling model_fn.
2019-04-21 16:53:31,673 INFO (MainThread-97467) Create CheckpointSaverHook.
2019-04-21 16:53:32,206 INFO (MainThread-97467) Graph was finalized.
2019-04-21 16:53:35,218 INFO (MainThread-97467) Running local_init_op.
2019-04-21 16:53:35,242 INFO (MainThread-97467) Done running local_init_op.
2019-04-21 16:53:36,206 INFO (MainThread-97467) Saving checkpoints for 0 into tfmodel/tf/model.ckpt.
2019-04-21 16:53:42,688 INFO (MainThread-97467) loss = 609.2002, step = 1
2019-04-21 16:53:45,294 INFO (MainThread-97467) global_step/sec: 38.3594
2019-04-21 16:53:45,296 INFO (MainThread-97467) loss = 284.28558, step = 101 (2.608 sec)
2019-04-21 16:53:47,043 INFO (MainThread-97467) global_step/sec: 57.1931
2019-04-21 16:53:47,044 INFO (MainThread-97467) loss = 162.46823, step = 201 (1.748 sec)
2019-04-21 16:53:48,776 INFO (MainThread-97467) global_step/sec: 57.6723
2019-04-21 16:53:48,778 INFO (MainThread-97467) loss = 109.745834

Input Function => Using data_file: test.csv
Input Function => Using num_epochs: 1
Input Function => Using task_index: 0
Input Function => Using total_workers: 1
Input Part File: test.csv
Total Workers: 1; Total Input Files: 1
Worker Index: 0; Total Workers: 1; Adding File (000): test.csv
Worker Index: 0; Total Workers: 1; Total Files to process: 1


2019-04-21 16:54:08,153 INFO (MainThread-97467) Done calling model_fn.
2019-04-21 16:54:08,172 INFO (MainThread-97467) Starting evaluation at 2019-04-21-23:54:08
2019-04-21 16:54:08,270 INFO (MainThread-97467) Graph was finalized.
2019-04-21 16:54:08,271 INFO (MainThread-97467) Restoring parameters from tfmodel/tf/model.ckpt-1000
2019-04-21 16:54:09,134 INFO (MainThread-97467) Running local_init_op.
2019-04-21 16:54:09,174 INFO (MainThread-97467) Done running local_init_op.


NotFoundError: test.csv; No such file or directory
	 [[Node: IteratorGetNext = IteratorGetNext[output_shapes=[[?]], output_types=[DT_STRING], _device="/job:localhost/replica:0/task:0/device:CPU:0"](OneShotIterator)]]

Caused by op 'IteratorGetNext', defined at:
  File "/anaconda3/lib/python3.6/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/anaconda3/lib/python3.6/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/anaconda3/lib/python3.6/site-packages/ipykernel_launcher.py", line 16, in <module>
    app.launch_new_instance()
  File "/Users/shsu33/tensorflow/lib/python3.6/site-packages/traitlets/config/application.py", line 658, in launch_instance
    app.start()
  File "/anaconda3/lib/python3.6/site-packages/ipykernel/kernelapp.py", line 486, in start
    self.io_loop.start()
  File "/anaconda3/lib/python3.6/site-packages/tornado/platform/asyncio.py", line 127, in start
    self.asyncio_loop.run_forever()
  File "/anaconda3/lib/python3.6/asyncio/base_events.py", line 422, in run_forever
    self._run_once()
  File "/anaconda3/lib/python3.6/asyncio/base_events.py", line 1432, in _run_once
    handle._run()
  File "/anaconda3/lib/python3.6/asyncio/events.py", line 145, in _run
    self._callback(*self._args)
  File "/anaconda3/lib/python3.6/site-packages/tornado/platform/asyncio.py", line 117, in _handle_events
    handler_func(fileobj, events)
  File "/anaconda3/lib/python3.6/site-packages/tornado/stack_context.py", line 276, in null_wrapper
    return fn(*args, **kwargs)
  File "/anaconda3/lib/python3.6/site-packages/zmq/eventloop/zmqstream.py", line 450, in _handle_events
    self._handle_recv()
  File "/anaconda3/lib/python3.6/site-packages/zmq/eventloop/zmqstream.py", line 480, in _handle_recv
    self._run_callback(callback, msg)
  File "/anaconda3/lib/python3.6/site-packages/zmq/eventloop/zmqstream.py", line 432, in _run_callback
    callback(*args, **kwargs)
  File "/anaconda3/lib/python3.6/site-packages/tornado/stack_context.py", line 276, in null_wrapper
    return fn(*args, **kwargs)
  File "/anaconda3/lib/python3.6/site-packages/ipykernel/kernelbase.py", line 283, in dispatcher
    return self.dispatch_shell(stream, msg)
  File "/anaconda3/lib/python3.6/site-packages/ipykernel/kernelbase.py", line 233, in dispatch_shell
    handler(stream, idents, msg)
  File "/anaconda3/lib/python3.6/site-packages/ipykernel/kernelbase.py", line 399, in execute_request
    user_expressions, allow_stdin)
  File "/anaconda3/lib/python3.6/site-packages/ipykernel/ipkernel.py", line 208, in do_execute
    res = shell.run_cell(code, store_history=store_history, silent=silent)
  File "/anaconda3/lib/python3.6/site-packages/ipykernel/zmqshell.py", line 537, in run_cell
    return super(ZMQInteractiveShell, self).run_cell(*args, **kwargs)
  File "/Users/shsu33/tensorflow/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 2662, in run_cell
    raw_cell, store_history, silent, shell_futures)
  File "/Users/shsu33/tensorflow/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 2785, in _run_cell
    interactivity=interactivity, compiler=compiler, result=result)
  File "/Users/shsu33/tensorflow/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 2901, in run_ast_nodes
    if self.run_code(code, result):
  File "/Users/shsu33/tensorflow/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 2961, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-1-628db2604f4e>", line 491, in <module>
    main_local()
  File "<ipython-input-1-628db2604f4e>", line 421, in main_local
    tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
  File "/Users/shsu33/tensorflow/lib/python3.6/site-packages/tensorflow/python/estimator/training.py", line 447, in train_and_evaluate
    return executor.run()
  File "/Users/shsu33/tensorflow/lib/python3.6/site-packages/tensorflow/python/estimator/training.py", line 531, in run
    return self.run_local()
  File "/Users/shsu33/tensorflow/lib/python3.6/site-packages/tensorflow/python/estimator/training.py", line 681, in run_local
    eval_result, export_results = evaluator.evaluate_and_export()
  File "/Users/shsu33/tensorflow/lib/python3.6/site-packages/tensorflow/python/estimator/training.py", line 886, in evaluate_and_export
    hooks=self._eval_spec.hooks)
  File "/Users/shsu33/tensorflow/lib/python3.6/site-packages/tensorflow/python/estimator/estimator.py", line 453, in evaluate
    input_fn, hooks, checkpoint_path)
  File "/Users/shsu33/tensorflow/lib/python3.6/site-packages/tensorflow/python/estimator/estimator.py", line 1346, in _evaluate_build_graph
    model_fn_lib.ModeKeys.EVAL))
  File "/Users/shsu33/tensorflow/lib/python3.6/site-packages/tensorflow/python/estimator/estimator.py", line 985, in _get_features_and_labels_from_input_fn
    result = self._call_input_fn(input_fn, mode)
  File "/Users/shsu33/tensorflow/lib/python3.6/site-packages/tensorflow/python/estimator/estimator.py", line 1074, in _call_input_fn
    return input_fn(**kwargs)
  File "<ipython-input-1-628db2604f4e>", line 283, in input_fn
    features, labels = parse_input(iterator.get_next())
  File "/Users/shsu33/tensorflow/lib/python3.6/site-packages/tensorflow/python/data/ops/iterator_ops.py", line 373, in get_next
    name=name)), self._output_types,
  File "/Users/shsu33/tensorflow/lib/python3.6/site-packages/tensorflow/python/ops/gen_dataset_ops.py", line 1745, in iterator_get_next
    output_shapes=output_shapes, name=name)
  File "/Users/shsu33/tensorflow/lib/python3.6/site-packages/tensorflow/python/framework/op_def_library.py", line 787, in _apply_op_helper
    op_def=op_def)
  File "/Users/shsu33/tensorflow/lib/python3.6/site-packages/tensorflow/python/framework/ops.py", line 3414, in create_op
    op_def=op_def)
  File "/Users/shsu33/tensorflow/lib/python3.6/site-packages/tensorflow/python/framework/ops.py", line 1740, in __init__
    self._traceback = self._graph._extract_stack()  # pylint: disable=protected-access

NotFoundError (see above for traceback): test.csv; No such file or directory
	 [[Node: IteratorGetNext = IteratorGetNext[output_shapes=[[?]], output_types=[DT_STRING], _device="/job:localhost/replica:0/task:0/device:CPU:0"](OneShotIterator)]]
