# Local_Pipeline notebook

Notebook ini digunakan untuk menjalankan pipeline orchestrator menggunakan Apache Beam dari seluruh module yang telah dibuat pada **notebook.ipynb**. Hasil dari run notebook ini akan menghasilkan komponen pipeline dan juga serving_model yang terdapat pada folder output

Code di bawah ini dibuat untuk membuat function yang akan menghasilkan sebuah pipeline yang dijalankan menggunakan Apache Beam 

In [1]:
import os
import sys
from typing import Text
 
from absl import logging
from tfx.orchestration import metadata, pipeline
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner
 
PIPELINE_NAME = "tasyaputrialiya-pipeline"
 
# pipeline inputs
DATA_ROOT = "data"
TRANSFORM_MODULE_FILE = "modules/plant_growth_transform.py"
TRAINER_MODULE_FILE = "modules/plant_growth_trainer.py"
TUNER_MODULE_FILE = "modules/plant_growth_tuner.py"
# requirement_file = os.path.join(root, "requirements.txt")
 
# pipeline outputs
OUTPUT_BASE = "output"
serving_model_dir = os.path.join(OUTPUT_BASE, 'serving_model')
pipeline_root = os.path.join(OUTPUT_BASE, PIPELINE_NAME)
metadata_path = os.path.join(pipeline_root, "metadata.sqlite")

def init_local_pipeline(
    components, pipeline_root: Text
) -> pipeline.Pipeline:
    
    logging.info(f"Pipeline root set to: {pipeline_root}")
    beam_args = [
        "--direct_running_mode=multi_processing"
        # 0 auto-detect based on on the number of CPUs available 
        # during execution time.
        "----direct_num_workers=0" 
    ]
    
    return pipeline.Pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=pipeline_root,
        components=components,
        enable_cache=True,
        metadata_connection_config=metadata.sqlite_metadata_connection_config(
            metadata_path
        ),
        eam_pipeline_args=beam_args
    )

if __name__ == "__main__":
    logging.set_verbosity(logging.INFO)
    
    from modules.components import init_components
    
    components = init_components(
        DATA_ROOT,
        training_module=TRAINER_MODULE_FILE,
        transform_module=TRANSFORM_MODULE_FILE,
        tuner_module=TUNER_MODULE_FILE,
        training_steps=5000,
        eval_steps=1000,
        serving_model_dir=serving_model_dir,
    )
    
    pipeline = init_local_pipeline(components, pipeline_root)
    BeamDagRunner().run(pipeline=pipeline)

Trial 30 Complete [00h 00m 01s]
val_binary_accuracy: 0.5121951103210449

Best val_binary_accuracy So Far: 0.6585366129875183
Total elapsed time: 00h 00m 42s
INFO:tensorflow:Oracle triggered exit


