# Semantically Equivalent Sentences

Determine whether two sentences are sementically equivalent - from data to deployed model.
The training data is a corpus of sentence pairs and, for each pair, a label indicating whether the sentences in the pair are semantically equivalent.
The data stems from the "Microsoft Research Paraphrase Corpus", which was extracted from online news sources and subsequently labeled by humans.


This notebook was adapted from the quick start guide of 🤗 Datasets [2].

## Author
- Sebastian Lehrig <sebastian.lehrig1@ibm.com>

## License
Apache-2.0 License

## References
[1] Original data: https://www.microsoft.com/en-us/download/details.aspx?id=52398

[2] Original code snippets from 🤗 Datasets: https://huggingface.co/docs/datasets/quickstart#nlp

[3] 🤗 Dataset: https://huggingface.co/datasets/glue/viewer/mrpc

## 0.) Imports & Constants

In [None]:
import json
import kfp
from kfp.components import InputPath, OutputPath
import kfp.dsl as dsl
from kfp.dsl import PipelineConf, data_passing_methods
from kubernetes.client.models import V1Volume, V1PersistentVolumeClaimVolumeSource
import numpy as np
import os
import requests
from typing import List, NamedTuple

%load_ext lab_black

In [None]:
# Environment-specific configurations
#
# %env CLUSTER_CONFIGURATION_SECRET remote-power-cluster
# %env CLUSTER_CONFIGURATION_SECRET remote-x86-cluster
# %env CLUSTER_CONFIGURATION_SECRET remote-x86-telekom-cluster
# %env TRAINING_GPUS 1
# %env TRAINING_NODE_SELECTOR nvidia.com/gpu.product: "Tesla-V100-SXM2-32GB"
# %env TRAINING_NODE_SELECTOR kubernetes.io/hostname: "p114worker04.b2s001.pbm.ihost.com"
# %env TRAINING_NODE_SELECTOR kubernetes.io/hostname: node2
#
# Reset:
# del os.environ['CLUSTER_CONFIGURATION_SECRET']
# del os.environ['TRAINING_GPUS']
# del os.environ['TRAINING_NODE_SELECTOR']

In [None]:
COMPONENT_CATALOG_FOLDER = f"{os.getenv('HOME')}/components"
COMPONENT_CATALOG_GIT = "https://github.com/lehrig/kubeflow-ppc64le-components.git"
COMPONENT_CATALOG_RELEASE = "main"

LOAD_DATASET_COMPONENT = (
    f"{COMPONENT_CATALOG_FOLDER}/data-collection/load-dataset/component.yaml"
)
TRAIN_MODEL_COMPONENT = (
    f"{COMPONENT_CATALOG_FOLDER}/model-building/train-model-job/component.yaml"
)
PLOT_CONFUSION_MATRIX_COMPONENT = (
    f"{COMPONENT_CATALOG_FOLDER}/model-building/plot-confusion-matrix/component.yaml"
)
CONVERT_MODEL_TO_ONNX_COMPONENT = (
    f"{COMPONENT_CATALOG_FOLDER}/model-building/convert-to-onnx/component.yaml"
)
UPLOAD_MODEL_COMPONENT = (
    f"{COMPONENT_CATALOG_FOLDER}/model-building/upload-model/component.yaml"
)
DEPLOY_MODEL_WITH_KSERVE_COMPONENT = f"{COMPONENT_CATALOG_FOLDER}/model-deployment/deploy-model-with-kserve/component.yaml"

BASE_IMAGE = "quay.io/ibm/kubeflow-notebook-image-ppc64le:latest"

ARGUMENTS = {
    "blackboard": "artefacts",
    "dataset_url": "glue",
    "dataset_configuration": "mrpc",
    "dataset_label_columns": ["label"],
    "model_name": "equal-sentences",
    "cluster_configuration_secret": os.getenv(
        "CLUSTER_CONFIGURATION_SECRET", default=""
    ),
    "training_gpus": os.getenv("TRAINING_GPUS", default="1"),
    "training_node_selector": os.getenv("TRAINING_NODE_SELECTOR", default=""),
}
MODEL_NAME = ARGUMENTS["model_name"]

with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace") as f:
    NAMESPACE = f.read()

ARGUMENTS

## 1.) Let's start with creating a client object for interaction

In [None]:
client = kfp.Client()

## 2.) The main part consists of defining the end-to-end workflow functions and create components from them
### 2.0) Load component catalog

In [None]:
!git clone --branch $COMPONENT_CATALOG_RELEASE $COMPONENT_CATALOG_GIT $COMPONENT_CATALOG_FOLDER

### 2.1) Load dataset (by reusing a Kubeflow component)

In [None]:
load_dataset_comp = kfp.components.load_component_from_file(LOAD_DATASET_COMPONENT)

