In [75]:
import kfp.dsl as dsl
import yaml, kfp
from kubernetes import client as k8s
from kfp import onprem

PVC_NAME = "nlp-example-pvc"
MOUNT = "/mnt"

#Please update below tag if you rebuild the images using build.sh script
EXTRACT_STEP_IMAGE="prawat5/features_extractor:1.0"
CLEAN_STEP_IMAGE="prawat5/clean_text_transformer:1.0"
TOKENIZE_STEP_IMAGE="prawat5/spacy_tokenizer:1.0"
VECTORIZE_STEP_IMAGE="prawat5/tfidf_vectorizer:1.0"
PREDICT_STEP_IMAGE="prawat5/lr_text_classifier:1.0"

DATA = {
    "EXTRACT_STEP_IMAGE": EXTRACT_STEP_IMAGE, 
    "CLEAN_STEP_IMAGE": CLEAN_STEP_IMAGE,
    "TOKENIZE_STEP_IMAGE": TOKENIZE_STEP_IMAGE,
    "VECTORIZE_STEP_IMAGE": VECTORIZE_STEP_IMAGE,
    "PREDICT_STEP_IMAGE": PREDICT_STEP_IMAGE,
    "PVC_NAME": PVC_NAME
}

SELDON_DEPLOYMENT_YAML = """
apiVersion: machinelearning.seldon.io/v1alpha2
kind: SeldonDeployment
metadata:
  labels:
    app: seldon
  name: "seldon-deployment-{{{{workflow.name}}}}"
  namespace: kubeflow
spec:
  annotations:
    project_name: NLP Pipeline
    deployment_version: v1
  name: "seldon-deployment-{{{{workflow.name}}}}"
  predictors:
  - componentSpecs:
    - spec:
        containers:
        - image: {CLEAN_STEP_IMAGE}
          imagePullPolicy: IfNotPresent
          name: cleantext
          resources:
            requests:
              memory: 1Mi
        - image: {TOKENIZE_STEP_IMAGE}
          imagePullPolicy: IfNotPresent
          name: spacytokenizer
        - image: {VECTORIZE_STEP_IMAGE}
          imagePullPolicy: IfNotPresent
          name: tfidfvectorizer
          volumeMounts:
          - name: mypvc
            mountPath: /mnt
        - image: {PREDICT_STEP_IMAGE}
          imagePullPolicy: IfNotPresent
          name: lrclassifier
          volumeMounts:
          - name: mypvc
            mountPath: /mnt
        terminationGracePeriodSeconds: 20
        volumes:
        - name: mypvc
          persistentVolumeClaim:
            claimName: "{PVC_NAME}"
    graph:
      children:
      - name: spacytokenizer
        endpoint:
          type: REST
        type: MODEL
        children:
        - name: tfidfvectorizer
          endpoint:
            type: REST
          type: MODEL
          children:
          - name: lrclassifier
            endpoint:
              type: REST
            type: MODEL
            children: []
      name: cleantext
      endpoint:
        type: REST
      type: MODEL
    name: single-model
    replicas: 1
    annotations:
      predictor_version: v1
"""

SELDON_DEPLOYMENT_YAML = SELDON_DEPLOYMENT_YAML.format(**DATA)


