In [32]:
# add your name here. Eg: yourname = 'vult5'
yourname = 'vult5'
NAMESPACE=yourname
EXPERIMENT=f'{yourname}-Examples'
IMAGE='asia.gcr.io/vinid-data-science-np/vu-example/base:1.0.0'
GCS_URI='gs://vinid-data-science-rec-ecart-np/kubeflow_artifact_sample'

# Example 1 - Lightweight component

## Simple pipeline 

Lightweight component is just like `dsl.ContainerOp`. <br>
The difference is it could transform a python function into a entrypoint for an defined image<br>
The principle of how it works is like: 
```bash
docker run --name <op_name> <base_image> python3 -c "import random; print(random.randint(0, 10))"
```

In [2]:
import kfp
from kfp import components, dsl
import easykubeflow as ekf

In [3]:
client = kfp.Client()
ekf.Utils(client)

<easykubeflow.utils.Utils at 0x7fa750403550>

In [4]:
@components.func_to_container_op
def print_op(message):
    from datetime import datetime
    print(f'{datetime.now()} -- message: {message}')

The approach above is not recommended in the restrict internet access environment<br>
cause the Kubeflow host would pull the default image that not appear in our Registry<br>
To replace default image, we have to do it like this

In [6]:
def print_op(message):
    from datetime import datetime
    print(f'{datetime.now()} -- message: {message}')
    
print_op = components.func_to_container_op(print_op, base_image=IMAGE)

To be more straightforward, the `easykubeflow` has the same function but allows<br>
you to define the base image in the decorator format

In [7]:
@ekf.func_to_container_op(base_image=IMAGE)
def print_op(message):
    from datetime import datetime
    print(f'{datetime.now()} -- message: {message}')

Now define sample pipeline 

In [8]:
@dsl.pipeline(
    name="Sample pipeline",
    description="Just a simple test for lightweight components"
)
def pipeline(yourname:str):
    print_op(yourname)

In [9]:
RUN='Lightweight Component'
run_result = client.create_run_from_pipeline_func(pipeline_func=pipeline,
                                                  arguments={'yourname': yourname},
                                                  run_name=RUN,
                                                  experiment_name=EXPERIMENT,
                                                  namespace=NAMESPACE)

In [10]:
client.runs.delete_run(run_result.run_id)

{}

## Lightweigh component with output

In [11]:
import kfp
from kfp import components, dsl
import easykubeflow as ekf
from typing import NamedTuple

In [12]:
client = kfp.Client()
ekf.Utils(client)

<easykubeflow.utils.Utils at 0x7fa72887a3c8>

The Lightweight Components require typing for its outputs.
There are 2 steps:
- Define op output type with NamedTuple
- Using namedtuple to return output at the end of your code.

In [13]:
@ekf.func_to_container_op(base_image=IMAGE)
def random_op() -> NamedTuple('RandomOutput', [('random_number', int),('random_time', str)]):
    from datetime import datetime
    import random
    
    r_int = random.randint(0, 10)
    r_time = str(datetime.now())
    
    # return output for this Op
    from collections import namedtuple
    result = namedtuple('RandomOutput', ['random_number', 'random_time'])
    return result(r_int, r_time)

In [14]:
@ekf.func_to_container_op(base_image=IMAGE)
def print_op(message):
    print(f'message: {message}')

In [15]:
@dsl.pipeline(
    name="Sample pipeline",
    description="Just a simple test for lightweight components"
)
def pipeline(yourname:str):
    random_task = random_op()
    print_op(random_task.outputs['random_number'])
    print_op(random_task.outputs['random_time'])

In [16]:
RUN='Lightweight Component'
run_result = client.create_run_from_pipeline_func(pipeline_func=pipeline,
                                                  arguments={'yourname': yourname},
                                                  run_name=RUN,
                                                  experiment_name=EXPERIMENT,
                                                  namespace=NAMESPACE)

In [17]:
client.runs.delete_run(run_result.run_id)

{}

# Example 2 - Recursion

In this example, we use Recursion to solve multigroup problem in P13N project.


The P13N project's got only one pipeline graph but it has to cover 2 groups of user A and D.<br>
So far we have parsed the group type as Pipeline Params and set recurrent job twice. <br>
This solution is not proper enough if we want it to run in serial pipeline that is
when group A flow is done then the group D flow would be started to run but we could only
estimate the total run time of group A and then set the proper time for group D as group A job done.


The example below would show you how to implement serial run with only one graph for multigroup of user.

In [18]:
import kfp
from kfp import dsl, components
from easykubeflow import Utils, func_to_container_op
from typing import NamedTuple

In [19]:
client = kfp.Client()
Utils(client)

<easykubeflow.utils.Utils at 0x7fa7285954a8>

The `decision_op` would process the list of user group that we define in string format. For instance: `"A|B|C"`

