# Data Processing Pipeline with sample code in GoLang

In this notebook, we will show how to:

* Interactively define a KubeFlow Pipeline using the Pipelines Python SDK
* Submit and run the pipeline
* Add a step in the pipeline


## Environment Setup

Before any experiment can be conducted. We need to setup and initialize an environment: ensure all Python modules has been setup and configured, as well as python modules. The pipeline is responsible for building and scheduling of custom steps implemented in Go.  You will find the source code for sample steps in workspace/components/golang/csv.go

### Imports
Setting up python modules

In [None]:
%reload_ext autoreload
%autoreload 2

%reload_ext nbextensions
%load_nbvars

import kfp.dsl as dsl
import kfp.gcp as gcp
import pandas as pd
from ipython_secrets import get_secret
from kfp.compiler import Compiler
from kfp import components
from os import environ
import boto3, kfp

from nbextensions.pv import use_pvc
from nbextensions.kubernetes import use_pull_secret
from nbextensions.aws import upload_to_s3

import nbextensions.utils as utils
from datetime import datetime
from urllib.parse import urlparse

import warnings
warnings.filterwarnings('ignore')

### Define global variables

Initialize global namespace variables. It is a good practice to place all global namespace variables in one cell. So, the notebook could be configured all-at-once. 

To enhance readability we would advice to capitalize such variables.

In [None]:
USER = environ.get('NB_USER', 'John Doe')
TAG = 'latest'
# TAG = 'v8'

BUILD_CONTEXT = f"{TAG}/buildcontext"
GOLANG_IMAGE = f"{DOCKER_REGISTRY}/golang:{TAG}"
FLASK_APP_IMAGE = f"{DOCKER_REGISTRY}/flask:{TAG}"
TRAINING_ROOT = f"{MOUNT_PATH}/{TAG}/training"

DATASET_FILE = f"{TRAINING_ROOT}/go-in.csv"
OUT_FILE = f"{TRAINING_ROOT}/go-out.csv"

s3 = boto3.session.Session().client(
    service_name='s3',
    aws_access_key_id=get_secret('aws_access_key_id'),
    aws_secret_access_key=get_secret('aws_secret_access_key'),
    endpoint_url=BUCKET_ENDPOINT
)

client = kfp.Client()
try:
    exp = client.get_experiment(experiment_name=APPLICATION_NAME)
except:
    exp = client.create_experiment(APPLICATION_NAME)

### Define build docker image pipeline

Define build pipeline. Yes, we arguably using KFP to build images  that will be de-facto used by final pipeline.

