In [None]:
!pip install tfx
!pip install autopep8
!pip install pylint

In [2]:
!mkdir modules

In [3]:
COMPONENTS_FILE = "modules/components.py"
PIPELINE_FILE = "modules/pipeline.py"

TRANSFORM_MODULE_FILE = "modules/transform.py"
TRAINER_MODULE_FILE =  "modules/trainer.py"

In [4]:
%%writefile {COMPONENTS_FILE}
import os
import tensorflow_model_analysis as tfma

from tfx.components import (
    CsvExampleGen,
    StatisticsGen,
    SchemaGen,
    ExampleValidator,
    Transform,
    Trainer,
    Evaluator,
    Pusher
)
from tfx.proto import example_gen_pb2, trainer_pb2, pusher_pb2
from tfx.types import Channel
from tfx.dsl.components.common.resolver import Resolver
from tfx.types.standard_artifacts import Model, ModelBlessing
from tfx.dsl.input_resolution.strategies.latest_blessed_model_strategy import (
    LatestBlessedModelStrategy
)


def init_components(
    data_dir,
    transform_module,
    training_module,
    training_steps,
    eval_steps,
    serving_model_dir,
):
    """
    Initializes TFX components required for building a pipeline for training and deploying a model.

    Args:
        data_dir (str): The directory containing the input data.
        transform_module (str): The path to the module containing the transformation logic.
        training_module (str): The path to the module containing the training logic.
        training_steps (int): The number of training steps.
        eval_steps (int): The number of evaluation steps.
        serving_model_dir (str): The directory where the trained model will be exported for serving.
    """

    output = example_gen_pb2.Output(
        split_config=example_gen_pb2.SplitConfig(splits=[
            example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=8),
            example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=2)
        ])
    )

    example_gen = CsvExampleGen(
        input_base=data_dir,
        output_config=output
    )

    statistics_gen = StatisticsGen(
        examples=example_gen.outputs['examples']
    )

    schema_gen = SchemaGen(
        statistics=statistics_gen.outputs["statistics"]
    )

    example_validator = ExampleValidator(
        statistics=statistics_gen.outputs['statistics'],
        schema=schema_gen.outputs['schema']
    )

    transform = Transform(
        examples=example_gen.outputs['examples'],
        schema=schema_gen.outputs['schema'],
        module_file=os.path.abspath(transform_module)
    )

    trainer = Trainer(
        module_file=os.path.abspath(training_module),
        examples=transform.outputs['transformed_examples'],
        transform_graph=transform.outputs['transform_graph'],
        schema=schema_gen.outputs['schema'],
        train_args=trainer_pb2.TrainArgs(
            splits=['train'],
            num_steps=training_steps
        ),
        eval_args=trainer_pb2.EvalArgs(
            splits=['eval'],
            num_steps=eval_steps
        )
    )

    model_resolver = Resolver(
        strategy_class=LatestBlessedModelStrategy,
        model=Channel(type=Model),
        model_blessing=Channel(type=ModelBlessing)
    ).with_id('Latest_blessed_model_resolver')

    eval_config = tfma.EvalConfig(
        model_specs=[tfma.ModelSpec(label_key='Liked')],
        slicing_specs=[tfma.SlicingSpec()],
        metrics_specs=[
            tfma.MetricsSpec(metrics=[
                tfma.MetricConfig(class_name='ExampleCount'),
                tfma.MetricConfig(class_name='AUC'),
                tfma.MetricConfig(class_name='FalsePositives'),
                tfma.MetricConfig(class_name='TruePositives'),
                tfma.MetricConfig(class_name='FalseNegatives'),
                tfma.MetricConfig(class_name='TrueNegatives'),
                tfma.MetricConfig(class_name='BinaryAccuracy',
                    threshold=tfma.MetricThreshold(
                        value_threshold=tfma.GenericValueThreshold(
                            lower_bound={'value': 0.5}
                        ),
                        change_threshold=tfma.GenericChangeThreshold(
                            direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                            absolute={'value': 0.0001}
                        )
                    )
                )
            ])
        ]
    )

    evaluator = Evaluator(
        examples=example_gen.outputs['examples'],
        model=trainer.outputs['model'],
        baseline_model=model_resolver.outputs['model'],
        eval_config=eval_config
    )

    pusher = Pusher(
        model=trainer.outputs["model"],
        model_blessing=evaluator.outputs["blessing"],
        push_destination=pusher_pb2.PushDestination(
            filesystem=pusher_pb2.PushDestination.Filesystem(
                base_directory=serving_model_dir
            )
        ),
    )

    components = (
        example_gen,
        statistics_gen,
        schema_gen,
        example_validator,
        transform,
        trainer,
        model_resolver,
        evaluator,
        pusher
    )

    return components

