# Data Transformation: Floor Price Optimisation

## Continuous Learning Experiment

- Author: Reshad Dernjani
- Source: Tensorflow Transform documentation https://www.tensorflow.org/tfx/transform/tutorials/TFT_census_example

## Usage
Provide your raw data the following way:
* 8% unoptimized traffic for 24 hours 
 * /notebooks/raw_data/[dsp_id]/dsp-[dsp_id]-train.csv
* 8% unoptimized traffic of the next day on certain hours
 * Next day hour 9 reduced dsp bids by 90%. Will be used for warm start training
   * /notebooks/raw_data/[dsp_id]/dsp-[dsp_id]-test-9-reduced.csv
 * Next day hour 10 reduced dsp bids by 90%. Will be used for the analysis
   * /notebooks/raw_data/[dsp_id]/dsp-[dsp_id]-test-10-reduced.csv
 
- Replace [dsp_id] with the actual dsp id

In [None]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import argparse
import os
import pprint
import tempfile
import time

try:
    import tensorflow_transform as tft
    import apache_beam as beam
except ImportError:
    # This will take a minute, ignore the warnings.
    !pip install -q tensorflow-transform
    !pip install -q apache_beam
    import tensorflow_transform as tft
    import apache_beam as beam

import tensorflow as tf
import tensorflow_transform.beam as tft_beam
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import dataset_schema

## Basic housekeeping

In [None]:
OUTPUT_PATH = '/notebooks/transformed'
RAW_PATH = '/notebooks/raw_data'
# Output name templates.
TRANSFORMED_TRAIN_DATA_FILEBASE = 'train_transformed'
TRANSFORMED_TEST_DATA_FILEBASE = 'test_transformed'

## Name our columns

In [None]:
# Order of columns in the csv.
ORDERED_COLUMNS = [
    "inventory_id",
    "request_type",
    "ex_floor_price",
    "ex_bid_price",
    "state_code",
    "country_code",
    "city_code",
    "device_os",
    "device_os_version",
    "hour_of_day"
]

CATEGORICAL_FEATURE_KEYS = [
    'inventory_id',
    'request_type',
    'state_code',
    'country_code',
    'city_code',
    'device_os',
    'device_os_version',
    'hour_of_day',
]

NUMERIC_FEATURE_KEYS = [
    'ex_floor_price',
]

OPTIONAL_NUMERIC_FEATURE_KEYS = [ 
    # Actually we handled optionals on the data query (at least during research).
]

LABEL_KEY = 'ex_bid_price'

## Define our features and schema

In [None]:
RAW_DATA_FEATURE_SPEC = dict(
    [(name, tf.FixedLenFeature([], tf.string))
     for name in CATEGORICAL_FEATURE_KEYS] +
    [(name, tf.FixedLenFeature([], tf.float32))
     for name in NUMERIC_FEATURE_KEYS] +
    [(name, tf.VarLenFeature(tf.float32))
     for name in OPTIONAL_NUMERIC_FEATURE_KEYS] +
    [(LABEL_KEY, tf.FixedLenFeature([], tf.float32))]
)

RAW_DATA_METADATA = dataset_metadata.DatasetMetadata(
    dataset_schema.from_feature_spec(RAW_DATA_FEATURE_SPEC)
)

## Create a Beam Transform for cleaning our input data

In [None]:
class MapAndFilterErrors(beam.PTransform):
    """Like beam.Map but filters out erros in the map_fn."""

    class _MapAndFilterErrorsDoFn(beam.DoFn):
        """Count the bad examples using a beam metric."""

        def __init__(self, fn):
            self._fn = fn
            # Create a counter to measure number of bad elements.
            self._bad_elements_counter = beam.metrics.Metrics.counter('floor_price_optimisation', 'bad_elements')

        def process(self, element):
            try:
                yield self._fn(element)
            except Exception:
                # Catch any exception of the above call.
                self._bad_elements_counter.inc(1)

    def __init__(self, fn):
        self._fn = fn

    def expand(self, pcoll):
        return pcoll | beam.ParDo(self._MapAndFilterErrorsDoFn(self._fn))

## Create a tf.Transform preprocessing_fn

In [None]:
def preprocessing_fn(inputs):
    """Preprocess input columns into transformed columns."""
    # Since we are modifying some features and leaving others unchanged, we
    # start by setting 'outputs' to a copy of 'inputs'.
    outputs = inputs.copy()

    # Scale numeric columns to have range [0, 1].
    for key in NUMERIC_FEATURE_KEYS:
        outputs[key] = tft.scale_to_0_1(outputs[key])

    # For all categorical columns except the label column, we generate a
    # vocabulary but do not modify the feature.  This vocabulary is instead
    # used in the trainer, by means of a feature column, to convert the feature
    # from a string to an integer id.
    for key in CATEGORICAL_FEATURE_KEYS:
        tft.vocabulary(inputs[key], vocab_filename=key)
    
    for key in OPTIONAL_NUMERIC_FEATURE_KEYS:
        # This is a SparseTensor because it is optional. 
        # Here we fill in a default value when it is missing.
        sparse_tensor = tf.SparseTensor(outputs[key].indices, outputs[key].values, [outputs[key].dense_shape[0], 1])
        dense = tf.sparse.to_dense(
            sparse_tensor,
            default_value=0.,
            validate_indices=True,
            name=None
        )
        # Reshaping from a batch of vectors of size 1 to a batch of scalars.
        dense = tf.squeeze(dense, axis=1)
        outputs[key] = tft.scale_to_0_1(dense)
        
    return outputs

