In [None]:
import subprocess
import kfp
import kfp.client
from kfp import dsl
from kfp import compiler
from kfp import kubernetes


PIPELINE_NAME = "distance-prediction"

In [None]:
@dsl.component(
    base_image="image-registry.openshift-image-registry.svc:5000/redhat-ods-applications/runtime-tensorflow:tensorflow",
    packages_to_install=["psycopg2", "onnx==1.17.0", "onnxruntime==1.19.2", "tf2onnx==1.16.1"]
)
def train_model(fresh_run: str, output_path: dsl.OutputPath(bytes)):
    from psycopg2 import connect
    from os import getenv
    from pandas import read_sql_query

    print(fresh_run)
    try:
        conn = connect(dbname=getenv('DBNAME'), user=getenv('USER'), host=getenv('HOST'), password=getenv('PASSWORD'))
    except Exception as e:
        print("I am unable to connect to the database")
        print(e)
        raise e
    
    sql_query = "SELECT * FROM distance.approaching_vehicle WHERE EVENT_TIMESTAMP < NOW()"
    data = read_sql_query(sql_query, con=conn)
    conn.close()

    print(data.head())

    from numpy import random
    from sklearn.model_selection import train_test_split
    
    random.default_rng(seed=513421)
    
    train_columns = [
        "other_bottom",
        "other_left",
        "other_right",
        "other_top",
        "other_speed",
        "your_speed"
    ]
    prediction_column = "brake_amount"
    
    x = data[train_columns].values
    y = data[prediction_column].values
    
    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2)
    print(x_train.shape, x_test.shape)

    from keras.models import Sequential
    from keras.layers import Dense, Dropout, BatchNormalization, Activation
    
    model = Sequential()
    model.add(Dense(32, activation='relu', input_dim=len(train_columns)))
    model.add(Dropout(0.2))
    model.add(Dense(32))
    model.add(BatchNormalization())
    model.add(Activation('relu'))
    model.add(Dropout(0.2))
    model.add(Dense(32))
    model.add(BatchNormalization())
    model.add(Activation('relu'))
    model.add(Dropout(0.2))
    model.add(Dense(1, activation='sigmoid'))
    
    model.compile(
        optimizer='adam',
        loss='binary_crossentropy',
        metrics=['accuracy']
    )
    
    # Train the model and get performance
    import os
    import time
    
    start = time.time()
    epochs = 2
    history = model.fit(
        x_train,
        y_train,
        epochs=epochs,
        validation_data=(x_test, y_test),
        verbose=True
    )
    end = time.time()
    print(f"Training of model is complete. Took {end-start} seconds")

    import tensorflow as tf
    import tf2onnx
    import onnx
    import pickle
    from pathlib import Path
    
    # Normally we use tf2.onnx.convert.from_keras.
    # workaround for tf2onnx bug https://github.com/onnx/tensorflow-onnx/issues/2348
    
    # Wrap the model in a `tf.function`
    @tf.function(input_signature=[tf.TensorSpec([None, x_train.shape[1]], tf.float32, name='dense_input')])
    def model_fn(x):
        return model(x)
    
    # Convert the Keras model to ONNX
    model_proto, _ = tf2onnx.convert.from_function(
        model_fn,
        input_signature=[tf.TensorSpec([None, x_train.shape[1]], tf.float32, name='dense_input')]
    )

    with open(output_path, "wb") as f:
        f.write(model_proto.SerializeToString())
    print("Finished training")