Writing modules/components.py


In [5]:
%%writefile {PIPELINE_FILE}
import os
from typing import Text

from absl import logging
from tfx.orchestration import metadata, pipeline
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner

PIPELINE_NAME = 'restaurant-reviews-pipeline'

DATA_ROOT = 'data'
TRANSFORM_MODULE_FILE = 'modules/transform.py'
TRAINER_MODULE_FILE = 'modules/trainer.py'

OUTPUT_BASE = 'output'
serving_model_dir = os.path.join(OUTPUT_BASE, 'serving_model')
pipeline_root = os.path.join(OUTPUT_BASE, PIPELINE_NAME)
metadata_path = os.path.join(pipeline_root, 'metadata.sqlite')


def init_local_pipeline(
    components, pipeline_root: Text
) -> pipeline.Pipeline:
    """
    Initialize a local TFX pipeline.

    Args:
        components: A dictionary of TFX components to be included in the pipeline.
        pipeline_root: Root directory for pipeline output artifacts.

    Returns:
        A TFX pipeline.
    """
    logging.info(f'Pipeline root set to: {pipeline_root}')
    beam_args = [
        '--direct_running_mode=multi_processing'
        # 0 auto-detect based on the number of CPUs available
        # duraing execution time
        '----direct_num_workers=0'
    ]

    return pipeline.Pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=pipeline_root,
        components=components,
        enable_cache=True,
        metadata_connection_config=metadata.sqlite_metadata_connection_config(
            metadata_path
        ),
        eam_pipeline_args=beam_args
    )


if __name__ == '__main__':
    logging.set_verbosity(logging.INFO)

    from modules.components import init_components
    components = init_components(
        DATA_ROOT,
        training_module=TRAINER_MODULE_FILE,
        transform_module=TRANSFORM_MODULE_FILE,
        training_steps=5000,
        eval_steps=1000,
        serving_model_dir=serving_model_dir,
    )

    pipeline = init_local_pipeline(components, pipeline_root)
    BeamDagRunner().run(pipeline=pipeline)

Writing modules/pipeline.py


In [6]:
%%writefile {TRANSFORM_MODULE_FILE}

"""
This module contains functions for transforming restaurant reviews data.
"""

import tensorflow as tf

LABEL_KEY_NEW = "Liked"
FEATURE_KEY_NEW = "Review"

def transformed_name(key):
    """
    Transform the given key.

    Args:
        key (str): Input key to transform.

    Returns:
        str: Transformed key.
    """
    return key + "_xf"

def preprocessing_fn(inputs):
    """
    Preprocess input data.

    Args:
        inputs (dict): Input data dictionary containing 'Liked' and 'Review' keys.

    Returns:
        dict: Transformed output data dictionary.
    """
    outputs = {}
    outputs[transformed_name(LABEL_KEY_NEW)] = tf.cast(inputs[LABEL_KEY_NEW], tf.int64)
    outputs[transformed_name(FEATURE_KEY_NEW)] = tf.strings.lower(inputs[FEATURE_KEY_NEW])
    return outputs

Writing modules/transform.py


In [7]:
%%writefile {TRAINER_MODULE_FILE}

"""
This module contains functions for training a stress model using TensorFlow and TensorFlow Transform.
"""

import os
import tensorflow as tf
import tensorflow_transform as tft
from tensorflow.keras import layers
from tfx.components.trainer.fn_args_utils import FnArgs

# Define constants
LABEL_KEY = "Liked"
FEATURE_KEY = "Review"
EMBEDDING_DIM = 16

# Function to rename transformed features
def transformed_name(key):
    """
    Transform the given key.

    Args:
        key (str): Input key to transform.

    Returns:
        str: Transformed key.
    """
    return key + "_xf"

# Function to read data from compressed TFRecord files
def gzip_reader_fn(filenames):
    return tf.data.TFRecordDataset(filenames, compression_type='GZIP')

