# TFX PIPELINE: MÔ HÌNH DNN PHÂN LOẠI CÁC LOÀI CHIM CÁNH CỤT

### Import thư viện và kiểm tra version

In [None]:
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))

TensorFlow version: 2.11.1
TFX version: 1.12.0


## 01. Set up:

### 01.1. Set up các biến:

Set up các biến liên quan đến pipeline:
- Tên pipeline (`penguin`).
- Path đến thư mục chứa các artifacts của từng component (`./pipelines/penguin/`).
- Path đến tập tin metadata.db (`./metadata/penguin/metadata.db`).
- Path đến thư mục serving model (`./serving_model/penguin/`).

Các biến path sẽ nằm trong thư mục hiện tại.

In [None]:
import os

# Name of pipeline
PIPELINE_NAME = "penguin"

# Output directory to store artifacts generated from the pipeline.
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)

# Path to a SQLite DB file to use as an MLMD storage.
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')

# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)

from absl import logging
logging.set_verbosity(logging.INFO)  # Set default logging level.

### 01.2. Chuẩn bị data file:

Ta sẽ lấy dữ liệu từ [Palmer Penguins dataset](https://allisonhorst.github.io/palmerpenguins/articles/intro.html), đây là dữ liệu raw.

Các biến liên quan đến data:
- Path đến thư mục chứa data (`/tmp/tfx-data.../`).
- URL để lấy data dưới dạng CSV.
- Path đến tập tin data.csv (`/tmp/tfx-data.../data.csv`).

In [None]:
import urllib.request
import tempfile

DATA_ROOT = tempfile.mkdtemp(prefix='tfx-data')  # Create a temporary directory.
_data_path = 'https://storage.googleapis.com/download.tensorflow.org/data/palmer_penguins/penguins_size.csv'
_data_filepath = os.path.join(DATA_ROOT, "data.csv")
urllib.request.urlretrieve(_data_path, _data_filepath)

('/tmp/tfx-datatund6htb/data.csv', <http.client.HTTPMessage at 0x7f3dc9f55c40>)

Ta sẽ xem một vài dòng đầu của dataset.

In [None]:
!head {_data_filepath}

species,island,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g,sex
Adelie,Torgersen,39.1,18.7,181,3750,MALE
Adelie,Torgersen,39.5,17.4,186,3800,FEMALE
Adelie,Torgersen,40.3,18,195,3250,FEMALE
Adelie,Torgersen,NA,NA,NA,NA,NA
Adelie,Torgersen,36.7,19.3,193,3450,FEMALE
Adelie,Torgersen,39.3,20.6,190,3650,MALE
Adelie,Torgersen,38.9,17.8,181,3625,FEMALE
Adelie,Torgersen,39.2,19.6,195,4675,MALE
Adelie,Torgersen,34.1,18.1,193,3475,NA


Dataset là dữ liệu raw chưa được xử lý, ở đây ta sẽ xóa những dòng có chứa NA.

In [None]:
!sed -i '/\bNA\b/d' {_data_filepath}
!head {_data_filepath}

species,island,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g,sex
Adelie,Torgersen,39.1,18.7,181,3750,MALE
Adelie,Torgersen,39.5,17.4,186,3800,FEMALE
Adelie,Torgersen,40.3,18,195,3250,FEMALE
Adelie,Torgersen,36.7,19.3,193,3450,FEMALE
Adelie,Torgersen,39.3,20.6,190,3650,MALE
Adelie,Torgersen,38.9,17.8,181,3625,FEMALE
Adelie,Torgersen,39.2,19.6,195,4675,MALE
Adelie,Torgersen,41.1,17.6,182,3200,FEMALE
Adelie,Torgersen,38.6,21.2,191,3800,MALE


Ở ta thấy có tổng cộng 6 thuộc tính và 1 class, nhưng chúng ta chỉ cần 4 thuộc tính số để dự đoán loài chim cánh cụt, vì vậy sẽ cần tiền xử lí ở `Tranform` để làm việc này.

### 01.3. Chuẩn bị schema file:

Để có được schema của dataset ta có thể tạo một pipeline khác bằng cách thưc hiện các thành phần: `CsvExampleGen`, `StatisticsGen`, `SchemaGen`. Nhưng ở đây để ngắn gọn nhóm em lấy schema đã có sẵn.

Các biến liên quan đến schema:
- Path đến thư mục chứa schema (`./schema/`).
- Path đến tập tin schema.pbtxt (`./schema/schema.pbtxt`).

Thực hiện tạo thư mục schema.

In [None]:
import shutil

SCHEMA_PATH = 'schema'

_schema_filename = 'schema.pbtxt'
_schema_filepath = os.path.join(SCHEMA_PATH, _schema_filename)
!mkdir {SCHEMA_PATH}

Ghi schema vào file.

In [None]:
%%writefile {_schema_filepath}
# proto-file: tensorflow_metadata/proto/v0/schema.proto
# proto-message: Schema
feature {
  name: "body_mass_g"
  type: INT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "culmen_depth_mm"
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "culmen_length_mm"
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "flipper_length_mm"
  type: INT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "island"
  type: BYTES
  domain: "island"
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "sex"
  type: BYTES
  domain: "sex"
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "species"
  type: BYTES
  domain: "species"
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
string_domain {
  name: "island"
  value: "Biscoe"
  value: "Dream"
  value: "Torgersen"
}
string_domain {
  name: "sex"
  value: "FEMALE"
  value: "MALE"
  value: "."
}
string_domain {
  name: "species"
  value: "Adelie"
  value: "Chinstrap"
  value: "Gentoo"
}
# generate_legacy_feature_spec: false


Writing schema/schema.pbtxt


## 02. Tạo pipeline:

### 02.1. Thực hiện code thực thi cho thành phần `Transform` và `Trainer`:

Các thành phần sau sẽ cần tạo code thực thi trong file riêng biệt:
- `Transform`: thực hiện tiền xử lí model phục vụ cho input của training và serving, preprocess code sẽ dược thực hiện trong Python file tách biệt. Ở đây `Transform` sẽ gọi hàm `preprocessing_fn` trong Python file.
    - Các phương thức tiền xử lí trong `preprocessing_fn` bao gồm:
        - `tft.scale_to_z_score`: chuẩn hóa Z score cho 4 thuộc tính continuous numeric.
        - `tf.lookup.StaticHashTable`: convert chuỗi thành giá trị chỉ số đối với thuộc tính phân loại.
        - Loại bỏ các thuộc tính không dùng cho dự đoán.
    - Các hàm khác như:
        - `_apply_preprocessing`: thực hiện transform cho training và serving.
        - `_get_serve_tf_examples_fn`: nhận vào chuỗi input và thực hiện tiền xử lí, áp dụng model cho inference.
- `Trainer`: training DNN model sử dụng Keras API, training code sẽ dược thực hiện trong Python file tách biệt (trong cùng file với preprocess code). Ở đây ta dùng **Generic Trainer** mà sẽ gọi hàm `run_fn` trong Python file. Các phương thức thuộc `Trainer` gồm:
    - `_input_fn`: tạo thuộc tính và label cho training.
    - `_build_keras_model`: tạo DNN Keras model cho việc phân loại.
    - `run_fn`: nhận vào các đối số của training, gọi `_input_fn` trên tập train và eval, tạo ra model từ đó training trên model đó, đồng thời cũng lưu model với signature tương ứng.

Tên Python file sẽ là `penguin_utils.py` mà sẽ được lưu trong thư mục hiện tại.

In [None]:
_module_file = 'penguin_utils.py'

Ghi preprocess, training code (như đã đề cập ở trên) vào file.

In [None]:
%%writefile {_module_file}
from typing import List, Text
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_metadata.proto.v0 import schema_pb2
import tensorflow_transform as tft
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

# Specify features that we will use.
_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10


# TFX Transform will call this function.
def preprocessing_fn(inputs):
  """tf.transform's callback function for preprocessing inputs.

  Args:
    inputs: map from feature keys to raw not-yet-transformed features.

  Returns:
    Map from string feature key to transformed feature.
  """
  outputs = {}

  # Uses features defined in _FEATURE_KEYS only.
  for key in _FEATURE_KEYS:
    # tft.scale_to_z_score computes the mean and variance of the given feature
    # and scales the output based on the result.
    outputs[key] = tft.scale_to_z_score(inputs[key])

  # For the label column we provide the mapping from string to index.
  # We could instead use `tft.compute_and_apply_vocabulary()` in order to
  # compute the vocabulary dynamically and perform a lookup.
  # Since in this example there are only 3 possible values, we use a hard-coded
  # table for simplicity.
  table_keys = ['Adelie', 'Chinstrap', 'Gentoo']
  initializer = tf.lookup.KeyValueTensorInitializer(
      keys=table_keys,
      values=tf.cast(tf.range(len(table_keys)), tf.int64),
      key_dtype=tf.string,
      value_dtype=tf.int64)
  table = tf.lookup.StaticHashTable(initializer, default_value=-1)
  outputs[_LABEL_KEY] = table.lookup(inputs[_LABEL_KEY])

  return outputs


# This function will apply the same transform operation to training data
#      and serving requests.
def _apply_preprocessing(raw_features, tft_layer):
  transformed_features = tft_layer(raw_features)
  if _LABEL_KEY in raw_features:
    transformed_label = transformed_features.pop(_LABEL_KEY)
    return transformed_features, transformed_label
  else:
    return transformed_features, None


# This function will create a handler function which gets a serialized
#      tf.example, preprocess and run an inference with it.
def _get_serve_tf_examples_fn(model, tf_transform_output):
  # We must save the tft_layer to the model to ensure its assets are kept and
  # tracked.
  model.tft_layer = tf_transform_output.transform_features_layer()

  @tf.function(input_signature=[
      tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')
  ])
  def serve_tf_examples_fn(serialized_tf_examples):
    # Expected input is a string which is serialized tf.Example format.
    feature_spec = tf_transform_output.raw_feature_spec()
    # Because input schema includes unnecessary fields like 'species' and
    # 'island', we filter feature_spec to include required keys only.
    required_feature_spec = {
        k: v for k, v in feature_spec.items() if k in _FEATURE_KEYS
    }
    parsed_features = tf.io.parse_example(serialized_tf_examples,
                                          required_feature_spec)

    # Preprocess parsed input with transform operation defined in
    # preprocessing_fn().
    transformed_features, _ = _apply_preprocessing(parsed_features,
                                                   model.tft_layer)
    # Run inference with ML model.
    return model(transformed_features)

  return serve_tf_examples_fn


def _input_fn(file_pattern: List[Text],
              data_accessor: tfx.components.DataAccessor,
              tf_transform_output: tft.TFTransformOutput,
              batch_size: int = 200) -> tf.data.Dataset:
  """Generates features and label for tuning/training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    tf_transform_output: A TFTransformOutput.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  dataset = data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(batch_size=batch_size),
      schema=tf_transform_output.raw_metadata.schema)

  transform_layer = tf_transform_output.transform_features_layer()
  def apply_transform(raw_features):
    return _apply_preprocessing(raw_features, transform_layer)

  return dataset.map(apply_transform).repeat()


def _build_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [
      keras.layers.Input(shape=(1,), name=key)
      for key in _FEATURE_KEYS
  ]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """
  tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      tf_transform_output,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      tf_transform_output,
      batch_size=_EVAL_BATCH_SIZE)

  model = _build_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # Save a computation graph including transform layer.
  signatures = {
      'serving_default': _get_serve_tf_examples_fn(model, tf_transform_output),
  }
  model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)