In [None]:
@dsl.component(
    base_image="image-registry.openshift-image-registry.svc:5000/redhat-ods-applications/runtime-tensorflow:tensorflow",
    packages_to_install=["boto3==1.35.55", "botocore==1.35.55"]
)
def save_model(fresh_run: str, input_path: dsl.InputPath(bytes)) -> str:
    import os
    import boto3
    import botocore

    print(fresh_run)
    print("Starting to save the model")
    aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID')
    aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
    endpoint_url = os.environ.get('AWS_S3_ENDPOINT')
    region_name = os.environ.get('AWS_DEFAULT_REGION')
    bucket_name = os.environ.get('AWS_S3_BUCKET')

    missing_var = False
    for aws_var in ['AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY', 'AWS_S3_ENDPOINT', 'AWS_DEFAULT_REGION', 'AWS_S3_BUCKET']:
        if not os.environ.get(aws_var):
            print(aws_var)
            missing_var = True
    if missing_var:
        raise ValueError("One or more connection variables are empty.  "
                         "Please check your connection to an S3 bucket.")
    
    session = boto3.session.Session(aws_access_key_id=aws_access_key_id,
                                    aws_secret_access_key=aws_secret_access_key,
                                    verify=False)
    
    s3_resource = session.resource(
        's3',
        config=botocore.client.Config(signature_version='s3v4'),
        endpoint_url=endpoint_url,
        region_name=region_name)
    
    bucket = s3_resource.Bucket(bucket_name)
    print("Got the connection to the bucket")
    
    def upload_model_to_s3(local_file, s3_prefix):
        next_version = get_next_version(list_objects("models"))
        file_path = os.path.join('distance', next_version, "model.onnx")
        s3_key = os.path.join(s3_prefix, file_path)
        os.makedirs("/tmp/models/distance", exist_ok=True)
        with open(local_file, 'rb') as f:
            data = f.read()
            with open("/tmp/models/distance/model.onnx", 'wb') as tmp:
                tmp.write(data)
            
        bucket.upload_file("/tmp/models/distance/model.onnx", s3_key)
        return next_version
    
    
    def list_objects(prefix):
        objects = []
        filter = bucket.objects.filter(Prefix=prefix)
        for obj in filter.all():
            objects.append(obj.key)
        return objects
    
    
    def get_next_version(models):
        model_versions = [int(x.split('models/distance/')[1].split('/')[0]) for x in models]
        next_version = str(max(model_versions) + 1)
        return next_version
    
    next_version = upload_model_to_s3(input_path, "models")
    print("Finished uploading the model")
    return next_version


In [None]:
@dsl.component(
    base_image="image-registry.openshift-image-registry.svc:5000/redhat-ods-applications/runtime-tensorflow:tensorflow",
    packages_to_install=["boto3==1.35.55", "botocore==1.35.55"]
)
def publish_to_registry(user_token: str, version: str):
    from requests import get, post
    from os import getenv

    headers = { "Authorization": f"Bearer {user_token}" }
    base_url = getenv("BASE_URL")

    model_name = 'distance'
    response = get(url=f"https://distance-prediction-rest{base_url}/api/model_registry/v1alpha3/registered_models",
                   headers=headers)
    print(response)
    print("Creating the model if we need to")
    models = response.json()['items']
    print(models)
    model_exists = [x for x in models if x['name'] == model_name]
    print(model_exists)
    
    if not model_exists:
        data = {
            "name": model_name
        }
        
        new_model = post(url=f"https://distance-prediction-rest{base_url}/api/model_registry/v1alpha3/registered_models",
                   headers=headers, json=data)
        new_model
        print("Created new model")
        model_id = new_model.json()['id']
    else:
        model_id = model_exists[0]['id']
    print(model_id)

    data = {
        "author": "kube:admin",
        "name": version,
        "registeredModelId": model_id,
        "state": "LIVE"
    }
    
    print("Creating the model version", version)
    response = post(url=f"https://distance-prediction-rest{base_url}/api/model_registry/v1alpha3/model_versions",
                   headers=headers,
                   json=data)
    print(response)
    model_version_id = response.json()['id']

    data = {
        'artifactType': 'model-artifact',
        "storageKey": "aws-connection-my-storage",
        "name": version,
        "modelFormatName": "onnx",
        "modelFormatVersion": "1",
        "storagePath": f"models/distance",
        "uri": f"s3://my-storage/models/distance/{version}?endpoint={getenv('AWS_S3_ENDPOINT')}&defaultRegion={getenv('AWS_DEFAULT_REGION')}",
        "description": "Distance Prediction Model",
        "state": "LIVE"
    }
    
    print("Creating the model artifact")
    response = post(url=f"https://distance-prediction-rest{base_url}/api/model_registry/v1alpha3/model_versions/{model_version_id}/artifacts",
                   headers=headers,
                   json=data)
    print(response)

