# Example of Kubeflow Pipeline from notebook

In [14]:
# Installing extra libs for mlflow integration
!pip install mlflow boto3 awscli pyarrow sklearn mlflow



In [15]:
# Make sure to use up to date kfp (kubeflow pipeline python SDK)
!pip install kfp --upgrade -q

In [16]:
import kfp
import os

from kfp import dsl

In [17]:
# In ordser to write models to model registry we need credetials (we can get those from admission-webhook config)
# juju show-unit admission-webhook/0 | yq .admission-webhook/*.relation-info[0].application-data

os.environ['MLFLOW_TRACKING_URI'] = "http://mlflow-server.kubeflow.svc.cluster.local:5000"
os.environ["MLFLOW_S3_ENDPOINT_URL"] = "http://minio.kubeflow.svc.cluster.local:9000"
os.environ["AWS_ACCESS_KEY_ID"] = "minio"
os.environ["AWS_SECRET_ACCESS_KEY"] = "O9YHN0K2DC7BELNFEM3UZQOURGSRSM"

!aws --endpoint-url $MLFLOW_S3_ENDPOINT_URL s3 ls mlflow/0

In [18]:
# In airgapped environment upload data manually
!wget https://raw.githubusercontent.com/Barteus/kubeflow-examples/main/e2e-wine-kfp-mlflow/winequality-red.csv

--2022-11-01 09:12:29--  https://raw.githubusercontent.com/Barteus/kubeflow-examples/main/e2e-wine-kfp-mlflow/winequality-red.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.110.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 84199 (82K) [text/plain]
Saving to: ‘winequality-red.csv.1’


2022-11-01 09:12:34 (74.8 MB/s) - ‘winequality-red.csv.1’ saved [84199/84199]



In [19]:
# We can create pipeline component from remote code with the SDK
web_downloader_op = kfp.components.load_component_from_url(
            'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/contrib/web/Download/component.yaml')

In [20]:
# Next we can create as many data Preprocessing task as we need. We put the imports inside function
def preprocess(file_path: kfp.components.InputPath('CSV'),
              output_file: kfp.components.OutputPath('parquet')):
    import pandas as pd
    df = pd.read_csv(file_path, header=0, sep=";")
    df.columns = [c.lower().replace(' ', '_') for c in df.columns]
    df.to_parquet(output_file)

In [21]:
# Running the task locally for testing purposes
preprocess('winequality-red.csv', 'preprocessed.parquet')

In [22]:
# Creating a component from python function with SDK
preprocess_op = kfp.components.create_component_from_func(
        func=preprocess,
        output_component_file='preprocess-component.yaml', # This is optional. It saves the component spec for future use.
        base_image='python:3.9', # Any base image we need
        packages_to_install=['pandas', 'pyarrow']) # Install additional packages

In [23]:
def trainning(file_path: kfp.components.InputPath('parquet'))->str:
    import pandas as pd
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import classification_report
    import mlflow
    from sklearn.linear_model import ElasticNet
    
    df = pd.read_parquet(file_path)
    
    target_column='quality'
    train_x, test_x, train_y, test_y = train_test_split(df.drop(columns=[target_column]),
                                                    df[target_column], test_size=.25,
                                                    random_state=1337, stratify=df[target_column])    
   
    with mlflow.start_run(run_name='elastic_net_models'):
        alpha =  0.5
        l1_ratio =  0.5
        lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)
        lr.fit(train_x, train_y)
        result = mlflow.sklearn.log_model(lr, "model", registered_model_name="wine-elasticnet")
        return f"{mlflow.get_artifact_uri()}/{result.artifact_path}"

In [24]:
# Local test 
trainning('preprocessed.parquet')

Successfully registered model 'wine-elasticnet'.
2022/11/01 09:12:43 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: wine-elasticnet, version 1
Created version '1' of model 'wine-elasticnet'.


's3://mlflow/0/d1f5fc2469fa40de8bd885acb9acf49e/artifacts/model'

In [25]:
# Same component creation as above
training_op = kfp.components.create_component_from_func(
        func=trainning,
        output_component_file='train-component.yaml', # This is optional. It saves the component spec for future use.
        base_image='python:3.9.13',
        packages_to_install=['pandas', 'pyarrow', 'sklearn', 'mlflow', 'boto3'])

