<h2 style='color:darkblue; text-align:center;'><strong>Taxi Demand Forecasting on Vertex AI </strong></h2>

In [1]:
import os
import pandas as pd
import pprint
import shutil
import tempfile
import urllib.request
from absl import logging
from pathlib import Path

import tensorflow as tf
import tensorflow_data_validation as tfdv

import tensorflow_model_analysis as tfma
from tfx import v1 as tfx

# Setting up logging and pprint
# tf.get_logger().propagate = False
pp = pprint.PrettyPrinter()
# Set TensorFlow logging to error-only
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'  # 0 = all, 1 = no info, 2 = no info and warnings, 3 = no info, warnings, and errors
tf.get_logger().setLevel('ERROR')

2023-12-13 02:58:38.568168: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
caused by: ['/opt/conda/lib/python3.10/site-packages/tensorflow_io/python/ops/libtensorflow_io_plugins.so: undefined symbol: _ZN3tsl6Status12empty_stringB5cxx11Ev']
caused by: ['/opt/conda/lib/python3.10/site-packages/tensorflow_io/python/ops/libtensorflow_io.so: undefined symbol: _ZNK10tensorflow4data11DatasetBase8FinalizeEPNS_15OpKernelContextESt8functionIFN3tsl8StatusOrISt10unique_ptrIS1_NS5_4core15RefCountDeleterEEEEvEE']


***Paths***

In [2]:
GOOGLE_CLOUD_PROJECT = 'mlops-363723'         
GOOGLE_CLOUD_PROJECT_NUMBER = '75674212269'  
GOOGLE_CLOUD_REGION = 'us-central1'          
GCS_BUCKET_NAME = 'chicago_taxitrips'  

PIPELINE_NAME = 'taxi-demand-prediction-management'
# Define the GCS_BUCKET_NAME
GCS_BUCKET_NAME = 'chicago_taxitrips'
DATA_DIRECTORY = 'gs://chicago_taxitrips/DATA_DIRECTORY'

# # Paths for users' data.
TRAIN_DATA_FILENAME = 'training_data.csv'
EVAL_DATA_FILENAME = 'validation_data.csv'
TEST_DATA_FILENAME = 'test_data.csv'

TRAIN_DATA_PATH = os.path.join(DATA_DIRECTORY, TRAIN_DATA_FILENAME)
EVAL_DATA_PATH = os.path.join(DATA_DIRECTORY, EVAL_DATA_FILENAME)
TEST_DATA_PATH = os.path.join(DATA_DIRECTORY, TEST_DATA_FILENAME)


# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)


SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Name of Vertex AI Endpoint.
ENDPOINT_NAME = 'prediction-' + PIPELINE_NAME

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
print('DATA_DIRECTORY:', DATA_DIRECTORY)
print('TRAIN_DATA_PATH:', TRAIN_DATA_PATH)
print('TEST_DATA_PATH:', TEST_DATA_PATH)
print('EVAL_DATA_PATH:', EVAL_DATA_PATH)
print('PIPELINE_ROOT:', PIPELINE_ROOT)
print('SERVING_MODEL_DIR:', SERVING_MODEL_DIR)

PIPELINE_ROOT: gs://chicago_taxitrips/pipeline_root/taxi-demand-prediction-management
DATA_DIRECTORY: gs://chicago_taxitrips/DATA_DIRECTORY
TRAIN_DATA_PATH: gs://chicago_taxitrips/DATA_DIRECTORY/training_data.csv
TEST_DATA_PATH: gs://chicago_taxitrips/DATA_DIRECTORY/test_data.csv
EVAL_DATA_PATH: gs://chicago_taxitrips/DATA_DIRECTORY/validation_data.csv
PIPELINE_ROOT: gs://chicago_taxitrips/pipeline_root/taxi-demand-prediction-management
SERVING_MODEL_DIR: gs://chicago_taxitrips/serving_model/taxi-demand-prediction-management


In [3]:
_taxi_constants_module_file = 'taxi_constants.py'

