### Tutorial Pipeline for Artifacts Generation, File Transfer

In [1]:
import kfp
import typing

#### Read and Write Files to GCS

In [2]:
from typing import NamedTuple
from kfp.components import *

def read_write_files_gcs(project_name: str, bucket_name: str, file_directory_path: str, filename: str, 
                         outfilename: str, outjsonname: str): 
    
    ## Import Required Libraries
    import pandas as pd
    import numpy as np
    import gcsfs
    from tensorflow.python.lib.io import file_io
    import json
    
    
    #GCSFS library is required to sign into the Project and Access the Storage Bucket filesystem
    #This needs to be done for each and every Pipeline block which needs to read/write to the bucket
    #This is because each Pipeline block runs as a independent Kubernetes pod
    
    fs = gcsfs.GCSFileSystem(project=project_name, token = 'cloud')
    
    file_path = 'gs://' + bucket_name + '/' + file_directory_path + '/' + filename
    
    #read input
    #pd.read_csv works simply by giving the GCS filepath
    df_sample = pd.read_csv(file_path)
    
    #Printed Outputs can be viewed in the Pipeline Block 'Logs' section
    print("\n\nInput File:\n")
    print(df_sample)
    print('Shape: {}'.format(df_sample.shape))

    df = pd.DataFrame({'Name':['Alex','Lina','Ameya','Martin'], 'Age':[21,19,20,18],
                   'Plays Football':['Yes','Yes','Yes','No']})

    print("\n\nOutput File:\n")
    print(df)
    print('Shape: {}'.format(df.shape))

    
    file_out_path = 'gs://' + bucket_name + '/' + file_directory_path + '/' + outfilename
    
    #Output csv
    #Keep index as False, to prevent an additional index row from getting added each time the file is read and written to
    df.to_csv(file_out_path, index=False)
    
    #Output json using file_io
    json_out_path = 'gs://' + bucket_name + '/' + file_directory_path + '/' + outjsonname
    
    
    s = {'name':'file1', 'outputs':'None','Format':'Json'}
    
    #write to GCS
    with file_io.FileIO(json_out_path, 'w') as f:
        json.dump(s, f)



In [3]:
#func_to_container_op packages the python function into a Pipeline Block which runs as a Kubernetes Pod

kfp_read_write_gcs = kfp.components.func_to_container_op(func = read_write_files_gcs, 
                                                          output_component_file = './read-write-gcs.yaml',
                                                          packages_to_install = ['numpy==1.17.2',
                                                                                 'pandas==1.0.3', 'gcsfs'])
                                                        #Be mindful of Package versions and Dependencies

#### Output files to Minio

In [4]:
from typing import NamedTuple


def exchange_files_within_pipeline_blocks1(project_name: str, bucket_name: str, file_directory_path: str, 
                                           filename: str, outfile: OutputPath()):
    #OutputPath files are Output to the Minio pod's Persistent Volume
    #Note that the OutputPath variable is still declared as a function input parameter
    #Using parameter outputs and File Output (OutputPath()) at the same time does not work

    import numpy as np
    import pandas as pd
    import gcsfs
    
    #read file from GCS
    #Access the bucket file-system
    
    fs = gcsfs.GCSFileSystem(project=project_name, token = 'cloud')
    
    file_path = 'gs://' + bucket_name + '/' + file_directory_path + '/' + filename
    
    #pd.read_csv works simply by giving the GCS filepath
    df = pd.read_csv(file_path)
    
    #Output the file to Minio as OutputPath
    
    df.to_csv(outfile)

In [5]:
kfp_exchange_files1 = kfp.components.func_to_container_op(func = exchange_files_within_pipeline_blocks1, 
                                                          output_component_file = './exchange_files1.yaml',
                                                          packages_to_install = ['numpy==1.17.2',
                                                                                 'pandas==1.0.3', 'gcsfs'])


#### Read Files from Minio

