In [None]:
import tempfile
import os
import math
from io import BytesIO
from google.cloud import storage
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow_transform import coders
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import schema_utils
import tensorflow_transform.beam as tft_beam
import tensorflow_transform as tft
import apache_beam as beam
from sklearn import model_selection

In [None]:
client = storage.Client()
bucket = client.get_bucket('ames-house-dataset')
blob = storage.Blob('train.csv', bucket)
content = blob.download_as_string()
data = pd.read_csv(BytesIO(content))

In [None]:
train, test = model_selection.train_test_split(data, test_size=0.2, random_state=42)

In [None]:
tempdir = tempfile.gettempdir()
gs_bucket = 'gs://ames-house-dataset/'
train_file_name = 'train_transformed'
test_file_name = 'test_transformed'

INPUT_COLS = ['Id', 'MSSubClass', 'MSZoning', 'LotFrontage', 'LotArea', 'Street',
              'Alley', 'LotShape', 'LandContour', 'Utilities', 'LotConfig',
              'LandSlope', 'Neighborhood', 'Condition1', 'Condition2',
              'BldgType', 'HouseStyle', 'OverallQual', 'OverallCond', 'YearBuilt',
              'YearRemodAdd', 'RoofStyle', 'RoofMatl', 'Exterior1st', 'Exterior2nd',
              'MasVnrType', 'MasVnrArea', 'ExterQual', 'ExterCond', 'Foundation',
              'BsmtQual', 'BsmtCond', 'BsmtExposure', 'BsmtFinType1', 'BsmtFinSF1',
              'BsmtFinType2', 'BsmtFinSF2', 'BsmtUnfSF', 'TotalBsmtSF', 'Heating',
              'HeatingQC', 'CentralAir', 'Electrical', '1stFlrSF', '2ndFlrSF',
              'LowQualFinSF', 'GrLivArea', 'BsmtFullBath', 'BsmtHalfBath',
              'FullBath', 'HalfBath', 'BedroomAbvGr', 'KitchenAbvGr', 'KitchenQual',
              'TotRmsAbvGrd', 'Functional', 'Fireplaces', 'FireplaceQu', 'GarageType',
              'GarageYrBlt', 'GarageFinish', 'GarageCars', 'GarageArea', 'GarageQual',
              'GarageCond', 'PavedDrive', 'WoodDeckSF', 'OpenPorchSF', 'EnclosedPorch',
              '3SsnPorch', 'ScreenPorch', 'PoolArea', 'PoolQC', 'Fence', 'MiscFeature',
              'MiscVal', 'MoSold', 'YrSold', 'SaleType', 'SaleCondition', 'SalePrice']


OPT_CAT_FEATURES = ['Alley', 'MasVnrType', 'BsmtQual', 'BsmtCond',
                    'BsmtExposure', 'BsmtFinType1', 'BsmtFinType2',
                    'Electrical', 'FireplaceQu', 'GarageType', 'GarageFinish',
                    'GarageQual', 'GarageCond', 'PoolQC', 'Fence',
                    'MiscFeature']


OPT_NUM_FEATURES = ['LotFrontage', 'MasVnrArea', 'GarageYrBlt']


CAT_FEATURES = ['MSZoning', 'Street', 'LotShape', 'LandContour', 'Utilities',
                'LotConfig', 'LandSlope', 'Neighborhood', 'Condition1',
                'Condition2', 'BldgType', 'HouseStyle', 'RoofStyle',
                'RoofMatl', 'Exterior1st', 'Exterior2nd', 'ExterQual',
                'ExterCond', 'Foundation', 'Heating', 'HeatingQC',
                'CentralAir', 'KitchenQual', 'Functional', 'PavedDrive',
                'SaleType', 'SaleCondition']


NUM_FEATURES = ['MSSubClass', 'LotArea', 'OverallQual', 'OverallCond',
                'YearBuilt', 'YearRemodAdd', 'BsmtFinSF1', 'BsmtFinSF2',
                'BsmtUnfSF', 'TotalBsmtSF', '1stFlrSF', '2ndFlrSF',
                'LowQualFinSF', 'GrLivArea', 'BsmtFullBath', 'BsmtHalfBath',
                'FullBath', 'HalfBath', 'BedroomAbvGr', 'KitchenAbvGr',
                'TotRmsAbvGrd', 'Fireplaces', 'GarageCars', 'GarageArea',
                'WoodDeckSF', 'OpenPorchSF', 'EnclosedPorch', '3SsnPorch',
                'ScreenPorch', 'PoolArea', 'MiscVal', 'MoSold', 'YrSold']


