# Pipeline Orchestration

Nama    : Muhammad Ricky Rizaldi  
Email   : mrickyrizaldi@gmail.com

## Informasi Dataset

In [1]:
import pandas as pd

data = pd.read_csv("../data/diabetes.csv")
data.head()

Unnamed: 0,Pregnancies,Glucose,BloodPressure,SkinThickness,Insulin,BMI,DiabetesPedigreeFunction,Age,Outcome
0,6,148,72,35,0,33.6,0.627,50,1
1,1,85,66,29,0,26.6,0.351,31,0
2,8,183,64,0,0,23.3,0.672,32,1
3,1,89,66,23,94,28.1,0.167,21,0
4,0,137,40,35,168,43.1,2.288,33,1


In [2]:
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 768 entries, 0 to 767
Data columns (total 9 columns):
 #   Column                    Non-Null Count  Dtype  
---  ------                    --------------  -----  
 0   Pregnancies               768 non-null    int64  
 1   Glucose                   768 non-null    int64  
 2   BloodPressure             768 non-null    int64  
 3   SkinThickness             768 non-null    int64  
 4   Insulin                   768 non-null    int64  
 5   BMI                       768 non-null    float64
 6   DiabetesPedigreeFunction  768 non-null    float64
 7   Age                       768 non-null    int64  
 8   Outcome                   768 non-null    int64  
dtypes: float64(2), int64(7)
memory usage: 54.1 KB


In [3]:
data.describe()

Unnamed: 0,Pregnancies,Glucose,BloodPressure,SkinThickness,Insulin,BMI,DiabetesPedigreeFunction,Age,Outcome
count,768.0,768.0,768.0,768.0,768.0,768.0,768.0,768.0,768.0
mean,3.845052,120.894531,69.105469,20.536458,79.799479,31.992578,0.471876,33.240885,0.348958
std,3.369578,31.972618,19.355807,15.952218,115.244002,7.88416,0.331329,11.760232,0.476951
min,0.0,0.0,0.0,0.0,0.0,0.0,0.078,21.0,0.0
25%,1.0,99.0,62.0,0.0,0.0,27.3,0.24375,24.0,0.0
50%,3.0,117.0,72.0,23.0,30.5,32.0,0.3725,29.0,0.0
75%,6.0,140.25,80.0,32.0,127.25,36.6,0.62625,41.0,1.0
max,17.0,199.0,122.0,99.0,846.0,67.1,2.42,81.0,1.0


Dataset yang digunakan adalah dataset diabetes yang berisi data medis pasien dengan berbagai atribut seperti jumlah kehamilan (Pregnancies), kadar glukosa darah (Glucose), tekanan darah (BloodPressure), ketebalan kulit (SkinThickness), kadar insulin (Insulin), indeks massa tubuh (BMI), fungsi silsilah diabetes (DiabetesPedigreeFunction), dan usia (Age). Kolom Outcome merupakan label target dengan nilai 1 menandakan pasien terdiagnosis diabetes dan 0 menandakan tidak.

