In [1]:
#!pip install -q tfx
#!pip install dill

# Our dataset

In [2]:
!head data/kdd-with-columns.csv
print("- "*100)
!head bad_data/bad_data.csv

duration,protocol_type,service,flag,src_bytes,dst_bytes,land,wrong_fragment,urgent,hot,num_failed_logins,logged_in,num_compromised,root_shell,su_attempted,num_root,num_file_creations,num_shells,num_access_files,num_outbound_cmds,is_host_login,is_guest_login,count,srv_count,serror_rate,srv_serror_rate,rerror_rate,srv_rerror_rate,same_srv_rate,diff_srv_rate,srv_diff_host_rate,dst_host_count,dst_host_srv_count,dst_host_same_srv_rate,dst_host_diff_srv_rate,dst_host_same_src_port_rate,dst_host_srv_diff_host_rate,dst_host_serror_rate,dst_host_srv_serror_rate,dst_host_rerror_rate,dst_host_srv_rerror_rate,outcome
0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.0,0.0,0.0,0.0,1.0,0.0,0.0,9,9,1.0,0.0,0.11,0.0,0.0,0.0,0.0,0.0,normal.
0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.0,0.0,0.0,0.0,1.0,0.0,0.0,19,19,1.0,0.0,0.05,0.0,0.0,0.0,0.0,0.0,normal.
0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.0,0.0,0.0,0.0,1.0,0.0,0.0,29,29,1.0,0.0,0.03,0.0,0.0,0.0,0.0,

In [3]:
import os
from tfx import v1 as tfx
PIPELINE_NAME = 'secure-local-validation'
PIPELINE_ROOT = os.path.join('.', PIPELINE_NAME)
DATA_DIR = './data'
METADATA_PATH = os.path.join(PIPELINE_ROOT, 'metadata.db')

# Remove all subfolders/artifacts in pipeline root. This refreshes the tfx state db
if os.path.exists(PIPELINE_ROOT):
    import shutil; shutil.rmtree('./secure-local-validation', ignore_errors=True)

2025-11-19 11:16:24.002943: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-11-19 11:16:24.017645: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2025-11-19 11:16:24.121367: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2025-11-19 11:16:24.256363: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:479] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2025-11-19 11:16:24.386226: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:10575] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registe

# Defining Our First TFX Components

* **`CsvExampleGen`**: Loads the raw `.csv` data.
* **`StatisticsGen`**: Calculates statistics on that data.
* **`SchemaGen`**: Infers a "data rules" schema from the statistics.

We will run these component *instances* in the next cell using the `InteractiveContext`. This is just a "showcase" to let us explore the artifacts one by one. Our final, automated pipeline will create its own instances later.

In [4]:
from tfx.components import CsvExampleGen, StatisticsGen, SchemaGen
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext

example_gen = CsvExampleGen(input_base=DATA_DIR)
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
schema_gen = SchemaGen(statistics=statistics_gen.outputs['statistics'], infer_feature_shape=False)

context = InteractiveContext(
    pipeline_root=PIPELINE_ROOT,
    metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH)
)

context.run(example_gen)
context.run(statistics_gen)
context.run(schema_gen)

print("âœ… Example Data Ingested (Artifact):")
context.show(example_gen.outputs['examples'])

print("ðŸ“Š Data Statistics (Artifact and inf):")
context.show(statistics_gen.outputs['statistics'])

print("ðŸ“‹ DATA CONTRACT - This defines what 'good' data looks like (Artifact):")
context.show(schema_gen.outputs['schema'])



âœ… Example Data Ingested (Artifact):


ðŸ“Š Data Statistics (Artifact and inf):


ðŸ“‹ DATA CONTRACT - This defines what 'good' data looks like (Artifact):


Unnamed: 0_level_0,Type,Presence,Valency,Domain
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
'count',INT,required,single,-
'diff_srv_rate',FLOAT,required,single,-
'dst_bytes',INT,required,single,-
'dst_host_count',INT,required,single,-
'dst_host_diff_srv_rate',FLOAT,required,single,-
'dst_host_rerror_rate',FLOAT,required,single,-
'dst_host_same_src_port_rate',FLOAT,required,single,-
'dst_host_same_srv_rate',FLOAT,required,single,-
'dst_host_serror_rate',FLOAT,required,single,-
'dst_host_srv_count',INT,required,single,-


