# MLOps Dataflow Pipeline

In [1]:
!pip install --user --quiet --upgrade google-cloud-pipeline-components apache-beam[gcp,interactive]
from datetime import datetime
import os

from google.cloud import aiplatform as aip

from typing import NamedTuple

from kfp.v2 import dsl
from kfp.v2 import compiler

from google_cloud_pipeline_components import aiplatform as gcc_aip
from google_cloud_pipeline_components.types import artifact_types

In [2]:
PROJECT = 'sandbox-michael-menzel'
REGION = 'europe-west4'
BUCKET = 'gs://sandbox-michael-menzel-training-europe-west4/staging'

DATA_FILE = 'gs://sandbox-michael-menzel-data-europe-west4/advertising-regression-data/advertising.csv'

PIPELINE_DISPLAY_NAME = f'dataflow-pipeline-{int(datetime.now().timestamp())}'
PIPELINE_ROOT = os.path.join(BUCKET, 'pipeline_root/dataflow_example')

aip.init(location=REGION, staging_bucket=BUCKET)

## Data Pipeline Script

In [3]:
%%writefile build/dataflow_pipeline.py
import argparse
from datetime import datetime
import logging
import sys

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

table_schema = {
    'fields': [
        {
            "name": "TV",
            "type": "FLOAT",
            "mode": "REQUIRED"
        },
        {
            "name": "Radio",
            "type": "STRING",
            "mode": "REQUIRED"
        },
        {
            "name": "Newspaper",
            "type": "FLOAT",
            "mode": "REQUIRED"
        },
        {
            "name": "Sales",
            "type": "FLOAT",
            "mode": "REQUIRED"
        },
    ]}

def parse_file(element):
    import csv
    for TV, radio, newspaper, sales in csv.reader([element], quotechar='"', delimiter=',', quoting=csv.QUOTE_ALL, skipinitialspace=True):
        return {'TV': TV, 'Radio': radio, 'Newspaper': newspaper, 'Sales': sales}

    
def run(argv=None):
    parser = argparse.ArgumentParser()
    parser.add_argument('--input',
                        dest='input',
                        help='Input file to process.')
    known_args, pipeline_args = parser.parse_known_args(argv)
    
    options = PipelineOptions(
        pipeline_args,
        job_name=f'import-advertising-csv-pipeline-{int(datetime.now().timestamp())}',
        save_main_session=True,
    )

    pipeline = beam.Pipeline(options=options)
    (
        pipeline
        | 'Read CSV from GCS' >> beam.io.ReadFromText(known_args.input)
        | 'Parse CSV file' >> beam.Map(parse_file)
        | 'Write To BigQuery' >> beam.io.WriteToBigQuery(
            'example_dataset.advertising_regression',
            schema=table_schema,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
    )
    return pipeline.run()
    
if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()


Overwriting build/dataflow_pipeline.py


In [4]:
!gsutil cp build/dataflow_pipeline.py "$BUCKET"

Copying file://build/dataflow_pipeline.py [Content-Type=text/x-python]...
/ [1 files][  1.9 KiB/  1.9 KiB]                                                
Operation completed over 1 objects/1.9 KiB.                                      


## Start the MLOps Pipeline with Dataflow Task on Vertex AI

In [5]:
@dsl.pipeline(
    name=PIPELINE_DISPLAY_NAME,
    description="A simple pipeline with a dataflow job",
    pipeline_root=PIPELINE_ROOT,
)
def pipeline(project_id: str, location: str, dataflow_script: str):
    
    from google_cloud_pipeline_components.experimental.dataflow import DataflowPythonJobOp
    from google_cloud_pipeline_components.experimental.wait_gcp_resources import WaitGcpResourcesOp
    
    dataflow_python_op = DataflowPythonJobOp(
        project=project_id,
        location=location,
        python_module_path=dataflow_script,
        temp_location = BUCKET,
        args = ['--input', DATA_FILE],
    )
  
    dataflow_wait_op =  WaitGcpResourcesOp(
        gcp_resources = dataflow_python_op.outputs["gcp_resources"]
    )
    
    
compiler.Compiler().compile(
    pipeline_func=pipeline, 
    package_path="build/dataflow_pipeline.json",
    type_check=False
)

job = aip.PipelineJob(
    display_name=PIPELINE_DISPLAY_NAME,
    template_path="build/dataflow_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={
        'project_id': PROJECT, 
        'location': REGION, 
        'dataflow_script': os.path.join(BUCKET, 'dataflow_pipeline.py')
    }
)

job.run()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob




INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/928871478446/locations/europe-west4/pipelineJobs/dataflow-pipeline-1642951363-20220123152245
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/928871478446/locations/europe-west4/pipelineJobs/dataflow-pipeline-1642951363-20220123152245')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/europe-west4/pipelines/runs/dataflow-pipeline-1642951363-20220123152245?project=928871478446
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/928871478446/locations/europe-west4/pipelineJobs/dataflow-pipeline-1642951363-20220123152245 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/928871478446/locations/europe-west4/pipelineJobs/dataflow-pip