In [4]:
%%writefile {_taxi_constants_module_file}
NUMERICAL_FEATURES = [ 
    'temperature_2m',
    'relativehumidity_2m',
    'rain',
    'snowfall',
    'hour_sin',
    'hour_cos',
    'day_sin',
    'day_cos',
    'month_sin',
    'month_cos']



CATEGORICAL_NUMERICAL_FEATURES = [
    'public_holiday', 'weathercode','pickup_community_area', 'year'
]


# Keys
LABEL_KEY = 'demand'

def t_name(key):
    """
    Rename the feature keys so that they don't clash with the raw keys when
    running the Evaluator component.
    Args:
    key: The original feature key
    Returns:
    key with '_xf' appended
    """
    return key + '_xf'


Overwriting taxi_constants.py


In [5]:
TRANSFORM_MODULE_PATH = "chicago_taxi_transform.py"

In [6]:
%%writefile {TRANSFORM_MODULE_PATH}

import tensorflow as tf
import tensorflow_transform as tft
import taxi_constants

_NUMERICAL_FEATURES = taxi_constants.NUMERICAL_FEATURES
_CATEGORICAL_NUMERICAL_FEATURES = taxi_constants.CATEGORICAL_NUMERICAL_FEATURES
_LABEL_KEY = taxi_constants.LABEL_KEY

def _fill_in_missing(x):
    """Replace missing values in a SparseTensor."""
    default_value = '' if x.dtype == tf.string else 0
    if not isinstance(x, tf.sparse.SparseTensor):
        return x
    return tf.squeeze(
        tf.sparse.to_dense(
            tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
            default_value),
        axis=1)

def z_score_normalization(inputs, feature_name):
    """Apply Z-score normalization on a feature."""
    return tft.scale_to_z_score(_fill_in_missing(inputs[feature_name]))


def log_transformation(inputs, feature_name):
    """Apply log transformation on a feature."""
    # Ensure the input feature is in float32
    feature_values = tf.cast(inputs[feature_name], tf.float32)

    # Replace non-positive values with a small positive number
    feature_values_safe = tf.where(
        feature_values <= 0, 
        tf.constant(0.01, dtype=tf.float32),
        feature_values
    )

    return tf.math.log1p(feature_values_safe)

def square_root_transformation(inputs, feature_name):
    """Apply square root transformation on a feature."""
    feature_values = tf.where(inputs[feature_name] <= 0, tf.constant(0.01, dtype=tf.float32), inputs[feature_name])
    return tf.math.sqrt(feature_values)


def preprocessing_fn(inputs):
    """Preprocess input columns into transformed columns."""
    outputs = {}
    
    # Numerical Features: Apply Z-score normalization.
    for key in _NUMERICAL_FEATURES:
        outputs[taxi_constants.t_name(key)] = z_score_normalization(inputs, key)
    
    # Categorical Numerical Features
    for key in _CATEGORICAL_NUMERICAL_FEATURES:
        # Ensure the feature exists in the inputs
        if key in inputs:
            outputs[taxi_constants.t_name(key)] = _fill_in_missing(inputs[key])

    # Feature Transformations
    outputs['log_trip_total'] = log_transformation(inputs, 'trip_total')
    outputs['log_trip_miles'] = log_transformation(inputs, 'trip_miles')
    outputs['log_duration'] = log_transformation(inputs, 'duration')
    outputs['sqrt_precipitation'] = square_root_transformation(inputs, 'precipitation')
    
    if _LABEL_KEY in inputs:
        
        outputs[_LABEL_KEY]= log_transformation(inputs, _LABEL_KEY)
        
    return outputs

Overwriting chicago_taxi_transform.py


In [7]:
_taxi_trainer_module_file = 'taxi_trainer.py'

In [8]:
%%writefile {_taxi_trainer_module_file}