In [31]:
# Create the missing secret for mlflow
!kubectl create secret generic seldon-init-container-secret -n user123 \
  --from-literal=RCLONE_CONFIG_S3_TYPE='s3' \
  --from-literal=RCLONE_CONFIG_S3_PROVIDER='minio' \
  --from-literal=RCLONE_CONFIG_S3_ENV_AUTH=false \
  --from-literal=RCLONE_CONFIG_S3_ENDPOINT='http://minio.kubeflow.svc.cluster.local:9000' \
  --from-literal=RCLONE_CONFIG_S3_ACCESS_KEY_ID='minio' \
  --from-literal=RCLONE_CONFIG_S3_SECRET_ACCESS_KEY='O9YHN0K2DC7BELNFEM3UZQOURGSRSM'

error: failed to create secret secrets "seldon-init-container-secret" already exists


In [26]:
# This function deploys the Model stored in Mlfow directly to Cluster with setdon deployment
def deploy(model_uri:str = "default_model_uri"):
    import subprocess
    
    with open("/tmp/manifest.yaml", "w") as f:
        manifest = """
apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
  name: mlflow
spec:
  name: wines
  predictors:
  - componentSpecs:
    - spec:
        containers:
        - name: classifier
          image: seldonio/mlflowserver:1.14.0-dev
          imagePullPolicy: Always
          livenessProbe:
            initialDelaySeconds: 80
            failureThreshold: 200
            periodSeconds: 5
            successThreshold: 1
            httpGet:
              path: /health/ping
              port: http
              scheme: HTTP
          readinessProbe:
            initialDelaySeconds: 80
            failureThreshold: 200
            periodSeconds: 5
            successThreshold: 1
            httpGet:
              path: /health/ping
              port: http
              scheme: HTTP
    graph:
      children: []
      implementation: MLFLOW_SERVER
      modelUri: """+model_uri+"""
      envSecretRefName: seldon-init-container-secret
      name: classifier
    name: wine-super-model
    replicas: 1
        """
        f.write(manifest)
    
    # Example of how to talk directly to kubeapi
    result = subprocess.call(['kubectl', 'apply', '-f', '/tmp/manifest.yaml', '-n', 'user123'])
    assert result == 0

In [27]:
deploy_op = kfp.components.create_component_from_func(
        func=deploy,
        output_component_file='deploy-component.yaml', # This is optional. It saves the component spec for future use.
        base_image='bponieckiklotz/seldon-deploy:0.1',
        packages_to_install=[])

In [28]:
from kubernetes.client.models import V1EnvVar
from kfp.onprem import use_k8s_secret

@dsl.pipeline(
    name="e2e_wine_pipeline",
    description="WINE pipeline",
)
def wine_pipeline(url):
    web_downloader_task = web_downloader_op(url=url)
    preprocess_task = preprocess_op(file=web_downloader_task.outputs['data'])
    
    train_task = (training_op(file=preprocess_task.outputs['output'])
                 .add_env_variable(V1EnvVar(name='MLFLOW_TRACKING_URI', value='http://mlflow-server.kubeflow.svc.cluster.local:5000'))
                 .add_env_variable(V1EnvVar(name='MLFLOW_S3_ENDPOINT_URL', value='http://minio.kubeflow.svc.cluster.local:9000'))
                 #https://kubeflow-pipelines.readthedocs.io/en/stable/source/kfp.extensions.html#kfp.onprem.use_k8s_secret
                  .apply(use_k8s_secret(secret_name='mlpipeline-minio-artifact', k8s_secret_key_to_env={
                     'accesskey': 'AWS_ACCESS_KEY_ID',
                     'secretkey': 'AWS_SECRET_ACCESS_KEY',
                 })))
    deploy_task = deploy_op(model_uri=train_task.output)

In [29]:
client = kfp.Client()
client.create_run_from_pipeline_func(
    wine_pipeline,
    arguments={
        "url": "https://raw.githubusercontent.com/Barteus/kubeflow-examples/main/e2e-wine-kfp-mlflow/winequality-red.csv",
    })

RunPipelineResult(run_id=d11c73be-6fbb-4a67-a711-98e64284b5f3)

In [30]:
!curl  -s http://10.152.183.242:8000/api/v0.1/predictions \
  -H "Content-Type: application/json" \
  -d '{"data":{"ndarray":[[10.1, 0.37, 0.34, 2.4, 0.085, 5.0, 17.0, 0.99683, 3.17, 0.65, 10.6]]}}'

{"data":{"names":[],"ndarray":[5.741928028712652]},"meta":{"requestPath":{"classifier":"seldonio/mlflowserver:1.14.0-dev"}}}