Writing penguin_utils.py


### 02.2. Định nghĩa pipeline:

Ta sẽ tạo pipeline với các thành phần sau:
- `CsvExampleGen`: đọc CSV file.
- `StatisticsGen`: có các số liệu thống kê trên dataset.
- `Importer`: nhập schema file đã tạo ở trên.
- `ExampleValidator`: phát hiện ra các bất thường dữ liệu dựa và số liệu thống kê và schema.
- `Transform`.
- `Trainer`.
- `Pusher`: lấy model đã được train và thực hiện triển khai, sau bước này, model sẽ được lưu trong thư mục `./serving_model/penguin` với các version khác nhau.

In [None]:
import tensorflow_model_analysis as tfma

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     schema_path: str, module_file: str, serving_model_dir: str,
                     metadata_path: str) -> tfx.dsl.Pipeline:
  """Creates a pipeline for schema generation."""
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # Computes statistics over data for visualization and schema generation.
  statistics_gen = tfx.components.StatisticsGen(
      examples=example_gen.outputs['examples'])

  # Import the schema.
  schema_importer = tfx.dsl.Importer(
      source_uri=schema_path,
      artifact_type=tfx.types.standard_artifacts.Schema).with_id(
          'schema_importer')

  # Performs anomaly detection based on statistics and data schema.
  example_validator = tfx.components.ExampleValidator(
      statistics=statistics_gen.outputs['statistics'],
      schema=schema_importer.outputs['result'])
  
  # Transforms input data using preprocessing_fn in the 'module_file'.
  transform = tfx.components.Transform(
      examples=example_gen.outputs['examples'],
      schema=schema_importer.outputs['result'],
      materialize=False,
      module_file=module_file)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],

      # Pass transform_graph to the trainer.
      transform_graph=transform.outputs['transform_graph'],

      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))

  # Pushes the model to a file destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      # model_blessing=evaluator.outputs['blessing'], # Pass an evaluation result.
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))
  
  components = [
      example_gen,
      statistics_gen,
      schema_importer,
      example_validator,
      transform,
      trainer,
      pusher
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      metadata_connection_config=tfx.orchestration.metadata
      .sqlite_metadata_connection_config(metadata_path),
      components=components)

