In [None]:
# Copyright 2019 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

# Spark and XGboost Pipeline

This tutorial demonstrate building a machine learning pipeling with spark and XGBoost. The pipeline 
- starts by creating an Google DataProc cluster, and then running analysis, transformation, distributed training and prediction in the created cluster. 
- Then a single node confusion-matrix and ROC aggregator is used (for classification case) to provide the confusion matrix data, and ROC data to the front end, respectively. 
- Finally, a delete cluster operation runs to destroy the cluster it creates in the beginning. The delete cluster operation is used as an exit handler, meaning it will run regardless of whether the pipeline fails or not.

**Please do not forget to enable the Dataproc API in your cluster** https://console.developers.google.com/apis/api/dataproc.googleapis.com/overview

In [22]:
import json
import os
import subprocess
import datetime

import kfp
from kfp import components
from kfp import dsl
from kfp import gcp
import kfp.compiler as compiler

import kubernetes as k8s

In [23]:
PROJECT_ID='kubeflow-pipeline-fantasy'

In [24]:
GCS_BUCKET='gs://kubeflow-pipeline-fantasy-kubeflow1-bucket'

In [None]:
HOST = "https://kubeflow1.endpoints.kubeflow-pipeline-fantasy.cloud.goog/pipeline"
CLIENT_ID = "493831447550-os23o55235htd9v45a9lsejv8d1plhd0.apps.googleusercontent.com"
OTHER_CLIENT_ID = "493831447550-iu24vv6id3ng5smhf2lboovv5qukuhbh.apps.googleusercontent.com"
OTHER_CLIENT_SECRET = "cB8Xj-rb9JWCYcCRDlpTMfhc"

## Create client

**If submit outside the kubeflow cluster, need the following**
- host = "https://`<your-deployment>`.endpoints.`<your-project>`.cloud.goog/pipeline"
- And, you'll first need to create OAuth client ID credentials of type `Other` according to the tutorial [here](
https://cloud.google.com/iap/docs/authentication-howto#authenticating_from_a_desktop_app)

**If you run and submit within the kubeflow cluster**, the following is enough
```python
client = kfp.Client()
```

In [None]:
#Get or create an experiment and submit a pipeline run
in_cluster = True
try:
  k8s.config.load_incluster_config()
except:
  in_cluster = False
  pass

if in_cluster:
    client = kfp.Client()
else:
    client = kfp.Client(host=HOST, 
                        client_id=CLIENT_ID,
                        other_client_id=OTHER_CLIENT_ID, 
                        other_client_secret=OTHER_CLIENT_SECRET)

## Load reusable components

In [25]:
confusion_matrix_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/4e7e6e866c1256e641b0c3effc55438e6e4b30f6/components/local/confusion_matrix/component.yaml')

roc_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/4e7e6e866c1256e641b0c3effc55438e6e4b30f6/components/local/roc/component.yaml')

dataproc_create_cluster_op = components.load_component_from_url(
    'https://raw.githubusercontent.com/kubeflow/pipelines/4e7e6e866c1256e641b0c3effc55438e6e4b30f6/components/gcp/dataproc/create_cluster/component.yaml')

dataproc_delete_cluster_op = components.load_component_from_url(
    'https://raw.githubusercontent.com/kubeflow/pipelines/4e7e6e866c1256e641b0c3effc55438e6e4b30f6/components/gcp/dataproc/delete_cluster/component.yaml')

dataproc_submit_pyspark_op = components.load_component_from_url(
    'https://raw.githubusercontent.com/kubeflow/pipelines/4e7e6e866c1256e641b0c3effc55438e6e4b30f6/components/gcp/dataproc/submit_pyspark_job/component.yaml'
)

dataproc_submit_spark_op = components.load_component_from_url(
    'https://raw.githubusercontent.com/kubeflow/pipelines/4e7e6e866c1256e641b0c3effc55438e6e4b30f6/components/gcp/dataproc/submit_spark_job/component.yaml'
)