Sumber dataset: [Kaggle-Diabetes](https://www.kaggle.com/datasets/akshaydattatraykhare/diabetes-dataset/data)

Tujuan proyek ini adalah membangun model klasifikasi biner untuk memprediksi apakah seseorang berpotensi mengidap diabetes berdasarkan data medis tersebut. Solusi machine learning yang dikembangkan akan menggunakan pipeline TFX agar setiap tahap mulai dari data ingestion, preprocessing, training, hingga deployment dapat berjalan otomatis dan terstruktur. Target utama proyek ini adalah menghasilkan model yang akurat serta mudah direproduksi untuk evaluasi dan penerapan di lingkungan produksi.

## Import Library

In [4]:
import os
import tensorflow as tf
import logging
from absl import logging as absl_logging

# Komponen utama TFX
from tfx.components import (
    CsvExampleGen, StatisticsGen, SchemaGen, ExampleValidator,
    Transform, Trainer, Tuner, Evaluator, Pusher
)

# Orchestrator
from tfx.orchestration import pipeline as tfx_pipeline
from tfx.orchestration import metadata
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner

# (opsional) jika nanti mau bikin PipelineOptions sendiri
# import apache_beam as beam

# Resolver untuk latest blessed model
from tfx.dsl.components.common.resolver import Resolver
from tfx.dsl.input_resolution.strategies.latest_blessed_model_strategy import (
    LatestBlessedModelStrategy
)
from tfx.types import Channel
from tfx.types.standard_artifacts import Model, ModelBlessing

# Protos
from tfx.proto import example_gen_pb2
from tfx.proto import trainer_pb2
from tfx.proto import pusher_pb2

# Evaluasi model
import tensorflow_model_analysis as tfma

# Pengaturan logging
tf.get_logger().setLevel('ERROR')
absl_logging.set_verbosity(absl_logging.ERROR)
logging.getLogger('apache_beam').setLevel(logging.ERROR)


Pada bagian ini, saya memuat seluruh library yang dibutuhkan untuk membangun pipeline TFX secara menyeluruh. Komponen inti seperti CsvExampleGen, StatisticsGen, SchemaGen, Transform, Trainer, hingga Pusher digunakan untuk menjalankan setiap tahapan penting dalam alur machine learning, mulai dari membaca data mentah, menghasilkan statistik dan schema, melakukan preprocessing, melatih model, mengevaluasi performanya, hingga menyiapkan artefak model untuk proses deployment.

Saya juga mengimpor modul orkestrasi seperti tfx_pipeline, metadata, dan BeamDagRunner yang berfungsi sebagai eksekutor utama pipeline. Selain itu, Resolver dengan strategi LatestBlessedModelStrategy digunakan untuk memilih model terbaik yang sudah lolos proses evaluasi sebelumnya. Library tambahan seperti tfma membantu melakukan analisis evaluasi model yang lebih komprehensif. Pada akhir bagian, saya menyesuaikan level logging agar output notebook tetap bersih dan lebih mudah untuk dipantau.

## Set Variable

In [5]:
BASE_DIR = os.getcwd()   # Direktori kerja utama (lokasi notebook)
PROJECT_ROOT = os.path.dirname(BASE_DIR) # Direktori proyek (satu tingkat di atas)

# Nama komponen pipeline
PIPELINE_NAME = "mrickyr-pipeline"
SCHEMA_PIPELINE_NAME = "diabetes-tfdv-schema"

# Struktur direktori artefak pipeline
PIPELINE_ROOT = os.path.join(BASE_DIR, "pipelines", PIPELINE_NAME)
METADATA_PATH = os.path.join(BASE_DIR, "metadata", PIPELINE_NAME, "metadata.db")
SERVING_MODEL_DIR = os.path.join(BASE_DIR, "serving_model", PIPELINE_NAME)
MODULES_DIR = os.path.join(BASE_DIR, "modules")

# Direktori data
DATA_ROOT = os.path.join(PROJECT_ROOT, "data")

# Inisialisasi direktori yang diperlukan
for p in [
    PIPELINE_ROOT,
    os.path.dirname(METADATA_PATH),
    SERVING_MODEL_DIR,
    DATA_ROOT,
    MODULES_DIR
]:
    os.makedirs(p, exist_ok=True)

# Lokasi modul pipeline
TRANSFORM_MODULE_FILE = os.path.join(MODULES_DIR, "preprocessing.py")
TRAINER_MODULE_FILE = os.path.join(MODULES_DIR, "trainer.py")
TUNER_MODULE_FILE = os.path.join(MODULES_DIR, "tuner.py")

# Informasi konfigurasi
print("PIPELINE_ROOT     :", PIPELINE_ROOT)
print("METADATA_PATH     :", METADATA_PATH)
print("SERVING_MODEL_DIR :", SERVING_MODEL_DIR)
print("DATA_ROOT         :", DATA_ROOT)
print("MODULES_DIR       :", MODULES_DIR)


PIPELINE_ROOT     : d:\dicoding\Submission_MLOps_2\mrickyr-notebook\pipelines\mrickyr-pipeline
METADATA_PATH     : d:\dicoding\Submission_MLOps_2\mrickyr-notebook\metadata\mrickyr-pipeline\metadata.db
SERVING_MODEL_DIR : d:\dicoding\Submission_MLOps_2\mrickyr-notebook\serving_model\mrickyr-pipeline
DATA_ROOT         : d:\dicoding\Submission_MLOps_2\data
MODULES_DIR       : d:\dicoding\Submission_MLOps_2\mrickyr-notebook\modules


Pada bagian ini, saya menentukan seluruh variabel dasar yang akan digunakan sebagai fondasi struktur pipeline. Saya memulai dengan menetapkan direktori kerja utama, kemudian mendefinisikan nama pipeline yang akan dibangun serta nama pipeline terpisah yang digunakan khusus untuk proses pembuatan schema. Setelah itu, saya menyusun struktur direktori artefak seperti lokasi penyimpanan pipeline, metadata, model hasil training, data mentah, dan modul Python yang digunakan dalam proses preprocessing, training, dan tuning.

Agar pipeline dapat berjalan tanpa hambatan, seluruh direktori penting saya pastikan sudah tersedia dengan membuatnya secara otomatis apabila belum ada. Selanjutnya, saya menetapkan path lengkap untuk modul-modul kustom yang akan digunakan TFX dalam menjalankan transformasi data, proses pelatihan model, dan tuning hyperparameter. Pada akhir bagian ini, saya menampilkan seluruh konfigurasi utama untuk memastikan bahwa lokasi penyimpanan artefak sudah benar sebelum pipeline dijalankan.

## Data Ingestion

### ExampleGen

In [6]:
# konfigurasi pembagian data: 80% train, 20% eval (8:2)
output_config = example_gen_pb2.Output(
    split_config = example_gen_pb2.SplitConfig(splits=[
        example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=8),
        example_gen_pb2.SplitConfig.Split(name='eval',  hash_buckets=2),
    ])
)

# komponen ExampleGen untuk membaca data CSV di folder DATA_ROOT
example_gen = CsvExampleGen(
    input_base=DATA_ROOT,
    output_config=output_config,
)


Pada tahap ini, saya menyiapkan proses data ingestion menggunakan komponen ExampleGen, yaitu komponen TFX yang bertanggung jawab membaca data mentah dan mengubahnya menjadi format TFRecord yang siap digunakan oleh pipeline. Saya terlebih dahulu menentukan konfigurasi pembagian data, yaitu 80% untuk pelatihan dan 20% untuk evaluasi, dengan memanfaatkan mekanisme hash bucket agar proses pemisahan tetap konsisten di setiap eksekusi.

Setelah konfigurasi pemisahan dibuat, saya menginisialisasi CsvExampleGen untuk membaca seluruh file CSV yang berada di direktori data. Komponen ini akan menghasilkan artefak Examples yang berisi data terstruktur untuk tahap-tahap berikutnya seperti pembuatan statistik, schema, dan transformasi. Dengan demikian, proses ingestion berjalan otomatis dan tetap terkontrol di dalam pipeline TFX.