### 2.2) Preprocess data (tokenize etc.)

In [None]:
def preprocess_dataset(
    dataset_dir: InputPath(str),
    train_dataset_dir: OutputPath(str),
    validation_dataset_dir: OutputPath(str),
    test_dataset_dir: OutputPath(str),
    batch_size: int = 200,
):
    """Tokenizes dataset and preprares it to be processed with BertForSequenceClassification. Saves result into `prep_dataset_dir`."""

    from datasets import load_from_disk
    import os
    from transformers import AutoTokenizer, DataCollatorWithPadding

    print(f"Loading input dataset from {dataset_dir}...")
    dataset = load_from_disk(dataset_dir)

    print("Printing first training dataset entry (unprocessed):")
    print(dataset["train"][0])

    # load a pretrained BERT model and its corresponding tokenizer from the
    # Huggingface Transformers library.
    tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")

    # tokenize the dataset & truncate and pad the text into tidy rectangular
    # tensors. The tokenizer generates three new columns in the dataset:
    # input_ids, token_type_ids, and an attention_mask.
    # These are the model inputs.
    def encode(examples):
        return tokenizer(
            examples["sentence1"],
            examples["sentence2"],
            truncation=True,
            padding="max_length",
        )

    prep_dataset = dataset.map(
        encode, batched=True, batch_size=batch_size, num_proc=2, keep_in_memory=True
    )

    print("Printing first training dataset entry (tokenized):")
    print(prep_dataset["train"][0])

    # Rename the label column to labels, which is the expected input name in
    # BertForSequenceClassification:
    prep_dataset = prep_dataset.map(
        lambda examples: {"labels": examples["label"]},
        batched=True,
        batch_size=batch_size,
        num_proc=2,
        keep_in_memory=True,
    )

    def save_as_tfdataset(
        dataset, columns, label_columns, data_collator, directory, shuffle
    ):
        import tensorflow as tf

        tf_dataset = dataset.to_tf_dataset(
            columns=columns,
            label_cols=label_columns,
            shuffle=shuffle,
            batch_size=batch_size,
            collate_fn=data_collator,
        )

        print(f"Saving pre-processed dataset to '{directory}'...")
        if not os.path.exists(directory):
            os.makedirs(directory)
        tf.data.experimental.save(tf_dataset, directory)

        print(f"Pre-processed dataset saved. Contents of '{directory}':")
        print(os.listdir(directory))

    data_collator = DataCollatorWithPadding(tokenizer=tokenizer, return_tensors="tf")
    columns = ["input_ids", "token_type_ids", "attention_mask"]
    label_columns = ["labels"]
    save_as_tfdataset(
        prep_dataset["train"],
        columns,
        label_columns,
        data_collator,
        train_dataset_dir,
        True,
    )
    save_as_tfdataset(
        prep_dataset["validation"],
        columns,
        label_columns,
        data_collator,
        validation_dataset_dir,
        False,
    )
    save_as_tfdataset(
        prep_dataset["test"],
        columns,
        label_columns,
        data_collator,
        test_dataset_dir,
        False,
    )

    print("Finished.")


preprocess_dataset_comp = kfp.components.create_component_from_func(
    func=preprocess_dataset, base_image=BASE_IMAGE
)

### 2.3) Train the model

In [None]:
def train_model(
    train_dataset_dir: InputPath(str),
    validation_dataset_dir: InputPath(str),
    model_dir: OutputPath(str),
    epochs: int = 8,
    batch_size: int = 1,
):
    """Trains CNN model. Once trained, the model is persisted to `model_dir`."""

    import os
    import tensorflow as tf
    from tensorflow.keras.callbacks import (
        EarlyStopping,
        ModelCheckpoint,
        ReduceLROnPlateau,
        TensorBoard,
    )
    import time
    from transformers import TFAutoModelForSequenceClassification

    def load_datasets():
        train_dataset = tf.data.experimental.load(train_dataset_dir)
        validation_dataset = tf.data.experimental.load(validation_dataset_dir)
        return (train_dataset, validation_dataset)

    def build_model():
        return TFAutoModelForSequenceClassification.from_pretrained("bert-base-cased")

    print("Loading datasets...")
    train_dataset, validation_dataset = load_datasets()

    print("Building model...")
    model = build_model()
    print(model.summary())

    print("Compiling model...")
    model.compile(
        loss=tf.keras.losses.SparseCategoricalCrossentropy(
            reduction=tf.keras.losses.Reduction.NONE, from_logits=True
        ),
        optimizer="adam",
        metrics=["accuracy"],
    )

    print("Initializing training callbacks...")
    callbacks = [
        EarlyStopping(monitor="val_loss", patience=20, verbose=0, mode="min"),
        ModelCheckpoint(
            f"{model_dir}/best_model.keras",
            monitor="val_loss",
            save_best_only=True,
            mode="min",
        ),
        ReduceLROnPlateau(
            monitor="val_loss",
            factor=0.1,
            patience=7,
            verbose=1,
            min_delta=0.0001,
            mode="min",
        ),
        TensorBoard(log_dir="./logs", histogram_freq=1),
    ]
    # TODO
    # Connect TensorBoard to s3://minio-service-kubeflow.apps.b2s001.pbm.ihost.com/mlpipeline/tensorboard
    # Will need tensorflow-io[tensorflow] package for that
    # See: https://github.com/tensorflow/tensorflow/issues/40302#issuecomment-918917472

    print("Starting model training...")
    start = time.time()
    hist = model.fit(
        train_dataset,
        validation_data=validation_dataset,
        epochs=epochs,
        steps_per_epoch=100,
        callbacks=callbacks,
    )
    print("\n\nTraining took ", time.time() - start, "seconds")

    print("Model train history:")
    print(hist.history)

    print(f"Saving model to: {model_dir}")
    if not os.path.exists(model_dir):
        os.makedirs(model_dir)
    model.save(model_dir)
    print(f"Model saved to: {model_dir}")

    print("Finished.")


