# Membuat Machine Learning Pipeline

In [1]:
import os
import sys
from typing import Text

from absl import logging
from tfx.orchestration import metadata, pipeline
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner

## Set Variable

In [2]:
PIPELINE_NAME = "jelvin_krisna_putra-pipeline"

# pipeline inputs
DATA_ROOT = "data"
TRANSFORM_MODULE_FILE = "modules/restaurant_review_transform.py"
TRAINER_MODULE_FILE = "modules/restaurant_review_trainer.py"
# requirement_file = os.path.join(root, "requirements.txt")

# pipeline outputs
OUTPUT_BASE = "output"
serving_model_dir = os.path.join(OUTPUT_BASE, 'serving_model')
pipeline_root = os.path.join(OUTPUT_BASE, PIPELINE_NAME)
metadata_path = os.path.join(pipeline_root, "metadata.sqlite")

## Create init_local_pipeline function

In [3]:
def init_local_pipeline(
    components, pipeline_root: Text
) -> pipeline.Pipeline:
    
    logging.info(f"Pipeline root set to: {pipeline_root}")
    beam_args = [
        "--direct_running_mode=multi_processing"
        # 0 auto-detect based on on the number of CPUs available 
        # during execution time.
        "----direct_num_workers=0" 
    ]
    
    return pipeline.Pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=pipeline_root,
        components=components,
        enable_cache=True,
        metadata_connection_config=metadata.sqlite_metadata_connection_config(
            metadata_path
        ),
        eam_pipeline_args=beam_args
    )

## Run the Machine Learning Operation Pipelines

In [None]:
if __name__ == "__main__":
    logging.set_verbosity(logging.INFO)
    
    from modules.components import init_components
    
    components = init_components(
        DATA_ROOT,
        training_module=TRAINER_MODULE_FILE,
        transform_module=TRANSFORM_MODULE_FILE,
        training_steps=5000,
        eval_steps=1000,
        serving_model_dir=serving_model_dir,
    )
    
    pipeline = init_local_pipeline(components, pipeline_root)
    BeamDagRunner().run(pipeline=pipeline)

INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Pipeline root set to: output\jelvin_krisna_putra-pipeline
INFO:absl:Generating ephemeral wheel package for 'E:\\UserKrisna\\Github Repos\\MLops-Beam\\modules\\restaurant_review_transform.py' (including modules: ['components', 'restaurant_review_trainer', 'restaurant_review_transform']).
INFO:absl:User module package has hash fingerprint version b63686f5da36d098dd07329b354c9ff54f2e11c5390713bba85baa6f2c8bff14.
INFO:absl:Executing: ['C:\\Users\\krisna\\.conda\\envs\\mlops-tfx\\python.exe', 'C:\\Users\\krisna\\AppData\\Local\\Temp\\tmp_n1svugi\\_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', 'C:\\Users\\krisna\\AppData\\Local\\Temp\\tmpy4o4urn2', '--dist-dir', 'C:\\Users\\krisna\\AppData\\Local\\Temp\\tmpo3v8jhf2']
INFO:absl:Successfully built user code wheel distribution at 'out