## Data Validation

### StatisticsGen

In [7]:
# Komponen StatisticsGen untuk menghitung statistik data
statistics_gen = StatisticsGen(
    examples=example_gen.outputs['examples']
)

Pada tahap ini, saya menggunakan komponen StatisticsGen untuk menghasilkan statistik deskriptif dari data yang telah diproses oleh ExampleGen. Komponen ini menghitung berbagai metrik penting seperti distribusi nilai, frekuensi, rerata, hingga deteksi nilai yang hilang. Statistik tersebut menjadi dasar yang sangat penting untuk memahami karakteristik data dan akan digunakan oleh komponen lain, seperti SchemaGen dan ExampleValidator, dalam proses validasi. 

### SchemaGen

In [8]:
# Komponen SchemaGen untuk menghasilkan schema data
schema_gen = SchemaGen(
    statistics=statistics_gen.outputs['statistics'],
    infer_feature_shape=True
)

Pada bagian ini, saya menggunakan komponen SchemaGen untuk menghasilkan schema otomatis berdasarkan statistik yang telah dibuat sebelumnya. Schema ini berisi definisi struktural dari data, seperti tipe fitur, rentang nilai yang diharapkan, serta apakah suatu fitur bersifat wajib atau opsional. Dengan mengaktifkan infer_feature_shape=True, saya meminta TFX untuk turut menafsirkan bentuk atau dimensi fitur jika memungkinkan. Hasil schema ini menjadi acuan penting bagi pipeline, terutama untuk mendeteksi anomali pada data baru dan memastikan proses preprocessing serta training berjalan dengan konsisten.

### ExampleValidator

In [9]:
# Komponen ExampleValidator untuk memvalidasi data berdasarkan schema
example_validator = ExampleValidator(
    statistics=statistics_gen.outputs['statistics'],
    schema=schema_gen.outputs['schema']
)

Pada tahap ini, saya menggunakan komponen ExampleValidator untuk memeriksa kualitas data dengan membandingkannya terhadap schema yang telah dihasilkan sebelumnya. Komponen ini membantu mendeteksi berbagai anomali seperti nilai yang berada di luar rentang wajar, fitur yang hilang, atau ketidaksesuaian tipe data. Dengan melakukan validasi ini sejak awal, saya dapat memastikan bahwa data yang masuk ke tahap preprocessing dan training memiliki kualitas yang baik dan konsisten, sehingga potensi error atau bias selama pelatihan model dapat diminimalkan.

## Data Preprocessing

### Transform

In [10]:
%%writefile {TRANSFORM_MODULE_FILE}
import tensorflow as tf
import tensorflow_transform as tft

# fitur numerik
NUMERIC_FEATURE_KEYS = [
    'Age',
    'BMI',
    'BloodPressure',
    'DiabetesPedigreeFunction',
    'Glucose',
    'Insulin',
    'Pregnancies',
    'SkinThickness'
]

# label (target)
LABEL_KEY = 'Outcome'

# fitur bernilai 0 tidak masuk akal (anggap saja missing)
ZERO_AS_MISSING = ['Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI']

def _xf(name): 
    '''Tambahkan suffix _xf pada nama fitur hasil transformasi.'''
    return f"{name}_xf"

def _impute_zero_with_nonzero_mean(x: tf.Tensor) -> tf.Tensor:
    """
    Mengganti nilai nol dengan rata-rata dari nilai bukan nol menggunakan analyzer TFT. 
    ...
    """
    x = tf.cast(x, tf.float32)
    x = tf.where(tf.math.is_nan(x), tf.zeros_like(x), x)

    is_zero = tf.equal(x, 0.0)
    masked_sum = tf.where(is_zero, tf.zeros_like(x), x)

    sum_all = tft.mean(masked_sum)
    zero_ratio = tft.mean(tf.cast(is_zero, tf.float32))
    nonzero_ratio = 1.0 - zero_ratio
    mean_nonzero = sum_all / tf.maximum(nonzero_ratio, 1e-6)

    return tf.where(is_zero, mean_nonzero, x)


def preprocessing_fn(inputs):
    """
    Fungsi preprocessing...
    """
    outputs = {}

    for key in NUMERIC_FEATURE_KEYS:
        val = tf.cast(inputs[key], tf.float32)
        val = tf.where(tf.math.is_nan(val), tf.zeros_like(val), val)

        if key in ZERO_AS_MISSING:
            val = _impute_zero_with_nonzero_mean(val)

        # Standardisasi
        outputs[_xf(key)] = tft.scale_to_z_score(val)

    # Label wajib int64
    outputs[_xf(LABEL_KEY)] = tf.cast(inputs[LABEL_KEY], tf.int64)
    return outputs


Writing d:\dicoding\Submission_MLOps_2\mrickyr-notebook\modules\preprocessing.py


In [11]:
# komponen Transform untuk melakukan preprocessing data
transform = Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_gen.outputs['schema'],
    module_file=TRANSFORM_MODULE_FILE,
)