In [20]:
@func_to_container_op(base_image=IMAGE)
def decision_op(groups:str) -> NamedTuple('DecisionOuput', [('picked_group', str),('rest_group', str), ('stop', str)]):
    groups = groups.split("|")
    count = len(groups) - 1
    picked_group = groups[0]
    rest_group = "|".join(groups[1:])
    stop = "no"
    if count == 0:
        stop = 'yes'
        rest_group="empty"
    print(f'picked_group: {picked_group} -- rest_group: {rest_group} -- count: {count}')    
    from collections import namedtuple
    result = namedtuple('DecisionOuput', ['picked_group', 'rest_group', 'stop'])
    return result(picked_group, rest_group, stop)

`do_task_op` represent for many ops that we use in the pipeline.

Just like: `user_op`, `item_op`, `optim_op`, etc.

In [21]:
@func_to_container_op(base_image=IMAGE)
def do_task_op(group:str, rest_group:str, stop:str) -> NamedTuple('DoTaskOutput', [('rest_group', str), ('stop', str)]):
    print(f'Do something with {group}')
    
    from collections import namedtuple
    result = namedtuple('DoTaskOutput', ['rest_group', 'stop'])
    return result(rest_group, stop)

In [22]:
@dsl.graph_component
def group_component(groups:str, stop:str):
    with dsl.Condition(stop == "no"):
        dec = decision_op(groups)
        # replace done_task with your pipeline logic here
        done_task = do_task_op(dec.outputs['picked_group'], dec.outputs['rest_group'], dec.outputs['stop'])
        # remember to parse the stop signal and list of the rest of groups for group_component that returned by dec
        group_component(done_task.outputs['rest_group'], done_task.outputs['stop'])

In [23]:
@dsl.pipeline(
    name="Sample pipeline",
    description="Just a simple test for lightweight components"
)
def pipeline(groups:str, stop:str):
    group_component(groups, stop)

In [24]:
RUN='Recursion'
run_result = client.create_run_from_pipeline_func(pipeline_func=pipeline,
                                                  arguments={'groups': 'A|B|C', 'stop': 'no'},
                                                  run_name=RUN,
                                                  experiment_name=EXPERIMENT,
                                                  namespace=NAMESPACE)

In [25]:
client.runs.delete_run(run_result.run_id)

{}

# Example 3 - ContainerOp Artifact

This examples show you how to use artifact metadata in containerOp that would be illustrated in the module of the run

## Confusion Matrix

In [29]:
import kfp
from kfp import dsl, components
from easykubeflow import func_to_container_op, Utils
from typing import NamedTuple

In [30]:
client = kfp.Client()
Utils(client)

In [41]:
@func_to_container_op(base_image=IMAGE)
def confusion_matrix_op(gcs_uri:str) -> NamedTuple('ConfusionMatrixOutput', [('mlpipeline_ui_metadata', 'UI_metadata')]):
    import pip
    def install(package):
        if hasattr(pip, 'main'):
            pip.main(['install', package])
        else:
            pip._internal.main(['install', package])
    install('pandas')
    install('sklearn')
    
    import json
    from pathlib import Path
    import pandas as pd
    from sklearn.metrics import confusion_matrix, accuracy_score
    import subprocess

    # init confusion matrix
    y_true = ["cat", "ant", "cat", "cat", "ant", "bird"]
    y_pred = ["ant", "ant", "cat", "cat", "ant", "cat"]
    vocab = ["ant", "bird", "cat"]
    cm = confusion_matrix(y_true, y_pred, labels=vocab)
    
    # create confusion matrix csv file
    data = []
    for actual_idx, actual_row in enumerate(cm):
        for predict_idx, count in enumerate(actual_row):
            data.append((vocab[actual_idx], vocab[predict_idx], count))
    df_cm = pd.DataFrame(data, columns=['actual', 'predict', 'count'])
    
    # write and upload csv file to GCS
    cm_file = '/confusion_matrix.csv'
    with open(cm_file, 'w') as f:
        df_cm.to_csv(f, columns=['actual', 'predict', 'count'], header=False, index=False)
    bash_command = f"gsutil cp {cm_file} {gcs_uri}"
    process = subprocess.Popen(bash_command.split(), stdout=subprocess.PIPE)
    output, error = process.communicate()
    
    # define metadata
    metadata = {
        "version": 1,
        "outputs" : [
        {
            "type": "confusion_matrix",
            "format": "csv",
            "source": f"{gcs_uri}{cm_file}",
            "schema": [
                {"name": "actual", "type": "CATEGORY"},
                {"name": "predict", "type": "CATEGORY"},
                {"name": "count", "type": "NUMBER"},
            ],
            "labels": list(map(str, vocab))
        }]
    }
    
    from collections import namedtuple
    confusion_matrix_output = namedtuple('ConfusionMatrixOutput', ['mlpipeline_ui_metadata'])
    return confusion_matrix_output(json.dumps(metadata))