from typing import NamedTuple, Dict, List, Text, Any
import tensorflow as tf
import tensorflow_transform as tft
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, TensorBoard
from tfx import v1 as tfx
from keras_tuner.engine import base_tuner
from keras_tuner import HyperParameters, RandomSearch
from tfx_bsl.public import tfxio
from tfx.components.trainer.fn_args_utils import FnArgs
import os
import taxi_constants

_LABEL_KEY = taxi_constants.LABEL_KEY


_BATCH_SIZE = 64

TunerFnResult = NamedTuple('TunerFnResult', [('tuner', RandomSearch), ('fit_kwargs', Dict[Text, Any])])  # Changed to Randomsearch


early_stopping = EarlyStopping(
    monitor='val_mean_absolute_error',
    patience=10,
    restore_best_weights=True
)

def _input_fn(file_pattern: List[Text],
              data_accessor: tfx.components.DataAccessor,
              tf_transform_output: tft.TFTransformOutput,
              batch_size: int = _BATCH_SIZE) -> tf.data.Dataset:
    return data_accessor.tf_dataset_factory(
        file_pattern,
        tfxio.TensorFlowDatasetOptions(batch_size=batch_size, label_key=_LABEL_KEY),
        tf_transform_output.transformed_metadata.schema).repeat()


def _build_keras_model(hp, tf_transform_output: tft.TFTransformOutput) -> tf.keras.Model:
    # Define feature specs and create input layers
    feature_spec = tf_transform_output.transformed_feature_spec().copy()
    # Remove the label feature
    feature_spec.pop(_LABEL_KEY)
    inputs = {
        key: tf.keras.layers.Input(shape=(1,), name=key)
        for key in feature_spec.keys()
    }

    # Concatenate all input features
    concatenated_inputs = tf.keras.layers.Concatenate()(list(inputs.values()))

    num_layers = hp.Int('num_layers', 1, 5)
    activation_choice = hp.Choice('activation', ['relu', 'leaky_relu', 'elu', 'tanh', 'sigmoid'])

    for i in range(num_layers):
        units = hp.Int(f'units_{i}', min_value=32, max_value=512, step=32)
        concatenated_inputs = tf.keras.layers.Dense(
            units=units,
            activation=activation_choice,
            kernel_regularizer=tf.keras.regularizers.l2(hp.Float('l2_{i}', 1e-5, 1e-2, sampling='log'))
        )(concatenated_inputs)
        if hp.Boolean(f'dropout_{i}'):
            dropout_rate = hp.Float(f'dropout_rate_{i}', 0.1, 0.5)
            concatenated_inputs = tf.keras.layers.Dropout(dropout_rate)(concatenated_inputs)

    # Output layer for regression
    output = tf.keras.layers.Dense(1, activation='linear')(concatenated_inputs)

    # Create and compile the Keras model
    model = tf.keras.Model(inputs=inputs, outputs=output)
    learning_rate = hp.Float('learning_rate', min_value=1e-4, max_value=1e-2, sampling='log')
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate),
        loss='mean_squared_error',
        metrics=[
            tf.keras.metrics.MeanAbsoluteError(),
            tf.keras.metrics.RootMeanSquaredError()
        ]
    )
    model.summary()
    return model

def tuner_fn(fn_args: FnArgs) -> TunerFnResult:
    tf_transform_output = tft.TFTransformOutput(fn_args.transform_graph_path)
    tuner = RandomSearch(
        hypermodel=lambda hp: _build_keras_model(hp, tf_transform_output),
        objective='val_mean_absolute_error',
        max_trials=25,
        executions_per_trial=1,
        directory=fn_args.working_dir,
        project_name='taxi_trips_tuning_RandomSearch'
    )

    train_dataset = _input_fn(fn_args.train_files, fn_args.data_accessor, tf_transform_output, _BATCH_SIZE)
    eval_dataset = _input_fn(fn_args.eval_files, fn_args.data_accessor, tf_transform_output, _BATCH_SIZE)

    return TunerFnResult(
        tuner=tuner,
        fit_kwargs={
            'x': train_dataset,
            'validation_data': eval_dataset,
            'steps_per_epoch': 1500,  
            'validation_steps': 969,  
            'callbacks': [early_stopping]
        }
    )