Pada bagian ini, saya mendefinisikan modul preprocessing yang akan digunakan oleh komponen Transform dalam TFX. Saya terlebih dahulu menentukan daftar fitur numerik yang akan diproses, label target, serta fitur-fitur yang memiliki nilai nol namun secara konteks dianggap tidak logis dan diperlakukan sebagai nilai hilang, seperti pada kolom Glucose, BloodPressure, SkinThickness, Insulin, dan BMI. Untuk menangani hal tersebut, saya membuat fungsi khusus _impute_zero_with_nonzero_mean yang menggunakan analyzer dari tensorflow_transform untuk menggantikan nilai nol dengan rata-rata dari nilai bukan nol, sehingga distribusi data menjadi lebih masuk akal secara statistik. Seluruh fitur numerik kemudian distandardisasi menggunakan tft.scale_to_z_score, sedangkan label Outcome dikonversi ke tipe int64 agar konsisten dengan kebutuhan tahap training.

Setelah fungsi preprocessing_fn didefinisikan, saya menghubungkannya ke dalam komponen Transform dengan memasukkan artefak Examples dan Schema sebagai input serta module_file sebagai lokasi modul preprocessing. Komponen Transform ini akan mengeksekusi seluruh logika preprocessing secara terkontrol di dalam pipeline, menghasilkan fitur-fitur yang sudah dibersihkan dan ditransformasi, sekaligus menyimpan graf transformasi yang nantinya dapat digunakan kembali saat serving model di lingkungan produksi.

## Model Training

### Trainer

In [12]:
%%writefile {TRAINER_MODULE_FILE}
import os
import tensorflow as tf
import tensorflow_transform as tft
from tensorflow.keras import layers
from tfx.components.trainer.fn_args_utils import FnArgs

# Konstanta dan Helper Function
LABEL_KEY = "Outcome"  # label asli


def transformed_name(key: str) -> str:
    """Menambahkan suffix '_xf' untuk menandai fitur hasil transformasi."""
    return key + "_xf"


def gzip_reader_fn(filenames):
    """Membaca TFRecord hasil transformasi (kompresi GZIP)."""
    return tf.data.TFRecordDataset(filenames, compression_type='GZIP')


# Input Function
def input_fn(file_pattern, tf_transform_output, num_epochs=None, batch_size=64) -> tf.data.Dataset:
    """
    Membuat dataset TF dari file TFRecord hasil transformasi.

    Args:
        file_pattern (str): Pola path file TFRecord hasil transformasi.
        tf_transform_output (tft.TFTransformOutput): Objek hasil komponen Transform.
        num_epochs (int): Jumlah epoch untuk membaca data (None = infinite).
        batch_size (int): Ukuran batch.

    Returns:
        tf.data.Dataset: Dataset yang siap digunakan untuk training/evaluasi.
    """
    # Mendapatkan spesifikasi fitur hasil transformasi
    transform_feature_spec = tf_transform_output.transformed_feature_spec().copy()

    # Membuat dataset dalam bentuk batch
    dataset = tf.data.experimental.make_batched_features_dataset(
        file_pattern=file_pattern,
        batch_size=batch_size,
        features=transform_feature_spec,
        reader=gzip_reader_fn,
        num_epochs=num_epochs,
        label_key=transformed_name(LABEL_KEY)
    ).repeat()
    return dataset


def model_builder(hparams=None):
    """
    Membangun model jaringan saraf tiruan (MLP) sederhana untuk klasifikasi biner.

    Args:
        hparams (dict, optional): Hyperparameter dari komponen Tuner (jika ada).

    Returns:
        tf.keras.Model: Model Keras yang sudah dikompilasi.
    """
    # deteksi tipe parameter (dict atau HyperParameters)
    if hparams is None:
        # jika dipanggil tanpa tuner (Trainer biasa)
        hidden_units = 64
        dropout_rate = 0.2
        learning_rate = 1e-3

    elif hasattr(hparams, "Choice"):  # dipanggil oleh keras_tuner
        hp = hparams
        hidden_units = hp.Int("units", min_value=32, max_value=128, step=32)
        dropout_rate = hp.Float("dropout", min_value=0.1, max_value=0.5, step=0.1)
        # turunkan range learning rate biar tidak terlalu agresif
        learning_rate = hp.Choice("learning_rate", [1e-3, 5e-4, 1e-4])

    else:  # dipanggil oleh Trainer dengan dict hasil dari tuner_fn
        hidden_units = int(hparams.get("units", 64))
        dropout_rate = float(hparams.get("dropout", 0.2))
        learning_rate = float(hparams.get("learning_rate", 1e-3))

    # definisi input layer sesuai fitur hasil Transform
    inputs = {
        name: layers.Input(shape=(1,), name=name, dtype=tf.float32)
        for name in [
            'Age_xf', 'BMI_xf', 'BloodPressure_xf', 'DiabetesPedigreeFunction_xf',
            'Glucose_xf', 'Insulin_xf', 'Pregnancies_xf', 'SkinThickness_xf'
        ]
    }

    # Concatenate seluruh fitur numerik
    x = layers.Concatenate(name="concatenate_inputs")(list(inputs.values()))
    
    # hidden layers
    x = layers.Dense(hidden_units, activation='relu')(x)
    x = layers.Dropout(dropout_rate)(x)
    x = layers.Dense(hidden_units // 2, activation='relu')(x)

    # output layer
    outputs = layers.Dense(1, activation='sigmoid', name='probability')(x)

    # compile model
    model = tf.keras.Model(inputs=inputs, outputs=outputs)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate),
        loss='binary_crossentropy',
        metrics=[
            tf.keras.metrics.BinaryAccuracy(name='binary_accuracy'),
            tf.keras.metrics.AUC(name='auc'),
            tf.keras.metrics.Precision(name='precision'),
            tf.keras.metrics.Recall(name='recall')
        ]
    )
    
    # tampilkan ringkasan model
    model.summary()
    return model