TARGET = 'SalePrice'


RAW_DATA_FEATURE_SPEC = dict(
    [(name, tf.io.FixedLenFeature([], tf.string)) for name in CAT_FEATURES]
    + [(name, tf.io.FixedLenFeature([], tf.float32)) for name in NUM_FEATURES]
    + [(name, tf.io.VarLenFeature(tf.string)) for name in OPT_CAT_FEATURES]
    + [(name, tf.io.VarLenFeature(tf.float32)) for name in OPT_NUM_FEATURES]
    + [(TARGET, tf.io.FixedLenFeature([], tf.float32))])

RAW_DATA_METADATA = dataset_metadata.DatasetMetadata(schema_utils.schema_from_feature_spec(RAW_DATA_FEATURE_SPEC))
INPUT_SCHEMA = schema_utils.schema_from_feature_spec(RAW_DATA_FEATURE_SPEC)
CONV_INPUT = coders.CsvCoder(INPUT_COLS, INPUT_SCHEMA)

In [None]:
class MapAndFilterErrors(beam.PTransform):
    """Like beam.Map but filters out errors in the map_fn."""

    class _MapAndFilterErrorsDoFn(beam.DoFn):
        """Count the bad examples using a beam metric."""

        def __init__(self, fn):
            self._fn = fn
            # Create a counter to measure number of bad elements.
            self._bad_elements_counter = beam.metrics.Metrics.counter('dataset', 'bad_elements')

        def process(self, element):
            try:
                yield self._fn(element)
            except Exception:  # pylint: disable=broad-except
                # Catch any exception the above call.
                self._bad_elements_counter.inc(1)

    def __init__(self, fn):
        self._fn = fn

    def expand(self, pcoll):
        return pcoll | beam.ParDo(self._MapAndFilterErrorsDoFn(self._fn))

In [None]:
def preprocessing_fn(inputs):
    outputs = inputs.copy()
    for key in ['1stFlrSF', 'GrLivArea', 'LotArea']:
        outputs[key] = tf.math.log(outputs[key])
    
    # for key in ['Neighborhood']:
    #     outputs[key] = tft.compute_and_apply_vocabulary(outputs[key])
    tft.vocabulary(inputs['Neighborhood'], vocab_filename='Neighborhood')
    
    return {'1stFlrSF': outputs['1stFlrSF'],
            'GrLivArea': outputs['GrLivArea'],
            'LotArea': outputs['LotArea'],
            'Neighborhood': outputs['Neighborhood'],
            'SalePrice': outputs['SalePrice']}

In [None]:
# def transform_data(train_file_name, test_file_name, tempdir):
with beam.Pipeline() as p:
    with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
        converter = tft.coders.CsvCoder(INPUT_COLS, RAW_DATA_METADATA.schema)
        train_data = (p
                      | 'read_train_data' >> beam.io.ReadFromText(os.path.join(gs_bucket, 'train.csv'), skip_header_lines=1)
                      | 'decode_train_data' >> MapAndFilterErrors(converter.decode))

        train_dataset = (train_data, RAW_DATA_METADATA)
        transformed_train_dataset, transform_fn = (train_dataset | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
        transformed_train_data, transformed_metadata = transformed_train_dataset

        transformed_data_coder = tft.coders.ExampleProtoCoder(transformed_metadata.schema)
        
        _ = (transformed_train_data
             | 'encode_train_data' >> beam.Map(transformed_data_coder.encode)
             | 'write_train_data' >> beam.io.WriteToTFRecord(os.path.join(tempdir, train_file_name)))

        test_data = (p
                     | 'read_test_data' >> beam.io.ReadFromText(os.path.join(gs_bucket, 'test.csv'), skip_header_lines=1)
                     | 'decode_test_data' >> MapAndFilterErrors(converter.decode))
        
        test_dataset = (test_data, RAW_DATA_METADATA)
        transformed_test_dataset = ((test_dataset, transform_fn) | tft_beam.TransformDataset())
        transformed_test_data, _ = transformed_test_dataset
        
        _ = (transformed_test_data
             | 'encode_test_data' >> beam.Map(transformed_data_coder.encode)
             | 'write_test_data' >> beam.io.WriteToTFRecord(os.path.join(tempdir, test_file_name)))
        
        _ = (transform_fn
             | 'save_transform_function' >> tft_beam.WriteTransformFn(tempdir))