In [43]:
@dsl.pipeline(
    name="Lightweight pipeline",
    description="artifact"
)
def pipeline(gcs_uri:str):
    task0 = confusion_matrix_op(gcs_uri=gcs_uri)

In [44]:
RUN="Artifact-Confusion-matrix"
run_result = client.create_run_from_pipeline_func(pipeline_func=pipeline,
                                                  arguments={'gcs_uri' : GCS_URI},
                                                  run_name=RUN,
                                                  experiment_name=EXPERIMENT,
                                                  namespace=NAMESPACE)

In [42]:
client.runs.delete_run(run_result.run_id)

{}

## Markdown

In [45]:
import kfp
from kfp import dsl, components
from easykubeflow import func_to_container_op, Utils
from typing import NamedTuple

In [46]:
client = kfp.Client()
Utils(client)

In [47]:
@func_to_container_op(base_image=IMAGE)
def markdown_op(gcs_uri:str) -> NamedTuple('MarkdownOutput', [('mlpipeline_ui_metadata', 'UI_metadata')]):
    import json
    from pathlib import Path
    import subprocess
    
    markdown_content = """# Markdown from GCS
    ## Heading2
    blah blah
    """
    markdown_file =  "/README.md"
    with open(markdown_file, 'w') as f:
        f.write(markdown_content)
        
    bash_command = f"gsutil cp {markdown_file} {gcs_uri}"
    process = subprocess.Popen(bash_command.split(), stdout=subprocess.PIPE)
    output, error = process.communicate()
    
    metadata = {
        "version": 1,
        "outputs" : [
        {
            "type": "markdown",
            "source": f"{gcs_uri}{markdown_file}",
        },
        {
            "storage": "inline", 
            "source": "# Inline Markdown\n[Link to kubeflow homepage](https://kubeflow.iap.vinid.dev/)", 
            "type": "markdown"
        }]
    }
    
    from collections import namedtuple
    confusion_matrix_output = namedtuple('ConfusionMatrixOutput', ['mlpipeline_ui_metadata'])
    return confusion_matrix_output(json.dumps(metadata))

In [48]:
@dsl.pipeline(
    name="Lightweight pipeline",
    description="artifact"
)
def pipeline(gcs_uri:str):
    task0 = markdown_op(gcs_uri=gcs_uri)

In [49]:
RUN="Artifact-Markdown"
run_result = client.create_run_from_pipeline_func(pipeline_func=pipeline,
                                                  arguments={'gcs_uri' : GCS_URI},
                                                  run_name=RUN,
                                                  experiment_name=EXPERIMENT,
                                                  namespace=NAMESPACE)

In [52]:
client.runs.delete_run(run_result.run_id)

{}

## ROC Curve

In [29]:
import kfp
from kfp import dsl, components
from easykubeflow import func_to_container_op, Utils
from typing import NamedTuple

In [30]:
client = kfp.Client()
Utils(client)

In [59]:
@func_to_container_op(base_image=IMAGE)
def roc_op(gcs_uri:str) -> NamedTuple('MarkdownOutput', [('mlpipeline_ui_metadata', 'UI_metadata')]):
    import pip
    def install(package):
        if hasattr(pip, 'main'):
            pip.main(['install', package])
        else:
            pip._internal.main(['install', package])
    install('pandas')
    
    import json
    from pathlib import Path
    import subprocess
    import pandas as pd
    
    # create ROC csv file
    fpr = [i/100 for i in range(100)]
    tpr = [i*0.04 if i*0.04 < 1  else 1 for i in range(100)]
    thresholds = 0.5
    df_roc = pd.DataFrame({'fpr': fpr, 'tpr': tpr, 'thresholds': thresholds})
    roc_file = '/roc.csv'
    with open(roc_file, 'w') as f:
        df_roc.to_csv(f, columns=['fpr', 'tpr', 'thresholds'], header=False, index=False)
        
    bash_command = f"gsutil cp {roc_file} {gcs_uri}"
    process = subprocess.Popen(bash_command.split(), stdout=subprocess.PIPE)
    output, error = process.communicate()
    
    metadata = {
        'outputs': [
        {
            'type': 'roc',
            'format': 'csv',
            'schema': [
                {'name': 'fpr', 'type': 'NUMBER'},
                {'name': 'tpr', 'type': 'NUMBER'},
                {'name': 'thresholds', 'type': 'NUMBER'},
            ],
            'source': f"{gcs_uri}{roc_file}"
        }]
    }
    
    from collections import namedtuple
    confusion_matrix_output = namedtuple('ConfusionMatrixOutput', ['mlpipeline_ui_metadata'])
    return confusion_matrix_output(json.dumps(metadata))