# Input function to create transformed features and batch data
def input_fn(file_pattern, tf_transform_output, num_epochs, batch_size=64):
    """
    Create input function for training data.

    Args:
        file_pattern (str): File pattern for input data.
        tf_transform_output (tensorflow_transform.TFTransformOutput): TensorFlow Transform output.
        num_epochs (int): Number of epochs.
        batch_size (int): Batch size.

    Returns:
        tf.data.Dataset: Input dataset.
    """
    transform_feature_spec = tf_transform_output.transformed_feature_spec().copy()
    dataset = tf.data.experimental.make_batched_features_dataset(
        file_pattern=file_pattern,
        batch_size=batch_size,
        features=transform_feature_spec,
        reader=gzip_reader_fn,
        num_epochs=num_epochs,
        label_key=transformed_name(LABEL_KEY)
    )
    return dataset

# Text vectorization layer for tokenization and data standardization
vectorize_layer = layers.TextVectorization(
    standardize="lower_and_strip_punctuation",
    max_tokens=10000,
    output_mode='int',
    output_sequence_length=100
)

# Function to build the machine learning model
def model_builder():
    """
    Build the machine learning model.

    Returns:
        tf.keras.Model: Compiled Keras model.
    """
    inputs = tf.keras.Input(shape=(1,), name=transformed_name(FEATURE_KEY), dtype=tf.string)
    reshaped_input = tf.reshape(inputs, [-1])
    x = vectorize_layer(reshaped_input)
    x = layers.Embedding(10000, EMBEDDING_DIM, name="embedding")(x)
    x = layers.GlobalAveragePooling1D()(x)
    x = layers.Dense(64, activation="relu")(x)
    x = layers.Dense(32, activation="relu")(x)
    outputs = layers.Dense(1, activation="sigmoid")(x)

    model = tf.keras.Model(inputs=inputs, outputs=outputs)

    model.compile(
        loss='binary_crossentropy',
        optimizer=tf.keras.optimizers.Adam(0.01),
        metrics=[tf.keras.metrics.BinaryAccuracy()]
    )

    model.summary()
    return model

# Function to preprocess raw request data for deployment
def _get_serve_tf_examples_fn(model, tf_transform_output):
    """
    Get serving function for TensorFlow Serving.

    Args:
        model (tf.keras.Model): Trained Keras model.
        tf_transform_output (tensorflow_transform.TFTransformOutput): TensorFlow Transform output.

    Returns:
        Callable: Serve function for TensorFlow Serving.
    """
    model.tft_layer = tf_transform_output.transform_features_layer()

    @tf.function
    def serve_tf_examples_fn(serialized_tf_examples):
        feature_spec = tf_transform_output.raw_feature_spec()
        feature_spec.pop(LABEL_KEY)
        parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
        transformed_features = model.tft_layer(parsed_features)
        return model(transformed_features)

    return serve_tf_examples_fn

# Function to run the training process
def run_fn(fn_args: FnArgs) -> None:
    """
    Run the training process.

    Args:
        fn_args (FnArgs): Function arguments.
    """
    log_dir = os.path.join(os.path.dirname(fn_args.serving_model_dir), 'logs')

    tensorboard_callback = tf.keras.callbacks.TensorBoard(
        log_dir=log_dir, update_freq='batch'
    )

    es = tf.keras.callbacks.EarlyStopping(
        monitor='val_binary_accuracy', mode='max', verbose=1, patience=10
    )
    mc = tf.keras.callbacks.ModelCheckpoint(
        fn_args.serving_model_dir, monitor='val_binary_accuracy', mode='max', verbose=1, save_best_only=True
    )

    tf_transform_output = tft.TFTransformOutput(fn_args.transform_graph_path)

    train_set = input_fn(fn_args.train_files, tf_transform_output, 10)
    val_set = input_fn(fn_args.eval_files, tf_transform_output, 10)
    vectorize_layer.adapt(
        [j[0].numpy()[0] for j in [i[0][transformed_name(FEATURE_KEY)] for i in list(train_set)]]
    )

    model = model_builder()

    model.fit(
        x=train_set,
        validation_data=val_set,
        callbacks=[tensorboard_callback, es, mc],
        steps_per_epoch=1000,
        validation_steps=1000,
        epochs=10
    )

    signatures = {
        'serving_default':
        _get_serve_tf_examples_fn(model, tf_transform_output).get_concrete_function(
            tf.TensorSpec(
                shape=[None],
                dtype=tf.string,
                name='examples'
            )
        )
    }
    model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)

Writing modules/trainer.py


In [8]:
import os
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner
from modules.pipeline import init_local_pipeline
from modules.components import init_components

In [9]:
PIPELINE_NAME = 'restaurant-reviews-pipeline'

DATA_ROOT = 'data'
TRANSFORM_MODULE_FILE = 'modules/transform.py'
TRAINER_MODULE_FILE = 'modules/trainer.py'

