# Import Library

In [5]:
import os
import sys
from typing import Text

from absl import logging
from tfx.orchestration import metadata, pipeline
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner

# Set Variables

In [6]:

PIPELINE_NAME = "wahyuazizi-pipeline"

# Pipeline inputs
DATA_ROOT = "data"
TRANSFORM_MODULE_FILE = "modules/employee_performance_transform.py"
TRAINER_MODULE_FILE = "modules/employee_performance_trainer.py"
TUNER_MODULE_FILE = "modules/employee_performance_tuner.py"

# Requirement_file = os.path.join(root, "requirements.txt")

# Pipeline outputs
OUTPUT_BASE = "output"
serving_model_dir = os.path.join(OUTPUT_BASE, "serving_model")
pipeline_root = os.path.join(OUTPUT_BASE, PIPELINE_NAME)
metadata_path = os.path.join(pipeline_root, "metadata.sqlite")



# Run Pipeline

In [7]:

def init_local_pipeline(
        components, pipeline_root: Text
) -> pipeline.Pipeline:
    logging.info(f"Pipeline root set to: {pipeline_root}")
    beam_args = [
        "--direct_running_mode=multi_processing"
        # 0 auto-detect based on the number of CPUs available
        # during execution time.
        "---direct_num_workers=0"
    ]

    return pipeline.Pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=pipeline_root,
        components=components,
        enable_cache=True,
        metadata_connection_config=metadata.sqlite_metadata_connection_config(
            metadata_path
        ),
        eam_pipeline_args=beam_args
    )


if __name__ == "__main__":
    logging.set_verbosity(logging.INFO)

    from modules.components import init_components

    components = init_components(
        DATA_ROOT,
        training_module=TRAINER_MODULE_FILE,
        transform_module=TRANSFORM_MODULE_FILE,
        tuning_module=TUNER_MODULE_FILE,
        training_steps=5000,
        eval_steps=1000,
        serving_model_dir=serving_model_dir,
    )

    pipeline = init_local_pipeline(components, pipeline_root)
    BeamDagRunner().run(pipeline=pipeline)


Trial 18 Complete [00h 00m 40s]
val_sparse_categorical_accuracy: 0.7825000286102295

Best val_sparse_categorical_accuracy So Far: 0.7827187776565552
Total elapsed time: 00h 07m 28s
INFO:tensorflow:Oracle triggered exit