# Serving Function (untuk model deployment)
def _get_serve_tf_examples_fn(model, tf_transform_output):
    """
    Mendefinisikan signature function agar model dapat menerima input mentah
    dalam format tf.Example saat deployment (TensorFlow Serving).
    """
    model.tft_layer = tf_transform_output.transform_features_layer()

    @tf.function
    def serve_tf_examples_fn(serialized_tf_examples):
        feature_spec = tf_transform_output.raw_feature_spec()
        feature_spec.pop(LABEL_KEY)
        parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
        transformed_features = model.tft_layer(parsed_features)
        return model(transformed_features)

    return serve_tf_examples_fn


def run_fn(fn_args: FnArgs) -> None:
    """
    Fungsi utama untuk melatih model.
    Fungsi ini dijalankan oleh komponen Trainer TFX dan meliputi:
    - Membaca hasil transformasi,
    - Menyusun pipeline input,
    - Melatih model dengan callback,
    - Menyimpan model beserta serving signature.
    """
    # Muat hasil Transform
    tf_transform_output = tft.TFTransformOutput(fn_args.transform_graph_path)

    # Siapkan dataset training & evaluasi
    train_set = input_fn(fn_args.train_files, tf_transform_output, num_epochs=10)
    eval_set  = input_fn(fn_args.eval_files,  tf_transform_output, num_epochs=10)

    # Ambil hyperparameter jika Tuner digunakan
    hparams = None
    if getattr(fn_args, "hyperparameters", None):
        try:
            hparams = fn_args.hyperparameters.get("values") or fn_args.hyperparameters
        except Exception:
            hparams = None

    # Bangun model
    model = model_builder(hparams=hparams)

    # Siapkan callback
    log_dir = os.path.join(os.path.dirname(fn_args.serving_model_dir), 'logs')
    tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, update_freq='epoch')
    early_stopping = tf.keras.callbacks.EarlyStopping(
        monitor='val_auc', mode='max', patience=10, restore_best_weights=True, verbose=1)
    reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(
        monitor='val_auc', mode='max', factor=0.5, patience=4, min_lr=1e-5, verbose=1)

    # Latih model
    model.fit(
        x=train_set,
        validation_data=eval_set,
        steps_per_epoch=fn_args.train_steps,
        validation_steps=fn_args.eval_steps,
        epochs=50,
        callbacks=[tensorboard_callback, early_stopping, reduce_lr],
        verbose=2
    )

    # Buat serving signature
    signatures = {
        'serving_default':
        _get_serve_tf_examples_fn(model, tf_transform_output).get_concrete_function(
            tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')
        )
    }

    # Simpan model dalam format TensorFlow SavedModel
    model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)


Writing d:\dicoding\Submission_MLOps_2\mrickyr-notebook\modules\trainer.py


In [13]:
# komponen Trainer untuk melatih model
trainer = Trainer(
    module_file=TRAINER_MODULE_FILE,
    examples=transform.outputs['transformed_examples'],
    transform_graph=transform.outputs['transform_graph'],
    schema=schema_gen.outputs['schema'],
    # train/eval steps bisa kamu naikkan nanti setelah semuanya jalan
    train_args=trainer_pb2.TrainArgs(splits=['train'], num_steps=10),
    eval_args=trainer_pb2.EvalArgs(splits=['eval'], num_steps=3),
)


Pada bagian ini, saya mendefinisikan seluruh logika pelatihan model yang akan dijalankan oleh komponen Trainer di dalam pipeline TFX. Saya mulai dengan membuat fungsi input_fn yang bertugas membaca data hasil Transform dalam format TFRecord terkompresi GZIP dan mengubahnya menjadi tf.data.Dataset yang siap digunakan untuk proses training dan evaluasi. Selanjutnya, saya membangun sebuah model jaringan saraf tiruan sederhana (MLP) melalui fungsi model_builder, dengan arsitektur yang memanfaatkan fitur-fitur numerik yang sudah ditransformasi, lapisan dense bertingkat, dropout untuk mencegah overfitting, serta output sigmoid untuk menangani kasus klasifikasi biner. Fungsi ini juga saya rancang agar fleksibel terhadap penggunaan hyperparameter, baik ketika dipanggil secara standar maupun terintegrasi dengan komponen Tuner.

Untuk keperluan deployment, saya menyiapkan fungsi _get_serve_tf_examples_fn yang mendefinisikan serving signature sehingga model dapat menerima input mentah dalam bentuk tf.Example dan menerapkan graf transformasi yang sama seperti saat training. Seluruh proses ini kemudian diorkestrasi di dalam run_fn, yang dijalankan oleh komponen Trainer: mulai dari memuat hasil Transform, menyiapkan dataset train dan eval, membangun model, hingga menjalankan pelatihan dengan callback seperti TensorBoard, early stopping, dan penyesuaian learning rate berbasis performa validasi. Pada akhir fungsi, model disimpan dalam format SavedModel lengkap dengan signature untuk serving. Terakhir, saya mengonfigurasi komponen Trainer di pipeline dengan menghubungkannya ke artefak transformed_examples, transform_graph, dan schema, serta menetapkan jumlah langkah training dan evaluasi awal agar proses pelatihan dapat berjalan otomatis di dalam pipeline TFX.

### Tuner