OUTPUT_BASE = 'output'
serving_model_dir = os.path.join(OUTPUT_BASE, 'serving_model')
pipeline_root = os.path.join(OUTPUT_BASE, PIPELINE_NAME)
metadata_path = os.path.join(pipeline_root, 'metadata.sqlite')

In [10]:
components = init_components(
    data_dir=DATA_ROOT,
    transform_module=TRANSFORM_MODULE_FILE,
    training_module=TRAINER_MODULE_FILE,
    training_steps=5000,
    eval_steps=1000,
    serving_model_dir=serving_model_dir
)

pipeline = init_local_pipeline(components, pipeline_root)
BeamDagRunner().run(pipeline)



Instructions for updating:
Use `tf.data.Dataset.map(tf.io.parse_example(...))` instead.


Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 Review_xf (InputLayer)      [(None, 1)]               0         
                                                                 
 tf.reshape (TFOpLambda)     (None,)                   0         
                                                                 
 text_vectorization (TextVe  (None, 100)               0         
 ctorization)                                                    
                                                                 
 embedding (Embedding)       (None, 100, 16)           160000    
                                                                 
 global_average_pooling1d (  (None, 16)                0         
 GlobalAveragePooling1D)                                         
                                                                 
 dense (Dense)               (None, 64)                1088  




Epoch 1: val_binary_accuracy improved from -inf to 0.76119, saving model to output/restaurant-reviews-pipeline/Trainer/model/7/Format-Serving


Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`


# Refactoring Code dan Penilaian Kode

In [17]:
!autopep8 --in-place --aggressive --aggressive modules/components.py modules/pipeline.py modules/transform.py modules/trainer.py
!pylint modules/components.py modules/pipeline.py modules/transform.py modules/trainer.py

************* Module components
modules/components.py:1:0: C0114: Missing module docstring (missing-module-docstring)
modules/components.py:23:0: R0913: Too many arguments (6/5) (too-many-arguments)
modules/components.py:23:0: R0914: Too many local variables (18/15) (too-many-locals)
modules/components.py:43:13: E1101: Module 'tfx.proto.example_gen_pb2' has no 'Output' member (no-member)
modules/components.py:44:21: E1101: Module 'tfx.proto.example_gen_pb2' has no 'SplitConfig' member (no-member)
modules/components.py:45:12: E1101: Module 'tfx.proto.example_gen_pb2' has no 'SplitConfig' member (no-member)
modules/components.py:46:12: E1101: Module 'tfx.proto.example_gen_pb2' has no 'SplitConfig' member (no-member)
modules/components.py:79:19: E1101: Module 'tfx.proto.trainer_pb2' has no 'TrainArgs' member (no-member)
modules/components.py:83:18: E1101: Module 'tfx.proto.trainer_pb2' has no 'EvalArgs' member (no-member)
modules/components.py:131:25: E1101: Module 'tfx.proto.pusher_pb2' 

In [18]:
!pip freeze > requirements.txt

In [19]:
import os
os.listdir('/content')

['.config',
 'modules',
 'output',
 '.ipynb_checkpoints',
 'data',
 'requirements.txt',
 'sample_data']

In [20]:
!zip -r content_folder.zip /content

  adding: content/ (stored 0%)
  adding: content/.config/ (stored 0%)
  adding: content/.config/default_configs.db (deflated 98%)
  adding: content/.config/gce (stored 0%)
  adding: content/.config/config_sentinel (stored 0%)
  adding: content/.config/.last_survey_prompt.yaml (stored 0%)
  adding: content/.config/active_config (stored 0%)
  adding: content/.config/.last_opt_in_prompt.yaml (stored 0%)
  adding: content/.config/.last_update_check.json (deflated 22%)
  adding: content/.config/logs/ (stored 0%)
  adding: content/.config/logs/2024.04.18/ (stored 0%)
  adding: content/.config/logs/2024.04.18/13.24.29.138246.log (deflated 58%)
  adding: content/.config/logs/2024.04.18/13.24.58.719263.log (deflated 56%)
  adding: content/.config/logs/2024.04.18/13.24.39.243263.log (deflated 86%)
  adding: content/.config/logs/2024.04.18/13.24.02.104562.log (deflated 91%)
  adding: content/.config/logs/2024.04.18/13.24.47.706251.log (deflated 58%)
  adding: content/.config/logs/2024.04.18/13.24

In [23]:
from google.colab import files
files.download('content_folder.zip')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>