train_specification = kfp.components.func_to_component_text(func=train_model)

In [None]:
train_model_comp = kfp.components.load_component_from_file(TRAIN_MODEL_COMPONENT)

### 2.4) Evaluate model with validation data

In [None]:
def evaluate_model(
    test_dataset_dir: InputPath(str), model_dir: InputPath(str), batch_size: int = 20
) -> NamedTuple("EvaluationOutput", [("mlpipeline_metrics", "Metrics")]):
    """Loads a saved model from file and uses a pre-downloaded dataset for evaluation.
    Model metrics are persisted to `{metrics_path}` for Kubeflow Pipelines metadata."""

    from collections import namedtuple
    import json
    import tensorflow as tf

    test_dataset = tf.data.experimental.load(test_dataset_dir)
    model = tf.keras.models.load_model(model_dir)
    (loss, accuracy) = model.evaluate(test_dataset)

    print((loss, accuracy))

    metrics = {
        "metrics": [
            {"name": "loss", "numberValue": str(loss), "format": "PERCENTAGE"},
            {"name": "accuracy", "numberValue": str(accuracy), "format": "PERCENTAGE"},
        ]
    }

    out_tuple = namedtuple("EvaluationOutput", ["mlpipeline_metrics"])

    return out_tuple(json.dumps(metrics))


evaluate_model_comp = kfp.components.create_component_from_func(
    func=evaluate_model, base_image=BASE_IMAGE
)

### 2.5) Create confusion matrix (by reusing a Kubeflow component)

In [None]:
plot_confusion_matrix_comp = kfp.components.load_component_from_file(
    PLOT_CONFUSION_MATRIX_COMPONENT
)

### 2.6) Convert model to ONNX (by reusing a Kubeflow component)

In [None]:
convert_model_to_onnx_comp = kfp.components.load_component_from_file(
    CONVERT_MODEL_TO_ONNX_COMPONENT
)

### 2.7) Upload model to MinIO artifact store (by reusing a Kubeflow component)

In [None]:
upload_model_comp = kfp.components.load_component_from_file(UPLOAD_MODEL_COMPONENT)

### 2.8) Deploy the model using KServe (by reusing a Kubeflow component)

In [None]:
deploy_model_with_kserve_comp = kfp.components.load_component_from_file(
    DEPLOY_MODEL_WITH_KSERVE_COMPONENT
)

## 3.) Create the actual pipeline by combining the components