In [14]:
%%writefile {TUNER_MODULE_FILE}
import tensorflow as tf
import tensorflow_transform as tft
import keras_tuner as kt
from tfx.components.trainer.fn_args_utils import FnArgs
from tfx.components.tuner.component import TunerFnResult

# Import fungsi dari trainer module
from trainer import input_fn, model_builder, transformed_name, LABEL_KEY


def tuner_fn(fn_args: FnArgs) -> TunerFnResult:
    """
    Fungsi utama untuk menjalankan hyperparameter tuning.
    Menggunakan model_builder dan input_fn dari trainer.py.
    """
    # Muat hasil Transform
    tf_transform_output = tft.TFTransformOutput(fn_args.transform_graph_path)

    # Siapkan dataset training & evaluasi (pakai fungsi dari trainer.py)
    train_set = input_fn(fn_args.train_files, tf_transform_output, num_epochs=10)
    val_set   = input_fn(fn_args.eval_files,  tf_transform_output, num_epochs=10)

    # Callback early stopping
    stop_early = tf.keras.callbacks.EarlyStopping(
        monitor="val_auc", mode="max", patience=10, restore_best_weights=True
    )
    
    # callback reduce learning rate
    reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(
        monitor="val_auc", mode="max", factor=0.5, patience=4, min_lr=1e-5, verbose=1
    )

    # Definisikan strategi tuning dengan Hyperband
    tuner = kt.Hyperband(
        model_builder,  # panggil fungsi model_builder dari trainer
        objective=kt.Objective("val_auc", direction="max"),
        max_epochs=20,
        factor=3,
        directory=fn_args.working_dir,
        project_name="keras_tuner_linked"
    )

    # Kembalikan hasil untuk komponen TFX Tuner
    return TunerFnResult(
        tuner=tuner,
        fit_kwargs={
            "x": train_set,
            "validation_data": val_set,
            "steps_per_epoch": fn_args.train_steps,
            "validation_steps": fn_args.eval_steps,
            "epochs": 50,
            "callbacks": [stop_early, reduce_lr],
            "verbose": 2
        }
    )


Writing d:\dicoding\Submission_MLOps_2\mrickyr-notebook\modules\tuner.py


In [15]:
# komponen Tuner untuk hyperparameter tuning
tuner = Tuner(
    module_file=TUNER_MODULE_FILE,
    examples=transform.outputs['transformed_examples'],
    transform_graph=transform.outputs['transform_graph'],
    schema=schema_gen.outputs['schema'],
    train_args=trainer_pb2.TrainArgs(splits=['train'], num_steps=10),
    eval_args=trainer_pb2.EvalArgs(splits=['eval'], num_steps=3),
)


Pada bagian ini, saya menyiapkan proses hyperparameter tuning dengan memanfaatkan komponen Tuner di TFX yang terintegrasi dengan Keras Tuner. Saya menggunakan kembali fungsi input_fn dan model_builder dari modul trainer.py agar arsitektur model dan alur input yang digunakan saat tuning konsisten dengan proses training utama. Melalui fungsi tuner_fn, saya terlebih dahulu memuat hasil Transform, lalu menyusun dataset train dan validasi dari artefak yang sama seperti yang digunakan Trainer.

Saya kemudian mendefinisikan dua callback penting, yaitu early stopping dan reduce learning rate, yang membantu menghentikan eksperimen lebih awal ketika performa tidak lagi membaik serta menyesuaikan laju pembelajaran secara adaptif. Untuk strategi pencarian hyperparameter, saya menggunakan Hyperband dengan tujuan mengoptimalkan metrik val_auc. Objek tuner beserta parameter pemanggilan fit dikembalikan dalam bentuk TunerFnResult, sehingga dapat dieksekusi secara otomatis oleh komponen TFX Tuner. Terakhir, saya mengonfigurasi komponen Tuner di pipeline dengan menghubungkannya ke artefak transformed_examples, transform_graph, dan schema, serta mendefinisikan jumlah langkah train dan eval. Dengan cara ini, proses pencarian kombinasi hyperparameter terbaik berjalan terstruktur dan tetap terintegrasi penuh di dalam pipeline TFX.

In [16]:
# Trainer dengan hyperparameter terbaik dari Tuner
trainer = Trainer(
    module_file=TRAINER_MODULE_FILE,
    examples=transform.outputs['transformed_examples'],
    transform_graph=transform.outputs['transform_graph'],
    schema=schema_gen.outputs['schema'],
    hyperparameters=tuner.outputs['best_hyperparameters'],
    train_args=trainer_pb2.TrainArgs(splits=['train'], num_steps=10),
    eval_args=trainer_pb2.EvalArgs(splits=['eval'], num_steps=3),
)

Pada tahap ini, saya menjalankan kembali komponen Trainer dengan memanfaatkan hyperparameter terbaik yang diperoleh dari komponen Tuner. Alih-alih menggunakan nilai default, Trainer kini menerima artefak best_hyperparameters sebagai input sehingga arsitektur dan konfigurasi model yang dilatih sudah dioptimalkan berdasarkan proses pencarian sebelumnya. Saya tetap menggunakan hasil transformasi (transformed_examples dan transform_graph) serta schema yang sama agar alur data dan definisi fitur tetap konsisten. Dengan cara ini, proses retraining dilakukan secara lebih terarah karena model dilatih ulang menggunakan kombinasi hyperparameter yang sudah terbukti memberikan performa terbaik pada data validasi.

## Model Analysis and Validation

### Resolver