Unnamed: 0_level_0,Values
Domain,Unnamed: 1_level_1
'flag',"'OTH', 'REJ', 'RSTO', 'RSTOS0', 'RSTR', 'S0', 'S1', 'S2', 'S3', 'SF', 'SH'"
'outcome',"'back.', 'buffer_overflow.', 'ftp_write.', 'guess_passwd.', 'imap.', 'ipsweep.', 'land.', 'loadmodule.', 'multihop.', 'neptune.', 'nmap.', 'normal.', 'perl.', 'phf.', 'pod.', 'portsweep.', 'rootkit.', 'satan.', 'smurf.', 'spy.', 'teardrop.', 'warezclient.', 'warezmaster.'"
'protocol_type',"'icmp', 'tcp', 'udp'"
'service',"'IRC', 'X11', 'Z39_50', 'auth', 'bgp', 'courier', 'csnet_ns', 'ctf', 'daytime', 'discard', 'domain', 'domain_u', 'echo', 'eco_i', 'ecr_i', 'efs', 'exec', 'finger', 'ftp', 'ftp_data', 'gopher', 'hostnames', 'http', 'http_443', 'imap4', 'iso_tsap', 'klogin', 'kshell', 'ldap', 'link', 'login', 'mtp', 'name', 'netbios_dgm', 'netbios_ns', 'netbios_ssn', 'netstat', 'nnsp', 'nntp', 'ntp_u', 'other', 'pm_dump', 'pop_2', 'pop_3', 'printer', 'private', 'red_i', 'remote_job', 'rje', 'shell', 'smtp', 'sql_net', 'ssh', 'sunrpc', 'supdup', 'systat', 'telnet', 'tim_i', 'time', 'urh_i', 'urp_i', 'uucp', 'uucp_path', 'vmnet', 'whois', 'tftp_u'"


# Manually Creating Our "Golden" Schema with TFDV

In the previous cell, we saw TFX components like `StatisticsGen` and `SchemaGen` create artifacts. TFDV is the lower-level library that does all the actual work of calculating stats, inferring schemas, and finding anomalies.

### Why Use TFDV Directly?

`SchemaGen` is great at making a *draft* of our schema. But for a secure, production pipeline, we need to create a **configured schema**.

