In [None]:
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import sys
import argparse
import os
import pprint
import tempfile
import urllib
import zipfile

In [None]:

train = os.path.join('datasets', 'adult/adult.data')
test = os.path.join('datasets', 'adult/adult.test')

In [None]:
import tensorflow_transform as tft
import apache_beam as beam

In [None]:
import tensorflow as tf
import tensorflow_transform.beam as tft_beam
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import dataset_schema
tf.logging.set_verbosity(tf.logging.ERROR)

In [None]:
CATEGORICAL_FEATURE_KEYS = [
    'workclass',
    #'education',
    #'marital-status',
    #'occupation',
    #'relationship',
    #'race',
    #'sex',
    #'native-country',
]

NUMERIC_FEATURE_KEYS = [
    'age',
    'capital-gain',
    #'capital-loss',
    #'hours-per-week',
]
OPTIONAL_NUMERIC_FEATURE_KEYS = [
    #'education-num',
]
LABEL_KEY = 'label'

In [None]:
RAW_DATA_FEATURE_SPEC = dict(
   
    [(name, tf.FixedLenFeature([], tf.string))
     for name in CATEGORICAL_FEATURE_KEYS] +
    [(name, tf.FixedLenFeature([], tf.float32))
     for name in NUMERIC_FEATURE_KEYS] +
    [(name, tf.VarLenFeature(tf.float32))
     for name in OPTIONAL_NUMERIC_FEATURE_KEYS] #+
    #[(LABEL_KEY, tf.FixedLenFeature([], tf.string))]
)

RAW_DATA_METADATA = dataset_metadata.DatasetMetadata(
    dataset_schema.from_feature_spec(RAW_DATA_FEATURE_SPEC))

In [None]:
testing = True
if testing:
  TRAIN_NUM_EPOCHS = 1
  NUM_TRAIN_INSTANCES = 1
  TRAIN_BATCH_SIZE = 1
  NUM_TEST_INSTANCES = 1
else:
  TRAIN_NUM_EPOCHS = 16
  NUM_TRAIN_INSTANCES = 32561
  TRAIN_BATCH_SIZE = 128
  NUM_TEST_INSTANCES = 16281

# Names of temp files
TRANSFORMED_TRAIN_DATA_FILEBASE = 'train_transformed'
TRANSFORMED_TEST_DATA_FILEBASE = 'test_transformed'
EXPORTED_MODEL_DIR = 'exported_model_dir'

In [None]:
class MapAndFilterErrors(beam.PTransform):
  """Like beam.Map but filters out erros in the map_fn."""

  class _MapAndFilterErrorsDoFn(beam.DoFn):
    """Count the bad examples using a beam metric."""

    def __init__(self, fn):
      self._fn = fn
      # Create a counter to measure number of bad elements.
      self._bad_elements_counter = beam.metrics.Metrics.counter(
          'census_example', 'bad_elements')

    def process(self, element):
      try:
        yield self._fn(element)
      except Exception:  # pylint: disable=broad-except
        # Catch any exception the above call.
        self._bad_elements_counter.inc(1)

  def __init__(self, fn):
    self._fn = fn

  def expand(self, pcoll):
    return pcoll | beam.ParDo(self._MapAndFilterErrorsDoFn(self._fn))

In [None]:
def preprocessing_fn(inputs):
    """Preprocess input columns into transformed columns."""
    # Since we are modifying some features and leaving others unchanged, we
    # start by setting `outputs` to a copy of `inputs.
    outputs = inputs.copy()
    res = dict()
    # Scale numeric columns to have range [0, 1].
    for key in NUMERIC_FEATURE_KEYS:
        res[key] = tft.scale_to_0_1(outputs[key], name=key)

    for key in OPTIONAL_NUMERIC_FEATURE_KEYS:
    # This is a SparseTensor because it is optional. Here we fill in a default
    # value when it is missing.
        dense = tf.sparse_to_dense(outputs[key].indices,
                                   [outputs[key].dense_shape[0], 1],
                                   outputs[key].values, default_value=0.)
        # Reshaping from a batch of vectors of size 1 to a batch to scalars.
        dense = tf.squeeze(dense, axis=1)
        res[key] = tft.scale_to_0_1(dense, name = key)


    # For all categorical columns except the label column, we generate a
    # vocabulary but do not modify the feature.  This vocabulary is instead
    # used in the trainer, by means of a feature column, to convert the feature
    # from a string to an integer id.
    for key in CATEGORICAL_FEATURE_KEYS:
        v1 = tft.compute_and_apply_vocabulary(
            inputs[key],
            default_value=-1,
            top_k=100,
            frequency_threshold=0.05,
            num_oov_buckets=0,
            vocab_filename=key,
            weights=None,
            labels=None,
            use_adjusted_mutual_info=False,
            min_diff_from_avg=0.0,
            coverage_top_k=None,
            coverage_frequency_threshold=None,
            key_fn=None,

            name=key
        )
        
        res[key] = tf.cast(v1,tf.float32)
    

    return {"XXALLXX":tf.stack([res[o] for o in res],axis=1, name="output_all")}