## Transform the data

In [None]:
def transform_data(train_data_file, test_data_files, working_dir, 
                   transformed_train_file_suffix='', transformed_test_file_suffix=''):
    """Transform the data and write out as a TFRecord.

    Read in the data using the CSV reader, and transform it using a
    preprocessing pipeline that scales numeric data and converts categorical data
    from strings to int64 value indices, by creating a vocabulary for each category.

    Args:
        train_data_files: Files containing training data
        test_data_files: Files containing test data
        working_dir: Directory to write transformed data and metadata to
        transformed_train_file_suffix: Suffix to be appended to the tranformed files
        transformed_test_file_suffix: Suffix to be appended to the tranformed files
    """

    # The "with" block will create a pipeline, and run that 
    # pipeline at the exit of the block.
    with beam.Pipeline() as pipeline:
        with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
            # Create a coder to read the data with the schema.  To do this we
            # need to list all columns in order since the schema doesn't specify the
            # order of columns in the csv.
            converter = tft.coders.CsvCoder(ORDERED_COLUMNS, RAW_DATA_METADATA.schema)
                
            # Read in raw data and convert using CSV converter. 
            # We use MapAndFilterErrors instead of Map to filter out decode errors in
            # convert.decode, which anyway should not occure since we are in control of the data generation.
            raw_data = (
                pipeline
                | 'ReadTrainData' >> beam.io.ReadFromText(train_data_file,skip_header_lines=1)
                | 'DecodeTrainData' >> MapAndFilterErrors(converter.decode))

            # Combine data and schema into a dataset tuple.  Note that we already used
            # the schema to read the CSV data, but we also need it to interpret raw_data.
            raw_dataset = (raw_data, RAW_DATA_METADATA)
            transformed_dataset, transform_fn = (raw_dataset | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
            transformed_data, transformed_metadata = transformed_dataset
            transformed_data_coder = tft.coders.ExampleProtoCoder(transformed_metadata.schema)
        
            _ = (
                transformed_data
                | 'EncodeTrainData' >> beam.Map(transformed_data_coder.encode)
                | 'WriteTrainData' >> beam.io.WriteToTFRecord(
                    os.path.join(working_dir, 
                        TRANSFORMED_TRAIN_DATA_FILEBASE+transformed_train_file_suffix)))
            
            j = 0
            for test_file in test_data_files:
                # Get suffix of raw data file e.g 'dsp-47-test-10hours.csv' --> ' -10hours'
                begin = test_file.find("test") + 4
                end = len(test_file) - 4
                raw_test_file_suffix = test_file[begin:end]
                
                # Now apply transform function to test data.
                raw_test_data = (
                    pipeline
                    | 'ReadTestData'+str(j) >> beam.io.ReadFromText(test_file, skip_header_lines=1)
                    | 'DecodeTestData'+str(j) >> MapAndFilterErrors(converter.decode))

                raw_test_dataset = (raw_test_data, RAW_DATA_METADATA)
                transformed_test_dataset = ((raw_test_dataset, transform_fn) | "test"+str(j) >> tft_beam.TransformDataset())
                # Don't need transformed data schema, since it's the same as before.
                transformed_test_data, _ = transformed_test_dataset

                _ = (
                    transformed_test_data
                    | 'EncodeTestData'+str(j) >> beam.Map(transformed_data_coder.encode)
                    | 'WriteTestData'+str(j) >> beam.io.WriteToTFRecord(
                        os.path.join(working_dir, 
                            TRANSFORMED_TEST_DATA_FILEBASE+raw_test_file_suffix+transformed_test_file_suffix)))
                j += 1
                
            # Will write a SavedModel and metadata to working_dir, which can then
            # be read by the tft.TFTransformOutput class.
            _ = (
                transform_fn
                | 'WriteTransformFn' >> tft_beam.WriteTransformFn(working_dir))

## Put it all together

In [None]:
# NOTE: After transforming the raw data we would need 
# to delete the file "transfomred/train_transformed-00000-of-00001" 
# and rename "transfomred/test_transformed-9-reduced-00000-of-00001" 
# to "transfomred/train_transformed-9-reduced-00000-of-00001" in order to continue with the training!

dsps = ['47_warmstarting']
for dsp in dsps:
    final_output_path = OUTPUT_PATH+'/'+dsp
    test_data_paths = tf.gfile.Glob(RAW_PATH+'/'+dsp+'/dsp-'+dsp+'-test*')
    train_data_path = tf.gfile.Glob(RAW_PATH+'/'+dsp+'/dsp-'+dsp+'-train*')[0]
    start = time.time() 
    transform_data(train_data_path, test_data_paths, final_output_path,
                   transformed_train_file_suffix='', transformed_test_file_suffix='')
    print('Transform took {:.2f} seconds for DSP {}'.format(time.time() - start, dsp))
    
# Ignore"WARNING:tensorflow:Expected binary or unicode string, got type_url:",
# since it is a known bug and will be fixed soon. 
# See: https://stackoverflow.com/questions/49394549/error-on-data-type-from-trainable-variables
# The same goes for "WARNING:root:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be." 
# See: https://github.com/apache/beam/pull/6801 