In this cell, we will act as the ML engineer and use the TFDV library directly to:
1.  **Generate Statistics** for both our "good" and "bad" data (this is what `StatisticsGen` does).
2.  **Infer a Schema** from the "good" data's statistics (this is what `SchemaGen` does).
3.  **Configure the Schema** by *manually adding* sensitive **drift detection thresholds**. (This is the critical step `SchemaGen` can't do by itself).
4.  **Save** this configured schema to a file (`golden_schema/schema.pbtxt`). This file is now our **"Golden Artifact"**.
5.  **Validate** our "bad" data against our new "golden" schema to prove that our security works and immediately find the anomalies and drift.

In [5]:
import tensorflow_data_validation as tfdv
from tensorflow_metadata.proto.v0 import schema_pb2

# Paths to our CSVs
good_csv = './data/kdd-with-columns.csv'
bad_csv = './bad_data/bad_data.csv'

print("Generating stats for 'good' and 'bad' data...")
stats_good = tfdv.generate_statistics_from_csv(data_location=good_csv)
stats_bad = tfdv.generate_statistics_from_csv(data_location=bad_csv)

print("Inferring schema from 'good' data...")
schema = tfdv.infer_schema(stats_good)

print("Configuring schema with drift thresholds...")
for feature in schema.feature:
    if feature.type in (schema_pb2.INT, schema_pb2.FLOAT):
        feature.drift_comparator.jensen_shannon_divergence.threshold = 1

drift_anomalies = tfdv.validate_statistics(
    statistics=stats_bad,
    schema=schema,
    previous_statistics=stats_good
)

# --- SAVE THE CONFIGURED "GOLDEN" SCHEMA ---
SCHEMA_DIR = './golden_schema'
os.makedirs(SCHEMA_DIR, exist_ok=True)
CONFIGURED_SCHEMA_PATH = os.path.join(SCHEMA_DIR, 'schema.pbtxt')
tfdv.write_schema_text(schema, CONFIGURED_SCHEMA_PATH)
print(f"\nâœ… Configured 'golden' schema saved to: {CONFIGURED_SCHEMA_PATH}\n")

print("--- ðŸš¨ Anomaly Report (Manual Check) ---")
tfdv.display_anomalies(drift_anomalies)
print("\n--- ðŸ“Š Statistics Visualization (Manual Check) ---")
tfdv.visualize_statistics(
    lhs_statistics=stats_good, 
    rhs_statistics=stats_bad, 
    lhs_name='Good', 
    rhs_name='Bad'
)

Generating stats for 'good' and 'bad' data...
Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`


Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`


Inferring schema from 'good' data...
Configuring schema with drift thresholds...

âœ… Configured 'golden' schema saved to: ./golden_schema/schema.pbtxt

--- ðŸš¨ Anomaly Report (Manual Check) ---


Unnamed: 0_level_0,Anomaly short description,Anomaly long description
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1
'num_root',High approximate Jensen-Shannon divergence between current and previous,"The approximate Jensen-Shannon divergence between current and previous is 1 (up to six significant digits), above the threshold 1."
'srv_serror_rate',High approximate Jensen-Shannon divergence between current and previous,"The approximate Jensen-Shannon divergence between current and previous is 1 (up to six significant digits), above the threshold 1."
'num_failed_logins',High approximate Jensen-Shannon divergence between current and previous,"The approximate Jensen-Shannon divergence between current and previous is 1 (up to six significant digits), above the threshold 1."
'dst_host_srv_diff_host_rate',High approximate Jensen-Shannon divergence between current and previous,"The approximate Jensen-Shannon divergence between current and previous is 1 (up to six significant digits), above the threshold 1."
'outcome',Unexpected string values,Examples contain values missing from the schema: toy. (~33%).



--- ðŸ“Š Statistics Visualization (Manual Check) ---


###  Training-Serving Skew (The "scaler.pkl" Vulnerability)

Instead of saving a `.pkl` file, the `Transform` component saves the preprocessing logic (like the *mean* and *standard deviation* of our scaler) as a **`transform_graph`**â€”a safe, versioned artifact.

This cell does one simple thing: It writes the `preprocessing.py` file. This file is our **preprocessing "recipe"** that the `Transform` component will use to create that secure artifact.

In [15]:
print("ðŸ”§ Setting up TensorFlow Transform (TFT) to solve training-serving skew...")

os.makedirs('modules', exist_ok=True)
with open('modules/preprocessing.py', 'w') as f:
    f.write("""
import tensorflow as tf
import tensorflow_transform as tft
def preprocessing_fn(inputs):
    num_cols = [key for key, value in inputs.items() if value.dtype in (tf.float32, tf.int64) and key not in ('outcome', 'labels')]
    cat_cols = [key for key, value in inputs.items() if value.dtype == tf.string and key not in ('outcome', 'labels')]

    # z-score normalization, like sklearn's StandardScaler
    #    To match MinMaxScaler, use tft.scale_to_0_1 instead.
    num_scaled = [tft.scale_to_z_score(tf.cast(inputs[k], tf.float32)) for k in num_cols]

    # One-hot encode, like sklearn's OneHotEncoder
    cat_onehot = []
    for k in cat_cols:
        idx = tft.compute_and_apply_vocabulary(inputs[k], num_oov_buckets=1, vocab_filename=k + "_vocab")
        depth = tft.experimental.get_vocabulary_size_by_name(k + "_vocab") + 1
        depth = tf.cast(depth, tf.int32)
        onehot = tf.one_hot(idx, depth=depth, dtype=tf.float32)
        cat_onehot.append(tf.reshape(onehot, [-1, tf.shape(onehot)[-1]]))

    features = tf.concat(num_scaled + cat_onehot, axis=-1)
    return {"features": features}
""")
print("ðŸ”’ This bridges TFX validation with sklearn training")
print("ðŸŽ¯ Solves the scaler.pkl vulnerability from Section 2!")

ðŸ”§ Setting up TensorFlow Transform (TFT) to solve training-serving skew...
ðŸ”’ This bridges TFX validation with sklearn training
ðŸŽ¯ Solves the scaler.pkl vulnerability from Section 2!


# Running the Full, Automated MLOps Pipeline

This is the final, automated pipeline. We will use `LocalDagRunner` to run the entire end-to-end process.
* **Input:** The `data` directory (our trusted data).
* **Action:** It will load our **"Golden Schema"** (the one we configured with drift rules) and use `ExampleValidator` to check the data.
* **Expected Result:** The `ExampleValidator` will find no anomalies. The pipeline will **SUCCEED**, and the `Transform` component will run, creating our secure preprocessing artifacts.


In [16]:
from tfx.v1.components import (
    CsvExampleGen,
    StatisticsGen,
    Transform,
    ExampleValidator,
    ImportSchemaGen,
)
from tfx.orchestration.local.local_dag_runner import LocalDagRunner

# --- CONFIG ---
CONFIGURED_SCHEMA_PATH = './golden_schema/schema.pbtxt'
PREPROCESSING_FILE_PATH = os.path.abspath('modules/preprocessing.py')
DATA_DIR = './data' # Or './bad_data'
PIPELINE_NAME = 'SecureValidationPipeline'
PIPELINE_ROOT = './SecureValidationPipeline'
METADATA_PATH = os.path.join(PIPELINE_ROOT, 'metadata.sqlite')

# --- PIPELINE DEFINITION ---
example_gen = CsvExampleGen(input_base=DATA_DIR)
stats_gen = StatisticsGen(examples=example_gen.outputs['examples'])
schema_importer = ImportSchemaGen(schema_file=CONFIGURED_SCHEMA_PATH)
validator = ExampleValidator(
    statistics=stats_gen.outputs['statistics'],
    schema=schema_importer.outputs['schema'],
)
transform = Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_importer.outputs['schema'],
    module_file=PREPROCESSING_FILE_PATH,
)

