## TFX Chicago Taxi Example 수행과정

참조 원본 : https://www.kangwoo.kr/2020/09/13/tfx-%ec%8b%9c%ec%b9%b4%ea%b3%a0-%ed%83%9d%ec%8b%9c/

### 1. 본 코드(.ipynb)를 수행하기 위한 전제조건 
* Kubeflow 환경
* Nobtebook server를 생성할 때, 다음의 조건을 만족할 것.
    * tensorflow 2.0 이후 버전으로 notebook server 구동
        * image : gcr.io/kubeflow-images-public/tensorflow-2.1.0-notebook-cpu:1.0.0
    * nbserver 생성시 데이터 관리를 위한 pv를 추가로 생성 (ReadWriteMany) 
        * name : nbserver-tfx-chicago-taxi-example-vol-1
        * /home/jovyan/data-vol-1 에 마운트.


### 2. 필요한 패키지 설치 
반드시 다음의 순서를 엄수해야 한다.
왜냐하면, kubeflow가 제공하는 tenworflow jupyter notebook container에서 pip로 설치 시.. 
* 기본으로 설치되어 있는 pip 버전은 19.x 이다.
* tfx는 pip 20 이상에서 무한 로딩이 지속되는 문제가 있다.
* kfp는 pip 20 아래에서는 오류가 나며 설치가 되지 않는다.

그래서, 먼저 pip 19 환경에서 tfx를 설치하고, pip를 최신으로 업그레이드 한 후 kfp를 설치해야 한다.

In [1]:
# !pip install tfx==0.22.0 --user --no-cache-dir
# !pip install pip --upgrade --user --no-cache-dir
# !pip install kfp --user --no-cache-dir

### 3. 환경변수 설정
pip install시에 나오는 메시지에 반응하기 위해.. 

In [2]:
!export PATH=${PATH}:/home/jovyan/.local/bin

재시도 지원을 위해, 기존 데이터를 삭제.

In [3]:
!rm -rf /home/jovyan/data-vol-1/tfx
!rm -rf /home/jovyan/data-vol-1/pipelines
!rm ./taxi_utils.py

rm: cannot remove '/home/jovyan/data-vol-1/tfx/__pycache__/taxi_utils.cpython-36.pyc': Permission denied
rm: cannot remove '/home/jovyan/data-vol-1/pipelines/tfx/kubeflow_chicago_taxi_pipeline_demo/CsvExampleGen/examples/59/train/data_tfrecord-00000-of-00016.gz': Permission denied
rm: cannot remove '/home/jovyan/data-vol-1/pipelines/tfx/kubeflow_chicago_taxi_pipeline_demo/CsvExampleGen/examples/59/train/data_tfrecord-00001-of-00016.gz': Permission denied
rm: cannot remove '/home/jovyan/data-vol-1/pipelines/tfx/kubeflow_chicago_taxi_pipeline_demo/CsvExampleGen/examples/59/train/data_tfrecord-00002-of-00016.gz': Permission denied
rm: cannot remove '/home/jovyan/data-vol-1/pipelines/tfx/kubeflow_chicago_taxi_pipeline_demo/CsvExampleGen/examples/59/train/data_tfrecord-00003-of-00016.gz': Permission denied
rm: cannot remove '/home/jovyan/data-vol-1/pipelines/tfx/kubeflow_chicago_taxi_pipeline_demo/CsvExampleGen/examples/59/train/data_tfrecord-00004-of-00016.gz': Permission denied
rm: cannot

rm: cannot remove './taxi_utils.py': No such file or directory


이쯤에서, kernel restart를 해 주자.


### 4. train을 위한 원본 데이터를 받아오기.

notebook server 생성시 만든 pv는 당연히 notebook server에서도 마운트되어 있지만, 나중에 TFX 실행 pod에서도 마운트해서 활용하게 된다.  
TFX pod에서 작업에 필요한 원본 데이터를 받아갈 수 있도록, notebook server에 마운트 된 위치에 파일을 받아 저장해 놓는다.

In [4]:
!mkdir -p /home/jovyan/data-vol-1/tfx/data/ 
!wget -P /home/jovyan/data-vol-1/tfx/data/ https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/data/simple/data.csv