In [60]:
@dsl.pipeline(
    name="Lightweight pipeline",
    description="artifact"
)
def pipeline(gcs_uri:str):
    task0 = roc_op(gcs_uri=gcs_uri)

In [61]:
RUN="Artifact-ROC-curve"
run_result = client.create_run_from_pipeline_func(pipeline_func=pipeline,
                                                  arguments={'gcs_uri' : GCS_URI},
                                                  run_name=RUN,
                                                  experiment_name=EXPERIMENT,
                                                  namespace=NAMESPACE)

In [62]:
client.runs.delete_run(run_result.run_id)

{}

## Table

In [63]:
import kfp
from kfp import dsl, components
from easykubeflow import func_to_container_op, Utils
from typing import NamedTuple

In [64]:
client = kfp.Client()
Utils(client)

In [65]:
@func_to_container_op(base_image=IMAGE)
def table_op(gcs_uri:str) -> NamedTuple('MarkdownOutput', [('mlpipeline_ui_metadata', 'UI_metadata')]):
    import pip
    def install(package):
        if hasattr(pip, 'main'):
            pip.main(['install', package])
        else:
            pip._internal.main(['install', package])
    install('pandas')
    
    
    import json
    from pathlib import Path
    import subprocess
    import pandas as pd
    
    x1 = [i for i in range(10)]
    x2 = [i for i in range(10,20,1)]
    x3 = [i for i in range(0,100,10)]
    df_table = pd.DataFrame({'col1': x1, 'col2': x2,'col3': x3 })
    table_file = '/roc.csv'
    with open(table_file, 'w') as f:
        df_table.to_csv(f, columns=['col1', 'col2', 'col3'], header=False, index=False)
        
    bash_command = f"gsutil cp {table_file} {gcs_uri}"
    process = subprocess.Popen(bash_command.split(), stdout=subprocess.PIPE)
    output, error = process.communicate()
    
    metadata = {
        'outputs': [
        {
            'type': 'table',
            'format': 'csv',
            'header': ['col1', 'col2', 'col3'],
            'source': f"{gcs_uri}{table_file}"
        }]
    }
    
    from collections import namedtuple
    confusion_matrix_output = namedtuple('ConfusionMatrixOutput', ['mlpipeline_ui_metadata'])
    return confusion_matrix_output(json.dumps(metadata))

In [66]:
@dsl.pipeline(
    name="Lightweight pipeline",
    description="artifact"
)
def pipeline(gcs_uri:str):
    task0 = table_op(gcs_uri=gcs_uri)

In [67]:
RUN="Artifact-Table"
run_result = client.create_run_from_pipeline_func(pipeline_func=pipeline,
                                                  arguments={'gcs_uri' : GCS_URI},
                                                  run_name=RUN,
                                                  experiment_name=EXPERIMENT,
                                                  namespace=NAMESPACE)

In [68]:
client.runs.delete_run(run_result.run_id)

{}

## Tensorboard (not completed)

In [69]:
import kfp
from kfp import dsl, components
from easykubeflow import func_to_container_op, Utils
from typing import NamedTuple

In [70]:
client = kfp.Client()
Utils(client)

In [71]:
@func_to_container_op(base_image=IMAGE)
def tensorboard_op(gcs_uri:str) -> NamedTuple('MarkdownOutput', [('mlpipeline_ui_metadata', 'UI_metadata')]):
    import json
    
    metadata = {
        'outputs': [
        {
            'type': 'tensorboard',
            'source': f"{gcs_uri}/logs"
        }]
    }
    
    from collections import namedtuple
    confusion_matrix_output = namedtuple('ConfusionMatrixOutput', ['mlpipeline_ui_metadata'])
    return confusion_matrix_output(json.dumps(metadata))

In [72]:
@dsl.pipeline(
    name="Lightweight pipeline",
    description="artifact"
)
def pipeline(gcs_uri:str):
    task0 = tensorboard_op(gcs_uri=gcs_uri)

In [73]:
RUN="Artifact-Tensorboard"
run_result = client.create_run_from_pipeline_func(pipeline_func=pipeline,
                                                  arguments={'gcs_uri' : GCS_URI},
                                                  run_name=RUN,
                                                  experiment_name=EXPERIMENT,
                                                  namespace=NAMESPACE)

In [74]:
client.runs.delete_run(run_result.run_id)

{}

## Static HTML (updating)

# Example 4 - Metadata

In [5]:
print('\U0001F31F')

🌟


In [10]:
    print('Give easykubeflow a \U0001F31F on github\nLink to Examples https://github.com/vule24/easykubeflow/tree/master/examples')


Give easykubeflow a 🌟 on github
Link to Examples https://github.com/vule24/easykubeflow/tree/master/examples