INFO:absl:Node CsvExampleGen depends on [].
INFO:absl:Node CsvExampleGen is scheduled.
INFO:absl:Node Latest_blessed_model_resolver depends on [].
INFO:absl:Node Latest_blessed_model_resolver is scheduled.
INFO:absl:Node StatisticsGen depends on ['Run[CsvExampleGen]'].
INFO:absl:Node StatisticsGen is scheduled.
INFO:absl:Node SchemaGen depends on ['Run[StatisticsGen]'].
INFO:absl:Node SchemaGen is scheduled.
INFO:absl:Node ExampleValidator depends on ['Run[SchemaGen]', 'Run[StatisticsGen]'].
INFO:absl:Node ExampleValidator is scheduled.
INFO:absl:Node Transform depends on ['Run[CsvExampleGen]', 'Run[SchemaGen]'].
INFO:absl:Node Transform is scheduled.
INFO:absl:Node Trainer depends on ['Run[SchemaGen]', 'Run[Transform]'].
INFO:absl:Node Trainer is scheduled.
INFO:absl:Node Evaluator depends on ['Run[CsvExampleGen]', 'Run[Latest_blessed_model_resolver]', 'Run[Trainer]'].
INFO:absl:Node Evaluator is scheduled.
INFO:absl:Node Pusher depends on ['Run[Evaluator]', 'Run[Trainer]'].
INFO:absl

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:[StatisticsGen] Resolved inputs: ({'examples': [Artifact(artifact: id: 1
type_id: 16
uri: "output\\jelvin_krisna_putra-pipeline\\CsvExampleGen\\examples\\2"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
  }
}
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:12278,xor_checksum:1624362428,sum_checksum:1624362428"
  }
}
custom_properties {
  key: "is_external"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "state"
  value {
    string_value: "published"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.11.0"
  }
}
state: LIV