In [None]:
@dsl.pipeline(
    name="Semantically Equal Sentences",
    description="An example pipeline that performs a an NLP task (determining whether two sentences are semantically equivalent)",
)
def semantic_equivalence_pipeline(
    blackboard: str,
    dataset_url: str,
    dataset_configuration: str,
    dataset_label_columns: List[str],
    model_name: str,
    cluster_configuration_secret: str,
    training_gpus: int,
    training_node_selector: str,
):
    create_blackboard = dsl.VolumeOp(
        name="Create Artefacts Blackboard",
        resource_name=blackboard,
        modes=dsl.VOLUME_MODE_RWO,
        size="4Gi",
        set_owner_reference=True,
    )

    load_dataset_task = load_dataset_comp(
        path=dataset_url,
        configuration=dataset_configuration,
        label_columns=dataset_label_columns,
    )
    load_dataset_task.after(create_blackboard)

    preprocess_dataset_task = preprocess_dataset_comp(
        dataset_dir=load_dataset_task.outputs["dataset_dir"],
    )

    # InputPath and OutputPath like "prep_dataset_dir" & "model_dir":
    # Use name of parameters of train component on right-hand side.
    train_parameters = {
        "train_dataset_dir": "train_dataset_dir",
        "validation_dataset_dir": "validation_dataset_dir",
        "model_dir": "model_dir",
    }

    train_model_task = train_model_comp(
        preprocess_dataset_task.outputs["train_dataset_dir"],
        preprocess_dataset_task.outputs["validation_dataset_dir"],
        train_specification,
        train_parameters,
        model_name=model_name,
        gpus=training_gpus,
        node_selector=training_node_selector,
        cluster_configuration_secret=cluster_configuration_secret,
    )

    evaluate_model_comp(
        preprocess_dataset_task.outputs["test_dataset_dir"],
        train_model_task.outputs["model_dir"],
    )

    plot_confusion_matrix_comp(
        input_columns=["input_ids", "token_type_ids", "attention_mask"],
        label_columns=load_dataset_task.outputs["labels"],
        test_dataset_dir=preprocess_dataset_task.outputs["test_dataset_dir"],
        model_dir=train_model_task.outputs["model_dir"],
    )

    convert_model_to_onnx_task = convert_model_to_onnx_comp(
        train_model_task.outputs["model_dir"]
    )

    upload_model_task = upload_model_comp(
        convert_model_to_onnx_task.outputs["onnx_model_dir"], model_name=model_name
    )

    deploy_model_with_kserve_task = deploy_model_with_kserve_comp(model_name=model_name)

    deploy_model_with_kserve_task.after(upload_model_task)

## 4.) Run the pipeline within an experiment
Create a pipeline run, using the client you initialized in a prior step.

In [None]:
# See: https://www.kubeflow.org/docs/components/pipelines/overview/caching/#managing-caching-staleness
def disable_cache_transformer(op):
    if isinstance(op, dsl.ContainerOp):
        op.execution_options.caching_strategy.max_cache_staleness = "P0D"
    else:
        op.add_pod_annotation(
            name="pipelines.kubeflow.org/max_cache_staleness", value="P0D"
        )
    return op


pipeline_conf = PipelineConf()
pipeline_conf.add_op_transformer(disable_cache_transformer)
pipeline_conf.data_passing_method = data_passing_methods.KubernetesVolume(
    volume=V1Volume(
        name=ARGUMENTS["blackboard"],
        persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(
            "{{workflow.name}}-%s" % ARGUMENTS["blackboard"]
        ),
    ),
    path_prefix=f'{ARGUMENTS["blackboard"]}/',
)

client.create_run_from_pipeline_func(
    semantic_equivalence_pipeline,
    arguments=ARGUMENTS,
    namespace=NAMESPACE,
    pipeline_conf=pipeline_conf,
)

## 5.) Test model deployment
See API documentation: https://github.com/kserve/kserve/blob/master/docs/predict-api/v2/required_api.md

### 5.1) Check model endpoint availability

In [None]:
HOST = MODEL_NAME + "-predictor-default." + NAMESPACE
HEADERS = {'Host': HOST}
MODEL_ENDPOINT = "http://" + MODEL_NAME + "-predictor-default/v2/models/" + MODEL_NAME

res = requests.get(MODEL_ENDPOINT, headers=HEADERS)
response = json.loads(res.text)
response

Note you can also do this:
```curl -H "Host: $HOST" $MODEL_ENDPOINT```

### 5.2) Get example sentences

In [None]:
SENTENCE1 = "Kubeflow on IBM Power rocks!"
SENTENCE2 = "Empowering the most efficient & dependable MLOps platforms in the world via Kubeflow on Power!"

example = {"sentence1": SENTENCE1, "sentence2": SENTENCE2}

In [None]:
raw_inputs = [
    "I've been waiting for a HuggingFace course my whole life.", 
    "I hate this so much!",
]
inputs = tokenizer(raw_inputs, padding=True, truncation=True, return_tensors="tf")
print(inputs)

### 5.3) Encode example sentences

In [None]:
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")


def encode(example):
    return tokenizer(
        example["sentence1"],
        example["sentence2"],
        truncation=True,
        padding="max_length",
    )


encoded_example = encode(example)
encoded_example

In [None]:
list(encoded_example.data)

### 5.3) Score example sentences against model 

In [None]:
PREDICT_ENDPOINT = MODEL_ENDPOINT + "/infer"

payload = {
  "inputs": [{
      "name": "input_ids/input_ids",
      "datatype": "INT32",
      "shape": [1, 5],
      "data": encoded_example
    }
  ]
}

res = requests.post(PREDICT_ENDPOINT, headers=HEADERS, data=json.dumps(payload))
response = json.loads(res.text)
response