In [None]:
@dsl.pipeline(name=PIPELINE_NAME)
def distance_model_pipeline(user_token: str, fresh_run: str):
    # Step 1
    train_model_task = train_model(fresh_run=fresh_run)
    kubernetes.use_secret_as_env(train_model_task, secret_name='postgres-creds', secret_key_to_env={"DBNAME": "DBNAME"})
    kubernetes.use_secret_as_env(train_model_task, secret_name='postgres-creds', secret_key_to_env={"HOST": "HOST"})
    kubernetes.use_secret_as_env(train_model_task, secret_name='postgres-creds', secret_key_to_env={"PASSWORD": "PASSWORD"})
    kubernetes.use_secret_as_env(train_model_task, secret_name='postgres-creds', secret_key_to_env={"USER": "USER"})

    # Step 2
    save_model_task = save_model(fresh_run=fresh_run, input_path=train_model_task.output)
    kubernetes.use_secret_as_env(save_model_task, secret_name='aws-connection-my-storage', secret_key_to_env={"AWS_ACCESS_KEY_ID": "AWS_ACCESS_KEY_ID"})
    kubernetes.use_secret_as_env(save_model_task, secret_name='aws-connection-my-storage', secret_key_to_env={"AWS_DEFAULT_REGION": "AWS_DEFAULT_REGION"})
    kubernetes.use_secret_as_env(save_model_task, secret_name='aws-connection-my-storage', secret_key_to_env={"AWS_S3_BUCKET": "AWS_S3_BUCKET"})
    kubernetes.use_secret_as_env(save_model_task, secret_name='aws-connection-my-storage', secret_key_to_env={"AWS_S3_ENDPOINT": "AWS_S3_ENDPOINT"})
    kubernetes.use_secret_as_env(save_model_task, secret_name='aws-connection-my-storage', secret_key_to_env={"AWS_SECRET_ACCESS_KEY": "AWS_SECRET_ACCESS_KEY"})

    # Step 3
    publish_task = publish_to_registry(user_token=user_token, version=save_model_task.output)
    kubernetes.use_secret_as_env(publish_task, secret_name='aws-connection-my-storage', secret_key_to_env={"AWS_ACCESS_KEY_ID": "AWS_ACCESS_KEY_ID"})
    kubernetes.use_secret_as_env(publish_task, secret_name='aws-connection-my-storage', secret_key_to_env={"AWS_DEFAULT_REGION": "AWS_DEFAULT_REGION"})
    kubernetes.use_secret_as_env(publish_task, secret_name='aws-connection-my-storage', secret_key_to_env={"AWS_S3_BUCKET": "AWS_S3_BUCKET"})
    kubernetes.use_secret_as_env(publish_task, secret_name='aws-connection-my-storage', secret_key_to_env={"AWS_S3_ENDPOINT": "AWS_S3_ENDPOINT"})
    kubernetes.use_secret_as_env(publish_task, secret_name='aws-connection-my-storage', secret_key_to_env={"AWS_SECRET_ACCESS_KEY": "AWS_SECRET_ACCESS_KEY"})
    kubernetes.use_secret_as_env(publish_task, secret_name='urls', secret_key_to_env={"BASE_URL": "BASE_URL"})


In [None]:
# Connect to the pipeline server
from os import getenv

print("Connecting to pipeline server")
token = subprocess.check_output("oc whoami -t", shell=True, text=True).strip()
kfp_client = kfp.Client(host=getenv("PIPELINES_URL"),
                        existing_token=token,
                        verify_ssl=False)

# Create a run for the pipeline
print("Running Pipeline")
kfp_client.create_run_from_pipeline_func(
    distance_model_pipeline,
    experiment_name=PIPELINE_NAME,
    arguments={
        "user_token": getenv("TOKEN"),
        "fresh_run": ""
    }
)

In [None]:
# Compile Pipeline
print("Compiling Pipeline")
compiler.Compiler().compile(distance_model_pipeline, 'distance_model_pipeline.yaml')

In [None]:
pipeline = kfp_client.upload_pipeline('distance_model_pipeline.yaml')