pipeline = tfx.dsl.Pipeline(
    pipeline_name=PIPELINE_NAME,
    pipeline_root=PIPELINE_ROOT,
    components=[
        example_gen,
        stats_gen,
        schema_importer,
        validator,
        transform,
    ],
    metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(
        METADATA_PATH
    ),
)
LocalDagRunner().run(pipeline)

running bdist_wheel
running build
running build_py
creating build/lib
copying preprocessing.py -> build/lib
installing to /tmp/tmph13lh287
running install
running install_lib
copying build/lib/preprocessing.py -> /tmp/tmph13lh287/.
running install_egg_info
running egg_info
creating tfx_user_code_Transform.egg-info
writing tfx_user_code_Transform.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_Transform.egg-info/dependency_links.txt
writing top-level names to tfx_user_code_Transform.egg-info/top_level.txt
writing manifest file 'tfx_user_code_Transform.egg-info/SOURCES.txt'
reading manifest file 'tfx_user_code_Transform.egg-info/SOURCES.txt'
writing manifest file 'tfx_user_code_Transform.egg-info/SOURCES.txt'
Copying tfx_user_code_Transform.egg-info to /tmp/tmph13lh287/./tfx_user_code_Transform-0.0+fcb0833c8c78a18f812dfc66130828d607fbff75686c6596effd8a3fba8534f9-py3.10.egg-info
running install_scripts
creating /tmp/tmph13lh287/tfx_user_code_transform-0.0+fcb0833c8c78a18f812df

!!

        ********************************************************************************
        Please avoid running ``setup.py`` directly.
        Instead, use pypa/build, pypa/installer or other
        standards-based tools.

        This deprecation is overdue, please update your project and remove deprecated
        calls to avoid build errors in the future.

        See https://blog.ganssle.io/articles/2021/10/setup-py-deprecated.html for details.
        ********************************************************************************

!!
  self.initialize_options()


Processing ./SecureValidationPipeline/_wheels/tfx_user_code_transform-0.0+fcb0833c8c78a18f812dfc66130828d607fbff75686c6596effd8a3fba8534f9-py3-none-any.whl
Installing collected packages: tfx-user-code-transform
Successfully installed tfx-user-code-transform-0.0+fcb0833c8c78a18f812dfc66130828d607fbff75686c6596effd8a3fba8534f9