## Define global variables

In [26]:
_PYSRC_PREFIX = 'gs://ml-pipeline-playground/dataproc-example' # Common path to python src.

_XGBOOST_PKG = 'gs://ml-pipeline-playground/xgboost4j-example-0.8-SNAPSHOT-jar-with-dependencies.jar'

_TRAINER_MAIN_CLS = 'ml.dmlc.xgboost4j.scala.example.spark.XGBoostTrainer'

_PREDICTOR_MAIN_CLS = 'ml.dmlc.xgboost4j.scala.example.spark.XGBoostPredictor'


def delete_directory_from_gcs(dir_path):
  """Delete a GCS dir recursively. Ignore errors."""
  try:
    subprocess.call(['gsutil', '-m', 'rm', '-r', dir_path])
  except:
    pass

## Define data analyze and transform operation

In [27]:
def dataproc_analyze_op(
    project,
    region,
    cluster_name,
    schema,
    train_data,
    output):
    """Submit dataproc analyze as a pyspark job.
    :param project: GCP project ID.
    :param region: Which zone to run this analyze.
    :param cluster_name: Name of the cluster.
    :param schema: GCS path to the schema.
    :param train_data: GCS path to the training data.
    :param output: GCS path to store the output.
    """
    return dataproc_submit_pyspark_op(
      project_id=project,
      region=region,
      cluster_name=cluster_name,
      main_python_file_uri=os.path.join(_PYSRC_PREFIX, 'analyze_run.py'),
      args=['--output', str(output), '--train', str(train_data), '--schema', str(schema)]
    )


def dataproc_transform_op(
    project,
    region,
    cluster_name,
    train_data,
    eval_data,
    target,
    analysis,
    output
):
    """Submit dataproc transform as a pyspark job.
    :param project: GCP project ID.
    :param region: Which zone to run this analyze.
    :param cluster_name: Name of the cluster.
    :param train_data: GCS path to the training data.
    :param eval_data: GCS path of the eval csv file.
    :param target: Target column name.
    :param analysis: GCS path of the analysis results
    :param output: GCS path to use for output.
    """

    # Remove existing [output]/train and [output]/eval if they exist.
    delete_directory_from_gcs(os.path.join(output, 'train'))
    delete_directory_from_gcs(os.path.join(output, 'eval'))

    return dataproc_submit_pyspark_op(
      project_id=project,
      region=region,
      cluster_name=cluster_name,
      main_python_file_uri=os.path.join(_PYSRC_PREFIX,
                                        'transform_run.py'),
      args=[
        '--output',
        str(output),
        '--analysis',
        str(analysis),
        '--target',
        str(target),
        '--train',
        str(train_data),
        '--eval',
        str(eval_data)
      ])

## Define training and prediction operation

In [28]:
def dataproc_train_op(
    project,
    region,
    cluster_name,
    train_data,
    eval_data,
    target,
    analysis,
    workers,
    rounds,
    output,
    is_classification=True
):

    if is_classification:
        config='gs://ml-pipeline-playground/trainconfcla.json'
    else:
        config='gs://ml-pipeline-playground/trainconfreg.json'

    return dataproc_submit_spark_op(
      project_id=project,
      region=region,
      cluster_name=cluster_name,
      main_class=_TRAINER_MAIN_CLS,
      spark_job=json.dumps({ 'jarFileUris': [_XGBOOST_PKG]}),
      args=json.dumps([
        str(config),
        str(rounds),
        str(workers),
        str(analysis),
        str(target),
        str(train_data),
        str(eval_data),
        str(output)
      ]))


def dataproc_predict_op(
    project,
    region,
    cluster_name,
    data,
    model,
    target,
    analysis,
    output
):

    return dataproc_submit_spark_op(
      project_id=project,
      region=region,
      cluster_name=cluster_name,
      main_class=_PREDICTOR_MAIN_CLS,
      spark_job=json.dumps({ 'jarFileUris': [_XGBOOST_PKG]}),
      args=json.dumps([
        str(model),
        str(data),
        str(analysis),
        str(target),
        str(output)
      ]))

