In [23]:
# Imports and global variable setup
import os
import sys
from typing import Text
from absl import logging
from tfx.orchestration import metadata, pipeline
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner

PIPELINE_NAME = "churn-pipeline"

# pipeline inputs
DATA_ROOT = "data"
TRANSFORM_MODULE_FILE = "modules/churn_transform.py"
TRAINER_MODULE_FILE = "modules/churn_trainer.py"

# pipeline outputs
OUTPUT_BASE = "san-limbong-pipeline"
serving_model_dir = os.path.join(OUTPUT_BASE, 'serving_model')
pipeline_root = os.path.join(OUTPUT_BASE, PIPELINE_NAME)
metadata_path = os.path.join(pipeline_root, "metadata.sqlite")


In [24]:
# Function to initialize the pipeline
def init_local_pipeline(components, pipeline_root: Text) -> pipeline.Pipeline:
    logging.info(f"Pipeline root set to: {pipeline_root}")
    beam_args = [
        "--direct_running_mode=multi_processing",
        # 0 auto-detect based on the number of CPUs available during execution time.
        "--direct_num_workers=0"
    ]
    
    return pipeline.Pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=pipeline_root,
        components=components,
        enable_cache=True,
        metadata_connection_config=metadata.sqlite_metadata_connection_config(
            metadata_path
        ),
        beam_pipeline_args=beam_args
    )


In [25]:
# Initializing components and running the pipeline
logging.set_verbosity(logging.INFO)

# Assuming init_components is defined in modules.components
from modules.components import init_components

components = init_components(
    DATA_ROOT,
    training_module=TRAINER_MODULE_FILE,
    transform_module=TRANSFORM_MODULE_FILE,
    training_steps=5000,
    eval_steps=1000,
    serving_model_dir=serving_model_dir,
)

pipeline = init_local_pipeline(components, pipeline_root)
BeamDagRunner().run(pipeline=pipeline)


INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Pipeline root set to: san-limbong-pipeline\churn-pipeline
INFO:absl:Generating ephemeral wheel package for 'd:\\Repository\\Dicoding Intermediate\\ML Ops\\template\\notebook\\modules\\churn_transform.py' (including modules: ['churn_trainer', 'churn_transform', 'components']).
INFO:absl:User module package has hash fingerprint version 4168ce76787e6e9aae9739374345022c0153009619192039455c8e6d45618f3e.
INFO:absl:Executing: ['c:\\Users\\USER\\anaconda3\\envs\\a443-churn\\python.exe', 'C:\\Users\\USER\\AppData\\Local\\Temp\\tmpgeh0yymc\\_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', 'C:\\Users\\USER\\AppData\\Local\\Temp\\tmpd60h0ltd', '--dist-dir', 'C:\\Users\\USER\\AppData\\Local\\Temp\\tmpzyr3itfa']
INFO:absl:Successfully built user code wheel distribution at 'san-limbong-pipeli

Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 Geography_xf (InputLayer)      [(None, 4)]          0           []                               
                                                                                                  
 Gender_xf (InputLayer)         [(None, 3)]          0           []                               
                                                                                                  
 CreditScore_xf (InputLayer)    [(None, 1)]          0           []                               
                                                                                                  
 Age_xf (InputLayer)            [(None, 1)]          0           []                               
                                                                                            

INFO:tensorflow:struct2tensor is not available.


INFO:tensorflow:tensorflow_decision_forests is not available.


INFO:tensorflow:tensorflow_decision_forests is not available.


INFO:tensorflow:tensorflow_text is not available.


INFO:tensorflow:tensorflow_text is not available.


INFO:tensorflow:Assets written to: san-limbong-pipeline\churn-pipeline\Trainer\model\7\Format-Serving\assets


INFO:tensorflow:Assets written to: san-limbong-pipeline\churn-pipeline\Trainer\model\7\Format-Serving\assets


You must install pydot (`pip install pydot`) and install graphviz (see instructions at https://graphviz.gitlab.io/download/) for plot_model to work.


INFO:absl:Training complete. Model written to san-limbong-pipeline\churn-pipeline\Trainer\model\7\Format-Serving. ModelRun written to san-limbong-pipeline\churn-pipeline\Trainer\model_run\7
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 7 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'model': [Artifact(artifact: uri: "san-limbong-pipeline\\churn-pipeline\\Trainer\\model\\7"
, artifact_type: name: "Model"
base_type: MODEL
)], 'model_run': [Artifact(artifact: uri: "san-limbong-pipeline\\churn-pipeline\\Trainer\\model_run\\7"
, artifact_type: name: "ModelRun"
)]}) for execution 7
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:node Trainer is finished.
INFO:absl:node Evaluator is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.evaluator.component.Evaluator"
    base_type: EVALUATE
  }
  id: "Evaluator"
}
contexts {
  contexts {
    typ



INFO:absl:The 'example_splits' parameter is not set, using 'eval' split.
INFO:absl:Evaluating model.
INFO:absl:udf_utils.get_fn {'example_splits': 'null', 'fairness_indicator_thresholds': 'null', 'eval_config': '{\n  "metrics_specs": [\n    {\n      "metrics": [\n        {\n          "class_name": "AUC"\n        },\n        {\n          "class_name": "Precision"\n        },\n        {\n          "class_name": "Recall"\n        },\n        {\n          "class_name": "ExampleCount"\n        },\n        {\n          "class_name": "BinaryAccuracy",\n          "threshold": {\n            "change_threshold": {\n              "absolute": 0.0001,\n              "direction": "HIGHER_IS_BETTER"\n            },\n            "value_threshold": {\n              "lower_bound": 0.5\n            }\n          }\n        }\n      ]\n    }\n  ],\n  "model_specs": [\n    {\n      "label_key": "Exited"\n    }\n  ],\n  "slicing_specs": [\n    {},\n    {\n      "feature_keys": [\n        "gender",\n        "



INFO:absl:Evaluation complete. Results written to san-limbong-pipeline\churn-pipeline\Evaluator\evaluation\8.
INFO:absl:Checking validation results.
INFO:absl:Blessing result True written to san-limbong-pipeline\churn-pipeline\Evaluator\blessing\8.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 8 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'evaluation': [Artifact(artifact: uri: "san-limbong-pipeline\\churn-pipeline\\Evaluator\\evaluation\\8"
, artifact_type: name: "ModelEvaluation"
)], 'blessing': [Artifact(artifact: uri: "san-limbong-pipeline\\churn-pipeline\\Evaluator\\blessing\\8"
, artifact_type: name: "ModelBlessing"
)]}) for execution 8
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:node Evaluator is finished.
INFO:absl:node Pusher is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.pusher.component.Pusher"
    base_type: DE