INFO:tensorflow:Oracle triggered exit
INFO:absl:Finished tuning... Tuner ID: tuner0
INFO:absl:Best HyperParameters: {'space': [{'class_name': 'Int', 'config': {'name': 'fc_units', 'default': None, 'conditions': [], 'min_value': 16, 'max_value': 64, 'step': 16, 'sampling': None}}, {'class_name': 'Choice', 'config': {'name': 'lr', 'default': 0.01, 'conditions': [], 'values': [0.01, 0.001, 0.0001], 'ordered': True}}], 'values': {'fc_units': 16, 'lr': 0.0001, 'tuner/epochs': 3, 'tuner/initial_epoch': 0, 'tuner/bracket': 2, 'tuner/round': 0}}
INFO:absl:Best Hyperparameters are written to output\wahyuazizi-pipeline\Tuner\best_hyperparameters\7\best_hyperparameters.txt.
INFO:absl:Tuner results are written to output\wahyuazizi-pipeline\Tuner\tuner_results\7\tuner_results.json.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 7 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'tuner_results': [Artifac

Results summary
Results in output\wahyuazizi-pipeline\Tuner\.system\executor_execution\7\.temp\7\kt_hyperband
Showing 10 best trials
<keras_tuner.engine.objective.Objective object at 0x000002552E7A7D00>
Trial summary
Hyperparameters:
fc_units: 16
lr: 0.0001
tuner/epochs: 3
tuner/initial_epoch: 0
tuner/bracket: 2
tuner/round: 0
Score: 0.7827187776565552
Trial summary
Hyperparameters:
fc_units: 16
lr: 0.0001
tuner/epochs: 9
tuner/initial_epoch: 3
tuner/bracket: 2
tuner/round: 1
tuner/trial_id: 0007
Score: 0.7826562523841858
Trial summary
Hyperparameters:
fc_units: 64
lr: 0.0001
tuner/epochs: 3
tuner/initial_epoch: 0
tuner/bracket: 2
tuner/round: 0
Score: 0.7826250195503235
Trial summary
Hyperparameters:
fc_units: 48
lr: 0.0001
tuner/epochs: 3
tuner/initial_epoch: 0
tuner/bracket: 2
tuner/round: 0
Score: 0.7825937271118164
Trial summary
Hyperparameters:
fc_units: 64
lr: 0.0001
tuner/epochs: 9
tuner/initial_epoch: 3
tuner/bracket: 2
tuner/round: 1
tuner/trial_id: 0011
Score: 0.782593727111

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:node Tuner is finished.
INFO:absl:node Trainer is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.trainer.component.Trainer"
    base_type: TRAIN
  }
  id: "Trainer"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "wahyuazizi-pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "20250320-112345.778709"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "wahyuazizi-pipeline.Trainer"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "Transform"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
    

Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 BusinessUnit_xf (InputLayer)   [(None, 11)]         0           []                               
                                                                                                  
 EmployeeStatus_xf (InputLayer)  [(None, 3)]         0           []                               
                                                                                                  
 EmployeeType_xf (InputLayer)   [(None, 4)]          0           []                               
                                                                                                  
 PayZone_xf (InputLayer)        [(None, 4)]          0           []                               
                                                                                            

  output, from_logits = _get_logits(


Epoch 1: val_sparse_categorical_accuracy improved from -inf to 0.76500, saving model to output\wahyuazizi-pipeline\Trainer\model\8\Format-Serving




INFO:tensorflow:Assets written to: output\wahyuazizi-pipeline\Trainer\model\8\Format-Serving\assets


INFO:tensorflow:Assets written to: output\wahyuazizi-pipeline\Trainer\model\8\Format-Serving\assets


Epoch 2/15
Epoch 2: val_sparse_categorical_accuracy did not improve from 0.76500
Epoch 3/15
Epoch 3: val_sparse_categorical_accuracy did not improve from 0.76500
Epoch 4/15
Epoch 4: val_sparse_categorical_accuracy did not improve from 0.76500
Epoch 5/15
Epoch 5: val_sparse_categorical_accuracy did not improve from 0.76500
Epoch 6/15
Epoch 6: val_sparse_categorical_accuracy did not improve from 0.76500
Epoch 7/15
Epoch 7: val_sparse_categorical_accuracy did not improve from 0.76500
Epoch 8/15
Epoch 8: val_sparse_categorical_accuracy did not improve from 0.76500
Epoch 9/15
Epoch 9: val_sparse_categorical_accuracy did not improve from 0.76500
Epoch 10/15
Epoch 10: val_sparse_categorical_accuracy did not improve from 0.76500
Epoch 11/15
Epoch 11: val_sparse_categorical_accuracy did not improve from 0.76500
Epoch 11: early stopping
INFO:tensorflow:struct2tensor is not available.


INFO:tensorflow:struct2tensor is not available.


INFO:tensorflow:tensorflow_decision_forests is not available.


INFO:tensorflow:tensorflow_decision_forests is not available.


INFO:tensorflow:tensorflow_text is not available.


INFO:tensorflow:tensorflow_text is not available.


INFO:tensorflow:Assets written to: output\wahyuazizi-pipeline\Trainer\model\8\Format-Serving\assets


INFO:tensorflow:Assets written to: output\wahyuazizi-pipeline\Trainer\model\8\Format-Serving\assets
INFO:absl:Training complete. Model written to output\wahyuazizi-pipeline\Trainer\model\8\Format-Serving. ModelRun written to output\wahyuazizi-pipeline\Trainer\model_run\8
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 8 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'model': [Artifact(artifact: uri: "output\\wahyuazizi-pipeline\\Trainer\\model\\8"
, artifact_type: name: "Model"
base_type: MODEL
)], 'model_run': [Artifact(artifact: uri: "output\\wahyuazizi-pipeline\\Trainer\\model_run\\8"
, artifact_type: name: "ModelRun"
)]}) for execution 8
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:node Trainer is finished.
INFO:absl:node Evaluator is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.evaluator.component.Evaluator"
    base_type: 



INFO:absl:The 'example_splits' parameter is not set, using 'eval' split.
INFO:absl:Evaluating model.
INFO:absl:udf_utils.get_fn {'example_splits': 'null', 'eval_config': '{\n  "metrics_specs": [\n    {\n      "metrics": [\n        {\n          "class_name": "AUC"\n        },\n        {\n          "class_name": "Precision"\n        },\n        {\n          "class_name": "Recall"\n        },\n        {\n          "class_name": "ExampleCount"\n        },\n        {\n          "class_name": "SparseCategoricalAccuracy",\n          "threshold": {\n            "change_threshold": {\n              "absolute": 0.0001,\n              "direction": "HIGHER_IS_BETTER"\n            },\n            "value_threshold": {\n              "lower_bound": 0.5\n            }\n          }\n        }\n      ]\n    }\n  ],\n  "model_specs": [\n    {\n      "label_key": "PerformanceScore"\n    }\n  ],\n  "slicing_specs": [\n    {},\n    {\n      "feature_keys": [\n        "EmployeeClassificationType",\n        "



























INFO:absl:Evaluation complete. Results written to output\wahyuazizi-pipeline\Evaluator\evaluation\9.
INFO:absl:Checking validation results.
INFO:absl:Blessing result True written to output\wahyuazizi-pipeline\Evaluator\blessing\9.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 9 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'evaluation': [Artifact(artifact: uri: "output\\wahyuazizi-pipeline\\Evaluator\\evaluation\\9"
, artifact_type: name: "ModelEvaluation"
)], 'blessing': [Artifact(artifact: uri: "output\\wahyuazizi-pipeline\\Evaluator\\blessing\\9"
, artifact_type: name: "ModelBlessing"
)]}) for execution 9
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:node Evaluator is finished.
INFO:absl:node Pusher is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.pusher.component.Pusher"
    base_type: DEPLOY
  }
  id: "Pusher"
}
contexts {