@dsl.pipeline(
  name='NLP',
  description='A pipeline demonstrating reproducible steps for NLP'
)
def nlp_pipeline(
        csv_url="reddit_train.csv",
        csv_encoding="ISO-8859-1",
        features_column="BODY",
        labels_column="REMOVED",
        raw_text_path='text.data',
        labels_path='labels.data',
        clean_text_path='clean.data',
        spacy_tokens_path='tokens.data',
        tfidf_vectors_path='tfidf.data',
        lr_prediction_path='prediction.data',
        tfidf_model_path='tfidf.model',
        lr_model_path='lr.model',
        lr_c_param=0.1,
        tfidf_max_features=10000,
        tfidf_ngram_range=3,
        batch_size='100'):
    
    """
    Pipeline 
    """
    
    csv_url = MOUNT + "/" + str(csv_url)
    raw_text_path = MOUNT + "/" + str(raw_text_path)
    labels_path = MOUNT + "/" + str(labels_path)
    clean_text_path = MOUNT + "/" + str(clean_text_path)
    spacy_tokens_path = MOUNT + "/" + str(spacy_tokens_path)
    tfidf_vectors_path = MOUNT + "/" + str(tfidf_vectors_path)
    lr_prediction_path = MOUNT + "/" + str(lr_prediction_path)
    tfidf_model_path = MOUNT + "/" + str(tfidf_model_path)
    lr_model_path = MOUNT + "/" + str(lr_model_path)
    
    
    extract_step = dsl.ContainerOp(
        name='Extract Features and Labels',
        image=EXTRACT_STEP_IMAGE,
        command="python",
        arguments=[
            "/microservice/pipeline_step.py",
            "--labels-path", labels_path,
            "--features-path", raw_text_path,
            "--csv-url", csv_url,
            "--csv-encoding", csv_encoding,
            "--features-column", features_column,
            "--labels-column", labels_column
        ]
    ).apply(onprem.mount_pvc(PVC_NAME, 'local-storage', MOUNT))

    clean_step = dsl.ContainerOp(
        name='Clean The Data',
        image=CLEAN_STEP_IMAGE,
        command="python",
        arguments=[
            "/microservice/pipeline_step.py",
            "--in-path", raw_text_path,
            "--out-path", clean_text_path,
        ]
    ).apply(onprem.mount_pvc(PVC_NAME, 'local-storage', MOUNT))
    
    clean_step.after(extract_step)
    
    tokenize_step = dsl.ContainerOp(
        name='Tokenize the Data',
        image=TOKENIZE_STEP_IMAGE,
        command="python",
        arguments=[
            "/microservice/pipeline_step.py",
            "--in-path", clean_text_path,
            "--out-path", spacy_tokens_path,
        ]
    ).apply(onprem.mount_pvc(PVC_NAME, 'local-storage', MOUNT))
    
    tokenize_step.after(clean_step)

    vectorize_step = dsl.ContainerOp(
        name='Train using TF-IDF Vectorizer',
        image=VECTORIZE_STEP_IMAGE,
        command="python",
        arguments=[
            "/microservice/pipeline_step.py",
            "--in-path", spacy_tokens_path,
            "--out-path", tfidf_vectors_path,
            "--max-features", tfidf_max_features,
            "--ngram-range", tfidf_ngram_range,
            "--action", "train",
            "--model-path", tfidf_model_path,
        ]
    ).apply(onprem.mount_pvc(PVC_NAME, 'local-storage', MOUNT))
    
    vectorize_step.after(tokenize_step)

    predict_step = dsl.ContainerOp(
        name='Predict-Logistic Regression Classifier',
        image=PREDICT_STEP_IMAGE,
        command="python",
        arguments=[
            "/microservice/pipeline_step.py",
            "--in-path", tfidf_vectors_path,
            "--labels-path", labels_path,
            "--out-path", lr_prediction_path,
            "--c-param", lr_c_param,
            "--action", "train",
            "--model-path", lr_model_path,
        ]
    ).apply(onprem.mount_pvc(PVC_NAME, 'local-storage', MOUNT))
    
    predict_step.after(vectorize_step)
    
    
    seldon_config = yaml.safe_load(SELDON_DEPLOYMENT_YAML)

    deploy_step = dsl.ResourceOp(
        name="Seldon-Deployment",
        k8s_resource=seldon_config,
        attribute_outputs={"name": "{.metadata.name}"})

    deploy_step.after(predict_step)
    

kfp.Client().create_run_from_pipeline_func(
    nlp_pipeline,
    experiment_name="Test",
    arguments={},
)

RunPipelineResult(run_id=1372ea07-e009-463c-9056-38d57d5b5c5b)