def _get_tf_examples_serving_signature(model, tf_transform_output):
    model.tft_layer_inference = tf_transform_output.transform_features_layer()

    @tf.function(input_signature=[
        tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')
    ])
    def serve_tf_examples_fn(serialized_tf_example):
        # Get the raw feature spec
        raw_feature_spec = tf_transform_output.raw_feature_spec()

        # Parse the raw features from the serialized example
        raw_features = tf.io.parse_example(serialized_tf_example, raw_feature_spec)

        # Exclude the 'demand' feature if it's present
        if 'demand' in raw_features:
            raw_features.pop('demand')

        # Apply the transformation to get the features for model prediction
        transformed_features = model.tft_layer_inference(raw_features)

        # Filter out any keys not used by the model
        model_input_keys = [layer.name for layer in model.layers if isinstance(layer, tf.keras.layers.InputLayer)]
        filtered_features = {key: value for key, value in transformed_features.items() if key in model_input_keys}

        # Make predictions with only the required inputs
        outputs = model(filtered_features)
        return {'outputs': outputs}

    return serve_tf_examples_fn


def _get_transform_features_signature(model, tf_transform_output):
    model.tft_layer_eval = tf_transform_output.transform_features_layer()

    @tf.function(input_signature=[
        tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')
    ])
    def transform_features_fn(serialized_tf_example):
        raw_feature_spec = tf_transform_output.raw_feature_spec()
        raw_features = tf.io.parse_example(serialized_tf_example, raw_feature_spec)
        transformed_features = model.tft_layer_eval(raw_features)
        return transformed_features

    return transform_features_fn

def export_serving_model(tf_transform_output, model, output_dir):
    print(f"Exporting the serving model to {output_dir}")
    model.tft_layer = tf_transform_output.transform_features_layer()
    signatures = {
        'serving_default': _get_tf_examples_serving_signature(model, tf_transform_output),
        'transform_features': _get_transform_features_signature(model, tf_transform_output),
    }
    model.save(output_dir, save_format='tf', signatures=signatures)
    print("Model exported successfully.")

    
def run_fn(fn_args: FnArgs):
    print(f"Model run directory: {fn_args.model_run_dir}")
    
    model_dir = os.path.join(fn_args.model_run_dir, 'model')

    early_stopping_callback = tf.keras.callbacks.EarlyStopping(
        monitor='val_mean_absolute_error', 
        mode='min', 
        patience=3,
        restore_best_weights=True
    )

    
    model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
        filepath=os.path.join(model_dir, 'best_model'),
        monitor='val_mean_absolute_error', 
        mode='min', 
        save_best_only=True,
        verbose=1
    )

 
    tf_transform_output = tft.TFTransformOutput(fn_args.transform_graph_path)

    # Set up the tuner for hyperparameter tuning
    tuner_fn_result = tuner_fn(fn_args)
    tuner = tuner_fn_result.tuner

    # Create datasets for training and evaluation
    train_dataset = _input_fn(
        fn_args.train_files, 
        fn_args.data_accessor, 
        tf_transform_output, 
        _BATCH_SIZE
    )
    eval_dataset = _input_fn(
        fn_args.eval_files, 
        fn_args.data_accessor, 
        tf_transform_output, 
        _BATCH_SIZE
    )

    # Perform hyperparameter search using the tuner
    tuner.search(**tuner_fn_result.fit_kwargs)

    # Retrieve the best hyperparameters
    best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]

    # Build the best Keras model based on the best hyperparameters
    model = _build_keras_model(best_hps, tf_transform_output)

    # Determine the number of steps per epoch
    total_train_examples = fn_args.train_steps * _BATCH_SIZE
    steps_per_epoch = total_train_examples // _BATCH_SIZE

    # Fit the model with the callbacks
    model.fit(
        train_dataset,
        steps_per_epoch=steps_per_epoch,
        validation_data=eval_dataset,
        validation_steps=fn_args.eval_steps,
        epochs=fn_args.train_steps // steps_per_epoch,
        callbacks=[
            # tensorboard_callback,
            early_stopping_callback,
            model_checkpoint_callback
        ]
    )
  
    signatures = {
      'serving_default': _get_tf_examples_serving_signature(model, tf_transform_output),
    }
    

    # At the end of the run_fn function
    export_serving_model(tf_transform_output, model, fn_args.serving_model_dir)



Overwriting taxi_trainer.py


In [9]:
!gsutil cp {_taxi_trainer_module_file} {MODULE_ROOT}/

Copying file://taxi_trainer.py [Content-Type=text/x-python]...
/ [1 files][ 10.1 KiB/ 10.1 KiB]                                                
Operation completed over 1 objects/10.1 KiB.                                     


***Pipeline Definition***

In [11]:
import os
import tensorflow_model_analysis as tfma
import tfx
from tfx import v1 as tfx
from tfx.components import CsvExampleGen, StatisticsGen, SchemaGen, ExampleValidator, Transform, Trainer, Pusher, Evaluator, Tuner
from tfx.orchestration import pipeline
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, TensorBoard
from tfx.proto import example_gen_pb2, trainer_pb2, pusher_pb2
from tfx.dsl.components.common.resolver import Resolver
from tfx.dsl.input_resolution.strategies.latest_blessed_model_strategy import LatestBlessedModelStrategy
from keras_tuner.engine import base_tuner
from keras_tuner import HyperParameters, RandomSearch
from tfx.types import Channel
from tfx.types.standard_artifacts import Model, ModelBlessing
import taxi_constants

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, endpoint_name: str, project_id: str,
                     region: str, serving_model_dir: str) -> tfx.dsl.Pipeline:
    # Define input data
    input_config = example_gen_pb2.Input(splits=[
        example_gen_pb2.Input.Split(name='train', pattern=TRAIN_DATA_FILENAME),
        example_gen_pb2.Input.Split(name='eval', pattern=EVAL_DATA_FILENAME)
    ])
    example_gen = CsvExampleGen(input_base=data_root, input_config=input_config)

    # Components for data analysis and validation
    statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
    schema_gen = SchemaGen(statistics=statistics_gen.outputs['statistics'])
    example_validator = ExampleValidator(
        statistics=statistics_gen.outputs['statistics'],
        schema=schema_gen.outputs['schema']
    )

    # Component for data transformation
    transform = Transform(
        examples=example_gen.outputs['examples'],
        schema=schema_gen.outputs['schema'],
        module_file=os.path.abspath(TRANSFORM_MODULE_PATH)
    )
    
    # Configuration for Vertex AI Training
    vertex_job_spec = {
        'project': project_id,
        'worker_pool_specs': [{
            'machine_spec': {
                'machine_type': 'n1-standard-4'
            },
            'replica_count': 1,
            'container_spec': {
                'image_uri': 'gcr.io/tfx-oss-public/tfx:{}'.format(tfx.__version__),
            },
        }],
    }
    
    # Tuner component
    tuner = tfx.extensions.google_cloud_ai_platform.Tuner(
        module_file=_taxi_trainer_module_file,
        examples=transform.outputs['transformed_examples'],
        schema=schema_gen.outputs['schema'],
        transform_graph=transform.outputs['transform_graph'],
        train_args=trainer_pb2.TrainArgs(splits=['train'], num_steps=100),
        eval_args=trainer_pb2.EvalArgs(splits=['eval'], num_steps=5),
         custom_config={
        tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY: True,
        tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY: region,
        tfx.extensions.google_cloud_ai_platform.experimental.TUNING_ARGS_KEY: vertex_job_spec
        # tfx.extensions.google_cloud_ai_platform.experimental.REMOTE_TRIALS_WORKING_DIR_KEY: os.path.join('gs://', GCS_BUCKET_NAME, 'tuner_trials')
    }
   )
    
    
    # Trainer component
    trainer = tfx.extensions.google_cloud_ai_platform.Trainer(
        module_file=os.path.abspath(module_file),
        transformed_examples=transform.outputs['transformed_examples'],
        schema=schema_gen.outputs['schema'],
        transform_graph=transform.outputs['transform_graph'],
        hyperparameters=tuner.outputs['best_hyperparameters'],
        train_args=tfx.proto.TrainArgs(splits=['train'], num_steps=10160),
        eval_args=tfx.proto.EvalArgs(splits=['eval'], num_steps=5716),
        custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY:
              vertex_job_spec
    
        }
    )
    
    vertex_serving_spec = {
      'project_id': project_id,
      'endpoint_name': endpoint_name,
      'machine_type': 'n1-standard-4',
  }
    # serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-12:latest'
    serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-13:latest'

    # Evaluator component configuration
    eval_config = tfma.EvalConfig(
        model_specs=[tfma.ModelSpec(signature_name='serving_default', label_key=taxi_constants.LABEL_KEY)],
        slicing_specs=[tfma.SlicingSpec()],
        metrics_specs=[
            tfma.MetricsSpec(
                metrics=[
                    tfma.MetricConfig(class_name='MeanAbsoluteError'),
                    tfma.MetricConfig(
                        class_name='MeanAbsoluteError',
                        threshold=tfma.MetricThreshold(
                            value_threshold=tfma.GenericValueThreshold(lower_bound={'value': 0.8159}),
                            change_threshold=tfma.GenericChangeThreshold(
                                direction=tfma.MetricDirection.LOWER_IS_BETTER,
                                absolute={'value': 0.02}
                            )
                        )
                    )
                ]
            )
        ]
    )

    # Model resolver component
    model_resolver = Resolver(
        strategy_class=LatestBlessedModelStrategy,
        model=Channel(type=Model),
        model_blessing=Channel(type=ModelBlessing)
    ).with_id('latest_blessed_model_resolver')

    # Evaluator component
    evaluator = Evaluator(
        examples=example_gen.outputs['examples'],
        example_splits=['eval'],  
        model=trainer.outputs['model'],
        baseline_model=model_resolver.outputs['model'],
        eval_config=eval_config
    )

    # Pusher component
    pusher = tfx.extensions.google_cloud_ai_platform.Pusher(
        model=trainer.outputs['model'],
        model_blessing=evaluator.outputs['blessing'],
        custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.VERTEX_CONTAINER_IMAGE_URI_KEY:
              serving_image,
          tfx.extensions.google_cloud_ai_platform.SERVING_ARGS_KEY:
            vertex_serving_spec,
        })

    # Define the pipeline
    components = [
        example_gen,
        statistics_gen,
        schema_gen,
        example_validator,
        transform,
        tuner,
        trainer,
        model_resolver,
        evaluator,
        pusher
    ]

    return tfx.dsl.Pipeline(
        pipeline_name=pipeline_name,
        pipeline_root=pipeline_root,
        components=components
    )


In [12]:
import tensorflow as tf
print(tf.__version__)


2.13.1


In [13]:
import os

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
# Following function will write the pipeline definition to PIPELINE_DEFINITION_FILE.
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_DIRECTORY,
        module_file=_taxi_trainer_module_file,
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        serving_model_dir=SERVING_MODEL_DIR,
        
       
    ))



running bdist_wheel
running build
running build_py
creating build
creating build/lib
copying chicago_taxi_transform.py -> build/lib
copying taxi_transform.py -> build/lib
copying taxi_trainer.py -> build/lib
copying taxi_constants.py -> build/lib
copying taxi_tuner.py -> build/lib
installing to /tmp/tmpx5_h3zx2
running install
running install_lib
copying build/lib/taxi_transform.py -> /tmp/tmpx5_h3zx2
copying build/lib/taxi_tuner.py -> /tmp/tmpx5_h3zx2
copying build/lib/taxi_constants.py -> /tmp/tmpx5_h3zx2
copying build/lib/taxi_trainer.py -> /tmp/tmpx5_h3zx2
copying build/lib/chicago_taxi_transform.py -> /tmp/tmpx5_h3zx2
running install_egg_info
running egg_info
creating tfx_user_code_Transform.egg-info
writing tfx_user_code_Transform.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_Transform.egg-info/dependency_links.txt
writing top-level names to tfx_user_code_Transform.egg-info/top_level.txt
writing manifest file 'tfx_user_code_Transform.egg-info/SOURCES.txt'
reading ma

!!

        ********************************************************************************
        Please avoid running ``setup.py`` directly.
        Instead, use pypa/build, pypa/installer or other
        standards-based tools.

        See https://blog.ganssle.io/articles/2021/10/setup-py-deprecated.html for details.
        ********************************************************************************

!!
  self.initialize_options()


creating '/tmp/tmpc_lawmam/tfx_user_code_Transform-0.0+38c9a72244295a98276b06a0cc23c38920807fdf5f9a424850ed1523130d21de-py3-none-any.whl' and adding '/tmp/tmpx5_h3zx2' to it
adding 'chicago_taxi_transform.py'
adding 'taxi_constants.py'
adding 'taxi_trainer.py'
adding 'taxi_transform.py'
adding 'taxi_tuner.py'
adding 'tfx_user_code_Transform-0.0+38c9a72244295a98276b06a0cc23c38920807fdf5f9a424850ed1523130d21de.dist-info/METADATA'
adding 'tfx_user_code_Transform-0.0+38c9a72244295a98276b06a0cc23c38920807fdf5f9a424850ed1523130d21de.dist-info/WHEEL'
adding 'tfx_user_code_Transform-0.0+38c9a72244295a98276b06a0cc23c38920807fdf5f9a424850ed1523130d21de.dist-info/top_level.txt'
adding 'tfx_user_code_Transform-0.0+38c9a72244295a98276b06a0cc23c38920807fdf5f9a424850ed1523130d21de.dist-info/RECORD'
removing /tmp/tmpx5_h3zx2
running bdist_wheel
running build
running build_py
creating build
creating build/lib
copying chicago_taxi_transform.py -> build/lib
copying taxi_transform.py -> build/lib
copying 

!!

        ********************************************************************************
        Please avoid running ``setup.py`` directly.
        Instead, use pypa/build, pypa/installer or other
        standards-based tools.

        See https://blog.ganssle.io/articles/2021/10/setup-py-deprecated.html for details.
        ********************************************************************************

!!
  self.initialize_options()


writing manifest file 'tfx_user_code_Tuner.egg-info/SOURCES.txt'
Copying tfx_user_code_Tuner.egg-info to /tmp/tmp91m96p5_/tfx_user_code_Tuner-0.0+38c9a72244295a98276b06a0cc23c38920807fdf5f9a424850ed1523130d21de-py3.10.egg-info
running install_scripts
creating /tmp/tmp91m96p5_/tfx_user_code_Tuner-0.0+38c9a72244295a98276b06a0cc23c38920807fdf5f9a424850ed1523130d21de.dist-info/WHEEL
creating '/tmp/tmp1otndb59/tfx_user_code_Tuner-0.0+38c9a72244295a98276b06a0cc23c38920807fdf5f9a424850ed1523130d21de-py3-none-any.whl' and adding '/tmp/tmp91m96p5_' to it
adding 'chicago_taxi_transform.py'
adding 'taxi_constants.py'
adding 'taxi_trainer.py'
adding 'taxi_transform.py'
adding 'taxi_tuner.py'
adding 'tfx_user_code_Tuner-0.0+38c9a72244295a98276b06a0cc23c38920807fdf5f9a424850ed1523130d21de.dist-info/METADATA'
adding 'tfx_user_code_Tuner-0.0+38c9a72244295a98276b06a0cc23c38920807fdf5f9a424850ed1523130d21de.dist-info/WHEEL'
adding 'tfx_user_code_Tuner-0.0+38c9a72244295a98276b06a0cc23c38920807fdf5f9a4248