## Define the training pipeline

In [29]:
@dsl.pipeline(
    name='XGBoost Trainer',
    description='A trainer that does end-to-end distributed training for XGBoost models.'
)
def xgb_train_pipeline(
    output=GCS_BUCKET,
    project=PROJECT_ID,
    cluster_name='xgb-%s' % dsl.RUN_ID_PLACEHOLDER,
    region='us-central1',
    train_data='gs://ml-pipeline-playground/sfpd/train.csv',
    eval_data='gs://ml-pipeline-playground/sfpd/eval.csv',
    schema='gs://ml-pipeline-playground/sfpd/schema.json',
    target='resolution',
    rounds=200,
    workers=2,
    true_label='ACTION',
):
    output_template = str(output) + '/' + dsl.RUN_ID_PLACEHOLDER + '/data'

    # Current GCP pyspark/spark op do not provide outputs as return values, instead,
    # we need to use strings to pass the uri around.
    analyze_output = output_template
    transform_output_train = os.path.join(output_template, 'train', 'part-*')
    transform_output_eval = os.path.join(output_template, 'eval', 'part-*')
    train_output = os.path.join(output_template, 'train_output')
    predict_output = os.path.join(output_template, 'predict_output')

    with dsl.ExitHandler(exit_op=dataproc_delete_cluster_op(
        project_id=project,
        region=region,
        name=cluster_name
    )):
        _create_cluster_op = dataproc_create_cluster_op(
            project_id=project,
            region=region,
            name=cluster_name,
            initialization_actions=[
              os.path.join(_PYSRC_PREFIX,
                           'initialization_actions.sh'),
            ],
            image_version='1.2'
        )

        _analyze_op = dataproc_analyze_op(
            project=project,
            region=region,
            cluster_name=cluster_name,
            schema=schema,
            train_data=train_data,
            output=output_template
        ).after(_create_cluster_op).set_display_name('Analyzer')

        _transform_op = dataproc_transform_op(
            project=project,
            region=region,
            cluster_name=cluster_name,
            train_data=train_data,
            eval_data=eval_data,
            target=target,
            analysis=analyze_output,
            output=output_template
        ).after(_analyze_op).set_display_name('Transformer')

        _train_op = dataproc_train_op(
            project=project,
            region=region,
            cluster_name=cluster_name,
            train_data=transform_output_train,
            eval_data=transform_output_eval,
            target=target,
            analysis=analyze_output,
            workers=workers,
            rounds=rounds,
            output=train_output
        ).after(_transform_op).set_display_name('Trainer')

        _predict_op = dataproc_predict_op(
            project=project,
            region=region,
            cluster_name=cluster_name,
            data=transform_output_eval,
            model=train_output,
            target=target,
            analysis=analyze_output,
            output=predict_output
        ).after(_train_op).set_display_name('Predictor')

        _cm_op = confusion_matrix_op(
            predictions=os.path.join(predict_output, 'part-*.csv'),
            output_dir=output_template
        ).after(_predict_op)

        _roc_op = roc_op(
            predictions_dir=os.path.join(predict_output, 'part-*.csv'),
            true_class=true_label,
            true_score_column=true_label,
            output_dir=output_template
        ).after(_predict_op)

    dsl.get_pipeline_conf().add_op_transformer(
        gcp.use_gcp_secret('user-gcp-sa'))

## Submit job

In [32]:
pipeline_func = xgb_train_pipeline
pipeline_filename = pipeline_func.__name__ + '.pipeline.zip'

compiler.Compiler().compile(pipeline_func, pipeline_filename)
#Submit a pipeline run
arguments = {"project":PROJECT_ID,
             "output": GCS_BUCKET}
run_name = pipeline_func.__name__ + ' run'
experiment = client.create_experiment('Spark-and-XGBoost')

run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)

