# Imports

In [7]:
import tensorflow as tf
import tensorflow_transform as tft
import tensorflow_transform.beam as tft_beam

from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import schema_utils

import pprint
import tempfile

print(f'TensorFlow version: {tf.__version__}')
print(f'TFX Transform version: {tft.__version__}')

TensorFlow version: 2.5.0
TFX Transform version: 1.1.0


# Collect raw data

In [8]:
# define sample data
raw_data = [
      {'x': 1, 'y': 1, 's': 'hello'},
      {'x': 2, 'y': 2, 's': 'world'},
      {'x': 3, 'y': 3, 's': 'hello'}
  ]


# Define the metadata

In [9]:
# define the schema as a DatasetMetadata object
raw_data_metadata = dataset_metadata.DatasetMetadata(
    
    # use convenience function to build a Schema protobuf
    schema_utils.schema_from_feature_spec({
        
        # define a dictionary mapping the keys to its feature spec type
        'y': tf.io.FixedLenFeature([], tf.float32),
        'x': tf.io.FixedLenFeature([], tf.float32),
        's': tf.io.FixedLenFeature([], tf.string),
    }))

In [10]:
# preview the schema
print(raw_data_metadata._schema)

feature {
  name: "s"
  type: BYTES
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "x"
  type: FLOAT
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "y"
  type: FLOAT
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}



# Create a preprocessing function


In [11]:
def preprocessing_fn(inputs):
    """Preprocess input columns into transformed columns."""
    
    # extract the columns and assign to local variables
    x = inputs['x']
    y = inputs['y']
    s = inputs['s']
    
    # data transformations using tft functions
    x_centered = x - tft.mean(x)
    y_normalized = tft.scale_to_0_1(y)
    s_integerized = tft.compute_and_apply_vocabulary(s)
    x_centered_times_y_normalized = (x_centered * y_normalized)
    
    # return the transformed data
    return {
        'x_centered': x_centered,
        'y_normalized': y_normalized,
        's_integerized': s_integerized,
        'x_centered_times_y_normalized': x_centered_times_y_normalized,
    }

# Generate a constant graph with the required transformations

In [12]:
# Ignore the warnings
tf.get_logger().setLevel('ERROR')

# a temporary directory is needed when analyzing the data
with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
    
    # define the pipeline using Apache Beam syntax
    transformed_dataset, transform_fn = (
        
        # analyze and transform the dataset using the preprocessing function
        (raw_data, raw_data_metadata) | tft_beam.AnalyzeAndTransformDataset(
            preprocessing_fn)
    )

# unpack the transformed dataset
transformed_data, transformed_metadata = transformed_dataset

# print the results
print('\nRaw data:\n{}\n'.format(pprint.pformat(raw_data)))
print('Transformed data:\n{}'.format(pprint.pformat(transformed_data)))






Raw data:
[{'s': 'hello', 'x': 1, 'y': 1},
 {'s': 'world', 'x': 2, 'y': 2},
 {'s': 'hello', 'x': 3, 'y': 3}]

Transformed data:
[{'s_integerized': 0,
  'x_centered': -1.0,
  'x_centered_times_y_normalized': -0.0,
  'y_normalized': 0.0},
 {'s_integerized': 1,
  'x_centered': 0.0,
  'x_centered_times_y_normalized': 0.0,
  'y_normalized': 0.5},
 {'s_integerized': 0,
  'x_centered': 1.0,
  'x_centered_times_y_normalized': 1.0,
  'y_normalized': 1.0}]