### 02.3. Run pipeline:

Ta sẽ dùng `LocalDagRunner` nhận vào đối tường pipeline và thực hiện run pipeline đó.

Có thể xem kết quả ở các bước run từng thành phần, ở dòng cuối cùng ta thấy `Pusher` - thành phần cuối cùng của pipeline đã kết thúc (nghĩa là đã run thành công) tương đương với cả pipeline đã run thành công.

In [None]:
tfx.orchestration.LocalDagRunner().run(
  _create_pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      data_root=DATA_ROOT,
      schema_path=SCHEMA_PATH,
      module_file=_module_file,
      serving_model_dir=SERVING_MODEL_DIR,
      metadata_path=METADATA_PATH))

INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Generating ephemeral wheel package for '/content/penguin_utils.py' (including modules: ['penguin_utils']).
INFO:absl:User module package has hash fingerprint version e88c42a59434fcef742385c0726b84c5962378e48160e116b85b4c60cbf912a5.
INFO:absl:Executing: ['/usr/bin/python3', '/tmp/tmproqachdz/_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', '/tmp/tmp2uxe4bny', '--dist-dir', '/tmp/tmpcouvxh6d']
INFO:absl:Successfully built user code wheel distribution at 'pipelines/penguin/_wheels/tfx_user_code_Transform-0.0+e88c42a59434fcef742385c0726b84c5962378e48160e116b85b4c60cbf912a5-py3-none-any.whl'; target user module is 'penguin_utils'.
INFO:absl:Full user module path is 'penguin_utils@pipelines/penguin/_wheels/tfx_user_code_Transform-0.0+e88c42a59434fcef742385c0726b84c5962378e48160e116b85b4c60cbf912a5-py3-none-any.whl'
INFO:absl:Generating ephemeral w

