# Local Pipeline Module

## Import Library

In [2]:
import os
from typing import Text

from absl import logging
from tfx.orchestration import metadata, pipeline
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner



## Set Variable

In [3]:
PIPELINE_NAME = "hate-pipeline"

# Pipeline inputs
DATA_ROOT = "data"
TRANSFORM_MODULE_FILE = "modules/hate_transform.py"
TRAINER_MODULE_FILE = "modules/hate_trainer.py"

# Pipeline outputs
OUTPUT_BASE = "output"
SERVING_MODEL_DIR = os.path.join(OUTPUT_BASE, 'serving_model')
PIPELINE_ROOT = os.path.join(OUTPUT_BASE, PIPELINE_NAME)
METADATA_PATH = os.path.join(PIPELINE_ROOT, "metadata.sqlite")

# Combine TFX Components

In [4]:
def init_local_pipeline(
    components, pipeline_root: Text
) -> pipeline.Pipeline:
    """Combine TFX Components"""

    logging.info(f"Pipeline root set to: {pipeline_root}")
    beam_args = [
        "--direct_running_mode=multi_processing"
        # 0 auto-detect based on on the number of CPUs available
        # during execution time.
        "----direct_num_workers=0"
    ]

    return pipeline.Pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=pipeline_root,
        components=components,
        enable_cache=True,
        metadata_connection_config=metadata.sqlite_metadata_connection_config(
            METADATA_PATH
        ),
        eam_pipeline_args=beam_args
    )

In [6]:
if __name__ == "__main__":
    logging.set_verbosity(logging.INFO)

    from modules.components import init_components

    components = init_components(
        DATA_ROOT,
        training_module=TRAINER_MODULE_FILE,
        transform_module=TRANSFORM_MODULE_FILE,
        training_steps=5000,
        eval_steps=1000,
        serving_model_dir=SERVING_MODEL_DIR,
    )

    pipeline_instance = init_local_pipeline(components, PIPELINE_ROOT)
    BeamDagRunner().run(pipeline=pipeline_instance)

INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Pipeline root set to: output\hate-pipeline
INFO:absl:Generating ephemeral wheel package for 'c:\\Users\\Fira\\Documents\\SEMESTER6\\BANGKIT2024\\Hate-CCPipeline - Copy\\modules\\hate_transform.py' (including modules: ['components', 'hate_trainer', 'hate_transform', 'local_pipeline']).
INFO:absl:User module package has hash fingerprint version 884fa90d86762b269152fc6e6180dbe9b4024c227b67386995ca17f5940e0941.
INFO:absl:Executing: ['c:\\Users\\Fira\\miniconda3\\envs\\ccmodel\\python.exe', 'C:\\Users\\Fira\\AppData\\Local\\Temp\\tmphkrmxg38\\_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', 'C:\\Users\\Fira\\AppData\\Local\\Temp\\tmpxlzqk7d9', '--dist-dir', 'C:\\Users\\Fira\\AppData\\Local\\Temp\\tmplzc5f1sa']
INFO:absl:Successfully built user code wheel distribution at 'output\\hat

INFO:absl:Node CsvExampleGen depends on [].
INFO:absl:Node CsvExampleGen is scheduled.
INFO:absl:Node Latest_blessed_model_resolver depends on [].
INFO:absl:Node Latest_blessed_model_resolver is scheduled.
INFO:absl:Node StatisticsGen depends on ['Run[CsvExampleGen]'].
INFO:absl:Node StatisticsGen is scheduled.
INFO:absl:Node SchemaGen depends on ['Run[StatisticsGen]'].
INFO:absl:Node SchemaGen is scheduled.
INFO:absl:Node ExampleValidator depends on ['Run[SchemaGen]', 'Run[StatisticsGen]'].
INFO:absl:Node ExampleValidator is scheduled.
INFO:absl:Node Transform depends on ['Run[CsvExampleGen]', 'Run[SchemaGen]'].
INFO:absl:Node Transform is scheduled.
INFO:absl:Node Trainer depends on ['Run[SchemaGen]', 'Run[Transform]'].
INFO:absl:Node Trainer is scheduled.
INFO:absl:Node Evaluator depends on ['Run[CsvExampleGen]', 'Run[Latest_blessed_model_resolver]', 'Run[Trainer]'].
INFO:absl:Node Evaluator is scheduled.
INFO:absl:Node Pusher depends on ['Run[Evaluator]', 'Run[Trainer]'].
INFO:absl

Instructions for updating:
Use ref() instead.


Instructions for updating:
Use ref() instead.
INFO:absl:Feature recalled has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature text has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature recalled has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature text has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature recalled has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature text has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature recalled has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature text has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature recalled has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature text has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature recalled has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature text has a shape dim {
  size: 1
}
. Settin

INFO:tensorflow:Assets written to: output\hate-pipeline\Transform\transform_graph\6\.temp_path\tftransform_tmp\e146a1024ac34d85aed2848626cb6014\assets


INFO:tensorflow:Assets written to: output\hate-pipeline\Transform\transform_graph\6\.temp_path\tftransform_tmp\e146a1024ac34d85aed2848626cb6014\assets


INFO:tensorflow:struct2tensor is not available.


INFO:tensorflow:struct2tensor is not available.