INFO:absl:[ExampleValidator] Resolved inputs: ({'statistics': [Artifact(artifact: id: 2
type_id: 18
uri: "output\\jelvin_krisna_putra-pipeline\\StatisticsGen\\statistics\\3"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "is_external"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "state"
  value {
    string_value: "published"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.11.0"
  }
}
state: LIVE
create_time_since_epoch: 1686794559428
last_update_time_since_epoch: 1686794559428
, artifact_type: id: 18
name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
base_type: STATISTICS
)], 'schema': [Artifact(artifact: id: 3
type_id: 20
uri: "output\\jelvin_krisna_putra-pipeline\\SchemaGen\\schema\\4"
custom_properties {
  key: "is_external"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "state"
  value

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 40
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=40, input_dict={'schema': [Artifact(artifact: id: 3
type_id: 20
uri: "output\\jelvin_krisna_putra-pipeline\\SchemaGen\\schema\\4"
custom_properties {
  key: "is_external"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "state"
  value {
    string_value: "published"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.11.0"
  }
}
state: LIVE
create_time_since_epoch: 1686794559868
last_update_time_since_epoch: 1686794559868
, artifact_type: id: 20
name: "Schema"
)], 'examples': [Artifact(artifact: id: 1
type_id: 16
uri: "output\\jelvin_krisna_putra-pipeline\\CsvExampleGen\\examples\\2"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
  }
}
custom_properties {
  k

INFO:absl:Analyze the 'train' split and transform all splits when splits_config is not set.
INFO:absl:udf_utils.get_fn {'module_file': None, 'module_path': 'restaurant_review_transform@output\\jelvin_krisna_putra-pipeline\\_wheels\\tfx_user_code_Transform-0.0+b63686f5da36d098dd07329b354c9ff54f2e11c5390713bba85baa6f2c8bff14-py3-none-any.whl', 'preprocessing_fn': None} 'preprocessing_fn'
INFO:absl:Installing 'output\\jelvin_krisna_putra-pipeline\\_wheels\\tfx_user_code_Transform-0.0+b63686f5da36d098dd07329b354c9ff54f2e11c5390713bba85baa6f2c8bff14-py3-none-any.whl' to a temporary directory.
INFO:absl:Executing: ['C:\\Users\\krisna\\.conda\\envs\\mlops-tfx\\python.exe', '-m', 'pip', 'install', '--target', 'C:\\Users\\krisna\\AppData\\Local\\Temp\\tmp7vsonm6s', 'output\\jelvin_krisna_putra-pipeline\\_wheels\\tfx_user_code_Transform-0.0+b63686f5da36d098dd07329b354c9ff54f2e11c5390713bba85baa6f2c8bff14-py3-none-any.whl']
INFO:absl:Successfully installed 'output\\jelvin_krisna_putra-pipeline\\_

Instructions for updating:
Use ref() instead.


Instructions for updating:
Use ref() instead.
INFO:absl:Feature Ambiance has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature Hygienic  has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature Menu_variety has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature Parking has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature Pet_friendly has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature Service has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature Smoking_area has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature Target has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature Taste has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature Vegan_options has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature Worth_the_price has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:If the number

INFO:absl:If the number of unique tokens is smaller than the provided top_k or approximation error is acceptable, consider using tft.experimental.approximate_vocabulary for a potentially more efficient implementation.
INFO:absl:If the number of unique tokens is smaller than the provided top_k or approximation error is acceptable, consider using tft.experimental.approximate_vocabulary for a potentially more efficient implementation.
INFO:absl:If the number of unique tokens is smaller than the provided top_k or approximation error is acceptable, consider using tft.experimental.approximate_vocabulary for a potentially more efficient implementation.
INFO:absl:If the number of unique tokens is smaller than the provided top_k or approximation error is acceptable, consider using tft.experimental.approximate_vocabulary for a potentially more efficient implementation.
INFO:absl:If the number of unique tokens is smaller than the provided top_k or approximation error is acceptable, consider using

INFO:absl:Feature Ambiance has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature Menu_variety has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature Parking has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature Pet_friendly has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature Service has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature Smoking_area has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature Target has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature Taste has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature Vegan_options has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature Worth_the_price has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature Ambiance has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature Hygienic  has a shape dim {
  size: 1
}
. Setting to

INFO:tensorflow:Assets written to: output\jelvin_krisna_putra-pipeline\Transform\transform_graph\40\.temp_path\tftransform_tmp\2c758017cfff4f8a9a013adeee485eb8\assets


INFO:tensorflow:Assets written to: output\jelvin_krisna_putra-pipeline\Transform\transform_graph\40\.temp_path\tftransform_tmp\2c758017cfff4f8a9a013adeee485eb8\assets


INFO:tensorflow:struct2tensor is not available.


INFO:tensorflow:struct2tensor is not available.


INFO:tensorflow:tensorflow_decision_forests is not available.


INFO:tensorflow:tensorflow_decision_forests is not available.


INFO:tensorflow:tensorflow_text is not available.


INFO:tensorflow:tensorflow_text is not available.
INFO:absl:If the number of unique tokens is smaller than the provided top_k or approximation error is acceptable, consider using tft.experimental.approximate_vocabulary for a potentially more efficient implementation.
INFO:absl:If the number of unique tokens is smaller than the provided top_k or approximation error is acceptable, consider using tft.experimental.approximate_vocabulary for a potentially more efficient implementation.
INFO:absl:If the number of unique tokens is smaller than the provided top_k or approximation error is acceptable, consider using tft.experimental.approximate_vocabulary for a potentially more efficient implementation.
INFO:absl:If the number of unique tokens is smaller than the provided top_k or approximation error is acceptable, consider using tft.experimental.approximate_vocabulary for a potentially more efficient implementation.
INFO:absl:If the number of unique tokens is smaller than the provided top_k or

INFO:tensorflow:Assets written to: output\jelvin_krisna_putra-pipeline\Transform\transform_graph\40\.temp_path\tftransform_tmp\e1a4246242bb4e6a9af9b383c27ba397\assets


INFO:tensorflow:Assets written to: output\jelvin_krisna_putra-pipeline\Transform\transform_graph\40\.temp_path\tftransform_tmp\e1a4246242bb4e6a9af9b383c27ba397\assets
INFO:absl:If the number of unique tokens is smaller than the provided top_k or approximation error is acceptable, consider using tft.experimental.approximate_vocabulary for a potentially more efficient implementation.
INFO:absl:If the number of unique tokens is smaller than the provided top_k or approximation error is acceptable, consider using tft.experimental.approximate_vocabulary for a potentially more efficient implementation.
INFO:absl:If the number of unique tokens is smaller than the provided top_k or approximation error is acceptable, consider using tft.experimental.approximate_vocabulary for a potentially more efficient implementation.
INFO:absl:If the number of unique tokens is smaller than the provided top_k or approximation error is acceptable, consider using tft.experimental.approximate_vocabulary for a pote

INFO:tensorflow:struct2tensor is not available.


INFO:tensorflow:struct2tensor is not available.


INFO:tensorflow:tensorflow_decision_forests is not available.


INFO:tensorflow:tensorflow_decision_forests is not available.


INFO:tensorflow:tensorflow_text is not available.


INFO:tensorflow:tensorflow_text is not available.


INFO:tensorflow:struct2tensor is not available.


INFO:tensorflow:struct2tensor is not available.


INFO:tensorflow:tensorflow_decision_forests is not available.


INFO:tensorflow:tensorflow_decision_forests is not available.


INFO:tensorflow:tensorflow_text is not available.


INFO:tensorflow:tensorflow_text is not available.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 40 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'transform_graph': [Artifact(artifact: uri: "output\\jelvin_krisna_putra-pipeline\\Transform\\transform_graph\\40"
, artifact_type: name: "TransformGraph"
)], 'post_transform_schema': [Artifact(artifact: uri: "output\\jelvin_krisna_putra-pipeline\\Transform\\post_transform_schema\\40"
, artifact_type: name: "Schema"
)], 'post_transform_anomalies': [Artifact(artifact: uri: "output\\jelvin_krisna_putra-pipeline\\Transform\\post_transform_anomalies\\40"
, artifact_type: name: "ExampleAnomalies"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
)], 'pre_transform_stats': [Artifact(artifact: uri: "output\\jelvin_krisna_putra-pipeline\\Transform\\pre_transform_stats\\40"
, artifact_type: name: "ExampleStati

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 41
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=41, input_dict={'examples': [Artifact(artifact: id: 29
type_id: 16
uri: "output\\jelvin_krisna_putra-pipeline\\Transform\\transformed_examples\\40"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "is_external"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "state"
  value {
    string_value: "published"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.11.0"
  }
}
state: LIVE
create_time_since_epoch: 1686796573317
last_update_time_since_epoch: 1686796573317
, artifact_type: id: 16
name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)], 'schema': [Artifact(artifact: id: 3
type_id: 20
uri: "outpu

INFO:absl:udf_utils.get_fn {'custom_config': 'null', 'train_args': '{\n  "num_steps": 5000,\n  "splits": [\n    "train"\n  ]\n}', 'module_path': 'restaurant_review_trainer@output\\jelvin_krisna_putra-pipeline\\_wheels\\tfx_user_code_Trainer-0.0+b63686f5da36d098dd07329b354c9ff54f2e11c5390713bba85baa6f2c8bff14-py3-none-any.whl', 'eval_args': '{\n  "num_steps": 1000,\n  "splits": [\n    "eval"\n  ]\n}'} 'run_fn'
INFO:absl:Installing 'output\\jelvin_krisna_putra-pipeline\\_wheels\\tfx_user_code_Trainer-0.0+b63686f5da36d098dd07329b354c9ff54f2e11c5390713bba85baa6f2c8bff14-py3-none-any.whl' to a temporary directory.
INFO:absl:Executing: ['C:\\Users\\krisna\\.conda\\envs\\mlops-tfx\\python.exe', '-m', 'pip', 'install', '--target', 'C:\\Users\\krisna\\AppData\\Local\\Temp\\tmpaypfd2xv', 'output\\jelvin_krisna_putra-pipeline\\_wheels\\tfx_user_code_Trainer-0.0+b63686f5da36d098dd07329b354c9ff54f2e11c5390713bba85baa6f2c8bff14-py3-none-any.whl']
INFO:absl:Successfully installed 'output\\jelvin_kris

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 Vegan_options_xf (InputLayer)  [(None, 3)]          0           []                               
                                                                                                  
 Smoking_area_xf (InputLayer)   [(None, 3)]          0           []                               
                                                                                                  
 Parking_xf (InputLayer)        [(None, 3)]          0           []                               
                                                                                                  
 Pet_friendly_xf (InputLayer)   [(None, 3)]          0           []                               
                                                                                              