INFO:tensorflow:Oracle triggered exit
INFO:absl:Finished tuning... Tuner ID: tuner0
INFO:absl:Best HyperParameters: {'space': [{'class_name': 'Int', 'config': {'name': 'units_1', 'default': None, 'conditions': [], 'min_value': 32, 'max_value': 128, 'step': 32, 'sampling': None}}, {'class_name': 'Float', 'config': {'name': 'dropout_1', 'default': 0.0, 'conditions': [], 'min_value': 0.0, 'max_value': 0.5, 'step': 0.1, 'sampling': None}}, {'class_name': 'Int', 'config': {'name': 'units_2', 'default': None, 'conditions': [], 'min_value': 16, 'max_value': 64, 'step': 16, 'sampling': None}}, {'class_name': 'Float', 'config': {'name': 'dropout_2', 'default': 0.0, 'conditions': [], 'min_value': 0.0, 'max_value': 0.5, 'step': 0.1, 'sampling': None}}, {'class_name': 'Choice', 'config': {'name': 'learning_rate', 'default': 0.01, 'conditions': [], 'values': [0.01, 0.001, 0.0001], 'ordered': True}}], 'values': {'units_1': 64, 'dropout_1': 0.5, 'units_2': 64, 'dropout_2': 0.2, 'learning_rate': 0.000

Results summary
Results in output\tasyaputrialiya-pipeline\Tuner\.system\executor_execution\28\.temp\28\kt_hyperband
Showing 10 best trials
<keras_tuner.engine.objective.Objective object at 0x0000024A969F46A0>
Trial summary
Hyperparameters:
units_1: 64
dropout_1: 0.5
units_2: 64
dropout_2: 0.2
learning_rate: 0.0001
tuner/epochs: 2
tuner/initial_epoch: 0
tuner/bracket: 2
tuner/round: 0
Score: 0.6585366129875183
Trial summary
Hyperparameters:
units_1: 128
dropout_1: 0.0
units_2: 48
dropout_2: 0.4
learning_rate: 0.0001
tuner/epochs: 10
tuner/initial_epoch: 4
tuner/bracket: 1
tuner/round: 1
tuner/trial_id: 0019
Score: 0.6097561120986938
Trial summary
Hyperparameters:
units_1: 32
dropout_1: 0.30000000000000004
units_2: 32
dropout_2: 0.4
learning_rate: 0.0001
tuner/epochs: 10
tuner/initial_epoch: 0
tuner/bracket: 0
tuner/round: 0
Score: 0.6097561120986938
Trial summary
Hyperparameters:
units_1: 64
dropout_1: 0.5
units_2: 64
dropout_2: 0.2
learning_rate: 0.0001
tuner/epochs: 4
tuner/initial_e

INFO:absl:Best Hyperparameters are written to output\tasyaputrialiya-pipeline\Tuner\best_hyperparameters\28\best_hyperparameters.txt.
INFO:absl:Tuner results are written to output\tasyaputrialiya-pipeline\Tuner\tuner_results\28\tuner_results.json.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 28 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'tuner_results': [Artifact(artifact: uri: "output\\tasyaputrialiya-pipeline\\Tuner\\tuner_results\\28"
, artifact_type: name: "TunerResults"
)], 'best_hyperparameters': [Artifact(artifact: uri: "output\\tasyaputrialiya-pipeline\\Tuner\\best_hyperparameters\\28"
, artifact_type: name: "HyperParameters"
)]}) for execution 28
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:node Tuner is finished.
INFO:absl:node Evaluator is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.evaluator.component.Evaluat



INFO:absl:Using output\tasyaputrialiya-pipeline\Trainer\model\7\Format-Serving as baseline model.




INFO:absl:The 'example_splits' parameter is not set, using 'eval' split.
INFO:absl:Evaluating model.
INFO:absl:udf_utils.get_fn {'eval_config': '{\n  "metrics_specs": [\n    {\n      "metrics": [\n        {\n          "class_name": "ExampleCount"\n        },\n        {\n          "class_name": "AUC"\n        },\n        {\n          "class_name": "FalsePositives"\n        },\n        {\n          "class_name": "TruePositives"\n        },\n        {\n          "class_name": "FalseNegatives"\n        },\n        {\n          "class_name": "TrueNegatives"\n        },\n        {\n          "class_name": "BinaryAccuracy",\n          "threshold": {\n            "change_threshold": {\n              "absolute": 0.0001,\n              "direction": "HIGHER_IS_BETTER"\n            },\n            "value_threshold": {\n              "lower_bound": 0.5\n            }\n          }\n        }\n      ]\n    }\n  ],\n  "model_specs": [\n    {\n      "label_key": "Growth_Milestone"\n    }\n  ],\n  "slic































































INFO:absl:Evaluation complete. Results written to output\tasyaputrialiya-pipeline\Evaluator\evaluation\29.
INFO:absl:Checking validation results.


Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`


Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`
INFO:absl:Blessing result False written to output\tasyaputrialiya-pipeline\Evaluator\blessing\29.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 29 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'evaluation': [Artifact(artifact: uri: "output\\tasyaputrialiya-pipeline\\Evaluator\\evaluation\\29"
, artifact_type: name: "ModelEvaluation"
)], 'blessing': [Artifact(artifact: uri: "output\\tasyaputrialiya-pipeline\\Evaluator\\blessing\\29"
, artifact_type: name: "ModelBlessing"
)]}) for execution 29
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:node Evaluator is finished.
INFO:absl:node Pusher is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.pusher.component.Pusher"
    base_type: DEPLOY
  }
  id: "Pusher"
}
contexts {
  contexts {
    type {
      nam