[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.2[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


Processing ./SecureValidationPipeline/_wheels/tfx_user_code_transform-0.0+fcb0833c8c78a18f812dfc66130828d607fbff75686c6596effd8a3fba8534f9-py3-none-any.whl
Installing collected packages: tfx-user-code-transform
Successfully installed tfx-user-code-transform-0.0+fcb0833c8c78a18f812dfc66130828d607fbff75686c6596effd8a3fba8534f9



[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.2[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


Processing ./SecureValidationPipeline/_wheels/tfx_user_code_transform-0.0+fcb0833c8c78a18f812dfc66130828d607fbff75686c6596effd8a3fba8534f9-py3-none-any.whl
Installing collected packages: tfx-user-code-transform
Successfully installed tfx-user-code-transform-0.0+fcb0833c8c78a18f812dfc66130828d607fbff75686c6596effd8a3fba8534f9



[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.2[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


INFO:tensorflow:Assets written to: ./SecureValidationPipeline/Transform/transform_graph/32/.temp_path/tftransform_tmp/e87be691e38d4e49b518b0e97c07b6d7/assets


INFO:tensorflow:Assets written to: ./SecureValidationPipeline/Transform/transform_graph/32/.temp_path/tftransform_tmp/e87be691e38d4e49b518b0e97c07b6d7/assets


INFO:tensorflow:struct2tensor is not available.


INFO:tensorflow:struct2tensor is not available.


INFO:tensorflow:tensorflow_decision_forests is not available.


INFO:tensorflow:tensorflow_decision_forests is not available.


INFO:tensorflow:tensorflow_text is not available.


INFO:tensorflow:tensorflow_text is not available.


INFO:tensorflow:Assets written to: ./SecureValidationPipeline/Transform/transform_graph/32/.temp_path/tftransform_tmp/f0d7d8bd69004f7d95bcb3fa98ea058a/assets


INFO:tensorflow:Assets written to: ./SecureValidationPipeline/Transform/transform_graph/32/.temp_path/tftransform_tmp/f0d7d8bd69004f7d95bcb3fa98ea058a/assets


INFO:tensorflow:struct2tensor is not available.


INFO:tensorflow:struct2tensor is not available.


INFO:tensorflow:tensorflow_decision_forests is not available.


INFO:tensorflow:tensorflow_decision_forests is not available.


INFO:tensorflow:tensorflow_text is not available.


INFO:tensorflow:tensorflow_text is not available.


INFO:tensorflow:struct2tensor is not available.


INFO:tensorflow:struct2tensor is not available.


INFO:tensorflow:tensorflow_decision_forests is not available.


INFO:tensorflow:tensorflow_decision_forests is not available.


INFO:tensorflow:tensorflow_text is not available.


INFO:tensorflow:tensorflow_text is not available.


In [22]:
import tensorflow as tf

file_path = "SecureValidationPipeline/Transform/transformed_examples/32/Split-train/transformed_examples-00000-of-00001.gz"
raw_dataset = tf.data.TFRecordDataset([file_path], compression_type='GZIP')

for raw_record in raw_dataset.take(1):
    example = tf.train.Example()
    example.ParseFromString(raw_record.numpy())
    print(example)

features {
  feature {
    key: "features"
    value {
      float_list {
        value: -0.09612267464399338
        value: -0.00372927519492805
        value: 0.07442012429237366
        value: -0.010465835221111774
        value: -0.06635283678770065
        value: -0.00395552720874548
        value: -0.0653456524014473
        value: -0.014181556180119514
        value: 1.5293927192687988
        value: -0.009705637581646442
        value: -0.015406275168061256
        value: -0.0070493160746991634
        value: -0.009025363251566887
        value: -0.01571056619286537
        value: -0.013825825415551662
        value: -0.04092235490679741
        value: 0.0
        value: 0.0
        value: -0.052920930087566376
        value: -0.8863224983215332
        value: -0.4064725935459137
        value: -0.7440078258514404
        value: -0.7429863214492798
        value: -0.36361297965049744
        value: -0.36487483978271484
        value: 0.9037188291549683
        value: -0.3791199

2025-11-19 12:15:22.408233: W tensorflow/core/framework/local_rendezvous.cc:404] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence
