In [None]:
import os
import pandas as pd
import tensorflow_model_analysis as tfma
from tfx.components import (CsvExampleGen, StatisticsGen, SchemaGen, ExampleValidator,
                            Transform, Trainer, Tuner, Evaluator, Pusher)
from tfx.proto import example_gen_pb2, trainer_pb2, pusher_pb2
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from tfx.dsl.components.common.resolver import Resolver
from tfx.dsl.input_resolution.strategies.latest_blessed_model_strategy import LatestBlessedModelStrategy
from tfx.types import Channel
from tfx.types.standard_artifacts import Model, ModelBlessing

In [None]:
PIPELINE_ROOT = "pipelines"
SCHEMA_PIPELINE_NAME = "fake-news-tfdv-schema"
SERVING_MODEL_DIR = "serving_model"

# METADATA_PATH = os.path.join("metadata", PIPELINE_NAME, "metadata.db")

In [None]:
real_df = pd.read_csv("data/True.csv")
real_df

In [None]:
fake_df = pd.read_csv("data/Fake.csv")
fake_df

In [None]:
real_df["class"] = 1
fake_df["class"] = 0

df = pd.concat([real_df, fake_df], axis=0)
df = df.drop(["title", "date", "subject"], axis=1)
df = df.rename(columns={"class": "is_real"})
df.to_csv("data/csv/news.csv", index=False)

In [None]:
DATA_ROOT = "data/csv"

In [None]:
interactive_context = InteractiveContext(pipeline_root=PIPELINE_ROOT)

In [None]:
output = example_gen_pb2.Output(
    split_config=example_gen_pb2.SplitConfig(splits=[
        example_gen_pb2.SplitConfig.Split(name="train", hash_buckets=8),
        example_gen_pb2.SplitConfig.Split(name="eval", hash_buckets=2),
    ])
)

example_gen = CsvExampleGen(input_base=DATA_ROOT, output_config=output)
interactive_context.run(example_gen)

In [None]:
statistics_gen = StatisticsGen(
    examples=example_gen.outputs["examples"]
)
interactive_context.run(statistics_gen)

In [None]:
interactive_context.show(statistics_gen.outputs["statistics"])

In [None]:
schema_gen = SchemaGen(
    statistics=statistics_gen.outputs["statistics"]
)
interactive_context.run(schema_gen)

In [None]:
interactive_context.show(schema_gen.outputs["schema"])

In [None]:
example_validator = ExampleValidator(
    statistics=statistics_gen.outputs["statistics"],
    schema=schema_gen.outputs["schema"],
)
interactive_context.run(example_validator)

In [None]:
interactive_context.show(example_validator.outputs["anomalies"])

In [None]:
TRANSFORM_MODULE_FILE = "news_transform.py"

In [None]:
%%writefile {TRANSFORM_MODULE_FILE}
import string
import tensorflow as tf
import tensorflow_transform as tft

LABEL_KEY = "is_real"
FEATURE_KEY = "text"

def transformed_name(key):
    return f"{key}_xf"

def preprocessing_fn(inputs):
    outputs = dict()
    
    outputs[transformed_name(FEATURE_KEY)] = tf.strings.lower(inputs[FEATURE_KEY])
    outputs[transformed_name(LABEL_KEY)] = tf.cast(inputs[LABEL_KEY], tf.int64)
    
    return outputs

In [17]:
transform = Transform(
    examples=example_gen.outputs["examples"],
    schema=schema_gen.outputs["schema"],
    module_file=os.path.abspath(TRANSFORM_MODULE_FILE)
)
interactive_context.run(transform)

KeyboardInterrupt: 

In [None]:
TRAINER_MODULE_FILE = "news_trainer.py"

In [None]:
%%writefile {TRAINER_MODULE_FILE}
import os
import tensorflow as tf
import tensorflow_transform as tft
import tensorflow_hub as hub
from tensorflow.keras import layers
from tfx.components.trainer.fn_args_utils import FnArgs

LABEL_KEY = "is_real"
FEATURE_KEY = "text"
NUM_EPOCHS = 30

def transformed_name(key): 
    return f"{key}_xf"

def gzip_reader_fn(filenames):
    return tf.data.TFRecordDataset(filenames, compression_type="GZIP")

def input_fn(file_pattern, tf_transform_output, num_epochs, batch_size=64)->tf.data.Dataset:
    transform_feature_spec = (
        tf_transform_output.transformed_feature_spec().copy()
    )
    
    dataset = tf.data.experimental.make_batched_features_dataset(
        file_pattern=file_pattern,
        batch_size=batch_size,
        features=transform_feature_spec,
        reader=gzip_reader_fn,
        num_epochs=num_epochs,
        label_key=transformed_name(LABEL_KEY),
    )
    
    return dataset