In [17]:
# Resolver component

model_resolver = Resolver(
    strategy_class=LatestBlessedModelStrategy,
    model=Channel(type=Model),
    model_blessing=Channel(type=ModelBlessing),
).with_id("latest_blessed_model_resolver")


Pada bagian ini, saya menambahkan komponen Resolver untuk memilih model terbaik yang telah lolos proses evaluasi pada eksekusi sebelumnya. Dengan menggunakan strategi LatestBlessedModelStrategy, komponen ini secara otomatis mencari artefak model paling mutakhir yang telah menerima status “blessed”, yakni model yang dinilai memenuhi kriteria performa oleh komponen Evaluator. Informasi ini kemudian digunakan oleh tahap pushing agar hanya model yang sudah tervalidasi yang diteruskan ke proses deployment. Dengan menambahkan Resolver, pipeline menjadi lebih aman dan stabil karena mencegah model yang kurang baik atau belum diverifikasi masuk ke tahap produksi.

### Evaluator

In [18]:
# konfigurasi Evaluator
eval_config = tfma.EvalConfig(
    model_specs=[tfma.ModelSpec(label_key='Outcome')],
    slicing_specs=[tfma.SlicingSpec()],  # overall
    metrics_specs=[
        tfma.MetricsSpec(metrics=[
            tfma.MetricConfig(class_name='ExampleCount'),

            # kemampuan diskriminasi menyeluruh
            tfma.MetricConfig(
                class_name='AUC',
                threshold=tfma.MetricThreshold(
                    value_threshold=tfma.GenericValueThreshold(
                        lower_bound={'value': 0.80}
                    ),
                    change_threshold=tfma.GenericChangeThreshold(
                        direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                        absolute={'value': -0.01}
                    )
                )
            ),

            # minimalkan false negative
            tfma.MetricConfig(
                class_name='Recall',
                threshold=tfma.MetricThreshold(
                    value_threshold=tfma.GenericValueThreshold(
                        lower_bound={'value': 0.50}
                    ),
                    change_threshold=tfma.GenericChangeThreshold(
                        direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                        absolute={'value': -0.01}
                    )
                )
            ),

            # Precision tetap dijaga agar tidak terlalu banyak false positive
            tfma.MetricConfig(
                class_name='Precision',
                threshold=tfma.MetricThreshold(
                    value_threshold=tfma.GenericValueThreshold(
                        lower_bound={'value': 0.50}
                    ),
                    change_threshold=tfma.GenericChangeThreshold(
                        direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                        absolute={'value': -0.01}
                    )
                )
            ),

            # Akurasi sebagai sanity check
            tfma.MetricConfig(
                class_name='BinaryAccuracy',
                threshold=tfma.MetricThreshold(
                    value_threshold=tfma.GenericValueThreshold(
                        lower_bound={'value': 0.70}
                    ),
                    change_threshold=tfma.GenericChangeThreshold(
                        direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                        absolute={'value': -0.01}
                    )
                )
            ),

            # Komponen confusion matrix (tanpa threshold)
            tfma.MetricConfig(class_name='TruePositives'),
            tfma.MetricConfig(class_name='FalsePositives'),
            tfma.MetricConfig(class_name='TrueNegatives'),
            tfma.MetricConfig(class_name='FalseNegatives')
        ])
    ]
)

Pada bagian ini, saya menyusun konfigurasi TFMA Evaluator untuk menilai kualitas model secara lebih terarah sesuai tujuan proyek, yaitu meminimalkan kasus pasien yang sebenarnya mengidap diabetes tetapi diprediksi tidak (false negative). Saya mendefinisikan EvalConfig dengan label Outcome sebagai target, lalu menambahkan beberapa metrik utama. Metrik AUC digunakan untuk mengukur kemampuan model membedakan kelas sehat dan diabetes secara keseluruhan, dengan batas minimal 0,80. Untuk mengendalikan false negative, saya memberi perhatian khusus pada metrik Recall dengan ambang minimal 0,50, sehingga model diharapkan cukup sensitif dalam menangkap kasus diabetes. Di sisi lain, Precision juga dijaga di atas 0,50 agar jumlah prediksi positif yang keliru (false positive) tidak terlalu banyak. BinaryAccuracy ditambahkan sebagai sanity check dengan batas minimal 0,70 untuk memastikan performa global tetap wajar.

Selain itu, saya ikut merekam komponen-komponen confusion matrix seperti True Positives, False Positives, True Negatives, dan False Negatives tanpa threshold tambahan. Informasi ini membantu saya menganalisis secara lebih rinci pola kesalahan model, terutama untuk memastikan bahwa jumlah false negative benar-benar dapat ditekan sesuai dengan tujuan bisnis dan konteks medis dari kasus diabetes.

In [19]:
# Evaluator component (untuk Apache Beam pipeline)
evaluator = Evaluator(
    examples=example_gen.outputs['examples'],
    model=trainer.outputs['model'],
    baseline_model=model_resolver.outputs['model'],  # boleh kosong di run pertama
    eval_config=eval_config,
)