INFO:tensorflow:tensorflow_decision_forests is not available.


INFO:tensorflow:tensorflow_decision_forests is not available.


INFO:tensorflow:tensorflow_text is not available.


INFO:tensorflow:tensorflow_text is not available.


INFO:tensorflow:struct2tensor is not available.


INFO:tensorflow:struct2tensor is not available.


INFO:tensorflow:tensorflow_decision_forests is not available.


INFO:tensorflow:tensorflow_decision_forests is not available.


INFO:tensorflow:tensorflow_text is not available.


INFO:tensorflow:tensorflow_text is not available.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 6 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'post_transform_schema': [Artifact(artifact: uri: "output\\hate-pipeline\\Transform\\post_transform_schema\\6"
, artifact_type: name: "Schema"
)], 'post_transform_stats': [Artifact(artifact: uri: "output\\hate-pipeline\\Transform\\post_transform_stats\\6"
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
base_type: STATISTICS
)], 'post_transform_anomalies': [Artifact(artifact: uri: "output\\hate-pipeline\\Transform\\post_transform_anomalies\\6"
, artifact_type: name: "ExampleAnomalies"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
)], 'pre_transform_schema': [Artifact(artifact: uri: "output\\hate-pipeline\\Transf

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 text_xf (InputLayer)        [(None, 1)]               0         
                                                                 
 tf.reshape (TFOpLambda)     (None,)                   0         
                                                                 
 text_vectorization (TextVec  (None, 100)              0         
 torization)                                                     
                                                                 
 embedding (Embedding)       (None, 100, 16)           160000    
                                                                 
 global_average_pooling1d (G  (None, 16)               0         
 lobalAveragePooling1D)                                          
                                                                 
 dense (Dense)               (None, 64)                1088  








Epoch 1: val_binary_accuracy improved from -inf to 0.90099, saving model to output\hate-pipeline\Trainer\model\7\Format-Serving




INFO:tensorflow:Assets written to: output\hate-pipeline\Trainer\model\7\Format-Serving\assets


INFO:tensorflow:Assets written to: output\hate-pipeline\Trainer\model\7\Format-Serving\assets


INFO:tensorflow:struct2tensor is not available.


INFO:tensorflow:struct2tensor is not available.


INFO:tensorflow:tensorflow_decision_forests is not available.


INFO:tensorflow:tensorflow_decision_forests is not available.


INFO:tensorflow:tensorflow_text is not available.


INFO:tensorflow:tensorflow_text is not available.


INFO:tensorflow:Assets written to: output\hate-pipeline\Trainer\model\7\Format-Serving\assets


INFO:tensorflow:Assets written to: output\hate-pipeline\Trainer\model\7\Format-Serving\assets
INFO:absl:Training complete. Model written to output\hate-pipeline\Trainer\model\7\Format-Serving. ModelRun written to output\hate-pipeline\Trainer\model_run\7
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 7 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'model_run': [Artifact(artifact: uri: "output\\hate-pipeline\\Trainer\\model_run\\7"
, artifact_type: name: "ModelRun"
)], 'model': [Artifact(artifact: uri: "output\\hate-pipeline\\Trainer\\model\\7"
, artifact_type: name: "Model"
base_type: MODEL
)]}) for execution 7
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:node Trainer is finished.
INFO:absl:node Evaluator is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.evaluator.component.Evaluator"
    base_type: EVALUATE
  }
  id: "Evaluator"



INFO:absl:The 'example_splits' parameter is not set, using 'eval' split.
INFO:absl:Evaluating model.
INFO:absl:udf_utils.get_fn {'example_splits': 'null', 'fairness_indicator_thresholds': 'null', 'eval_config': '{\n  "metrics_specs": [\n    {\n      "metrics": [\n        {\n          "class_name": "ExampleCount"\n        },\n        {\n          "class_name": "AUC"\n        },\n        {\n          "class_name": "FalsePositives"\n        },\n        {\n          "class_name": "TruePositives"\n        },\n        {\n          "class_name": "FalseNegatives"\n        },\n        {\n          "class_name": "TrueNegatives"\n        },\n        {\n          "class_name": "BinaryAccuracy",\n          "threshold": {\n            "change_threshold": {\n              "absolute": 0.0001,\n              "direction": "HIGHER_IS_BETTER"\n            },\n            "value_threshold": {\n              "lower_bound": 0.5\n            }\n          }\n        }\n      ]\n    }\n  ],\n  "model_specs": [\



























INFO:absl:Evaluation complete. Results written to output\hate-pipeline\Evaluator\evaluation\8.
INFO:absl:Checking validation results.


Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`


Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`
INFO:absl:Blessing result True written to output\hate-pipeline\Evaluator\blessing\8.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 8 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'blessing': [Artifact(artifact: uri: "output\\hate-pipeline\\Evaluator\\blessing\\8"
, artifact_type: name: "ModelBlessing"
)], 'evaluation': [Artifact(artifact: uri: "output\\hate-pipeline\\Evaluator\\evaluation\\8"
, artifact_type: name: "ModelEvaluation"
)]}) for execution 8
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:node Evaluator is finished.
INFO:absl:node Pusher is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.pusher.component.Pusher"
    base_type: DEPLOY
  }
  id: "Pusher"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      fi