def model_builder(vectorizer_layer):
    inputs = tf.keras.Input(shape=(1,), name=transformed_name(FEATURE_KEY), dtype=tf.string)

    x = vectorizer_layer(inputs)
    x = layers.Embedding(input_dim=5000, output_dim=128)(x)
    x = layers.LSTM(128)(x)
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dropout(0.5)(x)
    x = layers.Dense(64, activation='relu')(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(6, activation='softmax')(x)
    
    model = tf.keras.Model(inputs=inputs, outputs = outputs)

    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
        loss=tf.keras.losses.BinaryCrossentropy(),
        metrics=[tf.keras.metrics.BinaryAccuracy()],
    )
    
    model.summary()
    
    return model

def _get_serve_tf_example_fn(model, tf_transform_output):
    model.tft_layer = tf_transform_output.transform_features_layer()
    
    @tf.function
    def serve_tf_example_fn(serialized_tf_examples):
        feature_spec = tf_transform_output.raw_feature_spec()
        feature_spec.pop(LABEL_KEY)
        
        parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
        transformed_features = model.tft_layer(parsed_features)
        
        # get predictions using transformed features
        return model(transformed_features)
    
    return serve_tf_example_fn

def run_fn(fn_args: FnArgs):
    log_dir = os.path.join(os.path.dirname(fn_args.serving_model_dir), "logs")
    
    tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, update_freq="batch")
    early_stopping_callback = tf.keras.callbacks.EarlyStopping(
        monitor="val_binary_accuracy",
        mode="max",
        verbose=1,
        patience=10,
    )
    model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
        fn_args.serving_model_dir,
        monitor="val_binary_accuracy",
        mode="max",
        verbose=1,
        save_best_only=True,
    )
    callbacks = [
        tensorboard_callback, 
        early_stopping_callback, 
        model_checkpoint_callback
    ]
    
    tf_transform_output = tft.TFTransformOutput(fn_args.transform_graph_path)
    
    train_set = input_fn(fn_args.train_files, tf_transform_output, NUM_EPOCHS)
    eval_set = input_fn(fn_args.eval_files, tf_transform_output, NUM_EPOCHS)
    
    vectorizer_dataset = train_set.map(lambda f, l: f[transformed_name(FEATURE_KEY)])
    
    vectorizer_layer = layers.TextVectorization(
        max_tokens=5000,
        output_mode="int",
        output_sequence_length=40,
    )
    vectorizer_layer.adapt(vectorizer_dataset)
    
    model = model_builder(vectorizer_layer)
    
    model.fit(
        x=train_set,
        validation_data=eval_set,
        callbacks=callbacks,
        epochs=NUM_EPOCHS,
        verbose=1,
    )
    
    signatures = {
        "serving_default": _get_serve_tf_example_fn(model, tf_transform_output).get_concrete_function(
            tf.TensorSpec(
                shape=[None],
                dtype=tf.string,
                name="examples",
            )
        )
    }
    
    model.save(fn_args.serving_model_dir, save_format="tf", signatures=signatures)

In [None]:
trainer = Trainer(
    module_file=os.path.abspath(TRAINER_MODULE_FILE),
    examples=transform.outputs["transformed_examples"],
    transform_graph=transform.outputs["transform_graph"],
    schema=schema_gen.outputs["schema"],
    train_args=trainer_pb2.TrainArgs(splits=["train"]),
    eval_args=trainer_pb2.EvalArgs(splits=["eval"]),
)
interactive_context.run(trainer)

In [None]:
TUNER_MODULE_FILE = "news_tuner.py"

In [None]:
%%writefile {TUNER_MODULE_FILE}
import keras_tuner as kt
import tensorflow as tf
import tensorflow_transform as tft
from tensorflow.keras import layers
from tfx.components.trainer.fn_args_utils import FnArgs

LABEL_KEY = "emotions"
FEATURE_KEY = "text"
NUM_EPOCHS = 30

def transformed_name(key):
    return f"{key}_xf"

def gzip_reader_fn(filenames):
    return tf.data.TFRecordDataset(filenames, compression_type="GZIP")

def input_fn(file_pattern, tf_transform_output, num_epochs, batch_size=64)->tf.data.Dataset:
    transform_feature_spec = (
        tf_transform_output.transformed_feature_spec().copy()
    )
    
    dataset = tf.data.experimental.make_batched_features_dataset(
        file_pattern=file_pattern,
        batch_size=batch_size,
        features=transform_feature_spec,
        reader=gzip_reader_fn,
        num_epochs=num_epochs,
        label_key=transformed_name(LABEL_KEY),
    )
    
    return dataset