We use [Kaniko](https://github.com/GoogleContainerTools/kaniko) and Kubernetes to handle build operations. Build status can be tracked via KFP pipeline dashboard

In fact build image job can be even combined with primary pipeline as physically it will be different Kubernetes pods. However for sake of general purpose efficiency we schedule build process via separate pipeline step

In [None]:
kaniko_op = components.load_component_from_file('components/kaniko/deploy.yaml')

@dsl.pipeline(
  name='Pipeline images',
  description='Build images that will be used by the pipeline'
)
def build_image(
        image, 
        build_context=None, 
        dockerfile: dsl.PipelineParam=dsl.PipelineParam(name='dockerfile', value='Dockerfile')):
    kaniko_op(
        image=image,
        dockerfile=dockerfile,
        build_context=build_context
    ).apply(
        # docker registry credentials 
        use_pull_secret(secret_name=DOCKER_REGISTRY_PULL_SECRET)
    ).apply(
        # s3 bucket volume clame has been injected here        
        use_pvc(name=BUCKET_PVC, mount_to=MOUNT_PATH)
    )
        
Compiler().compile(build_image, '.kaniko.tar.gz')

Compiler transforms Python DSL into an [Argo Workflow](https://argoproj.github.io/docs/argo/readme.html). And stores generated artifacts in [`.kaniko.tar.gz`](.kaniko.tar.gz). So it could be executed multiple times. Perhaps with different parameters

## Data Processing Pipeline
Data processing pipeline will be executed outside of a Jupyter Notebook and utilize the maximum capacity of the current cluster. To achieve this we need to perform following actions:
* Build a docker image for data processing steps
* Define a data processing pipeline
* Run the pipeline

### Building a Docker image
Once pipeline has been defined we can reuse it multiple times by supplying different input parameters.

Next section will upload all files to s3, to share access with the pipeline. Files that should be ignored can be customized in [kanikoignore.txt](./kanikoignore.txt). To understand upload scenario you can review and modify: [aws.py](./extensions/kaniko/aws.py)

In [None]:
upload_to_s3(
    destination=f"s3://{BUCKET_NAME}/{BUILD_CONTEXT}",
    ignorefile='components/kaniko/ignorefile.txt',
    workspace='.',
    s3_client=s3
)

run = client.run_pipeline(
    exp.id, f'Build image: golang:{TAG}', '.kaniko.tar.gz', 
    params={
        'image': GOLANG_IMAGE,
        'build-context': f"{MOUNT_PATH}/{BUILD_CONTEXT}/components/golang"
    })

Build process can be long a long term. Because often images that has been used for data science tasks are huge. In this case you might want to adjust `timeout` parameter

In [None]:
%%time
# block until job completion
print(f"Waiting for run: {run.id}...")
result = client.wait_for_run_completion(run.id, timeout=720).run.status
print(f"Finished with: {result}")

### Define Pipeline
We have extracted code for training pipeline into a [component](components/training). Python code that defines `training_op` as well as a `http_download_op` can be found [here](components/training/component.py)

Below we will define a pipeline that will run the training pipeline as an experimnet. This pipeline will do the following. Every training operation (except download) will be encapsulated by the python script. You can change the scripts at your will however, you will need to rebuild a training image.

* Download dataset from http 
* Split data into sample and test. It can also put a rownum limit into a dataset to increase feedback
* Preprocess data for machine learning (clean, tokenize and transform text into vector)
* Apply sequence to sequence training with Keras. By the completion trained model will be uplooaded into s3 bucket 

In [None]:
from components.golang import (http_download_op, processing_op)

@dsl.pipeline(
  name='Processing',
  description="""
  Download dataset, 
  Run data processing steps using Go 
  """
)
def processing_pipeline(
    import_from: dsl.PipelineParam, 
    dataset_file: dsl.PipelineParam,
    dataset_md5: dsl.PipelineParam,
    out_file: dsl.PipelineParam,
):  
    download = http_download_op(
        url=import_from,
        md5sum=dataset_md5,
        download_to=dataset_file
    ).apply(
        use_pvc(name=BUCKET_PVC, mount_to=MOUNT_PATH)
    )
    
    # Run a single Go processing step.
    process = processing_op(
        script='gocsv',
        arguments=[
            dataset_file,
            out_file,
        ]
    ).apply(
        use_pvc(name=BUCKET_PVC, mount_to=MOUNT_PATH)
    ).after(download)
    
    # Run multiple Go processing steps in parallel
    a = ['f1', 'f2', 'f3', 'f4', 'f5', 'f6', 'f7', 'f8', 'f9', 'f10', 'f11', 'f12', 'f13', 'f14', 'f15', 'f16']
    for i in a:
        processed_file = f"{TRAINING_ROOT}/go-out{i}.csv"
        process = processing_op(
            script='gocsv',
            arguments=[
                dataset_file,
                processed_file,
            ]
        ).apply(
            use_pvc(name=BUCKET_PVC, mount_to=MOUNT_PATH)
        ).after(download)

    
#     training.set_memory_request('2G')
#    process.set_cpu_request('1')

Compiler().compile(processing_pipeline, '.processing.tar.gz')

### Run the pipeline

Code below will run a pipeline and inject some pipeline parameters. Here we provide two versions of data sets
* `SAMPLE_DATA_SET` - Data set that has just over 2 megabytes. Not enough for sufficient training. However ideal for development, because of faster feedback.
* `FULL_DATA_SET` - Precreated data set with all github issues. 3 gigabytes. Good enough for sufficient model

Depending on your needs you can choose one or another data set and pass it as a pipeline parameter `data-set`

In [None]:
# github issues small: 2Mi data set (best for dev/test)
SAMPLE_DATASET = 'https://s3.us-east-2.amazonaws.com/asi-kubeflow-models/gh-issues/go-in.csv'
SAMPLE_DATASET_MD5 = '916af946f2fe1d1779b26205d4d8378f'

run = client.run_pipeline(exp.id, f'Data processing {TAG}: {datetime.now():%m%d-%H%M}', '.processing.tar.gz',
                          params={
                              'import-from': SAMPLE_DATASET,
                              'dataset-md5': SAMPLE_DATASET_MD5,
                              'dataset-file': DATASET_FILE,
                              'out-file': OUT_FILE,
                          })

In [None]:
%%time
# block until job completion
print(f"Waiting for run: {run.id}...")
result = client.wait_for_run_completion(run.id, timeout=720).run.status
print(f"Finished with: {result}")