Pada tahap ini, saya mengaktifkan komponen Evaluator untuk menjalankan evaluasi model menggunakan Apache Beam dan konfigurasi metrik yang telah ditetapkan sebelumnya. Komponen ini menerima artefak examples sebagai data evaluasi dan model hasil training sebagai objek yang akan dianalisis. Jika tersedia, saya juga memasukkan baseline_model dari Resolver, sehingga Evaluator dapat membandingkan performa model baru terhadap model terbaik yang pernah dipromosikan sebelumnya. Dengan cara ini, pipeline dapat memastikan bahwa model baru tidak hanya memenuhi ambang kinerja yang ditentukan, tetapi juga tidak mengalami penurunan performa dibandingkan baseline. Hasil evaluasi inilah yang nantinya menentukan apakah model baru layak diberi status “blessed” dan diteruskan ke tahap deployment.

## Model Deployment

### Pusher

In [20]:
# Pusher component
pusher = Pusher(
    model=trainer.outputs["model"],
    model_blessing=evaluator.outputs["blessing"],  # hanya push kalau model "blessed"
    push_destination=pusher_pb2.PushDestination(
        filesystem=pusher_pb2.PushDestination.Filesystem(
            base_directory=SERVING_MODEL_DIR
        )
    ),
)

Pada bagian ini, saya menambahkan komponen Pusher yang bertugas melakukan proses deployment model ke direktori tujuan. Komponen ini hanya akan mengekspor model apabila artefak blessing dari Evaluator menyatakan bahwa model layak digunakan, sehingga hanya model yang memenuhi ambang performa dan tidak mengalami penurunan kualitas yang dapat diteruskan ke tahap produksi. Saya menentukan lokasi penyimpanan menggunakan PushDestination berbasis filesystem, yaitu pada direktori SERVING_MODEL_DIR. Ketika sebuah model dinyatakan “blessed”, Pusher akan menyimpan versi terbaru dalam format SavedModel, sehingga model tersebut siap digunakan untuk inference atau integrasi dengan sistem serving seperti TensorFlow Serving. Dengan mekanisme ini, pipeline memastikan bahwa proses deployment berlangsung aman, terkontrol, dan sepenuhnya otomatis.

## Run TFX Pipeline

In [21]:
# konfigurasi pipeline TFX
components = [
    example_gen,
    statistics_gen,
    schema_gen,
    example_validator,
    transform,
    tuner,
    trainer,
    model_resolver,
    evaluator,
    pusher,
]

# Argumen untuk Apache Beam (direct runner)
beam_pipeline_args = [
    "--direct_running_mode=in_memory",
    "--direct_num_workers=1",
]

# Buat objek Pipeline TFX
pipeline = tfx_pipeline.Pipeline(
    pipeline_name=PIPELINE_NAME,
    pipeline_root=PIPELINE_ROOT,
    components=components,
    enable_cache=True,
    metadata_connection_config=metadata.sqlite_metadata_connection_config(
        METADATA_PATH
    ),
    beam_pipeline_args=beam_pipeline_args,
)

# Jalankan pipeline dengan Apache Beam
BeamDagRunner().run(pipeline)


Trial 30 Complete [00h 00m 07s]
val_auc: 0.8633618354797363

Best val_auc So Far: 0.8740265965461731
Total elapsed time: 00h 01m 50s
Results summary
Results in d:\dicoding\Submission_MLOps_2\mrickyr-notebook\pipelines\mrickyr-pipeline\Tuner\.system\executor_execution\7\.temp\7\keras_tuner_linked
Showing 10 best trials
<keras_tuner.engine.objective.Objective object at 0x000001A291214D30>
Trial summary
Hyperparameters:
units: 128
dropout: 0.5
learning_rate: 0.0005
tuner/epochs: 20
tuner/initial_epoch: 7
tuner/bracket: 1
tuner/round: 1
tuner/trial_id: 0019
Score: 0.8740265965461731
Trial summary
Hyperparameters:
units: 128
dropout: 0.4
learning_rate: 0.001
tuner/epochs: 20
tuner/initial_epoch: 7
tuner/bracket: 2
tuner/round: 2
tuner/trial_id: 0014
Score: 0.8714698553085327
Trial summary
Hyperparameters:
units: 96
dropout: 0.4
learning_rate: 0.001
tuner/epochs: 20
tuner/initial_epoch: 7
tuner/bracket: 2
tuner/round: 2
tuner/trial_id: 0012
Score: 0.8692817687988281
Trial summary
Hyperparame

Pada tahap terakhir ini, saya menyusun dan mengeksekusi keseluruhan pipeline TFX. Saya terlebih dahulu menggabungkan seluruh komponen mulai dari ingestion, validasi, transformasi, tuning, training, evaluasi, hingga deployment ke dalam sebuah daftar components. Konfigurasi ini memastikan bahwa setiap komponen akan dijalankan secara berurutan dan saling terhubung melalui artefak yang mereka hasilkan.

Selanjutnya, saya menetapkan argumen Apache Beam menggunakan direct runner, yang memungkinkan pipeline dijalankan secara lokal tanpa kluster terdistribusi. Dengan konfigurasi tersebut, saya kemudian membangun objek Pipeline yang mencakup nama pipeline, lokasi penyimpanan artefak, pengaturan metadata, serta daftar komponen yang akan dieksekusi. Fitur enable_cache=True saya aktifkan agar komponen tidak perlu dijalankan ulang apabila artefak yang sama sudah tersedia, sehingga proses eksekusi menjadi lebih efisien.

Setelah seluruh konfigurasi siap, pipeline dijalankan menggunakan BeamDagRunner. Langkah ini mengeksekusi setiap komponen secara otomatis menggunakan Apache Beam, sehingga keseluruhan alur mulai dari data mentah hingga model siap deployment dapat berlangsung secara terstruktur, reproducible, dan sepenuhnya end-to-end.