--2021-04-01 02:47:23--  https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/data/simple/data.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.108.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1922812 (1.8M) [text/plain]
Saving to: ‘/home/jovyan/data-vol-1/tfx/data/data.csv’


2021-04-01 02:47:24 (6.46 MB/s) - ‘/home/jovyan/data-vol-1/tfx/data/data.csv’ saved [1922812/1922812]



> 가정 : 데이터 분석가는 데이터 분석에 필요한 코드 작성 및 테스트를 이미 수행했다고 가정한다.
> 그리고, 그 결과로 도출된 tfx python code의 내용이 이미 준비되어 있다고 가정한다.
> 바로 그 code 가 다음의 taxi_utils.py 이다.

### 5. taxi_utils.py 코드 생성 

taxi_utils.py 파일이 ipynb 파일의 위치와 같은 곳에 생성된다. (/home/jovyan/taxi_utils.py)

In [5]:
_taxi_utils_filename = 'taxi_utils.py'

In [6]:
%%writefile {_taxi_utils_filename}
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from typing import List, Text

import absl
import tensorflow as tf
import tensorflow_transform as tft

from tfx.components.trainer.executor import TrainerFnArgs

# Categorical features are assumed to each have a maximum value in the dataset.
_MAX_CATEGORICAL_FEATURE_VALUES = [24, 31, 12]

_CATEGORICAL_FEATURE_KEYS = [
    'trip_start_hour', 'trip_start_day', 'trip_start_month',
    'pickup_census_tract', 'dropoff_census_tract', 'pickup_community_area',
    'dropoff_community_area'
]

_DENSE_FLOAT_FEATURE_KEYS = ['trip_miles', 'fare', 'trip_seconds']

# Number of buckets used by tf.transform for encoding each feature.
_FEATURE_BUCKET_COUNT = 10

_BUCKET_FEATURE_KEYS = [
    'pickup_latitude', 'pickup_longitude', 'dropoff_latitude',
    'dropoff_longitude'
]

# Number of vocabulary terms used for encoding VOCAB_FEATURES by tf.transform
_VOCAB_SIZE = 1000

# Count of out-of-vocab buckets in which unrecognized VOCAB_FEATURES are hashed.
_OOV_SIZE = 10

_VOCAB_FEATURE_KEYS = [
    'payment_type',
    'company',
]

# Keys
_LABEL_KEY = 'tips'
_FARE_KEY = 'fare'

def _transformed_name(key):
  return key + '_xf'

def _transformed_names(keys):
  return [_transformed_name(key) for key in keys]

def _gzip_reader_fn(filenames):
  """Small utility returning a record reader that can read gzip'ed files."""
  return tf.data.TFRecordDataset(
      filenames,
      compression_type='GZIP')

def _fill_in_missing(x):
  """Replace missing values in a SparseTensor.

  Fills in missing values of `x` with '' or 0, and converts to a dense tensor.

  Args:
    x: A `SparseTensor` of rank 2.  Its dense shape should have size at most 1
      in the second dimension.

  Returns:
    A rank 1 tensor where missing values of `x` have been filled in.
  """
  default_value = '' if x.dtype == tf.string else 0
  return tf.squeeze(
      tf.sparse.to_dense(
          tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
          default_value),
      axis=1)

def _get_serve_tf_examples_fn(model, tf_transform_output):
  """Returns a function that parses a serialized tf.Example and applies TFT."""

  model.tft_layer = tf_transform_output.transform_features_layer()

  @tf.function
  def serve_tf_examples_fn(serialized_tf_examples):
    """Returns the output to be used in the serving signature."""
    feature_spec = tf_transform_output.raw_feature_spec()
    feature_spec.pop(_LABEL_KEY)
    parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)

    transformed_features = model.tft_layer(parsed_features)
    # TODO(b/148082271): Remove this line once TFT 0.22 is used.
    transformed_features.pop(_transformed_name(_LABEL_KEY), None)

    return model(transformed_features)

  return serve_tf_examples_fn