def model_builder(vectorizer_layer):
    inputs = tf.keras.Input(shape=(1,), name=transformed_name(FEATURE_KEY), dtype=tf.string)

    x = vectorizer_layer(inputs)
    x = layers.Embedding(input_dim=5000, output_dim=16)(x)
    x = layers.Bidirectional(layers.LSTM(64))(x)
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dropout(0.5)(x)
    x = layers.Dense(64, activation='relu')(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(6, activation='softmax')(x)
    
    model = tf.keras.Model(inputs=inputs, outputs = outputs)

    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
        loss=tf.keras.losses.CategoricalCrossentropy(),
        metrics=["accuracy"],
    )
    
    model.summary()
    
    return model

def tuner_fn(fn_args: FnArgs):
    tf_transform_output = tft.TFTransformOutput(fn_args.transform_graph_path)
    
    train_set = input_fn(fn_args.train_files, tf_transform_output, NUM_EPOCHS, 128)
    eval_set = input_fn(fn_args.eval_files, tf_transform_output, NUM_EPOCHS, 128)
    
    early_stopping_callback = tf.keras.callbacks.EarlyStopping(
        monitor="val_accuracy",
        mode="max",
        verbose=1,
        patience=10,
    )
    vectorizer_dataset = train_set.map(lambda f, l: f[transformed_name(FEATURE_KEY)])
    
    vectorizer_layer = layers.TextVectorization(
        max_tokens=5000,
        output_mode="int",
        output_sequence_length=20,
    )
    vectorizer_layer.adapt(vectorizer_dataset)
    
    tuner = kt.Hyperband(
        hypermodel=model_builder(vectorizer_layer),
        objective="val_accuracy",
        max_epochs=30,
        factor=3,
        directory=fn_args.working_dir,
        project_name="kt_hyperband",
    )
    
    return TunerFnResult(
        tuner=tuner,
        fit_kwargs={
            "x": train_set,
            "validation_data": eval_set,
            "steps_per_epoch": fn_args.train_steps,
            "validation_steps": fn_args.eval_steps,
            "callbacks": [early_stopping_callback],
        },
    )

In [None]:
tuner = Tuner(
    module_file=os.path.abspath(TUNER_MODULE_FILE),
    examples=transform.outputs["transformed_examples"],
    transform_graph=transform.outputs["transform_graph"],
    schema=schema_gen.outputs["schema"],
    train_args=trainer_pb2.TrainArgs(splits=["train"], num_steps=800),
    eval_args=trainer_pb2.EvalArgs(splits=["eval"], num_steps=400),
)
interactive_context.run(tuner)

In [None]:
model_resolver = Resolver(
    strategy_class=LatestBlessedModelStrategy,
    model=Channel(type=Model),
    model_blessing=Channel(type=ModelBlessing),
).with_id("Latest_blessed_model_resolver")

interactive_context.run(model_resolver)

In [None]:
eval_config = tfma.EvalConfig(
    model_specs=[tfma.ModelSpec(label_key="emotions")],
    slicing_specs=[tfma.SlicingSpec()],
    metrics_specs=[
        tfma.MetricsSpec(metrics=[
            tfma.MetricConfig(class_name="ExampleCount"),
            tfma.MetricConfig(class_name="AUC"),
            tfma.MetricConfig(class_name="TruePositives"),
            tfma.MetricConfig(class_name="FalsePositives"),
            tfma.MetricConfig(class_name="TrueNegatives"),
            tfma.MetricConfig(class_name="FalseNegatives"),
            tfma.MetricConfig(class_name="Accuracy", threshold=tfma.MetricThreshold(
                value_threshold=tfma.GenericValueThreshold(
                    lower_bound={"value": .6},
                ),
                change_threshold=tfma.GenericChangeThreshold(
                    direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                    absolute={"value": 0.0001},
                ),
            )),
        ])
    ]
)

In [None]:
evaluator = Evaluator(
    examples=example_gen.outputs["examples"],
    model=trainer.outputs["model"],
    baseline_model=model_resolver.outputs["model"],
    eval_config=eval_config,
)
interactive_context.run(evaluator)

In [None]:
eval_result = evaluator.outputs["evaluation"].get()[0].uri
tfma_result = tfma.load_eval_result(eval_result)
tfma.addons.fairness.view.widget_view.render_fainess_indicator(tfma_result)

In [None]:
pusher = Pusher(
    model=trainer.outputs["model"],
    model_blessing=evaluator.outputs["blessing"],
    push_destination=pusher_pb2.PushDestination(
        filesystem=pusher_pb2.PushDestination.Filesystem(
            base_directory=SERVING_MODEL_DIR,
        )
    )
)

interactive_context.run(pusher)