In [6]:
def exchange_files_within_pipeline_blocks2(infile: InputPath(),
                                           outfile: OutputPath()): 
    #OutputPath files are Output to the Minio pod's Persistent Volume
    #Note that the OutputPath variable is still declared as a function input parameter
    
    import numpy as np
    import pandas as pd
    
    #read file output by first container from the Minio Persistent Volume
    df = pd.read_csv(infile)
    
    print('Input File: \n')
    print(df)
    #Do your processing
    df_trim = df.iloc[:,0:2]
    
    print('Output File: \n')
    print(df_trim)
    
    #Output the processed file
    df_trim.to_csv(outfile)

In [7]:
kfp_exchange_files2 = kfp.components.func_to_container_op(func = exchange_files_within_pipeline_blocks2, 
                                                          output_component_file = './exchange_files2.yaml',
                                                          packages_to_install = ['numpy==1.17.2',
                                                                                 'pandas==1.0.3', 'gcsfs'])


#### Artifacts Generation

In [8]:
def generate_artifacts(project_name: str, bucket_name: str, file_directory_path: str, markdown_filename: str,
                       html_filename: str, mlpipeline_ui_metadata: OutputPath()): 
    #To generate artifacts, a metadata json file has to be created and written to the root level of the container's 
    #filesystem. Supported artifact types are -  tables, static html, confusion matrix, markdown and ROC curve
    #tfdv and tfma are deprecated features
    import numpy as np
    import pandas as pd
    import gcsfs
    from tensorflow.python.lib.io import file_io
    import json
    import mpld3
    
    markdown_path = 'gs://' + bucket_name + '/' +file_directory_path + '/' +markdown_filename

    #enter the GCS filesystem
    fs = gcsfs.GCSFileSystem(project=project_name, token = 'cloud')
    
    s = '''### This is a Introductory Trainer pipeline to help you understand the basics of -\n
    *Ingesting and Uploading Data to Cloud Storage with Security credentials\n
    *Data Exchange between Pipeline blocks\n
    *Process for generating Artifacts\n
    Please Complete this Pipeline before Proceeding with the Telco Customer Churn pipelines\n'''
    
    #create and write the markdown file to gcs
    with file_io.FileIO(markdown_path, 'w') as f:
        f.write(s)

    
    #create a seaborn artifact and write it to gcs
    df = pd.DataFrame({'Name':['Alex','Lina','Ameya','Martin'], 'Age':[21,19,20,18],
                   'Plays Football':['Yes','Yes','Yes','No']})
    
    import matplotlib.pyplot as plt
    import seaborn as sns

    ax = sns.catplot(y="Plays Football", kind="count", data=df, height=2.0, aspect=3.0, 
                     legend = True)

    fig = plt.gcf()
    html = mpld3.fig_to_html(fig)
    
    
    html_path = 'gs://' + bucket_name + '/' +file_directory_path + '/' + html_filename
    with file_io.FileIO(html_path, 'w') as f:
        f.write(html)    

            
    # A metadata json file has to be generated and dumped to the root level of the container for artifacts to
    # be generated. Metadata supports the following artifacts - 
    #   Static HTML: 'web-app'
    #   ROC Curve: 'roc'
    #   Confusion Matrix: 'confusion_matrix'
    #   Tables: 'table'
    #   Markdown: 'markdown'
    
    # Storage can be 'gcs', 'minio', or 'inline'
    # Source file path has to be provided - 
    # It can either be a GCS or Minio filepath. No need to specify anything for the 'inline' storage
    
    
    # Some artifacts like Confusion Matrix, ROC curve and Table need a 'schema' field for Artifact visualisation
    # The schema is basically the column headers and datatypes of the DataFrame being used to generate the Artifacts
    
    schema = [{'name':'Name', 'type': 'CATEGORY'}, {'name':'Age', 'type':'CATEGORY'}, 
              {'name':'Plays Football', 'type':'CATEGORY'}]
    # This schema is to output the same DataFrame we had created and stored to GCS in the 'read_files_gcs' block
    # The field 'name' is the name of the DataFrame column
    # The field 'type' is the datatype - only two Datatypes are supported - 'NUMBER' and 'CATEGORY'
    
    
    metadata = {
        'version' : 1, #Kubeflow Version
        'outputs' : [
        # Markdown that is hardcoded inline
        {
            'storage': 'inline',
            'source': '# Welcome to the Trainer',
            'type': 'markdown',
        },
        # Markdown that is read from a file
        {
            'storage' : 'gcs',
            'source': markdown_path,
            'type': 'markdown',
        }, #html web-app(for output of matplotlib/seaborn)
        {
          'type': 'table',
          'storage': 'gcs',
          'format': 'csv',
          'header': [x['name'] for x in schema],
          'source': 'gs://pipelines_artifacts/Artifacts/Player_data.csv'
        },
        {        
          'type': 'web-app',
          'storage': 'gcs',
          'source': html_path
        }]
    }
    
    #dump file to root level of container's filesystem
    with file_io.FileIO('/mlpipeline-ui-metadata.json', 'w') as f:
        json.dump(metadata, f)

    #dump file to Minio Output
    with file_io.FileIO(mlpipeline_ui_metadata, 'w') as f:
        json.dump(metadata, f)