def _input_fn(file_pattern: Text,
              tf_transform_output: tft.TFTransformOutput,
              batch_size: int = 200) -> tf.data.Dataset:
  """Generates features and label for tuning/training.

  Args:
    file_pattern: input tfrecord file pattern.
    tf_transform_output: A TFTransformOutput.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  transformed_feature_spec = (
      tf_transform_output.transformed_feature_spec().copy())

  dataset = tf.data.experimental.make_batched_features_dataset(
      file_pattern=file_pattern,
      batch_size=batch_size,
      features=transformed_feature_spec,
      reader=_gzip_reader_fn,
      label_key=_transformed_name(_LABEL_KEY))

  return dataset

def _build_keras_model(hidden_units: List[int] = None) -> tf.keras.Model:
  """Creates a DNN Keras model for classifying taxi data.

  Args:
    hidden_units: [int], the layer sizes of the DNN (input layer first).

  Returns:
    A keras Model.
  """
  real_valued_columns = [
      tf.feature_column.numeric_column(key, shape=())
      for key in _transformed_names(_DENSE_FLOAT_FEATURE_KEYS)
  ]
  categorical_columns = [
      tf.feature_column.categorical_column_with_identity(
          key, num_buckets=_VOCAB_SIZE + _OOV_SIZE, default_value=0)
      for key in _transformed_names(_VOCAB_FEATURE_KEYS)
  ]
  categorical_columns += [
      tf.feature_column.categorical_column_with_identity(
          key, num_buckets=_FEATURE_BUCKET_COUNT, default_value=0)
      for key in _transformed_names(_BUCKET_FEATURE_KEYS)
  ]
  categorical_columns += [
      tf.feature_column.categorical_column_with_identity(  # pylint: disable=g-complex-comprehension
          key,
          num_buckets=num_buckets,
          default_value=0) for key, num_buckets in zip(
              _transformed_names(_CATEGORICAL_FEATURE_KEYS),
              _MAX_CATEGORICAL_FEATURE_VALUES)
  ]
  indicator_column = [
      tf.feature_column.indicator_column(categorical_column)
      for categorical_column in categorical_columns
  ]

  model = _wide_and_deep_classifier(
      wide_columns=indicator_column,
      deep_columns=real_valued_columns,
      dnn_hidden_units=hidden_units or [100, 70, 50, 25])
  return model

def _wide_and_deep_classifier(wide_columns, deep_columns, dnn_hidden_units):
  """Build a simple keras wide and deep model.

  Args:
    wide_columns: Feature columns wrapped in indicator_column for wide (linear)
      part of the model.
    deep_columns: Feature columns for deep part of the model.
    dnn_hidden_units: [int], the layer sizes of the hidden DNN.

  Returns:
    A Wide and Deep Keras model
  """
  # Following values are hard coded for simplicity in this example,
  # However prefarably they should be passsed in as hparams.

  # Keras needs the feature definitions at compile time.
  input_layers = {
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype=tf.float32)
      for colname in _transformed_names(_DENSE_FLOAT_FEATURE_KEYS)
  }
  input_layers.update({
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32')
      for colname in _transformed_names(_VOCAB_FEATURE_KEYS)
  })
  input_layers.update({
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32')
      for colname in _transformed_names(_BUCKET_FEATURE_KEYS)
  })
  input_layers.update({
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32')
      for colname in _transformed_names(_CATEGORICAL_FEATURE_KEYS)
  })

  # TODO(b/144500510): SparseFeatures for feature columns + Keras.
  deep = tf.keras.layers.DenseFeatures(deep_columns)(input_layers)
  for numnodes in dnn_hidden_units:
    deep = tf.keras.layers.Dense(numnodes)(deep)
  wide = tf.keras.layers.DenseFeatures(wide_columns)(input_layers)

  output = tf.keras.layers.Dense(
      1, activation='sigmoid')(
          tf.keras.layers.concatenate([deep, wide]))

  model = tf.keras.Model(input_layers, output)
  model.compile(
      loss='binary_crossentropy',
      optimizer=tf.keras.optimizers.Adam(lr=0.001),
      metrics=[tf.keras.metrics.BinaryAccuracy()])
  model.summary(print_fn=absl.logging.info)
  return model

# TFX Transform will call this function.
def preprocessing_fn(inputs):
  """tf.transform's callback function for preprocessing inputs.

  Args:
    inputs: map from feature keys to raw not-yet-transformed features.

  Returns:
    Map from string feature key to transformed feature operations.
  """
  outputs = {}
  for key in _DENSE_FLOAT_FEATURE_KEYS:
    # Preserve this feature as a dense float, setting nan's to the mean.
    outputs[_transformed_name(key)] = tft.scale_to_z_score(
        _fill_in_missing(inputs[key]))

  for key in _VOCAB_FEATURE_KEYS:
    # Build a vocabulary for this feature.
    outputs[_transformed_name(key)] = tft.compute_and_apply_vocabulary(
        _fill_in_missing(inputs[key]),
        top_k=_VOCAB_SIZE,
        num_oov_buckets=_OOV_SIZE)

  for key in _BUCKET_FEATURE_KEYS:
    outputs[_transformed_name(key)] = tft.bucketize(
        _fill_in_missing(inputs[key]),
        _FEATURE_BUCKET_COUNT,
        always_return_num_quantiles=False)

  for key in _CATEGORICAL_FEATURE_KEYS:
    outputs[_transformed_name(key)] = _fill_in_missing(inputs[key])

  # Was this passenger a big tipper?
  taxi_fare = _fill_in_missing(inputs[_FARE_KEY])
  tips = _fill_in_missing(inputs[_LABEL_KEY])
  outputs[_transformed_name(_LABEL_KEY)] = tf.where(
      tf.math.is_nan(taxi_fare),
      tf.cast(tf.zeros_like(taxi_fare), tf.int64),
      # Test if the tip was > 20% of the fare.
      tf.cast(
          tf.greater(tips, tf.multiply(taxi_fare, tf.constant(0.2))), tf.int64))

  return outputs

# TFX Trainer will call this function.
# def trainer_fn(fn_args: TrainerFnArgs):
def run_fn(fn_args: TrainerFnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """
  # Number of nodes in the first layer of the DNN
  first_dnn_layer_size = 100
  num_dnn_layers = 4
  dnn_decay_factor = 0.7

  tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

  train_dataset = _input_fn(fn_args.train_files, tf_transform_output, 40)
  eval_dataset = _input_fn(fn_args.eval_files, tf_transform_output, 40)

  mirrored_strategy = tf.distribute.MirroredStrategy()
  with mirrored_strategy.scope():
    model = _build_keras_model(
        # Construct layers sizes with exponetial decay
        hidden_units=[
            max(2, int(first_dnn_layer_size * dnn_decay_factor**i))
            for i in range(num_dnn_layers)
        ])

  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  signatures = {
      'serving_default':
          _get_serve_tf_examples_fn(model,
                                    tf_transform_output).get_concrete_function(
                                        tf.TensorSpec(
                                            shape=[None],
                                            dtype=tf.string,
                                            name='examples')),
  }
  model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)