In [None]:
def transform_data(train_data_file, test_data_file, working_dir):
  """Transform the data and write out as a TFRecord of Example protos.

  Read in the data using the CSV reader, and transform it using a
  preprocessing pipeline that scales numeric data and converts categorical data
  from strings to int64 values indices, by creating a vocabulary for each
  category.

  Args:
    train_data_file: File containing training data
    test_data_file: File containing test data
    working_dir: Directory to write transformed data and metadata to
  """

  # The "with" block will create a pipeline, and run that pipeline at the exit
  # of the block.
  with beam.Pipeline() as pipeline:
    with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
      # Create a coder to read the census data with the schema.  To do this we
      # need to list all columns in order since the schema doesn't specify the
      # order of columns in the csv.
      ordered_columns = [
          'age', 'workclass', 'fnlwgt', 'education', 'education-num',
          'marital-status', 'occupation', 'relationship', 'race', 'sex',
          'capital-gain', 'capital-loss', 'hours-per-week', 'native-country',
          'label'
      ]
      converter = tft.coders.CsvCoder(ordered_columns, RAW_DATA_METADATA.schema)

      # Read in raw data and convert using CSV converter.  Note that we apply
      # some Beam transformations here, which will not be encoded in the TF
      # graph since we don't do the from within tf.Transform's methods
      # (AnalyzeDataset, TransformDataset etc.).  These transformations are just
      # to get data into a format that the CSV converter can read, in particular
      # removing spaces after commas.
      #
      # We use MapAndFilterErrors instead of Map to filter out decode errors in
      # convert.decode which should only occur for the trailing blank line.
      raw_data = (
          pipeline
          | 'ReadTrainData' >> beam.io.ReadFromText(train_data_file)
          | 'FixCommasTrainData' >> beam.Map(
              lambda line: line.replace(', ', ','))
          | 'DecodeTrainData' >> MapAndFilterErrors(converter.decode))

      # Combine data and schema into a dataset tuple.  Note that we already used
      # the schema to read the CSV data, but we also need it to interpret
      # raw_data.
      raw_dataset = (raw_data, RAW_DATA_METADATA)
      
          
      transform_fn =  raw_dataset |  tft_beam.AnalyzeDataset(preprocessing_fn)

      print("workdir",working_dir )
      #print(transform_fn)
      _ = (
          transform_fn
          | 'WriteTransformFn' >> tft_beam.WriteTransformFn(working_dir))
      return transform_fn
        

In [None]:
tf.reset_default_graph()
transform_data(train, test, "/tmp/tft10")


**Now try to inspect /tmp/tft10/transform_fn/saved_model.pb file with vim. You will find hardcoded path like:**   
`/var/folders/h3/zt26x1d93hq49vff996w04pc0000gq/T/tmpndp56yw3/tftransform_tmp/workclass`

In [None]:
import tensorflow as tf
import sys
from tensorflow.python.platform import gfile
tf.reset_default_graph()
from tensorflow.core.protobuf import saved_model_pb2
from tensorflow.python.util import compat
import tensorflow.contrib as tfc
with tf.Session() as sess:
    model_filename ='/tmp/tft10/transform_fn/saved_model.pb'
    with gfile.FastGFile(model_filename, 'rb') as f:

        data = compat.as_bytes(f.read())
        sm = saved_model_pb2.SavedModel()
        sm.ParseFromString(data)

        if 1 != len(sm.meta_graphs):
            print('More than one graph found. Not sure which to write')
            sys.exit(1)

        
        g_in = tf.import_graph_def(sm.meta_graphs[0].graph_def)
       
    
        opName = "import/transform/workclass/apply_vocab/string_to_index/hash_table/table_init/InitializeTableFromTextFileV2"
        # this is the only way I was able to init the hash table
        sess.run( sess.graph.get_operation_by_name(opName))
        #that doesn't work:
        #sess.run(tf.initializers.tables_initializer(name='initialize_all_tables'))
        in1 = sess.graph.get_tensor_by_name("import/transform/inputs/age:0")
      
        out1 = sess.graph.get_tensor_by_name("import/transform/output_all:0")
        
        in2 = sess.graph.get_tensor_by_name("import/transform/inputs/capital-gain:0")
        in3 = sess.graph.get_tensor_by_name("import/transform/inputs/workclass:0")
      
        # uncomment this to see all nodes incl. hardcoded path to workclass file
        #for o in sm.meta_graphs[0].graph_def.node:
        #    print(o)
        res = sess.run([out1], feed_dict = {in1: [35,60], in2:[100000, 80000], in3:["Local-gov","bbbb"]})
        print(res)
        

In [None]:
from tensorflow.graph_util import extract_sub_graph

![graph](graph_scale_0_1.png)