In [None]:
import kfp
from kfp import dsl
from kfp.dsl import Output, Metrics, HTML, Dataset

from typing import NamedTuple


@dsl.component(
    base_image="python:3.9",
    packages_to_install=["requests==2.32.3", "pandas==2.2.3"],
)
def call_model_inference(
    model_name: str,
    input_data_json: str,
    inference_result: Output[Dataset],
):
    import requests
    import pandas as pd
    import json

    try:
        parsed_input = json.loads(input_data_json)
        input_data = parsed_input.get("data", None)
        if input_data is None:
            raise ValueError("'data' field missing in input_data_json.")
    except Exception as e:
        raise ValueError(f"Invalid input_data_json: {e}")

    payload = {
        "inputs": [
            {
                "name": "input-0",
                "shape": [len(input_data), len(input_data[0])],
                "datatype": "FP32",
                "data": input_data,
            }
        ]
    }

    model_url = f"http://{model_name}.kubeflow-user-example-com.svc.cluster.local/v2/models/{model_name}/infer"
    print("Sending inference request to:", model_url)
    print("Payload:")
    print(json.dumps(payload, indent=2))

    response = requests.post(model_url, json=payload)
    print(f"Response status code: {response.status_code}")
    print(f"Response content: {response.text}")
    response.raise_for_status()

    result_json = response.json()
    outputs = result_json.get("outputs", [])

    if outputs:
        output_data = outputs[0].get("data", [])
        df = pd.DataFrame(input_data)
        df["prediction"] = output_data
        df.to_csv(inference_result.path, index=False)
        inference_result.metadata["source"] = "KServe inference"
        inference_result.metadata["format"] = "csv"
        print(f"Inference results saved to: {inference_result.path}")
    else:
        raise ValueError("No 'outputs' found in inference response.")

@dsl.component(base_image="python:3.10")
def show_broker_config(
    kafka_config: str = "kafka-headless.message-broker.svc.cluster.local:9092",
    topic: str = "output_1",
    model_name: str = "test-model-b",
):
    print("Kafka broker config:")
    print(f" - kafka_config: {kafka_config}")
    print(f" - topic: {topic}")
    print(f" - model_name: {model_name}")


@dsl.component(base_image="python:3.10", packages_to_install=["requests"])
def enqueue_inference(
    kafka_config: str = "kafka-headless.message-broker.svc.cluster.local:9092",
    topic: str = "output_1",
    model_name: str = "test-model-b",
) -> str:
    import requests
    import json

    future_data_point = {
        "co_level": 17.783347,
        "latitude": 41.3851,
        "longitude": 2.1734,
        "hour_of_day": 1.0,
    }

    input_data = [
        [
            future_data_point["co_level"],
            future_data_point["latitude"],
            future_data_point["longitude"],
            future_data_point["hour_of_day"],
        ]
    ]

    infer_url = f"http://{model_name}.kubeflow-user-example-com.svc.cluster.local/v2/models/{model_name}/infer"

    msg = {
        "output_topic": topic,
        "http_request": {
            "method": "POST",
            "url": infer_url,
            "json": {
                "id": "test-id",
                "parameters": {"explain": True},
                "inputs": [
                    {
                        "name": "input-0",
                        "shape": [len(input_data), 4],
                        "datatype": "FP32",
                        "data": input_data,
                    }
                ],
            },
        },
    }

    print(f"Enqueuing inference message:\n{json.dumps(msg, indent=2)}")

    response = requests.post(
        "http://async-bridge.message-broker.svc.cluster.local/api/v1/async-bridge/enqueue/ab-topic",
        json=msg,
    )
    response_json = response.json()

    print("Response from enqueue:", response_json)
    result = {"task_id": response_json["task_id"], "sent_data": msg}

    return json.dumps(result)