Writing taxi_utils.py


### 6. 파이프라인 코드 생성 

#### import libraries

In [7]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import os
from typing import Text

from kfp import onprem
import tensorflow_model_analysis as tfma

from tfx.components import CsvExampleGen
from tfx.components import Evaluator
from tfx.components import ExampleValidator
from tfx.components import InfraValidator
from tfx.components import Pusher
from tfx.components import ResolverNode
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.components import Transform
from tfx.components.base import executor_spec
from tfx.components.trainer.executor import GenericExecutor
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.orchestration import pipeline
from tfx.orchestration.kubeflow import kubeflow_dag_runner
from tfx.proto import infra_validator_pb2
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing
from tfx.utils.dsl_utils import external_input



#### 각종 변수 설정

파이프라인 이름 지정

In [8]:
_pipeline_name = 'kubeflow_chicago_taxi_pipeline_demo'
_notebook_name = 'nb-demo-tfx'
_namespace_name = 'ns-sunghj'

"_persistent_volume_claim" 은 notebook server 생성 시에 만든 pvc.   
* 만일 이름을 까먹었다면, ' kubectl get pv -n ns-sunghj | grep nb-demo-tfx-vol-1 | awk ‘{print $1}’' 으로 확인하자.  

"_persistent_volume" 의 이름을 확인하려면, 
* ' kubectl get pv | grep nbserver-tfx-chicago-taxi-example-vol-1 '  


