<h1> 2c. Refactoring to add batching and feature-creation </h1>

In this notebook, we continue reading the same small dataset, but refactor our ML pipeline in two small, but significant, ways:
<ol>
<li> Refactor the input to read data in batches.
<li> Refactor the feature creation so that it is not one-to-one with inputs.
</ol>
The Pandas function in the previous notebook also batched, only after it had read the whole data into memory -- on a large dataset, this won't be an option.
<p><a herf=https://www.kaggle.com/c/new-york-city-taxi-fare-prediction/data>数据来源</a></p>

In [1]:
import google.cloud.bigquery as bq
import tensorflow as tf
import numpy as np
import shutil
import tensorflow as tf
print(tf.__version__)

1.13.1


<h2> 1. Refactor the input </h2>

Read data created in Lab1a, but this time make it more general and performant.  Instead of using Pandas, we will use TensorFlow's Dataset API.

In [2]:
CSV_COLUMNS = ['key','fare_amount','pickup_datetime','pickup_longitude',
               'pickup_latitude','dropoff_longitude','dropoff_latitude','passenger_count']
LABEL_COLUMN = 'fare_amount'
DEFAULTS = [['nokey'], [0.0], ['nokey'], [-74.0], [40.0], [-74.0], [40.7], [1.0]]

def read_dataset(filename, mode, batch_size = 512):
  def _input_fn():
    def decode_csv(value_column):
      print(f'============={value_column}=============')
      columns = tf.decode_csv(value_column, record_defaults = DEFAULTS)
      features = dict(zip(CSV_COLUMNS, columns))
      label = features.pop(LABEL_COLUMN)
      return features, label

    # Create list of files that match pattern
    file_list = tf.gfile.Glob(filename)

    # Create dataset from file list
    dataset = tf.data.TextLineDataset(file_list).map(decode_csv)
    if mode == tf.estimator.ModeKeys.TRAIN:
        num_epochs = None # indefinitely
        dataset = dataset.shuffle(buffer_size = 10 * batch_size)
    else:
        num_epochs = 1 # end-of-input after this

    dataset = dataset.repeat(num_epochs).batch(batch_size) #batch 每次的数量
    return dataset.make_one_shot_iterator().get_next()
  return _input_fn
    

def get_train():
  return read_dataset('E:/new-york-city-taxi-fare-prediction/train.csv', mode = tf.estimator.ModeKeys.TRAIN)

# def get_valid():
#   return read_dataset('E:/new-york-city-taxi-fare-prediction/taxi-valid.csv', mode = tf.estimator.ModeKeys.EVAL)

def get_test():
  return read_dataset('E:/new-york-city-taxi-fare-prediction/test.csv', mode = tf.estimator.ModeKeys.EVAL)

<h2> 2. Refactor the way features are created. </h2>

For now, pass these through (same as previous lab).  However, refactoring this way will enable us to break the one-to-one relationship between inputs and features.

In [3]:
INPUT_COLUMNS = [
    tf.feature_column.numeric_column('pickuplon'),
    tf.feature_column.numeric_column('pickuplat'),
    tf.feature_column.numeric_column('dropofflat'),
    tf.feature_column.numeric_column('dropofflon'),
    tf.feature_column.numeric_column('passengers'),
]

def add_more_features(feats):
  # Nothing to add (yet!)
  return feats

feature_cols = add_more_features(INPUT_COLUMNS)

<h2> Create and train the model </h2>

Note that we train for num_steps * batch_size examples.

In [4]:
tf.logging.set_verbosity(tf.logging.INFO)
OUTDIR = 'taxi_trained'
shutil.rmtree(OUTDIR, ignore_errors = True) # start fresh each time
model = tf.estimator.LinearRegressor(
      feature_columns = feature_cols, model_dir = OUTDIR)
model.train(input_fn = get_train(), steps = 100);  # TODO: change the name of input_fn as needed

INFO:tensorflow:Using default config.
INFO:tensorflow:Using config: {'_model_dir': 'taxi_trained', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x0000020CD4C60668>, '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}
Instructions for updating:
Colocations handled automatically by placer.
INFO:tensorflow:Calling model_fn.


ValueError: Feature dropofflat is not in features dictionary.

<h3> Evaluate model </h3>

As before, evaluate on the validation data.  We'll do the third refactoring (to move the evaluation into the training loop) in the next lab.

In [None]:
def print_rmse(model, name, input_fn):
  metrics = model.evaluate(input_fn = input_fn, steps = 1)
  print('RMSE on {} dataset = {}'.format(name, np.sqrt(metrics['average_loss'])))
print_rmse(model, 'validation', get_test())