INFO:absl:Processing input csv data /tmp/tfx-datatund6htb/* to TFExample.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 1 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "pipelines/penguin/CsvExampleGen/examples/1"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:13161,xor_checksum:1681841112,sum_checksum:1681841112"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key



INFO:absl:Feature body_mass_g has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_depth_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature flipper_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature island has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature sex has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature species has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Training complete. Model written to pipelines/penguin/Trainer/model/6/Format-Serving. ModelRun written to pipelines/penguin/Trainer/model_run/6
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 6 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'model': [Artifact(artifact: uri: "pipelines/pengui

Sau khi model được push, ta sẽ thấy các version đã được train trong `./serving_mode/penguin`.

In [None]:
!find {SERVING_MODEL_DIR}

serving_model/penguin
serving_model/penguin/1681841210
serving_model/penguin/1681841210/keras_metadata.pb
serving_model/penguin/1681841210/saved_model.pb
serving_model/penguin/1681841210/variables
serving_model/penguin/1681841210/variables/variables.data-00000-of-00001
serving_model/penguin/1681841210/variables/variables.index
serving_model/penguin/1681841210/assets
serving_model/penguin/1681841210/fingerprint.pb


Ta cũng có thể kiểm tra sinature của model thông qua lệnh dưới đây.

In [None]:
!saved_model_cli show --dir {SERVING_MODEL_DIR}/$(ls -1 {SERVING_MODEL_DIR} | sort -nr | head -1) --tag_set serve --signature_def serving_default

2023-04-18 18:07:08.881687: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/nvidia/lib:/usr/local/nvidia/lib64
2023-04-18 18:07:08.881882: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/nvidia/lib:/usr/local/nvidia/lib64
The given SavedModel SignatureDef contains the following input(s):
  inputs['examples'] tensor_info:
      dtype: DT_STRING
      shape: (-1)
      name: serving_default_examples:0
The given SavedModel SignatureDef contains the following output(s):
  outputs['output_0'] tensor_info:
      dtype: DT_FLOAT
      shape: (-1, 3)
      name: StatefulPartitionedCall_1:0
Method name i

## 03. Kiểm tra các artifacts từ các thành phần:

Ta sẽ định nghĩa các hàm:
- `get_latest_artifacts`: để có artifacts cuối cùng từ thành phần xác định của model đã train.
- `visualize_artifacts`: trực quan hóa các artifacts lấy được ở trên.

In [None]:
from ml_metadata.proto import metadata_store_pb2
# Non-public APIs, just for showcase.
from tfx.orchestration.portable.mlmd import execution_lib

def get_latest_artifacts(metadata, pipeline_name, component_id):
  """Output artifacts of the latest run of the component."""
  context = metadata.store.get_context_by_type_and_name(
      'node', f'{pipeline_name}.{component_id}')
  executions = metadata.store.get_executions_by_context(context.id)
  latest_execution = max(executions,
                         key=lambda e:e.last_update_time_since_epoch)
  return execution_lib.get_output_artifacts(metadata, latest_execution.id)

# Non-public APIs, just for showcase.
from tfx.orchestration.experimental.interactive import visualizations

def visualize_artifacts(artifacts):
  """Visualizes artifacts using standard visualization modules."""
  for artifact in artifacts:
    visualization = visualizations.get_registry().get_visualization(
        artifact.type_name)
    if visualization:
      visualization.display(artifact)

from tfx.orchestration.experimental.interactive import standard_visualizations
standard_visualizations.register_standard_visualizations()

Lấy artifacts từ 2 thành phần `StatisticsGen` và `ExampleValidator`.

In [None]:
# Non-public APIs, just for showcase.
from tfx.orchestration.metadata import Metadata
from tfx.types import standard_component_specs

metadata_connection_config = tfx.orchestration.metadata.sqlite_metadata_connection_config(
    METADATA_PATH)

with Metadata(metadata_connection_config) as metadata_handler:
  # Find output artifacts from MLMD.
  stat_gen_output = get_latest_artifacts(metadata_handler, PIPELINE_NAME,
                                         'StatisticsGen')
  stats_artifacts = stat_gen_output[standard_component_specs.STATISTICS_KEY]
  
  ev_output = get_latest_artifacts(metadata_handler, PIPELINE_NAME,
                                   'ExampleValidator')
  anomalies_artifacts = ev_output[standard_component_specs.ANOMALIES_KEY]

INFO:absl:MetadataStore with DB connection initialized


Visualize các artifacts:
- Với artifacts từ `StatisticsGen`: kết quả visualize ta có thể xem (ở dạng số liệu hoặc đồ thị): count, missing, mean/median/std, min/max trên từng tập train và eval cũng như trên các thuộc tính khác nhau.
- Với artifacts từ `ExampleValidator`: kết quả visualize ta có thể thấy các bất thường của dữ liệu (trường hợp ở đây không có bất thường nào bởi bộ dữ liệu dùng để tạo ra schema và bộ dữ liệu dùng để train ở đây là giống nhau).

In [None]:
# docs-infra: no-execute
visualize_artifacts(stats_artifacts)

In [None]:
visualize_artifacts(anomalies_artifacts)

## 04. Serving model:

### 04.1. Serving trực tiếp đến model đã lưu:

- Tìm model cuối cùng đã được train.
- Load model.
- Tạo chức năng inference mà đầu vào là input, đầu ra là kết quả dự đoán.

In [None]:
# Find a model with the latest timestamp.
model_dirs = (item for item in os.scandir(SERVING_MODEL_DIR) if item.is_dir())
model_path = max(model_dirs, key=lambda i: int(i.name)).path

loaded_model = tf.keras.models.load_model(model_path)
inference_fn = loaded_model.signatures['serving_default']



- Input gồm 4 thuộc tính sẽ được chuyển về kiểu chuỗi và thông qua chức năng inference ta sẽ được output tương ứng.
- Kết quả dự đoán dưới đây cho thấy phần tử số 3 cho kết quả cao nhất tương đương chỉ số là 2 (thuộc loài **Gentoo**).

In [None]:
# Prepare an example and run inference.
features = {
  'culmen_length_mm': tf.train.Feature(float_list=tf.train.FloatList(value=[49.9])),
  'culmen_depth_mm': tf.train.Feature(float_list=tf.train.FloatList(value=[16.1])),
  'flipper_length_mm': tf.train.Feature(int64_list=tf.train.Int64List(value=[213])),
  'body_mass_g': tf.train.Feature(int64_list=tf.train.Int64List(value=[5400])),
}
example_proto = tf.train.Example(features=tf.train.Features(feature=features))
examples = example_proto.SerializeToString()

result = inference_fn(examples=tf.constant([examples]))
print(result['output_0'].numpy())

[[-4.2751994 -2.7406695  3.0294216]]


## 04.2 Serving đến model thông qua model server:

- Ở đây nhóm em dùng `tensorflow_model_server` để serving đến server (ngoài ra có thể sử dụng `docker` để serving).
- Ba dòng code dưới đây được thực hiện trên **terminal**.
    - Định nghĩa các biến model name và đường dẫn tuyệt đối đên model.
    - Thực hiện khởi chạy server bằng `tensorflow_model_server` trên các port 8500 (cho giao thức gRPC) và 8501 (cho giao thức REST API).

In [None]:
# Bash
export MODEL_NAME='penguin_model' && \
export MODEL_BASE_PATH='/home/ntpshin/Documents/TensorflowExtended/SOURCE/demo/serving_model/penguin' && \
tensorflow_model_server --port=8500 --rest_api_port=8501 \
  --model_name=$MODEL_NAME --model_base_path=$MODEL_BASE_PATH

[warn] getaddrinfo: address family for nodename not supported
[evhttp_server.cc : 245] NET_LOG: Entering the event loop ...
^C


- Thực hiện curl đến server với phương thức là predict, vì input là serialized string của tf.Examples, không thể truyền kiểu float, nên ở đây ta dùng biến `examples` ở trên và được encode để truyền vào curl.
- Ta có thể thấy output tương tự như thực hiện predict ngay trên model.

In [None]:
import base64
print(base64.b64encode(examples).decode('utf-8'))

Cm8KGwoRZmxpcHBlcl9sZW5ndGhfbW0SBhoECgLVAQoVCgtib2R5X21hc3NfZxIGGgQKApgqChwKEGN1bG1lbl9sZW5ndGhfbW0SCBIGCgSamUdCChsKD2N1bG1lbl9kZXB0aF9tbRIIEgYKBM3MgEE=


In [None]:
# Bash
export MODEL_NAME='penguin_model' && \
curl -d '{"instances": [{"b64": "Cm8KGwoRZmxpcHBlcl9sZW5ndGhfbW0SBhoECgLVAQoVCgtib2R5X21hc3NfZxIGGgQKApgqChwKEGN1bG1lbl9sZW5ndGhfbW0SCBIGCgSamUdCChsKD2N1bG1lbl9kZXB0aF9tbRIIEgYKBM3MgEE="}]}' \
  -X POST http://localhost:8501/v1/models/$MODEL_NAME:predict