In [9]:
_persistent_volume_claim = f'{_notebook_name}-vol-1'
_persistent_volume = 'pvc-e89b7baf-e37e-462f-af24-e4c276b84d52'
_persistent_volume_mount = '/mnt'

tfx pod이 바라보게 될 각종 경로들을 설정
* 입력 데이터 저장장소 : /mnt/tfx/data
* 출력(meta정보) 데이터 저장장소 : /mnt/pipelines/tfx 

In [10]:
# All input and output data are kept in the PV.
_input_base = os.path.join(_persistent_volume_mount, 'tfx')
_data_root = os.path.join(_input_base, 'data')
_output_base = os.path.join(_persistent_volume_mount, 'pipelines')
_tfx_root = os.path.join(_output_base, 'tfx')
_pipeline_root = os.path.join(_tfx_root, _pipeline_name)

taxi_utils.py 파일을 공유할 pv에 옮겨서 나중에 tfx pod가 활용할 수 있게 하자.

In [11]:
!mkdir ./data-vol-1/tfx 
!mv ./taxi_utils.py ./data-vol-1/tfx/

mkdir: cannot create directory ‘./data-vol-1/tfx’: File exists


In [12]:
_module_file = os.path.join(_input_base, 'taxi_utils.py')

학습이 완료된 모델이 저장될 위치 설정. 나중에 pusher가 이 위치에 학습이 완료된 모델을 가져다 놓는다.

In [13]:
_serving_model_dir = _output_base

tfx 파이프라인 생성 코드.