@dsl.component(
    base_image="python:3.10", packages_to_install=["confluent-kafka==2.10.1"]
)
def consume_kafka(
    task_info: str,
    kafka_config: str = "kafka-headless.message-broker.svc.cluster.local:9092",
    topic: str = "output_1",
) -> str:
    from confluent_kafka import Consumer
    import json

    conf = {
        "bootstrap.servers": kafka_config,
        "group.id": "my-consumer-group",
        "auto.offset.reset": "earliest",
    }

    print("Creating Kafka consumer...")
    consumer = Consumer(conf)
    print("Kafka consumer created successfully.")

    consumer.subscribe([topic])
    print(f"Listening to topic '{topic}'...")

    payload_result = None

    def process_message(message):
        nonlocal payload_result
        try:
            payload = json.loads(message.value().decode("utf-8"))
            print(f"Received payload: {json.dumps(payload, indent=4)}")
            payload_result = payload
            consumer.close()
        except Exception as e:
            print(f"Error processing message: {e}")

    try:
        finish = False
        while not finish:
            msg = consumer.poll(1.0)
            if msg is None:
                print("No message received, continuing to poll...")
            else:
                process_message(msg)
                finish = True
    finally:
        consumer.close()

    return json.dumps(payload_result)

    
@dsl.component(
    base_image='python:3.10',
    packages_to_install=['minio', 'numpy', 'keras', 'tensorflow']
)
def reshape_data():
    """
    Reshape the data for model building
    """
    print("reshaping data")
    
    from minio import Minio
    import numpy as np
    import keras
    import tensorflow

    minio_client = Minio(
        "10.244.0.40:9000",
        access_key="minio",
        secret_key="minio123",
        secure=False
    )
    minio_bucket = "mlpipeline"

    # Load MNIST dataset directly from Keras
    (x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()

    # save to numpy file, store in Minio (in your original context, you would store in Minio here)
    np.save("/tmp/x_train.npy", x_train)
    np.save("/tmp/y_train.npy", y_train)
    np.save("/tmp/x_test.npy", x_test)
    np.save("/tmp/y_test.npy", y_test)
    
    try:
        minio_client.fput_object(minio_bucket, "x_train.npy", "/tmp/x_train.npy")
        minio_client.fput_object(minio_bucket, "y_train.npy", "/tmp/y_train.npy")
        minio_client.fput_object(minio_bucket, "x_test.npy", "/tmp/x_test.npy")
        minio_client.fput_object(minio_bucket, "y_test.npy", "/tmp/y_test.npy")
    except Exception as e:
        print(f"Datasets already exist: {e}")
    
    # load data from minio
    minio_client.fget_object(minio_bucket,"x_train.npy","/tmp/x_train.npy")
    x_train = np.load("/tmp/x_train.npy")
    
    minio_client.fget_object(minio_bucket,"x_test.npy","/tmp/x_test.npy")
    x_test = np.load("/tmp/x_test.npy")
    
    # reshaping the data
    # reshaping pixels in a 28x28px image with greyscale, canal = 1. This is needed for the Keras API
    x_train = x_train.reshape(-1,28,28,1)
    x_test = x_test.reshape(-1,28,28,1)

    # normalizing the data
    # each pixel has a value between 0-255. Here we divide by 255, to get values from 0-1
    x_train = x_train / 255
    x_test = x_test / 255
    
    # save data from minio
    np.save("/tmp/x_train.npy",x_train)
    minio_client.fput_object(minio_bucket,"x_train.npy","/tmp/x_train.npy")
    
    np.save("/tmp/x_test.npy",x_test)
    minio_client.fput_object(minio_bucket,"x_test.npy","/tmp/x_test.npy")

@dsl.component(
    base_image='tensorflow/tensorflow:2.13.0',
    packages_to_install=['minio', 'pandas', 'numpy']
)
def model_building(
    metrics: Output[Metrics],
    ui_metadata: Output[HTML],
    no_epochs:int = 1,
    optimizer: str = "adam"):
    """
    Build the model with Keras API
    Export model parameters
    """
    from tensorflow import keras
    import tensorflow as tf
    from minio import Minio
    import numpy as np
    import pandas as pd
    import json
    
    minio_client = Minio(
        "10.244.0.40:9000",
        access_key="minio",
        secret_key="minio123",
        secure=False
    )
    minio_bucket = "mlpipeline"
    
    model = keras.models.Sequential()
    model.add(keras.layers.Conv2D(64, (3, 3), activation='relu', input_shape=(28,28,1)))
    model.add(keras.layers.MaxPool2D(2, 2))

    model.add(keras.layers.Flatten())
    model.add(keras.layers.Dense(64, activation='relu'))

    model.add(keras.layers.Dense(32, activation='relu'))

    model.add(keras.layers.Dense(10, activation='softmax')) #output are 10 classes, numbers from 0-9

    #show model summary - how it looks
    stringlist = []
    model.summary(print_fn=lambda x: stringlist.append(x))
    metric_model_summary = "\n".join(stringlist)
    
    #compile the model - we want to have a binary outcome
    model.compile(optimizer=optimizer,
              loss="sparse_categorical_crossentropy",
              metrics=['accuracy'])
    
    minio_client.fget_object(minio_bucket,"x_train.npy","/tmp/x_train.npy")
    x_train = np.load("/tmp/x_train.npy")
    
    minio_client.fget_object(minio_bucket,"y_train.npy","/tmp/y_train.npy")
    y_train = np.load("/tmp/y_train.npy")
    
    #fit the model and return the history while training
    history = model.fit(
      x=x_train,
      y=y_train,
      epochs=no_epochs,
      batch_size=20,
    )
    
    minio_client.fget_object(minio_bucket,"x_test.npy","/tmp/x_test.npy")
    x_test = np.load("/tmp/x_test.npy")
    
    minio_client.fget_object(minio_bucket,"y_test.npy","/tmp/y_test.npy")
    y_test = np.load("/tmp/y_test.npy")
    

    # Test the model against the test dataset
    # Returns the loss value & metrics values for the model in test mode.
    model_loss, model_accuracy = model.evaluate(x=x_test,y=y_test)
    
    # Confusion Matrix

    # Generates output predictions for the input samples.
    test_predictions = model.predict(x=x_test)

    # Returns the indices of the maximum values along an axis.
    test_predictions = np.argmax(test_predictions,axis=1) # the prediction outputs 10 values, we take the index number of the highest value, which is the prediction of the model

    # generate confusion matrix
    confusion_matrix = tf.math.confusion_matrix(labels=y_test,predictions=test_predictions)
    confusion_matrix = confusion_matrix.numpy()
    vocab = list(np.unique(y_test))
    data = []
    for target_index, target_row in enumerate(confusion_matrix):
        for predicted_index, count in enumerate(target_row):
            data.append((vocab[target_index], vocab[predicted_index], count))

    df_cm = pd.DataFrame(data, columns=['target', 'predicted', 'count'])
    cm_csv = df_cm.to_csv(header=False, index=False)
    
    metadata_dict = {
        "outputs": [
            {
                "type": "confusion_matrix",
                "format": "csv",
                "schema": [
                    {'name': 'target', 'type': 'CATEGORY'},
                    {'name': 'predicted', 'type': 'CATEGORY'},
                    {'name': 'count', 'type': 'NUMBER'},
                  ],
                "target_col" : "actual",
                "predicted_col" : "predicted",
                "source": cm_csv,
                "storage": "inline",
                "labels": [0,1,2,3,4,5,6,7,8,9]
            },
            {
                'storage': 'inline',
                'source': '''# Model Overview
## Model Summary

```
{}
```

## Model Performance

**Accuracy**: {}
**Loss**: {}

'''.format(metric_model_summary,model_accuracy,model_loss),
                'type': 'markdown',
            }
        ]
    }
    
    metrics_dict = {
      'metrics': [{
          'name': 'model_accuracy',
          'numberValue':  float(model_accuracy),
          'format' : "PERCENTAGE"
        },{
          'name': 'model_loss',
          'numberValue':  float(model_loss),
          'format' : "PERCENTAGE"
        }]}
    
    ### Save model to minIO
    
    keras.models.save_model(model,"/tmp/detect-digits")
    
    from minio import Minio
    import os

    minio_client = Minio(
            "10.244.0.40:9000",
            access_key="minio",
            secret_key="minio123",
            secure=False
        )
    minio_bucket = "mlpipeline"


    import glob

    def upload_local_directory_to_minio(local_path, bucket_name, minio_path):
        assert os.path.isdir(local_path)

        for local_file in glob.glob(local_path + '/**'):
            local_file = local_file.replace(os.sep, "/")
            if not os.path.isfile(local_file):
                upload_local_directory_to_minio(
                    local_file, bucket_name, minio_path + "/" + os.path.basename(local_file))
            else:
                remote_path = os.path.join(
                    minio_path, local_file[1 + len(local_path):])
                remote_path = remote_path.replace(
                    os.sep, "/")
                minio_client.fput_object(bucket_name, remote_path, local_file)

    upload_local_directory_to_minio("/tmp/detect-digits",minio_bucket,"models/detect-digits/1/")
    
    print("Saved model to minIO")
    
    with open(metrics.path, "w") as f:
        json.dump(metrics_dict, f)
    
    with open(ui_metadata.path, "w") as f:
        json.dump(metadata_dict, f)



@dsl.component(    
    base_image='python:3.10',
    packages_to_install=['kubernetes', 'kserve']
)
def model_serving():
    """
    Create kserve instance
    """
    from kubernetes import client 
    from kserve import KServeClient
    from kserve import constants
    from kserve import utils
    from kserve import V1beta1InferenceService
    from kserve import V1beta1InferenceServiceSpec
    from kserve import V1beta1PredictorSpec
    from kserve import V1beta1TFServingSpec
    from datetime import datetime

    namespace = utils.get_default_target_namespace()

    name='digits-recognizer-event'
    kserve_version='v1beta1'
    api_version = constants.KSERVE_GROUP + '/' + kserve_version

    isvc = V1beta1InferenceService(api_version=api_version,
                                   kind="InferenceService",
                                   metadata=client.V1ObjectMeta(
                                       name=name, namespace=namespace, annotations={'sidecar.istio.io/inject':'false'}),
                                   spec=V1beta1InferenceServiceSpec(
                                   predictor=V1beta1PredictorSpec(
                                       service_account_name="sa-minio-kserve",
                                       tensorflow=(V1beta1TFServingSpec(
                                           storage_uri="s3://mlpipeline/models/detect-digits/"))))
    )

    KServe = KServeClient()
    KServe.create(isvc)




@dsl.pipeline(
    name='event-susbcription-digits-recognizer-pipeline',
    description='Detect digits'
)
def event_subscription_pipeline(no_epochs:int, 
                optimizer:str,
                kafka_config: str = "kafka-headless.message-broker.svc.cluster.local:9092",
                topic: str = "output_1",
                model_name: str = "digits-reconizer",
                input_data_json: str = '{"data": [[6.8, 2.8, 4.8, 1.4], [6.0, 3.4, 4.5, 1.6]]}'
                ):

    comp_config_op = show_broker_config(
            kafka_config=kafka_config, topic=topic, model_name=model_name
    )

    comp_enqueue_op = enqueue_inference(
            kafka_config=kafka_config, topic=topic, model_name=model_name
    )

    comp_consume_kafka = consume_kafka(
            task_info=comp_enqueue_op.output, kafka_config=kafka_config, topic=topic
    )

    comp_reshape_data = reshape_data()
    comp_model_building = model_building(no_epochs=no_epochs,optimizer=optimizer)
    comp_model_serving = model_serving()

    step1 = comp_config_op
    
    step2 = comp_enqueue_op
    step2.after(step1)
    
    step3 = comp_consume_kafka
    step3.after(step2)
    
    step4 = comp_reshape_data
    step4.after(step3)
    
    step5 = comp_model_building
    step5.after(step4)
    
    step6 = comp_model_serving
    step6.after(step5)


if __name__ == "__main__":
    client = kfp.Client()

    arguments = {
        "no_epochs": 1,
        "optimizer": "adam"
    }

    run_directly = 1
    
    if (run_directly == 1):
        client.create_run_from_pipeline_func(event_subscription_pipeline,arguments=arguments,experiment_name="event",run_name="digits-recognizer-event-subscription")
    else:
        kfp.compiler.Compiler().compile(pipeline_func=event_subscription_pipeline,package_path='event_subscription_pipeline.yaml')
        client.upload_pipeline_version(pipeline_package_path='event_subscription_pipeline.yaml',pipeline_version_name="0.4",pipeline_name='event-susbcription-digits-recognizer-pipeline',description="just for testing")