In [9]:
kfp_generate_artifacts = kfp.components.func_to_container_op(func = generate_artifacts, 
                                                          output_component_file = './generate_artifacts.yaml',
                                                          packages_to_install = ['numpy==1.17.2',
                                                                                 'pandas==1.0.3', 'gcsfs',
                                                                                 'mpld3==0.5.1', 'seaborn==0.9.0',
                                                                                 'matplotlib==3.1.1'])


#### Defining the Pipeline Execution Sequence and Input-Output scheme

In [10]:
import kfp.dsl as dsl
#kfp.dsl Decorator is used to designate the Pipeline Definition

@dsl.pipeline(name='Trainer Pipeline',description='Data and Artifacts processing')
def Trainer_func(project_name='YDataSynthetic', bucket_name='pipelines_artifacts', 
                 file_directory_path='Artifacts', filename='Data_Summary.csv', 
                 outfilename='Player_data.csv', outjsonname='sample.json', markdown_filename='Instructions.md',
                 html_filename='sample_graph.html'):
    
    #Define the Input Parameters that you'd like for the User to control from the YData Pipelines Dashboard
    #in the Pipeline function definition
    
    #Passing pipeline parameter and a constant value as operation arguments
    read_write_files_gcs_task = kfp_read_write_gcs(project_name, bucket_name, file_directory_path, 
                                                   filename, outfilename, outjsonname)
    exchange_files_task1 = kfp_exchange_files1(project_name, bucket_name, file_directory_path, outfilename)
    exchange_files_task2 = kfp_exchange_files2(exchange_files_task1.outputs['outfile'])
    generate_artifacts_task = kfp_generate_artifacts(project_name, bucket_name, file_directory_path, 
                                                     markdown_filename, html_filename)
    
    #Take care to define the pipeline block execution and the input-output dependancies properly
    
#For an operation with a single return value, the output reference can be accessed using `task.output` or `task.outputs['output_name']` syntax
#For an operation with a multiple return values, the output references can be accessed using `task.outputs['output_name']` syntax

#### Compiling the Pipeline

In [11]:
pipeline_func = Trainer_func
pipeline_filename = pipeline_func.__name__+'.pipeline.tar.gz'

import kfp.compiler as comp            #this Compiler function will compile the Pipeline into .tar.gz format
comp.Compiler().compile(pipeline_func, pipeline_filename)

#The ready-to-upload compiled file will be generated in the same folder as this Jupyter notebook
#Upload it the the YData Pipelines Dashboard to Run