In [14]:
def _create_pipeline(pipeline_name: Text, pipeline_root: Text, data_root: Text,
                     module_file: Text, serving_model_dir: Text,
                     direct_num_workers: int) -> pipeline.Pipeline:
  """Implements the chicago taxi pipeline with TFX and Kubeflow Pipelines."""
  examples = external_input(data_root)

  # Brings data into the pipeline or otherwise joins/converts training data.
  example_gen = CsvExampleGen(input=examples)

  # Computes statistics over data for visualization and example validation.
  statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])

  # Generates schema based on statistics files.
  schema_gen = SchemaGen(
      statistics=statistics_gen.outputs['statistics'],
      infer_feature_shape=False)

  # Performs anomaly detection based on statistics and data schema.
  example_validator = ExampleValidator(
      statistics=statistics_gen.outputs['statistics'],
      schema=schema_gen.outputs['schema'])

  # Performs transformations and feature engineering in training and serving.
  transform = Transform(
      examples=example_gen.outputs['examples'],
      schema=schema_gen.outputs['schema'],
      module_file=module_file)

  # Uses user-provided Python function that implements a model using TF-Learn
  # to train a model on Google Cloud AI Platform.
  trainer = Trainer(
      module_file=module_file,
      custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
      examples=transform.outputs['transformed_examples'],
      transform_graph=transform.outputs['transform_graph'],
      schema=schema_gen.outputs['schema'],
      train_args=trainer_pb2.TrainArgs(num_steps=10000),
      eval_args=trainer_pb2.EvalArgs(num_steps=5000))
  

  # Uses TFMA to compute a evaluation statistics over features of a model and
  # perform quality validation of a candidate model (compared to a baseline).
  eval_config = tfma.EvalConfig(
    model_specs=[
        # This assumes a serving model with signature 'serving_default'. If
        # using estimator based EvalSavedModel, add signature_name: 'eval' and 
        # remove the label_key.
        tfma.ModelSpec(label_key='tips')
    ],
    metrics_specs=[
        tfma.MetricsSpec(
            # The metrics added here are in addition to those saved with the
            # model (assuming either a keras model or EvalSavedModel is used).
            # Any metrics added into the saved model (for example using
            # model.compile(..., metrics=[...]), etc) will be computed
            # automatically.
            metrics=[
                tfma.MetricConfig(class_name='ExampleCount')
            ],
            # To add validation thresholds for metrics saved with the model,
            # add them keyed by metric name to the thresholds map.
            thresholds = {
                'binary_accuracy': tfma.MetricThreshold(
                    value_threshold=tfma.GenericValueThreshold(
                        lower_bound={'value': 0.5}),
                    change_threshold=tfma.GenericChangeThreshold(
                       direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                       absolute={'value': -1e-10}))
            }
        )
    ],
    slicing_specs=[
        # An empty slice spec means the overall slice, i.e. the whole dataset.
        tfma.SlicingSpec(),
        # Data can be sliced along a feature column. In this case, data is
        # sliced along feature column trip_start_hour.
        tfma.SlicingSpec(feature_keys=['trip_start_hour'])
    ])

  # Get the latest blessed model for model validation.
  model_resolver = ResolverNode(
      instance_name='latest_blessed_model_resolver',
      resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
      model=Channel(type=Model),
      model_blessing=Channel(type=ModelBlessing))

  evaluator = Evaluator(
      examples=example_gen.outputs['examples'],
      model=trainer.outputs['model'],
      baseline_model=model_resolver.outputs['model'],
      # Change threshold will be ignored if there is no baseline (first run).
      eval_config=eval_config)

  # Performs infra validation of a candidate model to prevent unservable model
  # from being pushed. In order to use InfraValidator component, persistent
  # volume and its claim that the pipeline is using should be a ReadWriteMany
  # access mode.
  infra_validator = InfraValidator(
      model=trainer.outputs['model'],
      examples=example_gen.outputs['examples'],
      serving_spec=infra_validator_pb2.ServingSpec(
          tensorflow_serving=infra_validator_pb2.TensorFlowServing(
              tags=['latest']),
          kubernetes=infra_validator_pb2.KubernetesConfig()),
      request_spec=infra_validator_pb2.RequestSpec(
          tensorflow_serving=infra_validator_pb2.TensorFlowServingRequestSpec())
  )

  # Checks whether the model passed the validation steps and pushes the model
  # to  Google Cloud AI Platform if check passed.
  pusher = Pusher(
      model=trainer.outputs['model'],
      model_blessing=evaluator.outputs['blessing'],
      infra_blessing=infra_validator.outputs['blessing'],
      push_destination=pusher_pb2.PushDestination(
          filesystem=pusher_pb2.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  return pipeline.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=[
          example_gen,
          statistics_gen,
          schema_gen,
          example_validator,
          transform,
          trainer,
          model_resolver,
          evaluator,
          infra_validator,
          pusher,
      ],
      beam_pipeline_args=['--direct_num_workers=%d' % direct_num_workers],
  )

tfx에서 사용할 메타정보 저장소 설정. Kubeflow의 메타정보 저장소를 활용.

In [15]:
# from tfx.orchestration.kubeflow.proto import kubeflow_pb2

# metadata_config = kubeflow_pb2.KubeflowMetadataConfig()
metadata_config = kubeflow_dag_runner.get_default_kubeflow_metadata_config()
metadata_config.grpc_config.grpc_service_host.value = 'metadata-grpc-service.kubeflow'
metadata_config.grpc_config.grpc_service_port.value = '8080'
# metadata_config = kubeflow_dag_runner.get_default_kubeflow_metadata_config()
print(metadata_config)

grpc_config {
  grpc_service_host {
    value: "metadata-grpc-service.kubeflow"
  }
  grpc_service_port {
    value: "8080"
  }
}



KubeflowDagRunner 를 사용하여 작성한 TFX 파이프라인을 Kubeflow 파이프라인으로 생성.   
실행 후에는, chicago_taxi_pipeline_kubeflow_pvc.tar.gz 라는 Kubeflow 파이프라인 패키지가 생성됨.

In [16]:
if __name__ == '__main__':
  # This pipeline automatically injects the Kubeflow TFX image if the
  # environment variable 'KUBEFLOW_TFX_IMAGE' is defined. Currently, the tfx
  # cli tool exports the environment variable to pass to the pipelines.
# tfx_image = os.environ.get('KUBEFLOW_TFX_IMAGE', None)
    tfx_image = 'gcr.io/tfx-oss-public/tfx:0.22.0'

    runner_config = kubeflow_dag_runner.KubeflowDagRunnerConfig(
      kubeflow_metadata_config=metadata_config,
      # Specify custom docker image to use.
      tfx_image=tfx_image,
      pipeline_operator_funcs=(
          [
              onprem.mount_pvc(_persistent_volume_claim, _persistent_volume,
                               _persistent_volume_mount)
          ]))

    kubeflow_dag_runner.KubeflowDagRunner(config=runner_config).run(
      _create_pipeline(
          pipeline_name=_pipeline_name,
          pipeline_root=_pipeline_root,
          data_root=_data_root,
          module_file=_module_file,
          serving_model_dir=_serving_model_dir,
          # 0 means auto-detect based on the number of CPUs available during
          # execution time.
          direct_num_workers=0))

### 7. pipeline 실행

파이프라인 자동실행을 위한 rbac 설정.  
파이프라인을 실행시키는 서비스의 api로 요청을 보내기 위해서는 현재 namespace의 role binding을 통해 권한을 설정해야 한다.  
사용자 namespace 이름인 "ns-sunghj" 를 비롯한 설정값을 확인하고 입력하자.

In [17]:
_rbac_file = f"""apiVersion: rbac.istio.io/v1alpha1
kind: ServiceRoleBinding
metadata:
  name: bind-ml-pipeline-nb-{_namespace_name}
  namespace: kubeflow
spec:
  roleRef:
    kind: ServiceRole
    name: ml-pipeline-services
  subjects:
    - properties:
        source.principal: cluster.local/ns/{_namespace_name}/sa/default-editor
"""
with open('rbac.yaml', 'w') as writer:
    writer.write(_rbac_file)

파이프라인 자동실행을 위한 envoy filter 설정.   
파이프라인을 실행시키는 서비스의 api로 요청을 보내기 위해서는 kubeflow-userid 정보를 http 헤더에 보내야 하는데, 현재 버전에서 누락된 문제가 있다. 이를 해결하기 위해 envoy filter에 http 헤더를 추가하도록 설정한다.  
다음의 설정값들을 확인하고 입력하자.
* namespace (ns-sunghj) : 사용자 namespace
* header value (anonymous@kubeflow.org) : 다음의 명령으로 확인. "kubectl describe profiles ns-sunghj" (Owner값)
* notebook-name (nbserver-tfx-chicago-taxi-example) : 다음의 명령으로 확인 "kubectl get pod -n ns-sunghj --show-labels" (label의 notebook-name)

In [18]:
_envoyfilter_file = f"""apiVersion: networking.istio.io/v1alpha3
kind: EnvoyFilter
metadata:
  name: add-header-nbserver-tfx-chicago-taxi-example
  namespace: {_namespace_name}
spec:
  configPatches:
    - applyTo: VIRTUAL_HOST
      match:
        context: SIDECAR_OUTBOUND
        routeConfiguration:
          vhost:
            name: ml-pipeline.kubeflow.svc.cluster.local:8888
            route:
              name: default
      patch:
        operation: MERGE
        value:
          request_headers_to_add:
            - append: true
              header:
                key: kubeflow-userid
                value: anonymous@kubeflow.org
  workloadSelector:
    labels:
      notebook-name: {_notebook_name}
"""
with open('envoyfilter.yaml', 'w') as writer:
    writer.write(_envoyfilter_file)

"kubectl apply -f rbac.yaml" 명령은 사용자 namespace 안에 있는 현 notebook에서는 권한이 없어 실행할 수 없다.   
접근권한을 가진 shell에서 실행하자.

In [19]:
!kubectl apply -f envoyfilter.yaml

envoyfilter.networking.istio.io/add-header-nbserver-tfx-chicago-taxi-example unchanged


파이프라인 실행

In [20]:
import kfp
run_result = kfp.Client(
#     host=None  # replace with Kubeflow Pipelines endpoint if this notebook is run outside of the Kubeflow cluster.
#      host="ml-pipeline.kubeflow.svc.cluster.local:8888"
    
#     client_id="anonymous@kubeflow.org"
).create_run_from_pipeline_package('kubeflow_chicago_taxi_pipeline_demo.tar.gz', arguments={},namespace="ns-sunghj")