!!

        ********************************************************************************
        Please avoid running ``setup.py`` directly.
        Instead, use pypa/build, pypa/installer or other
        standards-based tools.

        See https://blog.ganssle.io/articles/2021/10/setup-py-deprecated.html for details.
        ********************************************************************************

!!
  self.initialize_options()


creating /tmp/tmp8xcbqs_2/tfx_user_code_Trainer-0.0+38c9a72244295a98276b06a0cc23c38920807fdf5f9a424850ed1523130d21de.dist-info/WHEEL
creating '/tmp/tmprhq92uo3/tfx_user_code_Trainer-0.0+38c9a72244295a98276b06a0cc23c38920807fdf5f9a424850ed1523130d21de-py3-none-any.whl' and adding '/tmp/tmp8xcbqs_2' to it
adding 'chicago_taxi_transform.py'
adding 'taxi_constants.py'
adding 'taxi_trainer.py'
adding 'taxi_transform.py'
adding 'taxi_tuner.py'
adding 'tfx_user_code_Trainer-0.0+38c9a72244295a98276b06a0cc23c38920807fdf5f9a424850ed1523130d21de.dist-info/METADATA'
adding 'tfx_user_code_Trainer-0.0+38c9a72244295a98276b06a0cc23c38920807fdf5f9a424850ed1523130d21de.dist-info/WHEEL'
adding 'tfx_user_code_Trainer-0.0+38c9a72244295a98276b06a0cc23c38920807fdf5f9a424850ed1523130d21de.dist-info/top_level.txt'
adding 'tfx_user_code_Trainer-0.0+38c9a72244295a98276b06a0cc23c38920807fdf5f9a424850ed1523130d21de.dist-info/RECORD'
removing /tmp/tmp8xcbqs_2


In [14]:
# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
import logging
logging.getLogger().setLevel(logging.INFO)

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.submit()

Creating PipelineJob


INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob


PipelineJob created. Resource name: projects/75674212269/locations/us-central1/pipelineJobs/taxi-demand-prediction-management-20231213030329


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/75674212269/locations/us-central1/pipelineJobs/taxi-demand-prediction-management-20231213030329


To use this PipelineJob in another session:


INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:


pipeline_job = aiplatform.PipelineJob.get('projects/75674212269/locations/us-central1/pipelineJobs/taxi-demand-prediction-management-20231213030329')


INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/75674212269/locations/us-central1/pipelineJobs/taxi-demand-prediction-management-20231213030329')


View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/taxi-demand-prediction-management-20231213030329?project=75674212269


INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/taxi-demand-prediction-management-20231213030329?project=75674212269


----
<h3 style='color:darkblue;text-align:center;'><strong>Conclusion and Future Directions</strong></h3>
<p style='color:black; font-size:16px; text-align:justify;'>
Our journey in developing the Taxi Demand Prediction and Management project has been a significant step forward in integrating advanced analytics and machine learning into practical applications. With this latest update, we are initiating the deployment of our pipeline in a cloud environment, utilizing the resources and services available through Vertex AI. This is an important phase in making our Taxi Demand Prediction model more scalable and robust, ready for real-world challenges in urban transport.

</p>

---

<strong>Key Achievements:</strong>
- <em style='color:darkgreen;'>Enhanced Operational Efficiency:</em> Our predictive model significantly improves taxi distribution, aligning with our operational goal of optimizing resource allocation and ensuring availability during peak demand.
- <em style='color:darkgreen;'>Streamlined Taxi Distribution:</em> Focused on reducing customer wait times, our project has successfully laid the groundwork for responsive and efficient taxi services.
- <em style='color:darkgreen;'>Contribution to Urban Mobility:</em> Beyond taxi services, our work contributes to the broader urban transportation ecosystem, enhancing overall urban mobility.
- <em style='color:darkgreen;'>Foundation for Future Exploration:</em> We've established a solid base for ongoing advancements in AI and machine learning, showcasing the potential for real-world applications and industry-wide transformations.

</p>
